From 1a5d14e0e7652be7b33319e47a65f2b76e117117 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=A1=BA=E4=B8=9C?= <577732344@qq.com> Date: Thu, 15 Jan 2026 22:08:12 +0800 Subject: [PATCH] Revert "rabbitmq" This reverts commit 0976909cc8b7fa744e48c886ed5cd07d2ae682c8. --- docs/技术方案.md | 555 +++++++++++++--------- docs/采集流程时序图.md | 313 +++++++----- job_crawler/app/api/routes.py | 27 +- job_crawler/app/core/config.py | 15 +- job_crawler/app/main.py | 4 +- job_crawler/app/models/job.py | 68 ++- job_crawler/app/models/progress.py | 2 +- job_crawler/app/services/__init__.py | 4 +- job_crawler/app/services/crawler.py | 13 +- job_crawler/app/services/kafka_service.py | 3 +- job_crawler/config/config.yml | 13 +- job_crawler/config/config.yml.docker | 13 +- job_crawler/docker-compose.yml | 51 +- job_crawler/requirements.txt | 3 +- 14 files changed, 665 insertions(+), 419 deletions(-) diff --git a/docs/技术方案.md b/docs/技术方案.md index d0cffba..48b267d 100644 --- a/docs/技术方案.md +++ b/docs/技术方案.md @@ -3,13 +3,12 @@ ## 1. 项目概述 ### 1.1 需求背景 -从八爪鱼API采集招聘数据,筛选近7天发布的数据,通过RabbitMQ消息队列提供数据消费接口,支持消息级别TTL自动过期。 +从八爪鱼API采集招聘数据,筛选近7天发布的数据,通过内置Kafka服务提供消息队列,供外部系统消费。 ### 1.2 核心功能 -- 增量采集八爪鱼API招聘数据(从后往前采集,最新数据优先) +- 增量采集八爪鱼API招聘数据 - 日期过滤(发布日期 + 采集时间均在7天内) -- RabbitMQ消息队列(支持消息TTL,7天自动过期) -- 容器启动自动开始采集 +- 内置Kafka服务 - 提供REST API消费接口 --- @@ -23,18 +22,16 @@ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ 八爪鱼API │───▶│ 采集服务 │───▶│ 日期过滤器 │ │ -│ │ (数据源) │ │ (从后往前) │ │ (7天内数据) │ │ +│ │ (数据源) │ │ (增量采集) │ │ (7天内数据) │ │ │ └──────────────┘ └──────────────┘ └────────┬─────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────────────────────┐ │ -│ │ RabbitMQ 服务 │ │ -│ │ ┌─────────────────────────────────────────────────────┐ │ │ -│ │ │ Queue: job_data │ │ │ -│ │ │ - 消息TTL: 7天 (604800000ms) │ │ │ -│ │ │ - 过期消息自动删除 │ │ │ -│ │ │ - 持久化存储 │ │ │ -│ │ └─────────────────────────────────────────────────────┘ │ │ +│ │ 内置 Kafka 服务 │ │ +│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ +│ │ │ Zookeeper │ │ Broker │ │ Topic:job_data │ │ │ +│ │ │ (Docker) │ │ (Docker) │ │ │ │ │ +│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ @@ -55,13 +52,14 @@ | 组件 | 技术方案 | 版本 | 说明 | |------|---------|------|------| -| 运行环境 | Python | 3.11+ | 主开发语言 | +| 运行环境 | Python | 3.10+ | 主开发语言 | | HTTP客户端 | httpx | 0.27+ | 异步HTTP请求 | -| 消息队列 | RabbitMQ | 3.12+ | 支持消息级别TTL | -| MQ客户端 | pika | 1.3+ | Python RabbitMQ SDK | +| 消息队列 | Kafka | 3.6+ | Docker部署 | +| Kafka客户端 | kafka-python | 2.0+ | Python Kafka SDK | | API框架 | FastAPI | 0.109+ | REST接口 | -| 容器编排 | Docker Compose | 2.0+ | 服务部署 | -| 数据存储 | SQLite | 内置 | 存储采集进度 | +| 容器编排 | Docker Compose | 2.0+ | Kafka/Zookeeper部署 | +| 任务调度 | APScheduler | 3.10+ | 定时增量采集 | +| 数据存储 | SQLite | 内置 | 存储采集进度(offset) | --- @@ -86,42 +84,36 @@ job_crawler/ │ │ ├── __init__.py │ │ ├── api_client.py # 八爪鱼API客户端 │ │ ├── crawler.py # 采集核心逻辑 -│ │ ├── rabbitmq_service.py # RabbitMQ服务 +│ │ ├── kafka_service.py # Kafka服务 │ │ └── progress_store.py # 进度存储 │ ├── utils/ # 工具函数 │ │ ├── __init__.py │ │ └── date_parser.py # 日期解析 │ ├── __init__.py │ └── main.py # 应用入口 -├── config/ # 配置文件 -│ ├── config.yml # 运行配置 -│ └── config.yml.docker # Docker配置模板 -├── docker-compose.yml # 容器编排 +├── docker-compose.yml # 容器编排(含Kafka+App) ├── Dockerfile # 应用镜像构建 -├── deploy.sh # 部署脚本(Linux) -├── deploy.bat # 部署脚本(Windows) ├── requirements.txt # Python依赖 -└── README.md # 使用说明 +├── .env.example # 配置模板 +├── .dockerignore # Docker忽略文件 +└── README.md # 使用说明 ``` - --- ## 5. 核心模块设计 ### 5.1 增量采集模块 -#### 采集策略(从后往前) +#### 采集策略 ```python # 增量采集流程 -1. 获取数据总数 total -2. 读取上次采集的起始位置 last_start_offset -3. 计算本次采集范围: - - start_offset = total - batch_size (从最新数据开始) - - end_offset = last_start_offset (截止到上次位置) -4. 循环采集: offset 从 start_offset 递减到 end_offset -5. 每批数据过滤后立即发送到RabbitMQ -6. 采集完成后保存 last_start_offset = 本次起始位置 +1. 读取上次采集的offset(首次为0) +2. 调用API: GET /data/all?taskId=xxx&offset={offset}&size=100 +3. 解析返回数据,过滤近7天数据 +4. 推送到Kafka +5. 更新offset = offset + size +6. 循环直到 offset >= total ``` #### 进度持久化 @@ -129,12 +121,9 @@ job_crawler/ ```sql CREATE TABLE crawl_progress ( task_id TEXT PRIMARY KEY, - last_start_offset INTEGER, -- 上次采集的起始位置 + current_offset INTEGER, total INTEGER, - last_update TIMESTAMP, - status TEXT, - filtered_count INTEGER, - produced_count INTEGER + last_update TIMESTAMP ); ``` @@ -150,58 +139,59 @@ CREATE TABLE crawl_progress ( #### 过滤逻辑 ```python -def is_within_days(aae397: str, collect_time: str, days: int = 7) -> bool: +def is_within_7_days(aae397: str, collect_time: str) -> bool: """ - 判断数据是否在指定天数内 - 条件:发布日期 AND 采集时间 都在N天内 + 判断数据是否在近7天内 + 条件:发布日期 AND 采集时间 都在7天内 """ today = datetime.now().date() - cutoff_date = today - timedelta(days=days) + seven_days_ago = today - timedelta(days=7) - publish_date = parse_aae397(aae397) - collect_date = parse_collect_time(collect_time) + publish_date = parse_aae397(aae397) # 解析发布日期 + collect_date = parse_collect_time(collect_time) # 解析采集时间 - return publish_date >= cutoff_date and collect_date >= cutoff_date + return publish_date >= seven_days_ago and collect_date >= seven_days_ago ``` -### 5.3 RabbitMQ服务模块 +### 5.3 Kafka服务模块 -#### 消息TTL机制 -```python -# 队列声明时设置消息TTL -channel.queue_declare( - queue='job_data', - durable=True, - arguments={ - 'x-message-ttl': 604800000 # 7天(毫秒) - } -) +#### Docker Compose配置 +```yaml +version: '3.8' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 -# 发送消息时也设置TTL(双重保障) -channel.basic_publish( - exchange='', - routing_key='job_data', - body=message, - properties=pika.BasicProperties( - delivery_mode=2, # 持久化 - expiration='604800000' # 7天 - ) -) + kafka: + image: confluentinc/cp-kafka:7.5.0 + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zookeeper ``` -#### 优势 -- 消息级别TTL,精确控制每条消息的过期时间 -- 过期消息自动删除,无需手动清理 -- 队列中始终保持最近7天的有效数据 +#### Topic设计 +- Topic名称: `job_data` +- 分区数: 3 +- 副本数: 1 +- 消息格式: JSON ### 5.4 REST API接口 | 接口 | 方法 | 说明 | |------|------|------| -| `/consume` | GET | 消费队列数据,支持batch_size参数 | -| `/queue/size` | GET | 获取队列消息数量 | +| `/consume` | GET | 消费Kafka数据,支持batch_size参数 | +| `/consume/stream` | GET | SSE流式消费 | | `/status` | GET | 查看采集进度和状态 | -| `/tasks` | GET | 获取任务列表 | | `/crawl/start` | POST | 手动触发采集任务 | | `/crawl/stop` | POST | 停止采集任务 | @@ -217,17 +207,13 @@ GET /consume?batch_size=10 "code": 0, "data": [ { - "_id": "uuid", - "_task_id": "00f3b445-...", - "_crawl_time": "2026-01-15T10:30:00", - "Std_class": "机动车司机/驾驶", - "aca112": "保底1万+五险+港内A2驾驶员", - "AAB004": "青岛唐盛物流有限公司", - "acb241": "1-1.5万", - "aab302": "青岛黄岛区", - "aae397": "1月13日", - "Collect_time": "2026-01-15", - ... + "job_title": "机动车司机/驾驶", + "company": "青岛唐盛物流有限公司", + "salary": "1-1.5万", + "location": "青岛黄岛区", + "publish_date": "2026-01-13", + "collect_time": "2026-01-15", + "url": "https://www.zhaopin.com/..." } ], "count": 10 @@ -239,20 +225,13 @@ GET /consume?batch_size=10 { "code": 0, "data": { - "tasks": [ - { - "task_id": "00f3b445-...", - "task_name": "青岛招聘数据", - "total": 270000, - "last_start_offset": 269900, - "status": "completed", - "filtered_count": 15000, - "produced_count": 15000, - "is_running": false - } - ], - "queue_size": 12345, - "running_count": 0 + "task_id": "00f3b445-d8ec-44e8-88b2-4b971a228b1e", + "total": 257449, + "current_offset": 156700, + "progress": "60.87%", + "kafka_lag": 1234, + "status": "running", + "last_update": "2026-01-15T10:30:00" } } ``` @@ -261,47 +240,113 @@ GET /consume?batch_size=10 ## 6. 数据模型 -### 6.1 原始数据保留 -数据采集后保留原始字段名,仅添加元数据: +### 6.1 原始数据字段映射 -| 字段 | 说明 | -|------|------| -| _id | 唯一标识(UUID) | -| _task_id | 任务ID | -| _crawl_time | 入库时间 | -| 其他字段 | 保留原始API返回的所有字段 | +| 原始字段 | 含义 | 输出字段 | +|---------|------|---------| +| Std_class | 职位分类 | job_category | +| aca112 | 职位名称 | job_title | +| AAB004 | 公司名称 | company | +| acb241 | 薪资范围 | salary | +| aab302 | 工作地点 | location | +| aae397 | 发布日期 | publish_date | +| Collect_time | 采集时间 | collect_time | +| ACE760 | 职位链接 | url | +| acb22a | 职位描述 | description | +| Experience | 经验要求 | experience | +| aac011 | 学历要求 | education | -### 6.2 RabbitMQ消息格式 +### 6.2 Kafka消息格式 ```json { - "_id": "uuid", - "_task_id": "00f3b445-d8ec-44e8-88b2-4b971a228b1e", - "_crawl_time": "2026-01-15T10:30:00", - "Std_class": "机动车司机/驾驶", - "aca112": "保底1万+五险+港内A2驾驶员", - "AAB004": "青岛唐盛物流有限公司", - "AAB019": "民营", - "acb241": "1-1.5万", - "aab302": "青岛黄岛区", - "AAE006": "青岛市黄岛区...", - "aae397": "1月13日", - "Collect_time": "2026-01-15", - "ACE760": "https://www.zhaopin.com/...", - "acb22a": "岗位职责...", - "Experience": "5-10年", - "aac011": "学历不限", - "acb240": "1人", - "AAB022": "交通/运输/物流", - "Num_employers": "20-99人", - "AAE004": "张先生/HR", - "AAB092": "公司简介..." + "id": "uuid", + "job_category": "机动车司机/驾驶", + "job_title": "保底1万+五险+港内A2驾驶员", + "company": "青岛唐盛物流有限公司", + "salary": "1-1.5万", + "location": "青岛黄岛区", + "publish_date": "2026-01-13", + "collect_time": "2026-01-15", + "url": "https://www.zhaopin.com/...", + "description": "...", + "experience": "5-10年", + "education": "学历不限", + "crawl_time": "2026-01-15T10:30:00" } ``` +--- + +## 7. 部署流程 + +### 7.1 Docker Compose 一键部署(推荐) + +```bash +# 1. 配置环境变量 +cd job_crawler +cp .env.example .env +# 编辑 .env 填入 API_USERNAME 和 API_PASSWORD + +# 2. 启动所有服务(Zookeeper + Kafka + App) +docker-compose up -d + +# 3. 查看日志 +docker-compose logs -f app + +# 4. 停止服务 +docker-compose down +``` + +### 7.2 单独构建镜像 + +```bash +# 构建镜像 +docker build -t job-crawler:latest . + +# 推送到私有仓库(可选) +docker tag job-crawler:latest your-registry/job-crawler:latest +docker push your-registry/job-crawler:latest +``` + +### 7.3 Kubernetes 部署(可选) + +```yaml +# 示例 Deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: job-crawler +spec: + replicas: 1 + selector: + matchLabels: + app: job-crawler + template: + spec: + containers: + - name: job-crawler + image: job-crawler:latest + ports: + - containerPort: 8000 + env: + - name: KAFKA_BOOTSTRAP_SERVERS + value: "kafka:9092" + envFrom: + - secretRef: + name: job-crawler-secrets +``` + +### 7.4 服务端口 +| 服务 | 端口 | 说明 | +|------|------|------| +| FastAPI | 8000 | HTTP API | +| Kafka | 9092 | 外部访问 | +| Kafka | 29092 | 容器内部访问 | +| Zookeeper | 2181 | Kafka协调 | --- -## 7. 配置说明 +## 8. 配置说明 ### 配置文件 `config/config.yml` @@ -315,124 +360,133 @@ app: # 八爪鱼API配置 api: base_url: https://openapi.bazhuayu.com + task_id: 00f3b445-d8ec-44e8-88b2-4b971a228b1e username: "your_username" password: "your_password" batch_size: 100 - # 多任务配置 - tasks: - - id: "00f3b445-d8ec-44e8-88b2-4b971a228b1e" - name: "青岛招聘数据" - enabled: true - - id: "task-id-2" - name: "任务2" - enabled: false -# RabbitMQ配置 -rabbitmq: - host: rabbitmq # Docker内部服务名 - port: 5672 - username: guest - password: guest - queue: job_data - message_ttl: 604800000 # 消息过期时间:7天(毫秒) +# Kafka配置 +kafka: + bootstrap_servers: kafka:29092 # Docker内部网络 + topic: job_data + consumer_group: job_consumer_group # 采集配置 crawler: - filter_days: 7 # 数据有效期(天) - max_expired_batches: 3 # 连续过期批次阈值 - max_workers: 5 # 最大并行任务数 - auto_start: true # 容器启动时自动开始采集 + interval: 300 # 采集间隔(秒) + filter_days: 7 # 过滤天数 # 数据库配置 database: - path: data/crawl_progress.db + path: /app/data/crawl_progress.db +``` + +### 配置加载优先级 + +1. 环境变量 `CONFIG_PATH` 指定配置文件路径 +2. 默认路径 `config/config.yml` + +### Docker挂载 + +```yaml +# docker-compose.yml +volumes: + - ./config:/app/config:ro # 配置文件(只读) + - app_data:/app/data # 数据持久化 ``` --- -## 8. 部署流程 - -### 8.1 Docker Compose 一键部署 - -```bash -# 1. 配置 -cd job_crawler -cp config/config.yml.docker config/config.yml -# 编辑 config/config.yml 填入账号密码 - -# 2. 构建镜像 -./deploy.sh build - -# 3. 启动服务 -./deploy.sh up - -# 4. 查看日志 -./deploy.sh logs - -# 5. 查看状态 -./deploy.sh status -``` - -### 8.2 部署脚本命令 - -| 命令 | 说明 | -|------|------| -| `./deploy.sh build` | 构建镜像 | -| `./deploy.sh up` | 启动服务 | -| `./deploy.sh down` | 停止服务 | -| `./deploy.sh restart` | 重启应用 | -| `./deploy.sh logs` | 查看应用日志 | -| `./deploy.sh status` | 查看服务状态 | -| `./deploy.sh reset` | 清理数据卷并重启 | - -### 8.3 服务端口 - -| 服务 | 端口 | 说明 | -|------|------|------| -| FastAPI | 8000 | HTTP API | -| RabbitMQ | 5672 | AMQP协议 | -| RabbitMQ | 15672 | 管理界面 | - -### 8.4 访问地址 - -- API文档: http://localhost:8000/docs -- RabbitMQ管理界面: http://localhost:15672 (guest/guest) - ---- - -## 9. 数据流向 - -``` -八爪鱼API → 采集服务(过滤7天内数据) → RabbitMQ(TTL=7天) → 第三方消费 - ↓ - 过期自动删除 -``` - ---- - -## 10. Token自动刷新机制 - -系统实现了Token自动管理: - -1. 首次请求时自动获取Token -2. Token缓存在内存中 -3. 请求前检查Token有效期(提前5分钟刷新) -4. 遇到401错误自动重新获取Token - ---- - -## 11. 异常处理 +## 9. 异常处理 | 异常场景 | 处理策略 | |---------|---------| | API请求失败 | 重试3次,指数退避 | -| Token过期 | 自动刷新Token | -| RabbitMQ连接失败 | 自动重连 | +| Token过期 | 返回错误,需手动更新 | +| Kafka连接失败 | 重试连接,数据暂存本地 | | 日期解析失败 | 记录日志,跳过该条数据 | --- -## 12. 快速启动 +## 10. 监控指标 + +- 采集进度百分比 +- Kafka消息堆积量(lag) +- 每分钟采集条数 +- 过滤后有效数据比例 +- API响应时间 + +--- + +## 11. 后续扩展 + +1. **多任务支持**: 支持配置多个taskId并行采集 +2. **数据去重**: 基于职位URL去重 +3. **告警通知**: 采集异常时发送通知 +4. **Web管理界面**: 可视化监控采集状态 + +--- + +## 12. Docker 镜像构建 + +### Dockerfile 说明 + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app + +# 安装系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends gcc + +# 安装Python依赖 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# 复制应用代码 +COPY app/ ./app/ + +# 创建数据目录 +RUN mkdir -p /app/data + +# 环境变量 +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +``` + +### 构建命令 + +```bash +# 构建 +docker build -t job-crawler:latest . + +# 运行测试 +docker run --rm -p 8000:8000 \ + -e API_USERNAME=xxx \ + -e API_PASSWORD=xxx \ + -e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 \ + job-crawler:latest +``` + +--- + +## 13. 代码分层说明 + +| 层级 | 目录 | 职责 | +|------|------|------| +| API层 | `app/api/` | 路由定义、请求处理、响应格式化 | +| 服务层 | `app/services/` | 业务逻辑、外部服务调用 | +| 模型层 | `app/models/` | 数据结构定义、数据转换 | +| 工具层 | `app/utils/` | 通用工具函数 | +| 核心层 | `app/core/` | 配置、日志等基础设施 | + +--- + +## 14. 快速启动 ```bash # 1. 配置 @@ -441,15 +495,46 @@ cp config/config.yml.docker config/config.yml # 编辑 config/config.yml 填入账号密码 # 2. 一键启动 -./deploy.sh build -./deploy.sh up +docker-compose up -d -# 3. 查看采集日志 -./deploy.sh logs +# 3. 访问API文档 +# http://localhost:8000/docs -# 4. 消费数据 +# 4. 启动采集 +curl -X POST http://localhost:8000/crawl/start + +# 5. 查看进度 +curl http://localhost:8000/status + +# 6. 消费数据 curl http://localhost:8000/consume?batch_size=10 - -# 5. 查看队列大小 -curl http://localhost:8000/queue/size +``` + +--- + +## 15. Token自动刷新机制 + +系统实现了Token自动管理: + +1. 首次请求时自动获取Token +2. Token缓存在内存中 +3. 请求前检查Token有效期(提前5分钟刷新) +4. 遇到401错误自动重新获取Token + +```python +# app/services/api_client.py 核心逻辑 +async def _get_token(self) -> str: + # 检查token是否有效(提前5分钟刷新) + if self._access_token and time.time() < self._token_expires_at - 300: + return self._access_token + + # 调用 /token 接口获取新token + response = await client.post(f"{self.base_url}/token", json={ + "username": self.username, + "password": self.password, + "grant_type": "password" + }) + + self._access_token = token_data.get("access_token") + self._token_expires_at = time.time() + expires_in ``` diff --git a/docs/采集流程时序图.md b/docs/采集流程时序图.md index 7267e6b..a9a04cc 100644 --- a/docs/采集流程时序图.md +++ b/docs/采集流程时序图.md @@ -1,58 +1,68 @@ # 增量采集流程时序图 -## 1. 核心逻辑 +## 1. 核心逻辑变更 -### 采集方向(从后往前) +### 原逻辑(从前往后) ``` -offset: total-100 → total-200 → ... → last_start_offset -优势:先采集最新数据,下次只采集新增部分 +offset: 0 → 100 → 200 → ... → total +问题:新数据在末尾,每次都要遍历全部旧数据 ``` -### 消息队列 -- 使用 RabbitMQ,支持消息级别 TTL -- 消息过期时间:7天,过期自动删除 -- 每批数据过滤后立即发送,不等待任务结束 +### 新逻辑(从后往前) +``` +offset: total-100 → total-200 → ... → 0 +优势:先采集最新数据,遇到过期数据即可停止 +``` -## 2. 容器启动与自动采集 +## 2. 容器启动与自动采集时序图 ``` -┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ -│ Docker │ │ App │ │ Crawler │ │ RabbitMQ │ -│ 容器 │ │ FastAPI │ │ Manager │ │ │ -└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ - │ │ │ │ - │ docker-compose │ │ │ - │ up │ │ │ - │───────────────>│ │ │ - │ │ │ │ - │ │ lifespan启动 │ │ - │ │ auto_start=true│ │ - │ │───────────────>│ │ - │ │ │ │ - │ │ │ 遍历enabled任务 │ - │ │ │ 创建TaskCrawler │ - │ │ │────────┐ │ - │ │ │<───────┘ │ - │ │ │ │ - │ │ │ 并行启动所有任务│ - │ │ │═══════════════>│ - │ │ │ │ +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Docker │ │ App │ │ Crawler │ │ 八爪鱼API │ │ Kafka │ +│ 容器 │ │ FastAPI │ │ Manager │ │ │ │ │ +└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ + │ │ │ │ │ + │ docker-compose │ │ │ │ + │ up │ │ │ │ + │───────────────>│ │ │ │ + │ │ │ │ │ + │ │ lifespan启动 │ │ │ + │ │ 读取config.yml │ │ │ + │ │───────────────>│ │ │ + │ │ │ │ │ + │ │ │ 遍历enabled=true的任务 │ + │ │ │────────┐ │ │ + │ │ │ │ │ │ + │ │ │<───────┘ │ │ + │ │ │ │ │ + │ │ │ 为每个任务创建 │ │ + │ │ │ TaskCrawler │ │ + │ │ │────────┐ │ │ + │ │ │ │ │ │ + │ │ │<───────┘ │ │ + │ │ │ │ │ + │ │ auto_start_all │ │ │ + │ │───────────────>│ │ │ + │ │ │ │ │ + │ │ │ 并行启动所有任务 │ + │ │ │═══════════════════════════════>│ + │ │ │ │ │ ``` ## 3. 单任务采集流程(从后往前,实时发送) ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ -│ TaskCrawler │ │ 八爪鱼API │ │ DateFilter │ │ RabbitMQ │ +│ TaskCrawler │ │ 八爪鱼API │ │ DateFilter │ │ Kafka │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ 1.获取数据总数 │ │ │ │───────────────>│ │ │ │<───────────────│ │ │ - │ total=270000 │ │ │ + │ total=257449 │ │ │ │ │ │ │ │ 2.读取上次进度,计算采集范围 │ │ - │ start_offset = total - 100 = 269900 │ + │ start_offset = total - 100 = 257349 │ │ end_offset = last_start_offset (上次起始位置) │ │────────┐ │ │ │ │<───────┘ │ │ │ @@ -62,36 +72,44 @@ offset: total-100 → total-200 → ... → last_start_offset │ ╚══════════════════════════════════════════════════════════╝ │ │ │ │ │ 3.请求一批数据 │ │ │ - │ offset=269900 │ │ │ + │ offset=257349 │ │ │ │───────────────>│ │ │ │<───────────────│ │ │ │ 返回100条 │ │ │ │ │ │ │ - │ 4.过滤数据(7天内有效) │ │ + │ 4.过滤数据 │ │ │ │───────────────────────────────>│ │ │<───────────────────────────────│ │ - │ 有效95条,过期5条 │ │ + │ 有效数据95条 │ │ │ │ │ │ │ - │ 5.立即发送到RabbitMQ │ │ - │ (消息TTL=7天,过期自动删除) │ │ + │ 5.立即发送到Kafka (不等待任务结束) │ │────────────────────────────────────────────────>│ │<────────────────────────────────────────────────│ │ 发送成功 │ │ │ │ │ │ │ - │ 6.更新offset,继续循环 │ │ - │ offset = 269900 - 100 = 269800 │ │ + │ 6.更新offset,保存进度 │ │ + │ offset = 257349 - 100 = 257249 │ │ │────────┐ │ │ │ │<───────┘ │ │ │ │ │ │ │ - │ 7.检查停止条件 │ │ │ - │ offset >= end_offset ? 继续 │ │ - │ offset < end_offset ? 停止 │ │ + │ 7.检查是否继续 │ │ │ + │ offset >= end_offset ? │ │ │────────┐ │ │ │ - │<───────┘ │ │ │ + │<───────┘ 是→继续循环 │ │ + │ 否→结束 │ │ + │ │ │ │ + │ ╔══════════════════════════════════════════════════════════╗ + │ ║ 停止条件: ║ + │ ║ - offset < end_offset (已采集到上次位置) ║ + │ ║ - 首次采集时连续3批全过期 ║ + │ ║ - 手动停止 ║ + │ ╚══════════════════════════════════════════════════════════╝ │ │ │ │ ``` -## 4. 进度记录与增量采集 +**关键点:每批数据过滤后立即发送Kafka,不等待整个任务完成** + +## 4. 进度记录与增量采集逻辑 ``` ┌─────────────────────────────────────────────────────────────────────────┐ @@ -100,96 +118,171 @@ offset: total-100 → total-200 → ... → last_start_offset │ │ │ 首次采集: │ │ ┌─────────────────────────────────────────────────────────────────┐ │ -│ │ total = 270000 │ │ -│ │ start_offset = total - 100 = 269900 │ │ -│ │ end_offset = 0 (首次采集,遇到连续过期数据停止) │ │ +│ │ total = 257449 │ │ +│ │ start_offset = total - batch_size = 257349 │ │ +│ │ end_offset = 0 (采集到最开始,或遇到过期数据停止) │ │ │ │ │ │ -│ │ 采集完成后保存: last_start_offset = 269900 │ │ +│ │ 采集完成后保存: │ │ +│ │ - last_start_offset = 257349 (本次采集的起始位置) │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ 下次采集: │ │ ┌─────────────────────────────────────────────────────────────────┐ │ -│ │ total = 270500 (新增500条) │ │ -│ │ start_offset = 270500 - 100 = 270400 │ │ -│ │ end_offset = 269900 (上次的起始位置) │ │ +│ │ total = 260000 (新增了数据) │ │ +│ │ start_offset = total - batch_size = 259900 │ │ +│ │ end_offset = last_start_offset = 257349 (上次的起始位置) │ │ │ │ │ │ -│ │ 只采集 270400 → 269900 这部分新增数据 │ │ -│ │ 采集完成后保存: last_start_offset = 270400 │ │ +│ │ 只采集 259900 → 257349 这部分新增数据 │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ +│ 流程图: │ +│ │ +│ 获取 total │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ │ +│ │ 读取上次进度 │ │ +│ │ last_start_offset │ │ +│ └───────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ ┌─────────────────────────────────┐ │ +│ │last_start_offset │ 是 │ end_offset = last_start_offset │ │ +│ │ 存在? │────>│ (从上次位置截止) │ │ +│ └───────────────────┘ └─────────────────────────────────┘ │ +│ │ 否 │ +│ ▼ │ +│ ┌───────────────────────────────────────┐ │ +│ │ end_offset = 0 │ │ +│ │ (首次采集,采集到最开始或遇到过期停止) │ │ +│ └───────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────┐ │ +│ │ start_offset = │ │ +│ │ total - batch_size│ │ +│ └───────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────┐ │ +│ │ 从 start_offset 向前采集 │ │ +│ │ 直到 offset <= end_offset │ │ +│ └───────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────┐ │ +│ │ 保存 last_start_offset = 本次起始位置 │ │ +│ └───────────────────────────────────────┘ │ +│ │ └─────────────────────────────────────────────────────────────────────────┘ ``` ## 5. 停止条件 采集停止的条件(满足任一即停止): -1. `offset < end_offset` - 已采集到上次的起始位置 +1. `offset <= end_offset` - 已采集到上次的起始位置 2. 连续3批数据全部过期 - 数据太旧(仅首次采集时生效) 3. 手动调用停止接口 -## 6. 消息过期机制 +## 6. 完整流程示例 -``` -┌─────────────────────────────────────────────────────────────────────────┐ -│ RabbitMQ 消息TTL │ -├─────────────────────────────────────────────────────────────────────────┤ -│ │ -│ 消息发送时设置 TTL = 7天 (604800000ms) │ -│ │ -│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ -│ │ 消息A │ │ 消息B │ │ 消息C │ │ 消息D │ │ -│ │ 1月8日 │ │ 1月10日 │ │ 1月14日 │ │ 1月15日 │ │ -│ │ 已过期 │ │ 即将过期 │ │ 有效 │ │ 有效 │ │ -│ │ 自动删除 │ │ │ │ │ │ │ │ -│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ -│ ↓ │ -│ RabbitMQ自动清理 │ -│ │ -│ 优势: │ -│ - 消息级别TTL,精确控制每条消息的过期时间 │ -│ - 过期消息自动删除,无需手动清理 │ -│ - 队列中始终保持最近7天的有效数据 │ -│ │ -└─────────────────────────────────────────────────────────────────────────┘ +### 首次采集 +数据总量 `total = 257449`,`batch_size = 100`,无历史进度: + +| 轮次 | offset | 请求范围 | 有效数据 | 动作 | +|------|--------|----------|----------|------| +| 1 | 257349 | 257349-257449 | 98 | 发送到Kafka,继续 | +| 2 | 257249 | 257249-257349 | 95 | 发送到Kafka,继续 | +| ... | ... | ... | ... | ... | +| N | 1000 | 1000-1100 | 0 | expired_batches=1 | +| N+1 | 900 | 900-1000 | 0 | expired_batches=2 | +| N+2 | 800 | 800-900 | 0 | expired_batches=3,**停止** | + +保存进度:`last_start_offset = 257349` + +### 第二次采集(1小时后) +数据总量 `total = 257600`(新增151条),读取 `last_start_offset = 257349`: + +| 轮次 | offset | 请求范围 | end_offset | 动作 | +|------|--------|----------|------------|------| +| 1 | 257500 | 257500-257600 | 257349 | 发送到Kafka,继续 | +| 2 | 257400 | 257400-257500 | 257349 | 发送到Kafka,继续 | +| 3 | 257300 | 257300-257400 | 257349 | offset < end_offset,**停止** | + +保存进度:`last_start_offset = 257500` + +## 7. 代码变更点 + +### 7.1 progress_store - 保存 last_start_offset +```python +# 进度表增加字段 +# last_start_offset: 上次采集的起始位置,作为下次采集的截止位置 ``` +### 7.2 crawler.py - TaskCrawler.start() +```python +async def start(self): + total = await api_client.get_total_count(self.task_id) + + # 读取上次进度 + progress = progress_store.get_progress(self.task_id) + last_start_offset = progress.last_start_offset if progress else None + + # 计算本次采集范围 + start_offset = total - self.batch_size # 从最新数据开始 + end_offset = last_start_offset if last_start_offset else 0 # 截止到上次起始位置 + + # 保存本次起始位置 + this_start_offset = start_offset + + current_offset = start_offset + expired_batches = 0 + + while current_offset >= end_offset and self._running: + valid_count = await self._crawl_batch(current_offset) + + # 仅首次采集时检查过期(end_offset=0时) + if end_offset == 0: + if valid_count == 0: + expired_batches += 1 + if expired_batches >= 3: + break # 连续3批过期,停止 + else: + expired_batches = 0 + + current_offset -= self.batch_size + + # 保存进度,记录本次起始位置供下次使用 + progress_store.save_progress( + task_id=self.task_id, + last_start_offset=this_start_offset, + ... + ) +``` -## 7. 配置说明 +### 7.3 main.py - 自动启动 +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("服务启动中...") + + # 自动启动所有任务 + from app.services import crawler_manager + asyncio.create_task(crawler_manager.start_all()) + + yield + + logger.info("服务关闭中...") + crawler_manager.stop_all() + kafka_service.close() +``` + +## 8. 配置说明 ```yaml # config.yml - -# RabbitMQ配置 -rabbitmq: - host: rabbitmq # Docker内部服务名 - port: 5672 - username: guest - password: guest - queue: job_data - message_ttl: 604800000 # 消息过期时间:7天(毫秒) - -# 采集配置 crawler: filter_days: 7 # 数据有效期(天) - max_expired_batches: 3 # 连续过期批次阈值(首次采集时生效) + max_expired_batches: 3 # 连续过期批次阈值,超过则停止 auto_start: true # 容器启动时自动开始采集 ``` - -## 8. API接口 - -| 接口 | 方法 | 说明 | -|------|------|------| -| `/status` | GET | 获取采集状态 | -| `/tasks` | GET | 获取任务列表 | -| `/crawl/start` | POST | 启动采集任务 | -| `/crawl/stop` | POST | 停止采集任务 | -| `/consume` | GET | 消费队列数据 | -| `/queue/size` | GET | 获取队列消息数量 | - -## 9. 数据流向 - -``` -八爪鱼API → 采集服务(过滤7天内数据) → RabbitMQ(TTL=7天) → 第三方消费 - ↓ - 过期自动删除 -``` diff --git a/job_crawler/app/api/routes.py b/job_crawler/app/api/routes.py index b36a6ed..8ffbcfd 100644 --- a/job_crawler/app/api/routes.py +++ b/job_crawler/app/api/routes.py @@ -5,7 +5,7 @@ from typing import Optional from fastapi import APIRouter, Query, BackgroundTasks, HTTPException from fastapi.responses import StreamingResponse from app.models import ApiResponse, ConsumeResponse, StatusResponse -from app.services import crawler_manager, rabbitmq_service +from app.services import crawler_manager, kafka_service logger = logging.getLogger(__name__) router = APIRouter() @@ -84,18 +84,29 @@ async def stop_crawl( @router.get("/consume", response_model=ConsumeResponse) async def consume_data( - batch_size: int = Query(10, ge=1, le=100, description="批量大小") + batch_size: int = Query(10, ge=1, le=100, description="批量大小"), + timeout: int = Query(5000, ge=1000, le=30000, description="超时时间(毫秒)") ): - """消费RabbitMQ数据""" + """消费Kafka数据""" try: - messages = rabbitmq_service.consume(batch_size) + messages = kafka_service.consume(batch_size, timeout) return ConsumeResponse(data=messages, count=len(messages)) except Exception as e: logger.error(f"消费数据失败: {e}") raise HTTPException(status_code=500, detail=str(e)) -@router.get("/queue/size") -async def get_queue_size(): - """获取队列消息数量""" - return {"queue_size": rabbitmq_service.get_queue_size()} +@router.get("/consume/stream") +async def consume_stream(): + """SSE流式消费""" + async def event_generator(): + consumer = kafka_service.get_consumer() + try: + for message in consumer: + data = json.dumps(message.value, ensure_ascii=False) + yield f"data: {data}\n\n" + except Exception as e: + logger.error(f"流式消费错误: {e}") + finally: + consumer.close() + return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/job_crawler/app/core/config.py b/job_crawler/app/core/config.py index 8721bbe..f28abee 100644 --- a/job_crawler/app/core/config.py +++ b/job_crawler/app/core/config.py @@ -27,13 +27,10 @@ class ApiConfig(BaseModel): tasks: List[TaskConfig] = [] -class RabbitMQConfig(BaseModel): - host: str = "localhost" - port: int = 5672 - username: str = "guest" - password: str = "guest" - queue: str = "job_data" - message_ttl: int = 604800000 # 7天(毫秒) +class KafkaConfig(BaseModel): + bootstrap_servers: str = "localhost:9092" + topic: str = "job_data" + consumer_group: str = "job_consumer_group" class CrawlerConfig(BaseModel): @@ -52,7 +49,7 @@ class Settings(BaseModel): """应用配置""" app: AppConfig = AppConfig() api: ApiConfig = ApiConfig() - rabbitmq: RabbitMQConfig = RabbitMQConfig() + kafka: KafkaConfig = KafkaConfig() crawler: CrawlerConfig = CrawlerConfig() database: DatabaseConfig = DatabaseConfig() @@ -74,7 +71,7 @@ class Settings(BaseModel): return cls( app=AppConfig(**data.get('app', {})), api=api_config, - rabbitmq=RabbitMQConfig(**data.get('rabbitmq', {})), + kafka=KafkaConfig(**data.get('kafka', {})), crawler=CrawlerConfig(**data.get('crawler', {})), database=DatabaseConfig(**data.get('database', {})) ) diff --git a/job_crawler/app/main.py b/job_crawler/app/main.py index 1cb5804..03c654f 100644 --- a/job_crawler/app/main.py +++ b/job_crawler/app/main.py @@ -6,7 +6,7 @@ from fastapi import FastAPI from app.core.config import settings from app.core.logging import setup_logging from app.api import router -from app.services import rabbitmq_service +from app.services import kafka_service setup_logging() logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ async def lifespan(app: FastAPI): logger.info("服务关闭中...") from app.services import crawler_manager crawler_manager.stop_all() - rabbitmq_service.close() + kafka_service.close() app = FastAPI( diff --git a/job_crawler/app/models/job.py b/job_crawler/app/models/job.py index 8870a92..2d6ed6c 100644 --- a/job_crawler/app/models/job.py +++ b/job_crawler/app/models/job.py @@ -1,24 +1,60 @@ """招聘数据模型""" +from pydantic import BaseModel from datetime import datetime import uuid -class JobData: - """招聘数据 - 保留原始数据格式""" +class JobData(BaseModel): + """招聘数据模型""" + id: str = "" + task_id: str = "" # 任务ID + job_category: str = "" # Std_class - 职位分类 + job_title: str = "" # aca112 - 职位名称 + company: str = "" # AAB004 - 公司名称 + company_type: str = "" # AAB019 - 企业类型 + salary: str = "" # acb241 - 薪资范围 + location: str = "" # aab302 - 工作地点 + address: str = "" # AAE006 - 详细地址 + publish_date: str = "" # aae397 - 发布日期 + collect_time: str = "" # Collect_time - 采集时间 + url: str = "" # ACE760 - 职位链接 + description: str = "" # acb22a - 职位描述 + experience: str = "" # Experience - 经验要求 + education: str = "" # aac011 - 学历要求 + headcount: str = "" # acb240 - 招聘人数 + industry: str = "" # AAB022 - 行业 + company_size: str = "" # Num_employers - 公司规模 + contact: str = "" # AAE004 - 联系人 + company_intro: str = "" # AAB092 - 公司简介 + crawl_time: str = "" # 入库时间 - def __init__(self, raw_data: dict, task_id: str = ""): - self.raw_data = raw_data - self.task_id = task_id - # 添加元数据 - self.raw_data["_id"] = str(uuid.uuid4()) - self.raw_data["_task_id"] = task_id - self.raw_data["_crawl_time"] = datetime.now().isoformat() - - def to_dict(self) -> dict: - """转换为字典(原始数据 + 元数据)""" - return self.raw_data + def __init__(self, **data): + super().__init__(**data) + if not self.id: + self.id = str(uuid.uuid4()) + if not self.crawl_time: + self.crawl_time = datetime.now().isoformat() @classmethod - def from_raw(cls, raw: dict, task_id: str = "") -> "JobData": - """从原始API数据创建""" - return cls(raw.copy(), task_id) + def from_raw(cls, raw: dict) -> "JobData": + """从原始API数据转换""" + return cls( + job_category=raw.get("Std_class", ""), + job_title=raw.get("aca112", ""), + company=raw.get("AAB004", ""), + company_type=raw.get("AAB019", "").strip(), + salary=raw.get("acb241", ""), + location=raw.get("aab302", ""), + address=raw.get("AAE006", ""), + publish_date=raw.get("aae397", ""), + collect_time=raw.get("Collect_time", ""), + url=raw.get("ACE760", ""), + description=raw.get("acb22a", ""), + experience=raw.get("Experience", ""), + education=raw.get("aac011", ""), + headcount=raw.get("acb240", ""), + industry=raw.get("AAB022", ""), + company_size=raw.get("Num_employers", ""), + contact=raw.get("AAE004", ""), + company_intro=raw.get("AAB092", ""), + ) diff --git a/job_crawler/app/models/progress.py b/job_crawler/app/models/progress.py index 81fd7cb..e33c053 100644 --- a/job_crawler/app/models/progress.py +++ b/job_crawler/app/models/progress.py @@ -18,7 +18,7 @@ class CrawlStatus(BaseModel): total: int last_start_offset: Optional[int] = None progress: str - queue_size: int = 0 + kafka_lag: int = 0 status: str last_update: str filtered_count: int = 0 diff --git a/job_crawler/app/services/__init__.py b/job_crawler/app/services/__init__.py index b5a232d..62d8396 100644 --- a/job_crawler/app/services/__init__.py +++ b/job_crawler/app/services/__init__.py @@ -1,12 +1,12 @@ """服务模块""" from .api_client import api_client, BazhuayuClient -from .rabbitmq_service import rabbitmq_service, RabbitMQService +from .kafka_service import kafka_service, KafkaService from .progress_store import progress_store, ProgressStore from .crawler import crawler_manager, CrawlerManager, TaskCrawler __all__ = [ "api_client", "BazhuayuClient", - "rabbitmq_service", "RabbitMQService", + "kafka_service", "KafkaService", "progress_store", "ProgressStore", "crawler_manager", "CrawlerManager", "TaskCrawler" ] diff --git a/job_crawler/app/services/crawler.py b/job_crawler/app/services/crawler.py index 16027ea..6e8bf5b 100644 --- a/job_crawler/app/services/crawler.py +++ b/job_crawler/app/services/crawler.py @@ -4,7 +4,7 @@ import logging from typing import Dict, Optional from concurrent.futures import ThreadPoolExecutor from app.services.api_client import api_client -from app.services.rabbitmq_service import rabbitmq_service +from app.services.kafka_service import kafka_service from app.services.progress_store import progress_store from app.utils import is_within_days from app.models import JobData @@ -134,20 +134,21 @@ class TaskCrawler: aae397 = raw.get("aae397", "") collect_time = raw.get("Collect_time", "") if is_within_days(aae397, collect_time, self.filter_days): - job = JobData.from_raw(raw, self.task_id) + job = JobData.from_raw(raw) + job.task_id = self.task_id filtered_jobs.append(job) valid_count = len(filtered_jobs) expired_count = len(data_list) - valid_count self._total_filtered += valid_count - # 立即发送到RabbitMQ + # 立即发送到Kafka produced = 0 if filtered_jobs: - produced = rabbitmq_service.produce_batch(filtered_jobs) + produced = kafka_service.produce_batch(filtered_jobs) self._total_produced += produced - logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送MQ={produced}") + logger.info(f"[{self.task_name}] offset={offset}, 获取={len(data_list)}, 有效={valid_count}, 过期={expired_count}, 发送Kafka={produced}") return valid_count @@ -235,7 +236,7 @@ class CrawlerManager: return crawler.get_status() if crawler else {} return { "tasks": [c.get_status() for c in self._crawlers.values()], - "queue_size": rabbitmq_service.get_queue_size(), + "kafka_lag": kafka_service.get_lag(), "running_count": sum(1 for c in self._crawlers.values() if c.is_running) } diff --git a/job_crawler/app/services/kafka_service.py b/job_crawler/app/services/kafka_service.py index b55ce5f..328275b 100644 --- a/job_crawler/app/services/kafka_service.py +++ b/job_crawler/app/services/kafka_service.py @@ -66,8 +66,7 @@ class KafkaService: def produce(self, job_data: JobData) -> bool: """发送消息到Kafka""" try: - data = job_data.to_dict() - future = self.producer.send(self.topic, key=data.get("_id"), value=data) + future = self.producer.send(self.topic, key=job_data.id, value=job_data.model_dump()) future.get(timeout=10) return True except KafkaError as e: diff --git a/job_crawler/config/config.yml b/job_crawler/config/config.yml index debac56..ed354bc 100644 --- a/job_crawler/config/config.yml +++ b/job_crawler/config/config.yml @@ -24,14 +24,11 @@ api: name: "任务3" enabled: false -# RabbitMQ配置 -rabbitmq: - host: rabbitmq - port: 5672 - username: guest - password: guest - queue: job_data - message_ttl: 604800000 # 消息过期时间:7天(毫秒) +# Kafka配置 +kafka: + bootstrap_servers: kafka:29092 + topic: job_data + consumer_group: job_consumer_group # 采集配置 crawler: diff --git a/job_crawler/config/config.yml.docker b/job_crawler/config/config.yml.docker index 8fa4226..ec8f33a 100644 --- a/job_crawler/config/config.yml.docker +++ b/job_crawler/config/config.yml.docker @@ -22,14 +22,11 @@ api: name: "任务2" enabled: false -# RabbitMQ配置 -rabbitmq: - host: rabbitmq - port: 5672 - username: guest - password: guest - queue: job_data - message_ttl: 604800000 # 消息过期时间:7天(毫秒) +# Kafka配置(Docker内部网络) +kafka: + bootstrap_servers: kafka:29092 + topic: job_data + consumer_group: job_consumer_group # 采集配置 crawler: diff --git a/job_crawler/docker-compose.yml b/job_crawler/docker-compose.yml index 95f4e06..bb01b21 100644 --- a/job_crawler/docker-compose.yml +++ b/job_crawler/docker-compose.yml @@ -1,23 +1,51 @@ +version: '3.8' + services: - rabbitmq: - image: rabbitmq:3.12-management - container_name: job-rabbitmq + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + container_name: job-zookeeper ports: - - "5672:5672" - - "15672:15672" + - "2181:2181" environment: - RABBITMQ_DEFAULT_USER: guest - RABBITMQ_DEFAULT_PASS: guest + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 volumes: - - rabbitmq_data:/var/lib/rabbitmq + - zookeeper_data:/var/lib/zookeeper/data healthcheck: - test: ["CMD", "rabbitmq-diagnostics", "check_running"] + test: ["CMD", "nc", "-z", "localhost", "2181"] interval: 10s timeout: 5s retries: 5 networks: - job-network + kafka: + image: confluentinc/cp-kafka:7.5.0 + container_name: job-kafka + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + volumes: + - kafka_data:/var/lib/kafka/data + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"] + interval: 10s + timeout: 10s + retries: 5 + networks: + - job-network + app: image: job-crawler:latest container_name: job-crawler @@ -29,7 +57,7 @@ services: - ./config:/app/config:ro - app_data:/app/data depends_on: - rabbitmq: + kafka: condition: service_healthy restart: unless-stopped networks: @@ -40,5 +68,6 @@ networks: driver: bridge volumes: - rabbitmq_data: + zookeeper_data: + kafka_data: app_data: diff --git a/job_crawler/requirements.txt b/job_crawler/requirements.txt index 0851c08..685058b 100644 --- a/job_crawler/requirements.txt +++ b/job_crawler/requirements.txt @@ -1,7 +1,8 @@ fastapi==0.109.0 uvicorn==0.27.0 httpx==0.27.0 -pika==1.3.2 +kafka-python==2.0.2 apscheduler==3.10.4 pydantic==2.5.3 +python-dotenv==1.0.0 PyYAML==6.0.1