diff --git a/job_crawler/app/services/crawler.py b/job_crawler/app/services/crawler.py index 6e8bf5b..bbc06ed 100644 --- a/job_crawler/app/services/crawler.py +++ b/job_crawler/app/services/crawler.py @@ -7,7 +7,6 @@ from app.services.api_client import api_client from app.services.kafka_service import kafka_service from app.services.progress_store import progress_store from app.utils import is_within_days -from app.models import JobData from app.core.config import settings logger = logging.getLogger(__name__) @@ -128,24 +127,24 @@ class TaskCrawler: logger.info(f"[{self.task_name}] offset={offset}, 返回数据为空") return 0 - # 过滤数据 - filtered_jobs = [] + # 过滤数据,保留原始内容 + filtered_raw_data = [] for raw in data_list: aae397 = raw.get("aae397", "") collect_time = raw.get("Collect_time", "") if is_within_days(aae397, collect_time, self.filter_days): - job = JobData.from_raw(raw) - job.task_id = self.task_id - filtered_jobs.append(job) + # 添加 task_id 到原始数据 + raw["task_id"] = self.task_id + filtered_raw_data.append(raw) - valid_count = len(filtered_jobs) + valid_count = len(filtered_raw_data) expired_count = len(data_list) - valid_count self._total_filtered += valid_count - # 立即发送到Kafka + # 立即发送到Kafka(原始数据) produced = 0 - if filtered_jobs: - produced = kafka_service.produce_batch(filtered_jobs) + if filtered_raw_data: + produced = kafka_service.produce_batch_raw(filtered_raw_data) self._total_produced += produced logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}") diff --git a/job_crawler/app/services/kafka_service.py b/job_crawler/app/services/kafka_service.py index 328275b..2bad660 100644 --- a/job_crawler/app/services/kafka_service.py +++ b/job_crawler/app/services/kafka_service.py @@ -82,6 +82,27 @@ class KafkaService: self.producer.flush() return success_count + def produce_raw(self, raw_data: dict) -> bool: + """发送原始数据到Kafka""" + try: + # 使用原始数据中的某个字段作为key,或生成一个 + key = raw_data.get("ACE760", "") or raw_data.get("aca112", "") + future = self.producer.send(self.topic, key=key, value=raw_data) + future.get(timeout=10) + return True + except KafkaError as e: + logger.error(f"发送原始消息失败: {e}") + return False + + def produce_batch_raw(self, raw_data_list: List[dict]) -> int: + """批量发送原始数据""" + success_count = 0 + for raw_data in raw_data_list: + if self.produce_raw(raw_data): + success_count += 1 + self.producer.flush() + return success_count + def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]: """消费消息""" messages = []