feat(job_crawler): enhance logging and tracking for data filtering and Kafka production

- Add logging when API returns empty data to track offset progression
- Track expired job count separately from valid filtered jobs
- Initialize produced counter to handle cases with no filtered jobs
- Consolidate logging into single comprehensive info log per batch
- Log includes: total fetched, valid, expired, and Kafka-produced counts
- Improves observability for debugging data flow and filtering efficiency
This commit is contained in:
2026-01-15 17:59:12 +08:00
parent 3acc0a9221
commit 3cacaf040a

View File

@@ -125,6 +125,7 @@ class TaskCrawler:
result = await api_client.fetch_data(self.task_id, offset, self.batch_size) result = await api_client.fetch_data(self.task_id, offset, self.batch_size)
data_list = result.get("data", {}).get("data", []) data_list = result.get("data", {}).get("data", [])
if not data_list: if not data_list:
logger.info(f"[{self.task_name}] offset={offset}, 返回数据为空")
return 0 return 0
# 过滤数据 # 过滤数据
@@ -138,13 +139,16 @@ class TaskCrawler:
filtered_jobs.append(job) filtered_jobs.append(job)
valid_count = len(filtered_jobs) valid_count = len(filtered_jobs)
expired_count = len(data_list) - valid_count
self._total_filtered += valid_count self._total_filtered += valid_count
# 立即发送到Kafka # 立即发送到Kafka
produced = 0
if filtered_jobs: if filtered_jobs:
produced = kafka_service.produce_batch(filtered_jobs) produced = kafka_service.produce_batch(filtered_jobs)
self._total_produced += produced self._total_produced += produced
logger.debug(f"[{self.task_name}] offset={offset}, 过滤={valid_count}, 发送={produced}")
logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}")
return valid_count return valid_count