# 增量采集流程时序图 ## 1. 核心逻辑变更 ### 原逻辑(从前往后) ``` offset: 0 → 100 → 200 → ... → total 问题:新数据在末尾,每次都要遍历全部旧数据 ``` ### 新逻辑(从后往前) ``` offset: total-100 → total-200 → ... → 0 优势:先采集最新数据,遇到过期数据即可停止 ``` ## 2. 容器启动与自动采集时序图 ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Docker │ │ App │ │ Crawler │ │ 八爪鱼API │ │ Kafka │ │ 容器 │ │ FastAPI │ │ Manager │ │ │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ docker-compose │ │ │ │ │ up │ │ │ │ │───────────────>│ │ │ │ │ │ │ │ │ │ │ lifespan启动 │ │ │ │ │ 读取config.yml │ │ │ │ │───────────────>│ │ │ │ │ │ │ │ │ │ │ 遍历enabled=true的任务 │ │ │ │────────┐ │ │ │ │ │ │ │ │ │ │ │<───────┘ │ │ │ │ │ │ │ │ │ │ 为每个任务创建 │ │ │ │ │ TaskCrawler │ │ │ │ │────────┐ │ │ │ │ │ │ │ │ │ │ │<───────┘ │ │ │ │ │ │ │ │ │ auto_start_all │ │ │ │ │───────────────>│ │ │ │ │ │ │ │ │ │ │ 并行启动所有任务 │ │ │ │═══════════════════════════════>│ │ │ │ │ │ ``` ## 3. 单任务采集流程(从后往前,实时发送) ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ TaskCrawler │ │ 八爪鱼API │ │ DateFilter │ │ Kafka │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ 1.获取数据总数 │ │ │ │───────────────>│ │ │ │<───────────────│ │ │ │ total=257449 │ │ │ │ │ │ │ │ 2.读取上次进度,计算采集范围 │ │ │ start_offset = total - 100 = 257349 │ │ end_offset = last_start_offset (上次起始位置) │ │────────┐ │ │ │ │<───────┘ │ │ │ │ │ │ │ │ ╔══════════════════════════════════════════════════════════╗ │ ║ 循环:每批请求→过滤→立即发送 ║ │ ╚══════════════════════════════════════════════════════════╝ │ │ │ │ │ 3.请求一批数据 │ │ │ │ offset=257349 │ │ │ │───────────────>│ │ │ │<───────────────│ │ │ │ 返回100条 │ │ │ │ │ │ │ │ 4.过滤数据 │ │ │ │───────────────────────────────>│ │ │<───────────────────────────────│ │ │ 有效数据95条 │ │ │ │ │ │ │ │ 5.立即发送到Kafka (不等待任务结束) │ │────────────────────────────────────────────────>│ │<────────────────────────────────────────────────│ │ 发送成功 │ │ │ │ │ │ │ │ 6.更新offset,保存进度 │ │ │ offset = 257349 - 100 = 257249 │ │ │────────┐ │ │ │ │<───────┘ │ │ │ │ │ │ │ │ 7.检查是否继续 │ │ │ │ offset >= end_offset ? │ │ │────────┐ │ │ │ │<───────┘ 是→继续循环 │ │ │ 否→结束 │ │ │ │ │ │ │ ╔══════════════════════════════════════════════════════════╗ │ ║ 停止条件: ║ │ ║ - offset < end_offset (已采集到上次位置) ║ │ ║ - 首次采集时连续3批全过期 ║ │ ║ - 手动停止 ║ │ ╚══════════════════════════════════════════════════════════╝ │ │ │ │ ``` **关键点:每批数据过滤后立即发送Kafka,不等待整个任务完成** ## 4. 进度记录与增量采集逻辑 ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ 进度记录与增量采集 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 首次采集: │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ total = 257449 │ │ │ │ start_offset = total - batch_size = 257349 │ │ │ │ end_offset = 0 (采集到最开始,或遇到过期数据停止) │ │ │ │ │ │ │ │ 采集完成后保存: │ │ │ │ - last_start_offset = 257349 (本次采集的起始位置) │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ 下次采集: │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ total = 260000 (新增了数据) │ │ │ │ start_offset = total - batch_size = 259900 │ │ │ │ end_offset = last_start_offset = 257349 (上次的起始位置) │ │ │ │ │ │ │ │ 只采集 259900 → 257349 这部分新增数据 │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ 流程图: │ │ │ │ 获取 total │ │ │ │ │ ▼ │ │ ┌───────────────────┐ │ │ │ 读取上次进度 │ │ │ │ last_start_offset │ │ │ └───────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────┐ ┌─────────────────────────────────┐ │ │ │last_start_offset │ 是 │ end_offset = last_start_offset │ │ │ │ 存在? │────>│ (从上次位置截止) │ │ │ └───────────────────┘ └─────────────────────────────────┘ │ │ │ 否 │ │ ▼ │ │ ┌───────────────────────────────────────┐ │ │ │ end_offset = 0 │ │ │ │ (首次采集,采集到最开始或遇到过期停止) │ │ │ └───────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────┐ │ │ │ start_offset = │ │ │ │ total - batch_size│ │ │ └───────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────┐ │ │ │ 从 start_offset 向前采集 │ │ │ │ 直到 offset <= end_offset │ │ │ └───────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────┐ │ │ │ 保存 last_start_offset = 本次起始位置 │ │ │ └───────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ``` ## 5. 停止条件 采集停止的条件(满足任一即停止): 1. `offset <= end_offset` - 已采集到上次的起始位置 2. 连续3批数据全部过期 - 数据太旧(仅首次采集时生效) 3. 手动调用停止接口 ## 6. 完整流程示例 ### 首次采集 数据总量 `total = 257449`,`batch_size = 100`,无历史进度: | 轮次 | offset | 请求范围 | 有效数据 | 动作 | |------|--------|----------|----------|------| | 1 | 257349 | 257349-257449 | 98 | 发送到Kafka,继续 | | 2 | 257249 | 257249-257349 | 95 | 发送到Kafka,继续 | | ... | ... | ... | ... | ... | | N | 1000 | 1000-1100 | 0 | expired_batches=1 | | N+1 | 900 | 900-1000 | 0 | expired_batches=2 | | N+2 | 800 | 800-900 | 0 | expired_batches=3,**停止** | 保存进度:`last_start_offset = 257349` ### 第二次采集(1小时后) 数据总量 `total = 257600`(新增151条),读取 `last_start_offset = 257349`: | 轮次 | offset | 请求范围 | end_offset | 动作 | |------|--------|----------|------------|------| | 1 | 257500 | 257500-257600 | 257349 | 发送到Kafka,继续 | | 2 | 257400 | 257400-257500 | 257349 | 发送到Kafka,继续 | | 3 | 257300 | 257300-257400 | 257349 | offset < end_offset,**停止** | 保存进度:`last_start_offset = 257500` ## 7. 代码变更点 ### 7.1 progress_store - 保存 last_start_offset ```python # 进度表增加字段 # last_start_offset: 上次采集的起始位置,作为下次采集的截止位置 ``` ### 7.2 crawler.py - TaskCrawler.start() ```python async def start(self): total = await api_client.get_total_count(self.task_id) # 读取上次进度 progress = progress_store.get_progress(self.task_id) last_start_offset = progress.last_start_offset if progress else None # 计算本次采集范围 start_offset = total - self.batch_size # 从最新数据开始 end_offset = last_start_offset if last_start_offset else 0 # 截止到上次起始位置 # 保存本次起始位置 this_start_offset = start_offset current_offset = start_offset expired_batches = 0 while current_offset >= end_offset and self._running: valid_count = await self._crawl_batch(current_offset) # 仅首次采集时检查过期(end_offset=0时) if end_offset == 0: if valid_count == 0: expired_batches += 1 if expired_batches >= 3: break # 连续3批过期,停止 else: expired_batches = 0 current_offset -= self.batch_size # 保存进度,记录本次起始位置供下次使用 progress_store.save_progress( task_id=self.task_id, last_start_offset=this_start_offset, ... ) ``` ### 7.3 main.py - 自动启动 ```python @asynccontextmanager async def lifespan(app: FastAPI): logger.info("服务启动中...") # 自动启动所有任务 from app.services import crawler_manager asyncio.create_task(crawler_manager.start_all()) yield logger.info("服务关闭中...") crawler_manager.stop_all() kafka_service.close() ``` ## 8. 配置说明 ```yaml # config.yml crawler: filter_days: 7 # 数据有效期(天) max_expired_batches: 3 # 连续过期批次阈值,超过则停止 auto_start: true # 容器启动时自动开始采集 ```