diff --git a/docs/采集流程时序图.md b/docs/采集流程时序图.md new file mode 100644 index 0000000..a9a04cc --- /dev/null +++ b/docs/采集流程时序图.md @@ -0,0 +1,288 @@ +# 增量采集流程时序图 + +## 1. 核心逻辑变更 + +### 原逻辑(从前往后) +``` +offset: 0 → 100 → 200 → ... → total +问题:新数据在末尾,每次都要遍历全部旧数据 +``` + +### 新逻辑(从后往前) +``` +offset: total-100 → total-200 → ... → 0 +优势:先采集最新数据,遇到过期数据即可停止 +``` + +## 2. 容器启动与自动采集时序图 + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Docker │ │ App │ │ Crawler │ │ 八爪鱼API │ │ Kafka │ +│ 容器 │ │ FastAPI │ │ Manager │ │ │ │ │ +└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ + │ │ │ │ │ + │ docker-compose │ │ │ │ + │ up │ │ │ │ + │───────────────>│ │ │ │ + │ │ │ │ │ + │ │ lifespan启动 │ │ │ + │ │ 读取config.yml │ │ │ + │ │───────────────>│ │ │ + │ │ │ │ │ + │ │ │ 遍历enabled=true的任务 │ + │ │ │────────┐ │ │ + │ │ │ │ │ │ + │ │ │<───────┘ │ │ + │ │ │ │ │ + │ │ │ 为每个任务创建 │ │ + │ │ │ TaskCrawler │ │ + │ │ │────────┐ │ │ + │ │ │ │ │ │ + │ │ │<───────┘ │ │ + │ │ │ │ │ + │ │ auto_start_all │ │ │ + │ │───────────────>│ │ │ + │ │ │ │ │ + │ │ │ 并行启动所有任务 │ + │ │ │═══════════════════════════════>│ + │ │ │ │ │ +``` + +## 3. 单任务采集流程(从后往前,实时发送) + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ TaskCrawler │ │ 八爪鱼API │ │ DateFilter │ │ Kafka │ +└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ + │ │ │ │ + │ 1.获取数据总数 │ │ │ + │───────────────>│ │ │ + │<───────────────│ │ │ + │ total=257449 │ │ │ + │ │ │ │ + │ 2.读取上次进度,计算采集范围 │ │ + │ start_offset = total - 100 = 257349 │ + │ end_offset = last_start_offset (上次起始位置) │ + │────────┐ │ │ │ + │<───────┘ │ │ │ + │ │ │ │ + │ ╔══════════════════════════════════════════════════════════╗ + │ ║ 循环:每批请求→过滤→立即发送 ║ + │ ╚══════════════════════════════════════════════════════════╝ + │ │ │ │ + │ 3.请求一批数据 │ │ │ + │ offset=257349 │ │ │ + │───────────────>│ │ │ + │<───────────────│ │ │ + │ 返回100条 │ │ │ + │ │ │ │ + │ 4.过滤数据 │ │ │ + │───────────────────────────────>│ │ + │<───────────────────────────────│ │ + │ 有效数据95条 │ │ │ + │ │ │ │ + │ 5.立即发送到Kafka (不等待任务结束) │ + │────────────────────────────────────────────────>│ + │<────────────────────────────────────────────────│ + │ 发送成功 │ │ │ + │ │ │ │ + │ 6.更新offset,保存进度 │ │ + │ offset = 257349 - 100 = 257249 │ │ + │────────┐ │ │ │ + │<───────┘ │ │ │ + │ │ │ │ + │ 7.检查是否继续 │ │ │ + │ offset >= end_offset ? │ │ + │────────┐ │ │ │ + │<───────┘ 是→继续循环 │ │ + │ 否→结束 │ │ + │ │ │ │ + │ ╔══════════════════════════════════════════════════════════╗ + │ ║ 停止条件: ║ + │ ║ - offset < end_offset (已采集到上次位置) ║ + │ ║ - 首次采集时连续3批全过期 ║ + │ ║ - 手动停止 ║ + │ ╚══════════════════════════════════════════════════════════╝ + │ │ │ │ +``` + +**关键点:每批数据过滤后立即发送Kafka,不等待整个任务完成** + +## 4. 进度记录与增量采集逻辑 + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ 进度记录与增量采集 │ +├─────────────────────────────────────────────────────────────────────────┤ +│ │ +│ 首次采集: │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ total = 257449 │ │ +│ │ start_offset = total - batch_size = 257349 │ │ +│ │ end_offset = 0 (采集到最开始,或遇到过期数据停止) │ │ +│ │ │ │ +│ │ 采集完成后保存: │ │ +│ │ - last_start_offset = 257349 (本次采集的起始位置) │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ +│ │ +│ 下次采集: │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ total = 260000 (新增了数据) │ │ +│ │ start_offset = total - batch_size = 259900 │ │ +│ │ end_offset = last_start_offset = 257349 (上次的起始位置) │ │ +│ │ │ │ +│ │ 只采集 259900 → 257349 这部分新增数据 │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ +│ │ +│ 流程图: │ +│ │ +│ 获取 total │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ │ +│ │ 读取上次进度 │ │ +│ │ last_start_offset │ │ +│ └───────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ ┌─────────────────────────────────┐ │ +│ │last_start_offset │ 是 │ end_offset = last_start_offset │ │ +│ │ 存在? │────>│ (从上次位置截止) │ │ +│ └───────────────────┘ └─────────────────────────────────┘ │ +│ │ 否 │ +│ ▼ │ +│ ┌───────────────────────────────────────┐ │ +│ │ end_offset = 0 │ │ +│ │ (首次采集,采集到最开始或遇到过期停止) │ │ +│ └───────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ │ +│ │ start_offset = │ │ +│ │ total - batch_size│ │ +│ └───────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────┐ │ +│ │ 从 start_offset 向前采集 │ │ +│ │ 直到 offset <= end_offset │ │ +│ └───────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────┐ │ +│ │ 保存 last_start_offset = 本次起始位置 │ │ +│ └───────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +## 5. 停止条件 + +采集停止的条件(满足任一即停止): +1. `offset <= end_offset` - 已采集到上次的起始位置 +2. 连续3批数据全部过期 - 数据太旧(仅首次采集时生效) +3. 手动调用停止接口 + +## 6. 完整流程示例 + +### 首次采集 +数据总量 `total = 257449`,`batch_size = 100`,无历史进度: + +| 轮次 | offset | 请求范围 | 有效数据 | 动作 | +|------|--------|----------|----------|------| +| 1 | 257349 | 257349-257449 | 98 | 发送到Kafka,继续 | +| 2 | 257249 | 257249-257349 | 95 | 发送到Kafka,继续 | +| ... | ... | ... | ... | ... | +| N | 1000 | 1000-1100 | 0 | expired_batches=1 | +| N+1 | 900 | 900-1000 | 0 | expired_batches=2 | +| N+2 | 800 | 800-900 | 0 | expired_batches=3,**停止** | + +保存进度:`last_start_offset = 257349` + +### 第二次采集(1小时后) +数据总量 `total = 257600`(新增151条),读取 `last_start_offset = 257349`: + +| 轮次 | offset | 请求范围 | end_offset | 动作 | +|------|--------|----------|------------|------| +| 1 | 257500 | 257500-257600 | 257349 | 发送到Kafka,继续 | +| 2 | 257400 | 257400-257500 | 257349 | 发送到Kafka,继续 | +| 3 | 257300 | 257300-257400 | 257349 | offset < end_offset,**停止** | + +保存进度:`last_start_offset = 257500` + +## 7. 代码变更点 + +### 7.1 progress_store - 保存 last_start_offset +```python +# 进度表增加字段 +# last_start_offset: 上次采集的起始位置,作为下次采集的截止位置 +``` + +### 7.2 crawler.py - TaskCrawler.start() +```python +async def start(self): + total = await api_client.get_total_count(self.task_id) + + # 读取上次进度 + progress = progress_store.get_progress(self.task_id) + last_start_offset = progress.last_start_offset if progress else None + + # 计算本次采集范围 + start_offset = total - self.batch_size # 从最新数据开始 + end_offset = last_start_offset if last_start_offset else 0 # 截止到上次起始位置 + + # 保存本次起始位置 + this_start_offset = start_offset + + current_offset = start_offset + expired_batches = 0 + + while current_offset >= end_offset and self._running: + valid_count = await self._crawl_batch(current_offset) + + # 仅首次采集时检查过期(end_offset=0时) + if end_offset == 0: + if valid_count == 0: + expired_batches += 1 + if expired_batches >= 3: + break # 连续3批过期,停止 + else: + expired_batches = 0 + + current_offset -= self.batch_size + + # 保存进度,记录本次起始位置供下次使用 + progress_store.save_progress( + task_id=self.task_id, + last_start_offset=this_start_offset, + ... + ) +``` + +### 7.3 main.py - 自动启动 +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("服务启动中...") + + # 自动启动所有任务 + from app.services import crawler_manager + asyncio.create_task(crawler_manager.start_all()) + + yield + + logger.info("服务关闭中...") + crawler_manager.stop_all() + kafka_service.close() +``` + +## 8. 配置说明 + +```yaml +# config.yml +crawler: + filter_days: 7 # 数据有效期(天) + max_expired_batches: 3 # 连续过期批次阈值,超过则停止 + auto_start: true # 容器启动时自动开始采集 +``` diff --git a/job_crawler/app/core/config.py b/job_crawler/app/core/config.py index d8c75ca..f28abee 100644 --- a/job_crawler/app/core/config.py +++ b/job_crawler/app/core/config.py @@ -37,6 +37,8 @@ class CrawlerConfig(BaseModel): interval: int = 300 filter_days: int = 7 max_workers: int = 5 + max_expired_batches: int = 3 # 连续过期批次阈值 + auto_start: bool = True # 容器启动时自动开始采集 class DatabaseConfig(BaseModel): diff --git a/job_crawler/app/main.py b/job_crawler/app/main.py index c20a225..03c654f 100644 --- a/job_crawler/app/main.py +++ b/job_crawler/app/main.py @@ -1,4 +1,5 @@ """FastAPI应用入口""" +import asyncio import logging from contextlib import asynccontextmanager from fastapi import FastAPI @@ -15,8 +16,18 @@ logger = logging.getLogger(__name__) async def lifespan(app: FastAPI): """应用生命周期管理""" logger.info("服务启动中...") + + # 自动启动所有采集任务 + if settings.crawler.auto_start: + from app.services import crawler_manager + logger.info("自动启动采集任务...") + asyncio.create_task(crawler_manager.start_all()) + yield + logger.info("服务关闭中...") + from app.services import crawler_manager + crawler_manager.stop_all() kafka_service.close() diff --git a/job_crawler/app/models/progress.py b/job_crawler/app/models/progress.py index 96f4016..e33c053 100644 --- a/job_crawler/app/models/progress.py +++ b/job_crawler/app/models/progress.py @@ -1,11 +1,12 @@ """采集进度模型""" from pydantic import BaseModel +from typing import Optional class CrawlProgress(BaseModel): """采集进度""" task_id: str - current_offset: int = 0 + last_start_offset: Optional[int] = None # 上次采集的起始位置,作为下次的截止位置 total: int = 0 last_update: str = "" status: str = "idle" # idle, running, completed, error @@ -15,7 +16,7 @@ class CrawlStatus(BaseModel): """采集状态响应""" task_id: str total: int - current_offset: int + last_start_offset: Optional[int] = None progress: str kafka_lag: int = 0 status: str diff --git a/job_crawler/app/services/crawler.py b/job_crawler/app/services/crawler.py index 6b5511f..70dbdbd 100644 --- a/job_crawler/app/services/crawler.py +++ b/job_crawler/app/services/crawler.py @@ -1,4 +1,4 @@ -"""多任务增量采集核心逻辑""" +"""多任务增量采集核心逻辑 - 从后往前采集""" import asyncio import logging from typing import Dict, Optional @@ -8,19 +8,20 @@ from app.services.kafka_service import kafka_service from app.services.progress_store import progress_store from app.utils import is_within_days from app.models import JobData -from app.core.config import settings, TaskConfig +from app.core.config import settings logger = logging.getLogger(__name__) class TaskCrawler: - """单个任务采集器""" + """单个任务采集器 - 从后往前采集""" - def __init__(self, task_config: TaskConfig): + def __init__(self, task_config): self.task_id = task_config.id self.task_name = task_config.name or task_config.id self.batch_size = settings.api.batch_size self.filter_days = settings.crawler.filter_days + self.max_expired_batches = settings.crawler.max_expired_batches self._running = False self._total_filtered = 0 self._total_produced = 0 @@ -30,7 +31,7 @@ class TaskCrawler: return self._running async def start(self, reset: bool = False): - """开始采集""" + """开始采集 - 从后往前""" if self._running: logger.warning(f"[{self.task_name}] 任务已在运行中") return @@ -38,43 +39,80 @@ class TaskCrawler: self._running = True self._total_filtered = 0 self._total_produced = 0 - logger.info(f"[{self.task_name}] 开始采集任务") + logger.info(f"[{self.task_name}] 开始采集任务(从后往前)") try: if reset: progress_store.reset_progress(self.task_id) - current_offset = 0 - else: - progress = progress_store.get_progress(self.task_id) - current_offset = progress.current_offset if progress else 0 + # 获取数据总数 total = await api_client.get_total_count(self.task_id) - logger.info(f"[{self.task_name}] 数据总数: {total}, 当前偏移: {current_offset}") + logger.info(f"[{self.task_name}] 数据总数: {total}") - if current_offset >= total: - logger.info(f"[{self.task_name}] 数据已全部采集完成") - progress_store.save_progress(self.task_id, current_offset, total, "completed", - self._total_filtered, self._total_produced) + if total == 0: + logger.info(f"[{self.task_name}] 无数据可采集") self._running = False return - while current_offset < total and self._running: + # 读取上次进度,确定截止位置 + progress = progress_store.get_progress(self.task_id) + last_start_offset = progress.last_start_offset if progress else None + + # 计算本次采集范围 + start_offset = max(0, total - self.batch_size) # 从最新数据开始 + end_offset = last_start_offset if last_start_offset is not None else 0 + + # 如果没有新数据 + if start_offset <= end_offset: + logger.info(f"[{self.task_name}] 无新数据,start={start_offset}, end={end_offset}") + self._running = False + return + + logger.info(f"[{self.task_name}] 采集范围: {start_offset} → {end_offset}") + + # 记录本次起始位置 + this_start_offset = start_offset + current_offset = start_offset + expired_batches = 0 + is_first_crawl = last_start_offset is None + + while current_offset >= end_offset and self._running: try: - await self._crawl_batch(current_offset) - current_offset += self.batch_size - progress_store.save_progress(self.task_id, current_offset, total, "running", - self._total_filtered, self._total_produced) - progress_pct = min(100, current_offset / total * 100) - logger.info(f"[{self.task_name}] 进度: {progress_pct:.2f}% ({current_offset}/{total})") + # 采集一批数据并立即发送Kafka + valid_count = await self._crawl_batch(current_offset) + + # 仅首次采集时检查连续过期 + if is_first_crawl: + if valid_count == 0: + expired_batches += 1 + logger.info(f"[{self.task_name}] 连续过期批次: {expired_batches}") + if expired_batches >= self.max_expired_batches: + logger.info(f"[{self.task_name}] 连续{self.max_expired_batches}批过期,停止采集") + break + else: + expired_batches = 0 + + current_offset -= self.batch_size + + # 计算进度 + crawled = this_start_offset - current_offset + total_to_crawl = this_start_offset - end_offset + progress_pct = min(100, crawled / total_to_crawl * 100) if total_to_crawl > 0 else 100 + logger.info(f"[{self.task_name}] 进度: {progress_pct:.1f}% (offset={current_offset})") + await asyncio.sleep(0.5) except Exception as e: logger.error(f"[{self.task_name}] 采集批次失败: {e}") await asyncio.sleep(5) - status = "completed" if current_offset >= total else "stopped" - progress_store.save_progress(self.task_id, current_offset, total, status, - self._total_filtered, self._total_produced) - logger.info(f"[{self.task_name}] 采集任务 {status}") + # 保存进度 + status = "completed" if current_offset < end_offset or not self._running else "stopped" + progress_store.save_progress( + self.task_id, this_start_offset, total, status, + self._total_filtered, self._total_produced + ) + logger.info(f"[{self.task_name}] 采集完成,状态={status},过滤={self._total_filtered},发送={self._total_produced}") + except Exception as e: logger.error(f"[{self.task_name}] 采集任务异常: {e}") progress_store.save_progress(self.task_id, 0, 0, "error", @@ -82,26 +120,33 @@ class TaskCrawler: finally: self._running = False - async def _crawl_batch(self, offset: int): - """采集一批数据""" + async def _crawl_batch(self, offset: int) -> int: + """采集一批数据,过滤后立即发送Kafka,返回有效数据数量""" result = await api_client.fetch_data(self.task_id, offset, self.batch_size) data_list = result.get("data", {}).get("data", []) if not data_list: - return + return 0 + # 过滤数据 filtered_jobs = [] for raw in data_list: aae397 = raw.get("aae397", "") collect_time = raw.get("Collect_time", "") if is_within_days(aae397, collect_time, self.filter_days): job = JobData.from_raw(raw) - job.task_id = self.task_id # 添加任务ID标识 + job.task_id = self.task_id filtered_jobs.append(job) - self._total_filtered += len(filtered_jobs) + valid_count = len(filtered_jobs) + self._total_filtered += valid_count + + # 立即发送到Kafka if filtered_jobs: produced = kafka_service.produce_batch(filtered_jobs) self._total_produced += produced + logger.debug(f"[{self.task_name}] offset={offset}, 过滤={valid_count}, 发送={produced}") + + return valid_count def stop(self): """停止采集""" @@ -114,19 +159,20 @@ class TaskCrawler: if not stats: return { "task_id": self.task_id, "task_name": self.task_name, - "total": 0, "current_offset": 0, "progress": "0%", + "total": 0, "last_start_offset": None, "progress": "-", "status": "idle", "last_update": "", - "filtered_count": 0, "produced_count": 0 + "filtered_count": 0, "produced_count": 0, "is_running": self._running } - total = stats.get("total", 0) - current = stats.get("current_offset", 0) - progress = f"{min(100, current / total * 100):.2f}%" if total > 0 else "0%" return { "task_id": self.task_id, "task_name": self.task_name, - "total": total, "current_offset": current, "progress": progress, - "status": stats.get("status", "idle"), "last_update": stats.get("last_update", ""), + "total": stats.get("total", 0), + "last_start_offset": stats.get("last_start_offset"), + "progress": "-", + "status": "running" if self._running else stats.get("status", "idle"), + "last_update": stats.get("last_update", ""), "filtered_count": stats.get("filtered_count", 0), - "produced_count": stats.get("produced_count", 0) + "produced_count": stats.get("produced_count", 0), + "is_running": self._running } @@ -145,11 +191,9 @@ class CrawlerManager: logger.info(f"初始化任务采集器: {task.name} ({task.id})") def get_crawler(self, task_id: str) -> Optional[TaskCrawler]: - """获取指定任务的采集器""" return self._crawlers.get(task_id) def get_all_crawlers(self) -> Dict[str, TaskCrawler]: - """获取所有采集器""" return self._crawlers async def start_task(self, task_id: str, reset: bool = False) -> bool: @@ -166,15 +210,12 @@ class CrawlerManager: async def start_all(self, reset: bool = False): """启动所有任务""" - tasks = [] for task_id, crawler in self._crawlers.items(): if not crawler.is_running: - tasks.append(crawler.start(reset)) - if tasks: - await asyncio.gather(*tasks) + logger.info(f"自动启动任务: {crawler.task_name}") + asyncio.create_task(crawler.start(reset)) def stop_task(self, task_id: str) -> bool: - """停止单个任务""" crawler = self._crawlers.get(task_id) if not crawler: return False @@ -182,17 +223,13 @@ class CrawlerManager: return True def stop_all(self): - """停止所有任务""" for crawler in self._crawlers.values(): crawler.stop() def get_status(self, task_id: str = None) -> dict: - """获取状态""" if task_id: crawler = self._crawlers.get(task_id) return crawler.get_status() if crawler else {} - - # 返回所有任务状态 return { "tasks": [c.get_status() for c in self._crawlers.values()], "kafka_lag": kafka_service.get_lag(), @@ -201,7 +238,6 @@ class CrawlerManager: @property def is_any_running(self) -> bool: - """是否有任务在运行""" return any(c.is_running for c in self._crawlers.values()) diff --git a/job_crawler/app/services/progress_store.py b/job_crawler/app/services/progress_store.py index c0c7b8d..3844314 100644 --- a/job_crawler/app/services/progress_store.py +++ b/job_crawler/app/services/progress_store.py @@ -25,7 +25,7 @@ class ProgressStore: conn.execute(""" CREATE TABLE IF NOT EXISTS crawl_progress ( task_id TEXT PRIMARY KEY, - current_offset INTEGER DEFAULT 0, + last_start_offset INTEGER, total INTEGER DEFAULT 0, last_update TEXT, status TEXT DEFAULT 'idle', @@ -53,27 +53,27 @@ class ProgressStore: if row: return CrawlProgress( task_id=row["task_id"], - current_offset=row["current_offset"], + last_start_offset=row["last_start_offset"], total=row["total"], last_update=row["last_update"] or "", status=row["status"] ) return None - def save_progress(self, task_id: str, offset: int, total: int, + def save_progress(self, task_id: str, last_start_offset: int, total: int, status: str = "running", filtered_count: int = 0, produced_count: int = 0): """保存采集进度""" now = datetime.now().isoformat() with self._get_conn() as conn: conn.execute(""" INSERT INTO crawl_progress - (task_id, current_offset, total, last_update, status, filtered_count, produced_count) + (task_id, last_start_offset, total, last_update, status, filtered_count, produced_count) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(task_id) DO UPDATE SET - current_offset = excluded.current_offset, total = excluded.total, + last_start_offset = excluded.last_start_offset, total = excluded.total, last_update = excluded.last_update, status = excluded.status, filtered_count = excluded.filtered_count, produced_count = excluded.produced_count - """, (task_id, offset, total, now, status, filtered_count, produced_count)) + """, (task_id, last_start_offset, total, now, status, filtered_count, produced_count)) conn.commit() def get_stats(self, task_id: str) -> dict: diff --git a/job_crawler/config/config.yml b/job_crawler/config/config.yml index 8fe38e3..ed354bc 100644 --- a/job_crawler/config/config.yml +++ b/job_crawler/config/config.yml @@ -26,7 +26,7 @@ api: # Kafka配置 kafka: - bootstrap_servers: localhost:9092 + bootstrap_servers: kafka:29092 topic: job_data consumer_group: job_consumer_group @@ -35,6 +35,8 @@ crawler: interval: 300 # 采集间隔(秒) filter_days: 7 # 过滤天数 max_workers: 5 # 最大并行任务数 + max_expired_batches: 3 # 连续过期批次阈值(首次采集时生效) + auto_start: true # 容器启动时自动开始采集 # 数据库配置 database: diff --git a/job_crawler/config/config.yml.docker b/job_crawler/config/config.yml.docker index 981406b..ec8f33a 100644 --- a/job_crawler/config/config.yml.docker +++ b/job_crawler/config/config.yml.docker @@ -33,6 +33,8 @@ crawler: interval: 300 filter_days: 7 max_workers: 5 + max_expired_batches: 3 # 连续过期批次阈值(首次采集时生效) + auto_start: true # 容器启动时自动开始采集 # 数据库配置 database: