Files
ocups-kafka/docs/技术方案.md
2026-01-15 22:08:12 +08:00

541 lines
15 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 招聘数据增量采集与消息队列服务技术方案
## 1. 项目概述
### 1.1 需求背景
从八爪鱼API采集招聘数据筛选近7天发布的数据通过内置Kafka服务提供消息队列供外部系统消费。
### 1.2 核心功能
- 增量采集八爪鱼API招聘数据
- 日期过滤(发布日期 + 采集时间均在7天内
- 内置Kafka服务
- 提供REST API消费接口
---
## 2. 系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ 系统架构图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ 八爪鱼API │───▶│ 采集服务 │───▶│ 日期过滤器 │ │
│ │ (数据源) │ │ (增量采集) │ │ (7天内数据) │ │
│ └──────────────┘ └──────────────┘ └────────┬─────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 内置 Kafka 服务 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ Zookeeper │ │ Broker │ │ Topic:job_data │ │ │
│ │ │ (Docker) │ │ (Docker) │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ FastAPI 服务 │ │
│ │ ┌─────────────────┐ ┌─────────────────────────────┐ │ │
│ │ │ GET /consume │ │ GET /status │ │ │
│ │ │ (消费数据) │ │ (采集状态/进度) │ │ │
│ │ └─────────────────┘ └─────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
---
## 3. 技术选型
| 组件 | 技术方案 | 版本 | 说明 |
|------|---------|------|------|
| 运行环境 | Python | 3.10+ | 主开发语言 |
| HTTP客户端 | httpx | 0.27+ | 异步HTTP请求 |
| 消息队列 | Kafka | 3.6+ | Docker部署 |
| Kafka客户端 | kafka-python | 2.0+ | Python Kafka SDK |
| API框架 | FastAPI | 0.109+ | REST接口 |
| 容器编排 | Docker Compose | 2.0+ | Kafka/Zookeeper部署 |
| 任务调度 | APScheduler | 3.10+ | 定时增量采集 |
| 数据存储 | SQLite | 内置 | 存储采集进度(offset) |
---
## 4. 项目结构
```
job_crawler/
├── app/ # 应用代码
│ ├── api/ # API路由层
│ │ ├── __init__.py
│ │ └── routes.py # 路由定义
│ ├── core/ # 核心配置
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ └── logging.py # 日志配置
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── job.py # 招聘数据模型
│ │ ├── progress.py # 进度模型
│ │ └── response.py # 响应模型
│ ├── services/ # 业务服务层
│ │ ├── __init__.py
│ │ ├── api_client.py # 八爪鱼API客户端
│ │ ├── crawler.py # 采集核心逻辑
│ │ ├── kafka_service.py # Kafka服务
│ │ └── progress_store.py # 进度存储
│ ├── utils/ # 工具函数
│ │ ├── __init__.py
│ │ └── date_parser.py # 日期解析
│ ├── __init__.py
│ └── main.py # 应用入口
├── docker-compose.yml # 容器编排含Kafka+App
├── Dockerfile # 应用镜像构建
├── requirements.txt # Python依赖
├── .env.example # 配置模板
├── .dockerignore # Docker忽略文件
└── README.md # 使用说明
```
---
## 5. 核心模块设计
### 5.1 增量采集模块
#### 采集策略
```python
# 增量采集流程
1. 读取上次采集的offset首次为0
2. 调用API: GET /data/all?taskId=xxx&offset={offset}&size=100
3. 解析返回数据过滤近7天数据
4. 推送到Kafka
5. 更新offset = offset + size
6. 循环直到 offset >= total
```
#### 进度持久化
使用SQLite存储采集进度
```sql
CREATE TABLE crawl_progress (
task_id TEXT PRIMARY KEY,
current_offset INTEGER,
total INTEGER,
last_update TIMESTAMP
);
```
### 5.2 日期过滤模块
#### aae397 字段格式解析
| 原始值 | 解析规则 | 示例结果 |
|--------|---------|---------|
| "今天" | 当前日期 | 2026-01-15 |
| "1月13日" | 当年+月日 | 2026-01-13 |
| "1月9日" | 当年+月日 | 2026-01-09 |
#### 过滤逻辑
```python
def is_within_7_days(aae397: str, collect_time: str) -> bool:
"""
判断数据是否在近7天内
条件:发布日期 AND 采集时间 都在7天内
"""
today = datetime.now().date()
seven_days_ago = today - timedelta(days=7)
publish_date = parse_aae397(aae397) # 解析发布日期
collect_date = parse_collect_time(collect_time) # 解析采集时间
return publish_date >= seven_days_ago and collect_date >= seven_days_ago
```
### 5.3 Kafka服务模块
#### Docker Compose配置
```yaml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
```
#### Topic设计
- Topic名称: `job_data`
- 分区数: 3
- 副本数: 1
- 消息格式: JSON
### 5.4 REST API接口
| 接口 | 方法 | 说明 |
|------|------|------|
| `/consume` | GET | 消费Kafka数据支持batch_size参数 |
| `/consume/stream` | GET | SSE流式消费 |
| `/status` | GET | 查看采集进度和状态 |
| `/crawl/start` | POST | 手动触发采集任务 |
| `/crawl/stop` | POST | 停止采集任务 |
#### 接口详情
**GET /consume**
```json
// Request
GET /consume?batch_size=10
// Response
{
"code": 0,
"data": [
{
"job_title": "机动车司机/驾驶",
"company": "青岛唐盛物流有限公司",
"salary": "1-1.5万",
"location": "青岛黄岛区",
"publish_date": "2026-01-13",
"collect_time": "2026-01-15",
"url": "https://www.zhaopin.com/..."
}
],
"count": 10
}
```
**GET /status**
```json
{
"code": 0,
"data": {
"task_id": "00f3b445-d8ec-44e8-88b2-4b971a228b1e",
"total": 257449,
"current_offset": 156700,
"progress": "60.87%",
"kafka_lag": 1234,
"status": "running",
"last_update": "2026-01-15T10:30:00"
}
}
```
---
## 6. 数据模型
### 6.1 原始数据字段映射
| 原始字段 | 含义 | 输出字段 |
|---------|------|---------|
| Std_class | 职位分类 | job_category |
| aca112 | 职位名称 | job_title |
| AAB004 | 公司名称 | company |
| acb241 | 薪资范围 | salary |
| aab302 | 工作地点 | location |
| aae397 | 发布日期 | publish_date |
| Collect_time | 采集时间 | collect_time |
| ACE760 | 职位链接 | url |
| acb22a | 职位描述 | description |
| Experience | 经验要求 | experience |
| aac011 | 学历要求 | education |
### 6.2 Kafka消息格式
```json
{
"id": "uuid",
"job_category": "机动车司机/驾驶",
"job_title": "保底1万+五险+港内A2驾驶员",
"company": "青岛唐盛物流有限公司",
"salary": "1-1.5万",
"location": "青岛黄岛区",
"publish_date": "2026-01-13",
"collect_time": "2026-01-15",
"url": "https://www.zhaopin.com/...",
"description": "...",
"experience": "5-10年",
"education": "学历不限",
"crawl_time": "2026-01-15T10:30:00"
}
```
---
## 7. 部署流程
### 7.1 Docker Compose 一键部署(推荐)
```bash
# 1. 配置环境变量
cd job_crawler
cp .env.example .env
# 编辑 .env 填入 API_USERNAME 和 API_PASSWORD
# 2. 启动所有服务Zookeeper + Kafka + App
docker-compose up -d
# 3. 查看日志
docker-compose logs -f app
# 4. 停止服务
docker-compose down
```
### 7.2 单独构建镜像
```bash
# 构建镜像
docker build -t job-crawler:latest .
# 推送到私有仓库(可选)
docker tag job-crawler:latest your-registry/job-crawler:latest
docker push your-registry/job-crawler:latest
```
### 7.3 Kubernetes 部署(可选)
```yaml
# 示例 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: job-crawler
spec:
replicas: 1
selector:
matchLabels:
app: job-crawler
template:
spec:
containers:
- name: job-crawler
image: job-crawler:latest
ports:
- containerPort: 8000
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka:9092"
envFrom:
- secretRef:
name: job-crawler-secrets
```
### 7.4 服务端口
| 服务 | 端口 | 说明 |
|------|------|------|
| FastAPI | 8000 | HTTP API |
| Kafka | 9092 | 外部访问 |
| Kafka | 29092 | 容器内部访问 |
| Zookeeper | 2181 | Kafka协调 |
---
## 8. 配置说明
### 配置文件 `config/config.yml`
```yaml
# 应用配置
app:
name: job-crawler
version: 1.0.0
debug: false
# 八爪鱼API配置
api:
base_url: https://openapi.bazhuayu.com
task_id: 00f3b445-d8ec-44e8-88b2-4b971a228b1e
username: "your_username"
password: "your_password"
batch_size: 100
# Kafka配置
kafka:
bootstrap_servers: kafka:29092 # Docker内部网络
topic: job_data
consumer_group: job_consumer_group
# 采集配置
crawler:
interval: 300 # 采集间隔(秒)
filter_days: 7 # 过滤天数
# 数据库配置
database:
path: /app/data/crawl_progress.db
```
### 配置加载优先级
1. 环境变量 `CONFIG_PATH` 指定配置文件路径
2. 默认路径 `config/config.yml`
### Docker挂载
```yaml
# docker-compose.yml
volumes:
- ./config:/app/config:ro # 配置文件(只读)
- app_data:/app/data # 数据持久化
```
---
## 9. 异常处理
| 异常场景 | 处理策略 |
|---------|---------|
| API请求失败 | 重试3次指数退避 |
| Token过期 | 返回错误,需手动更新 |
| Kafka连接失败 | 重试连接,数据暂存本地 |
| 日期解析失败 | 记录日志,跳过该条数据 |
---
## 10. 监控指标
- 采集进度百分比
- Kafka消息堆积量(lag)
- 每分钟采集条数
- 过滤后有效数据比例
- API响应时间
---
## 11. 后续扩展
1. **多任务支持**: 支持配置多个taskId并行采集
2. **数据去重**: 基于职位URL去重
3. **告警通知**: 采集异常时发送通知
4. **Web管理界面**: 可视化监控采集状态
---
## 12. Docker 镜像构建
### Dockerfile 说明
```dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends gcc
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY app/ ./app/
# 创建数据目录
RUN mkdir -p /app/data
# 环境变量
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
```
### 构建命令
```bash
# 构建
docker build -t job-crawler:latest .
# 运行测试
docker run --rm -p 8000:8000 \
-e API_USERNAME=xxx \
-e API_PASSWORD=xxx \
-e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 \
job-crawler:latest
```
---
## 13. 代码分层说明
| 层级 | 目录 | 职责 |
|------|------|------|
| API层 | `app/api/` | 路由定义、请求处理、响应格式化 |
| 服务层 | `app/services/` | 业务逻辑、外部服务调用 |
| 模型层 | `app/models/` | 数据结构定义、数据转换 |
| 工具层 | `app/utils/` | 通用工具函数 |
| 核心层 | `app/core/` | 配置、日志等基础设施 |
---
## 14. 快速启动
```bash
# 1. 配置
cd job_crawler
cp config/config.yml.docker config/config.yml
# 编辑 config/config.yml 填入账号密码
# 2. 一键启动
docker-compose up -d
# 3. 访问API文档
# http://localhost:8000/docs
# 4. 启动采集
curl -X POST http://localhost:8000/crawl/start
# 5. 查看进度
curl http://localhost:8000/status
# 6. 消费数据
curl http://localhost:8000/consume?batch_size=10
```
---
## 15. Token自动刷新机制
系统实现了Token自动管理
1. 首次请求时自动获取Token
2. Token缓存在内存中
3. 请求前检查Token有效期提前5分钟刷新
4. 遇到401错误自动重新获取Token
```python
# app/services/api_client.py 核心逻辑
async def _get_token(self) -> str:
# 检查token是否有效提前5分钟刷新
if self._access_token and time.time() < self._token_expires_at - 300:
return self._access_token
# 调用 /token 接口获取新token
response = await client.post(f"{self.base_url}/token", json={
"username": self.username,
"password": self.password,
"grant_type": "password"
})
self._access_token = token_data.get("access_token")
self._token_expires_at = time.time() + expires_in
```