"""多任务增量采集核心逻辑 - 从后往前采集""" import asyncio import logging from typing import Dict, Optional from concurrent.futures import ThreadPoolExecutor from app.services.api_client import api_client from app.services.kafka_service import kafka_service from app.services.progress_store import progress_store from app.utils import is_within_days from app.core.config import settings logger = logging.getLogger(__name__) class TaskCrawler: """单个任务采集器 - 从后往前采集""" def __init__(self, task_config): self.task_id = task_config.id self.task_name = task_config.name or task_config.id self.batch_size = settings.api.batch_size self.filter_days = settings.crawler.filter_days self.max_expired_batches = settings.crawler.max_expired_batches self._running = False self._total_filtered = 0 self._total_produced = 0 @property def is_running(self) -> bool: return self._running async def start(self, reset: bool = False): """开始采集 - 从后往前""" if self._running: logger.warning(f"[{self.task_name}] 任务已在运行中") return self._running = True self._total_filtered = 0 self._total_produced = 0 logger.info(f"[{self.task_name}] 开始采集任务(从后往前)") try: if reset: progress_store.reset_progress(self.task_id) # 获取数据总数 total = await api_client.get_total_count(self.task_id) logger.info(f"[{self.task_name}] 数据总数: {total}") if total == 0: logger.info(f"[{self.task_name}] 无数据可采集") self._running = False return # 读取上次进度,确定截止位置 progress = progress_store.get_progress(self.task_id) last_start_offset = progress.last_start_offset if progress else None # 计算本次采集范围 start_offset = max(0, total - self.batch_size) # 从最新数据开始 end_offset = last_start_offset if last_start_offset is not None else 0 # 如果没有新数据 if start_offset <= end_offset: logger.info(f"[{self.task_name}] 无新数据,start={start_offset}, end={end_offset}") self._running = False return logger.info(f"[{self.task_name}] 采集范围: {start_offset} → {end_offset}") # 记录本次起始位置 this_start_offset = start_offset current_offset = start_offset expired_batches = 0 is_first_crawl = last_start_offset is None while current_offset >= end_offset and self._running: try: # 采集一批数据并立即发送Kafka valid_count = await self._crawl_batch(current_offset) # 仅首次采集时检查连续过期 if is_first_crawl: if valid_count == 0: expired_batches += 1 logger.info(f"[{self.task_name}] 连续过期批次: {expired_batches}") if expired_batches >= self.max_expired_batches: logger.info(f"[{self.task_name}] 连续{self.max_expired_batches}批过期,停止采集") break else: expired_batches = 0 current_offset -= self.batch_size # 计算进度 crawled = this_start_offset - current_offset total_to_crawl = this_start_offset - end_offset progress_pct = min(100, crawled / total_to_crawl * 100) if total_to_crawl > 0 else 100 logger.info(f"[{self.task_name}] 进度: {progress_pct:.1f}% (offset={current_offset})") await asyncio.sleep(0.5) except Exception as e: logger.error(f"[{self.task_name}] 采集批次失败: {e}") await asyncio.sleep(5) # 保存进度 status = "completed" if current_offset < end_offset or not self._running else "stopped" progress_store.save_progress( self.task_id, this_start_offset, total, status, self._total_filtered, self._total_produced ) logger.info(f"[{self.task_name}] 采集完成,状态={status},过滤={self._total_filtered},发送={self._total_produced}") except Exception as e: logger.error(f"[{self.task_name}] 采集任务异常: {e}") progress_store.save_progress(self.task_id, 0, 0, "error", self._total_filtered, self._total_produced) finally: self._running = False async def _crawl_batch(self, offset: int) -> int: """采集一批数据,过滤后立即发送Kafka,返回有效数据数量""" result = await api_client.fetch_data(self.task_id, offset, self.batch_size) data_list = result.get("data", {}).get("data", []) if not data_list: logger.info(f"[{self.task_name}] offset={offset}, 返回数据为空") return 0 # 过滤数据,保留原始内容 filtered_raw_data = [] for raw in data_list: aae397 = raw.get("aae397", "") collect_time = raw.get("Collect_time", "") if is_within_days(aae397, collect_time, self.filter_days): # 添加 task_id 到原始数据 raw["task_id"] = self.task_id filtered_raw_data.append(raw) valid_count = len(filtered_raw_data) expired_count = len(data_list) - valid_count self._total_filtered += valid_count # 立即发送到Kafka(原始数据) produced = 0 if filtered_raw_data: produced = kafka_service.produce_batch_raw(filtered_raw_data) self._total_produced += produced logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}") return valid_count def stop(self): """停止采集""" logger.info(f"[{self.task_name}] 正在停止采集任务...") self._running = False def get_status(self) -> dict: """获取采集状态""" stats = progress_store.get_stats(self.task_id) if not stats: return { "task_id": self.task_id, "task_name": self.task_name, "total": 0, "last_start_offset": None, "progress": "-", "status": "idle", "last_update": "", "filtered_count": 0, "produced_count": 0, "is_running": self._running } return { "task_id": self.task_id, "task_name": self.task_name, "total": stats.get("total", 0), "last_start_offset": stats.get("last_start_offset"), "progress": "-", "status": "running" if self._running else stats.get("status", "idle"), "last_update": stats.get("last_update", ""), "filtered_count": stats.get("filtered_count", 0), "produced_count": stats.get("produced_count", 0), "is_running": self._running } class CrawlerManager: """多任务采集管理器""" def __init__(self): self._crawlers: Dict[str, TaskCrawler] = {} self._executor = ThreadPoolExecutor(max_workers=settings.crawler.max_workers) self._init_crawlers() def _init_crawlers(self): """初始化所有启用的任务采集器""" for task in settings.get_enabled_tasks(): self._crawlers[task.id] = TaskCrawler(task) logger.info(f"初始化任务采集器: {task.name} ({task.id})") def get_crawler(self, task_id: str) -> Optional[TaskCrawler]: return self._crawlers.get(task_id) def get_all_crawlers(self) -> Dict[str, TaskCrawler]: return self._crawlers async def start_task(self, task_id: str, reset: bool = False) -> bool: """启动单个任务""" crawler = self._crawlers.get(task_id) if not crawler: logger.error(f"任务不存在: {task_id}") return False if crawler.is_running: logger.warning(f"任务已在运行: {task_id}") return False asyncio.create_task(crawler.start(reset)) return True async def start_all(self, reset: bool = False): """启动所有任务""" for task_id, crawler in self._crawlers.items(): if not crawler.is_running: logger.info(f"自动启动任务: {crawler.task_name}") asyncio.create_task(crawler.start(reset)) def stop_task(self, task_id: str) -> bool: crawler = self._crawlers.get(task_id) if not crawler: return False crawler.stop() return True def stop_all(self): for crawler in self._crawlers.values(): crawler.stop() def get_status(self, task_id: str = None) -> dict: if task_id: crawler = self._crawlers.get(task_id) return crawler.get_status() if crawler else {} return { "tasks": [c.get_status() for c in self._crawlers.values()], "kafka_lag": kafka_service.get_lag(), "running_count": sum(1 for c in self._crawlers.values() if c.is_running) } @property def is_any_running(self) -> bool: return any(c.is_running for c in self._crawlers.values()) # 全局管理器实例 crawler_manager = CrawlerManager()