From 3cacaf040a35429139b0f80c4a9c7c04a4522e47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=A1=BA=E4=B8=9C?= <577732344@qq.com> Date: Thu, 15 Jan 2026 17:59:12 +0800 Subject: [PATCH] feat(job_crawler): enhance logging and tracking for data filtering and Kafka production - Add logging when API returns empty data to track offset progression - Track expired job count separately from valid filtered jobs - Initialize produced counter to handle cases with no filtered jobs - Consolidate logging into single comprehensive info log per batch - Log includes: total fetched, valid, expired, and Kafka-produced counts - Improves observability for debugging data flow and filtering efficiency --- job_crawler/app/services/crawler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/job_crawler/app/services/crawler.py b/job_crawler/app/services/crawler.py index 70dbdbd..6e8bf5b 100644 --- a/job_crawler/app/services/crawler.py +++ b/job_crawler/app/services/crawler.py @@ -125,6 +125,7 @@ class TaskCrawler: result = await api_client.fetch_data(self.task_id, offset, self.batch_size) data_list = result.get("data", {}).get("data", []) if not data_list: + logger.info(f"[{self.task_name}] offset={offset}, 返回数据为空") return 0 # 过滤数据 @@ -138,13 +139,16 @@ class TaskCrawler: filtered_jobs.append(job) valid_count = len(filtered_jobs) + expired_count = len(data_list) - valid_count self._total_filtered += valid_count # 立即发送到Kafka + produced = 0 if filtered_jobs: produced = kafka_service.produce_batch(filtered_jobs) self._total_produced += produced - logger.debug(f"[{self.task_name}] offset={offset}, 过滤={valid_count}, 发送={produced}") + + logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}") return valid_count