feat(job_crawler): implement reverse-order incremental crawling with real-time Kafka publishing

- Add comprehensive sequence diagrams documenting container startup, task initialization, and incremental crawling flow
- Implement reverse-order crawling logic (from latest to oldest) to optimize performance by processing new data first
- Add real-time Kafka message publishing after each batch filtering instead of waiting for task completion
- Update progress tracking to store last_start_offset for accurate incremental crawling across sessions
- Enhance crawler service with improved offset calculation and batch processing logic
- Update configuration files to support new crawling parameters and Kafka integration
- Add progress model enhancements to track crawling state and handle edge cases
- Improve main application initialization to properly handle lifespan events and task auto-start
This change enables efficient incremental data collection where new data is prioritized and published immediately, reducing latency and improving system responsiveness.
This commit is contained in:
2026-01-15 17:46:55 +08:00
parent 63cd432a0c
commit 3acc0a9221
8 changed files with 402 additions and 60 deletions

View File

@@ -1,4 +1,4 @@
"""多任务增量采集核心逻辑"""
"""多任务增量采集核心逻辑 - 从后往前采集"""
import asyncio
import logging
from typing import Dict, Optional
@@ -8,19 +8,20 @@ from app.services.kafka_service import kafka_service
from app.services.progress_store import progress_store
from app.utils import is_within_days
from app.models import JobData
from app.core.config import settings, TaskConfig
from app.core.config import settings
logger = logging.getLogger(__name__)
class TaskCrawler:
"""单个任务采集器"""
"""单个任务采集器 - 从后往前采集"""
def __init__(self, task_config: TaskConfig):
def __init__(self, task_config):
self.task_id = task_config.id
self.task_name = task_config.name or task_config.id
self.batch_size = settings.api.batch_size
self.filter_days = settings.crawler.filter_days
self.max_expired_batches = settings.crawler.max_expired_batches
self._running = False
self._total_filtered = 0
self._total_produced = 0
@@ -30,7 +31,7 @@ class TaskCrawler:
return self._running
async def start(self, reset: bool = False):
"""开始采集"""
"""开始采集 - 从后往前"""
if self._running:
logger.warning(f"[{self.task_name}] 任务已在运行中")
return
@@ -38,43 +39,80 @@ class TaskCrawler:
self._running = True
self._total_filtered = 0
self._total_produced = 0
logger.info(f"[{self.task_name}] 开始采集任务")
logger.info(f"[{self.task_name}] 开始采集任务(从后往前)")
try:
if reset:
progress_store.reset_progress(self.task_id)
current_offset = 0
else:
progress = progress_store.get_progress(self.task_id)
current_offset = progress.current_offset if progress else 0
# 获取数据总数
total = await api_client.get_total_count(self.task_id)
logger.info(f"[{self.task_name}] 数据总数: {total}, 当前偏移: {current_offset}")
logger.info(f"[{self.task_name}] 数据总数: {total}")
if current_offset >= total:
logger.info(f"[{self.task_name}] 数据已全部采集完成")
progress_store.save_progress(self.task_id, current_offset, total, "completed",
self._total_filtered, self._total_produced)
if total == 0:
logger.info(f"[{self.task_name}] 数据可采集")
self._running = False
return
while current_offset < total and self._running:
# 读取上次进度,确定截止位置
progress = progress_store.get_progress(self.task_id)
last_start_offset = progress.last_start_offset if progress else None
# 计算本次采集范围
start_offset = max(0, total - self.batch_size) # 从最新数据开始
end_offset = last_start_offset if last_start_offset is not None else 0
# 如果没有新数据
if start_offset <= end_offset:
logger.info(f"[{self.task_name}] 无新数据start={start_offset}, end={end_offset}")
self._running = False
return
logger.info(f"[{self.task_name}] 采集范围: {start_offset}{end_offset}")
# 记录本次起始位置
this_start_offset = start_offset
current_offset = start_offset
expired_batches = 0
is_first_crawl = last_start_offset is None
while current_offset >= end_offset and self._running:
try:
await self._crawl_batch(current_offset)
current_offset += self.batch_size
progress_store.save_progress(self.task_id, current_offset, total, "running",
self._total_filtered, self._total_produced)
progress_pct = min(100, current_offset / total * 100)
logger.info(f"[{self.task_name}] 进度: {progress_pct:.2f}% ({current_offset}/{total})")
# 采集一批数据并立即发送Kafka
valid_count = await self._crawl_batch(current_offset)
# 仅首次采集时检查连续过期
if is_first_crawl:
if valid_count == 0:
expired_batches += 1
logger.info(f"[{self.task_name}] 连续过期批次: {expired_batches}")
if expired_batches >= self.max_expired_batches:
logger.info(f"[{self.task_name}] 连续{self.max_expired_batches}批过期,停止采集")
break
else:
expired_batches = 0
current_offset -= self.batch_size
# 计算进度
crawled = this_start_offset - current_offset
total_to_crawl = this_start_offset - end_offset
progress_pct = min(100, crawled / total_to_crawl * 100) if total_to_crawl > 0 else 100
logger.info(f"[{self.task_name}] 进度: {progress_pct:.1f}% (offset={current_offset})")
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"[{self.task_name}] 采集批次失败: {e}")
await asyncio.sleep(5)
status = "completed" if current_offset >= total else "stopped"
progress_store.save_progress(self.task_id, current_offset, total, status,
self._total_filtered, self._total_produced)
logger.info(f"[{self.task_name}] 采集任务 {status}")
# 保存进度
status = "completed" if current_offset < end_offset or not self._running else "stopped"
progress_store.save_progress(
self.task_id, this_start_offset, total, status,
self._total_filtered, self._total_produced
)
logger.info(f"[{self.task_name}] 采集完成,状态={status},过滤={self._total_filtered},发送={self._total_produced}")
except Exception as e:
logger.error(f"[{self.task_name}] 采集任务异常: {e}")
progress_store.save_progress(self.task_id, 0, 0, "error",
@@ -82,26 +120,33 @@ class TaskCrawler:
finally:
self._running = False
async def _crawl_batch(self, offset: int):
"""采集一批数据"""
async def _crawl_batch(self, offset: int) -> int:
"""采集一批数据过滤后立即发送Kafka返回有效数据数量"""
result = await api_client.fetch_data(self.task_id, offset, self.batch_size)
data_list = result.get("data", {}).get("data", [])
if not data_list:
return
return 0
# 过滤数据
filtered_jobs = []
for raw in data_list:
aae397 = raw.get("aae397", "")
collect_time = raw.get("Collect_time", "")
if is_within_days(aae397, collect_time, self.filter_days):
job = JobData.from_raw(raw)
job.task_id = self.task_id # 添加任务ID标识
job.task_id = self.task_id
filtered_jobs.append(job)
self._total_filtered += len(filtered_jobs)
valid_count = len(filtered_jobs)
self._total_filtered += valid_count
# 立即发送到Kafka
if filtered_jobs:
produced = kafka_service.produce_batch(filtered_jobs)
self._total_produced += produced
logger.debug(f"[{self.task_name}] offset={offset}, 过滤={valid_count}, 发送={produced}")
return valid_count
def stop(self):
"""停止采集"""
@@ -114,19 +159,20 @@ class TaskCrawler:
if not stats:
return {
"task_id": self.task_id, "task_name": self.task_name,
"total": 0, "current_offset": 0, "progress": "0%",
"total": 0, "last_start_offset": None, "progress": "-",
"status": "idle", "last_update": "",
"filtered_count": 0, "produced_count": 0
"filtered_count": 0, "produced_count": 0, "is_running": self._running
}
total = stats.get("total", 0)
current = stats.get("current_offset", 0)
progress = f"{min(100, current / total * 100):.2f}%" if total > 0 else "0%"
return {
"task_id": self.task_id, "task_name": self.task_name,
"total": total, "current_offset": current, "progress": progress,
"status": stats.get("status", "idle"), "last_update": stats.get("last_update", ""),
"total": stats.get("total", 0),
"last_start_offset": stats.get("last_start_offset"),
"progress": "-",
"status": "running" if self._running else stats.get("status", "idle"),
"last_update": stats.get("last_update", ""),
"filtered_count": stats.get("filtered_count", 0),
"produced_count": stats.get("produced_count", 0)
"produced_count": stats.get("produced_count", 0),
"is_running": self._running
}
@@ -145,11 +191,9 @@ class CrawlerManager:
logger.info(f"初始化任务采集器: {task.name} ({task.id})")
def get_crawler(self, task_id: str) -> Optional[TaskCrawler]:
"""获取指定任务的采集器"""
return self._crawlers.get(task_id)
def get_all_crawlers(self) -> Dict[str, TaskCrawler]:
"""获取所有采集器"""
return self._crawlers
async def start_task(self, task_id: str, reset: bool = False) -> bool:
@@ -166,15 +210,12 @@ class CrawlerManager:
async def start_all(self, reset: bool = False):
"""启动所有任务"""
tasks = []
for task_id, crawler in self._crawlers.items():
if not crawler.is_running:
tasks.append(crawler.start(reset))
if tasks:
await asyncio.gather(*tasks)
logger.info(f"自动启动任务: {crawler.task_name}")
asyncio.create_task(crawler.start(reset))
def stop_task(self, task_id: str) -> bool:
"""停止单个任务"""
crawler = self._crawlers.get(task_id)
if not crawler:
return False
@@ -182,17 +223,13 @@ class CrawlerManager:
return True
def stop_all(self):
"""停止所有任务"""
for crawler in self._crawlers.values():
crawler.stop()
def get_status(self, task_id: str = None) -> dict:
"""获取状态"""
if task_id:
crawler = self._crawlers.get(task_id)
return crawler.get_status() if crawler else {}
# 返回所有任务状态
return {
"tasks": [c.get_status() for c in self._crawlers.values()],
"kafka_lag": kafka_service.get_lag(),
@@ -201,7 +238,6 @@ class CrawlerManager:
@property
def is_any_running(self) -> bool:
"""是否有任务在运行"""
return any(c.is_running for c in self._crawlers.values())