ORIGIN DATA
This commit is contained in:
@@ -7,7 +7,6 @@ from app.services.api_client import api_client
|
|||||||
from app.services.kafka_service import kafka_service
|
from app.services.kafka_service import kafka_service
|
||||||
from app.services.progress_store import progress_store
|
from app.services.progress_store import progress_store
|
||||||
from app.utils import is_within_days
|
from app.utils import is_within_days
|
||||||
from app.models import JobData
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -128,24 +127,24 @@ class TaskCrawler:
|
|||||||
logger.info(f"[{self.task_name}] offset={offset}, 返回数据为空")
|
logger.info(f"[{self.task_name}] offset={offset}, 返回数据为空")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
# 过滤数据
|
# 过滤数据,保留原始内容
|
||||||
filtered_jobs = []
|
filtered_raw_data = []
|
||||||
for raw in data_list:
|
for raw in data_list:
|
||||||
aae397 = raw.get("aae397", "")
|
aae397 = raw.get("aae397", "")
|
||||||
collect_time = raw.get("Collect_time", "")
|
collect_time = raw.get("Collect_time", "")
|
||||||
if is_within_days(aae397, collect_time, self.filter_days):
|
if is_within_days(aae397, collect_time, self.filter_days):
|
||||||
job = JobData.from_raw(raw)
|
# 添加 task_id 到原始数据
|
||||||
job.task_id = self.task_id
|
raw["task_id"] = self.task_id
|
||||||
filtered_jobs.append(job)
|
filtered_raw_data.append(raw)
|
||||||
|
|
||||||
valid_count = len(filtered_jobs)
|
valid_count = len(filtered_raw_data)
|
||||||
expired_count = len(data_list) - valid_count
|
expired_count = len(data_list) - valid_count
|
||||||
self._total_filtered += valid_count
|
self._total_filtered += valid_count
|
||||||
|
|
||||||
# 立即发送到Kafka
|
# 立即发送到Kafka(原始数据)
|
||||||
produced = 0
|
produced = 0
|
||||||
if filtered_jobs:
|
if filtered_raw_data:
|
||||||
produced = kafka_service.produce_batch(filtered_jobs)
|
produced = kafka_service.produce_batch_raw(filtered_raw_data)
|
||||||
self._total_produced += produced
|
self._total_produced += produced
|
||||||
|
|
||||||
logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}")
|
logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}")
|
||||||
|
|||||||
@@ -82,6 +82,27 @@ class KafkaService:
|
|||||||
self.producer.flush()
|
self.producer.flush()
|
||||||
return success_count
|
return success_count
|
||||||
|
|
||||||
|
def produce_raw(self, raw_data: dict) -> bool:
|
||||||
|
"""发送原始数据到Kafka"""
|
||||||
|
try:
|
||||||
|
# 使用原始数据中的某个字段作为key,或生成一个
|
||||||
|
key = raw_data.get("ACE760", "") or raw_data.get("aca112", "")
|
||||||
|
future = self.producer.send(self.topic, key=key, value=raw_data)
|
||||||
|
future.get(timeout=10)
|
||||||
|
return True
|
||||||
|
except KafkaError as e:
|
||||||
|
logger.error(f"发送原始消息失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def produce_batch_raw(self, raw_data_list: List[dict]) -> int:
|
||||||
|
"""批量发送原始数据"""
|
||||||
|
success_count = 0
|
||||||
|
for raw_data in raw_data_list:
|
||||||
|
if self.produce_raw(raw_data):
|
||||||
|
success_count += 1
|
||||||
|
self.producer.flush()
|
||||||
|
return success_count
|
||||||
|
|
||||||
def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]:
|
def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]:
|
||||||
"""消费消息"""
|
"""消费消息"""
|
||||||
messages = []
|
messages = []
|
||||||
|
|||||||
Reference in New Issue
Block a user