- Add technical documentation (技术方案.md) with system architecture and design details - Create FastAPI application structure with modular organization (api, core, models, services, utils) - Implement job data crawler service with incremental collection from third-party API - Add Kafka service integration with Docker Compose configuration for message queue - Create data models for job listings, progress tracking, and API responses - Implement REST API endpoints for data consumption (/consume, /status) and task management - Add progress persistence layer using SQLite for tracking collection offsets - Implement date filtering logic to extract data published within 7 days - Create API client service for third-party data source integration - Add configuration management with environment-based settings - Include Docker support with Dockerfile and docker-compose.yml for containerized deployment - Add logging configuration and utility functions for date parsing - Include requirements.txt with all Python dependencies and README documentation
15 KiB
15 KiB
招聘数据增量采集与消息队列服务技术方案
1. 项目概述
1.1 需求背景
从八爪鱼API采集招聘数据,筛选近7天发布的数据,通过内置Kafka服务提供消息队列,供外部系统消费。
1.2 核心功能
- 增量采集八爪鱼API招聘数据
- 日期过滤(发布日期 + 采集时间均在7天内)
- 内置Kafka服务
- 提供REST API消费接口
2. 系统架构
┌─────────────────────────────────────────────────────────────────┐
│ 系统架构图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ 八爪鱼API │───▶│ 采集服务 │───▶│ 日期过滤器 │ │
│ │ (数据源) │ │ (增量采集) │ │ (7天内数据) │ │
│ └──────────────┘ └──────────────┘ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 内置 Kafka 服务 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ Zookeeper │ │ Broker │ │ Topic:job_data │ │ │
│ │ │ (Docker) │ │ (Docker) │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ FastAPI 服务 │ │
│ │ ┌─────────────────┐ ┌─────────────────────────────┐ │ │
│ │ │ GET /consume │ │ GET /status │ │ │
│ │ │ (消费数据) │ │ (采集状态/进度) │ │ │
│ │ └─────────────────┘ └─────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
3. 技术选型
| 组件 | 技术方案 | 版本 | 说明 |
|---|---|---|---|
| 运行环境 | Python | 3.10+ | 主开发语言 |
| HTTP客户端 | httpx | 0.27+ | 异步HTTP请求 |
| 消息队列 | Kafka | 3.6+ | Docker部署 |
| Kafka客户端 | kafka-python | 2.0+ | Python Kafka SDK |
| API框架 | FastAPI | 0.109+ | REST接口 |
| 容器编排 | Docker Compose | 2.0+ | Kafka/Zookeeper部署 |
| 任务调度 | APScheduler | 3.10+ | 定时增量采集 |
| 数据存储 | SQLite | 内置 | 存储采集进度(offset) |
4. 项目结构
job_crawler/
├── app/ # 应用代码
│ ├── api/ # API路由层
│ │ ├── __init__.py
│ │ └── routes.py # 路由定义
│ ├── core/ # 核心配置
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ └── logging.py # 日志配置
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── job.py # 招聘数据模型
│ │ ├── progress.py # 进度模型
│ │ └── response.py # 响应模型
│ ├── services/ # 业务服务层
│ │ ├── __init__.py
│ │ ├── api_client.py # 八爪鱼API客户端
│ │ ├── crawler.py # 采集核心逻辑
│ │ ├── kafka_service.py # Kafka服务
│ │ └── progress_store.py # 进度存储
│ ├── utils/ # 工具函数
│ │ ├── __init__.py
│ │ └── date_parser.py # 日期解析
│ ├── __init__.py
│ └── main.py # 应用入口
├── docker-compose.yml # 容器编排(含Kafka+App)
├── Dockerfile # 应用镜像构建
├── requirements.txt # Python依赖
├── .env.example # 配置模板
├── .dockerignore # Docker忽略文件
└── README.md # 使用说明
5. 核心模块设计
5.1 增量采集模块
采集策略
# 增量采集流程
1. 读取上次采集的offset(首次为0)
2. 调用API: GET /data/all?taskId=xxx&offset={offset}&size=100
3. 解析返回数据,过滤近7天数据
4. 推送到Kafka
5. 更新offset = offset + size
6. 循环直到 offset >= total
进度持久化
使用SQLite存储采集进度:
CREATE TABLE crawl_progress (
task_id TEXT PRIMARY KEY,
current_offset INTEGER,
total INTEGER,
last_update TIMESTAMP
);
5.2 日期过滤模块
aae397 字段格式解析
| 原始值 | 解析规则 | 示例结果 |
|---|---|---|
| "今天" | 当前日期 | 2026-01-15 |
| "1月13日" | 当年+月日 | 2026-01-13 |
| "1月9日" | 当年+月日 | 2026-01-09 |
过滤逻辑
def is_within_7_days(aae397: str, collect_time: str) -> bool:
"""
判断数据是否在近7天内
条件:发布日期 AND 采集时间 都在7天内
"""
today = datetime.now().date()
seven_days_ago = today - timedelta(days=7)
publish_date = parse_aae397(aae397) # 解析发布日期
collect_date = parse_collect_time(collect_time) # 解析采集时间
return publish_date >= seven_days_ago and collect_date >= seven_days_ago
5.3 Kafka服务模块
Docker Compose配置
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
Topic设计
- Topic名称:
job_data - 分区数: 3
- 副本数: 1
- 消息格式: JSON
5.4 REST API接口
| 接口 | 方法 | 说明 |
|---|---|---|
/consume |
GET | 消费Kafka数据,支持batch_size参数 |
/consume/stream |
GET | SSE流式消费 |
/status |
GET | 查看采集进度和状态 |
/crawl/start |
POST | 手动触发采集任务 |
/crawl/stop |
POST | 停止采集任务 |
接口详情
GET /consume
// Request
GET /consume?batch_size=10
// Response
{
"code": 0,
"data": [
{
"job_title": "机动车司机/驾驶",
"company": "青岛唐盛物流有限公司",
"salary": "1-1.5万",
"location": "青岛黄岛区",
"publish_date": "2026-01-13",
"collect_time": "2026-01-15",
"url": "https://www.zhaopin.com/..."
}
],
"count": 10
}
GET /status
{
"code": 0,
"data": {
"task_id": "00f3b445-d8ec-44e8-88b2-4b971a228b1e",
"total": 257449,
"current_offset": 156700,
"progress": "60.87%",
"kafka_lag": 1234,
"status": "running",
"last_update": "2026-01-15T10:30:00"
}
}
6. 数据模型
6.1 原始数据字段映射
| 原始字段 | 含义 | 输出字段 |
|---|---|---|
| Std_class | 职位分类 | job_category |
| aca112 | 职位名称 | job_title |
| AAB004 | 公司名称 | company |
| acb241 | 薪资范围 | salary |
| aab302 | 工作地点 | location |
| aae397 | 发布日期 | publish_date |
| Collect_time | 采集时间 | collect_time |
| ACE760 | 职位链接 | url |
| acb22a | 职位描述 | description |
| Experience | 经验要求 | experience |
| aac011 | 学历要求 | education |
6.2 Kafka消息格式
{
"id": "uuid",
"job_category": "机动车司机/驾驶",
"job_title": "保底1万+五险+港内A2驾驶员",
"company": "青岛唐盛物流有限公司",
"salary": "1-1.5万",
"location": "青岛黄岛区",
"publish_date": "2026-01-13",
"collect_time": "2026-01-15",
"url": "https://www.zhaopin.com/...",
"description": "...",
"experience": "5-10年",
"education": "学历不限",
"crawl_time": "2026-01-15T10:30:00"
}
7. 部署流程
7.1 Docker Compose 一键部署(推荐)
# 1. 配置环境变量
cd job_crawler
cp .env.example .env
# 编辑 .env 填入 API_USERNAME 和 API_PASSWORD
# 2. 启动所有服务(Zookeeper + Kafka + App)
docker-compose up -d
# 3. 查看日志
docker-compose logs -f app
# 4. 停止服务
docker-compose down
7.2 单独构建镜像
# 构建镜像
docker build -t job-crawler:latest .
# 推送到私有仓库(可选)
docker tag job-crawler:latest your-registry/job-crawler:latest
docker push your-registry/job-crawler:latest
7.3 Kubernetes 部署(可选)
# 示例 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: job-crawler
spec:
replicas: 1
selector:
matchLabels:
app: job-crawler
template:
spec:
containers:
- name: job-crawler
image: job-crawler:latest
ports:
- containerPort: 8000
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
envFrom:
- secretRef:
name: job-crawler-secrets
7.4 服务端口
| 服务 | 端口 | 说明 |
|---|---|---|
| FastAPI | 8000 | HTTP API |
| Kafka | 9092 | 外部访问 |
| Kafka | 29092 | 容器内部访问 |
| Zookeeper | 2181 | Kafka协调 |
8. 配置说明
配置文件 config/config.yml
# 应用配置
app:
name: job-crawler
version: 1.0.0
debug: false
# 八爪鱼API配置
api:
base_url: https://openapi.bazhuayu.com
task_id: 00f3b445-d8ec-44e8-88b2-4b971a228b1e
username: "your_username"
password: "your_password"
batch_size: 100
# Kafka配置
kafka:
bootstrap_servers: kafka:29092 # Docker内部网络
topic: job_data
consumer_group: job_consumer_group
# 采集配置
crawler:
interval: 300 # 采集间隔(秒)
filter_days: 7 # 过滤天数
# 数据库配置
database:
path: /app/data/crawl_progress.db
配置加载优先级
- 环境变量
CONFIG_PATH指定配置文件路径 - 默认路径
config/config.yml
Docker挂载
# docker-compose.yml
volumes:
- ./config:/app/config:ro # 配置文件(只读)
- app_data:/app/data # 数据持久化
9. 异常处理
| 异常场景 | 处理策略 |
|---|---|
| API请求失败 | 重试3次,指数退避 |
| Token过期 | 返回错误,需手动更新 |
| Kafka连接失败 | 重试连接,数据暂存本地 |
| 日期解析失败 | 记录日志,跳过该条数据 |
10. 监控指标
- 采集进度百分比
- Kafka消息堆积量(lag)
- 每分钟采集条数
- 过滤后有效数据比例
- API响应时间
11. 后续扩展
- 多任务支持: 支持配置多个taskId并行采集
- 数据去重: 基于职位URL去重
- 告警通知: 采集异常时发送通知
- Web管理界面: 可视化监控采集状态
12. Docker 镜像构建
Dockerfile 说明
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends gcc
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY app/ ./app/
# 创建数据目录
RUN mkdir -p /app/data
# 环境变量
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
构建命令
# 构建
docker build -t job-crawler:latest .
# 运行测试
docker run --rm -p 8000:8000 \
-e API_USERNAME=xxx \
-e API_PASSWORD=xxx \
-e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 \
job-crawler:latest
13. 代码分层说明
| 层级 | 目录 | 职责 |
|---|---|---|
| API层 | app/api/ |
路由定义、请求处理、响应格式化 |
| 服务层 | app/services/ |
业务逻辑、外部服务调用 |
| 模型层 | app/models/ |
数据结构定义、数据转换 |
| 工具层 | app/utils/ |
通用工具函数 |
| 核心层 | app/core/ |
配置、日志等基础设施 |
14. 快速启动
# 1. 配置
cd job_crawler
cp config/config.yml.docker config/config.yml
# 编辑 config/config.yml 填入账号密码
# 2. 一键启动
docker-compose up -d
# 3. 访问API文档
# http://localhost:8000/docs
# 4. 启动采集
curl -X POST http://localhost:8000/crawl/start
# 5. 查看进度
curl http://localhost:8000/status
# 6. 消费数据
curl http://localhost:8000/consume?batch_size=10
15. Token自动刷新机制
系统实现了Token自动管理:
- 首次请求时自动获取Token
- Token缓存在内存中
- 请求前检查Token有效期(提前5分钟刷新)
- 遇到401错误自动重新获取Token
# app/services/api_client.py 核心逻辑
async def _get_token(self) -> str:
# 检查token是否有效(提前5分钟刷新)
if self._access_token and time.time() < self._token_expires_at - 300:
return self._access_token
# 调用 /token 接口获取新token
response = await client.post(f"{self.base_url}/token", json={
"username": self.username,
"password": self.password,
"grant_type": "password"
})
self._access_token = token_data.get("access_token")
self._token_expires_at = time.time() + expires_in