Files
ocups-kafka/docs/技术方案.md
2026-01-15 22:08:12 +08:00

15 KiB
Raw Permalink Blame History

招聘数据增量采集与消息队列服务技术方案

1. 项目概述

1.1 需求背景

从八爪鱼API采集招聘数据筛选近7天发布的数据通过内置Kafka服务提供消息队列供外部系统消费。

1.2 核心功能

  • 增量采集八爪鱼API招聘数据
  • 日期过滤(发布日期 + 采集时间均在7天内
  • 内置Kafka服务
  • 提供REST API消费接口

2. 系统架构

┌─────────────────────────────────────────────────────────────────┐
│                         系统架构图                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐  │
│  │  八爪鱼API   │───▶│  采集服务    │───▶│   日期过滤器     │  │
│  │  (数据源)    │    │  (增量采集)  │    │  (7天内数据)     │  │
│  └──────────────┘    └──────────────┘    └────────┬─────────┘  │
│                                                    │            │
│                                                    ▼            │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    内置 Kafka 服务                        │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐   │  │
│  │  │  Zookeeper  │  │   Broker    │  │  Topic:job_data │   │  │
│  │  │  (Docker)   │  │  (Docker)   │  │                 │   │  │
│  │  └─────────────┘  └─────────────┘  └─────────────────┘   │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                    │            │
│                                                    ▼            │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    FastAPI 服务                           │  │
│  │  ┌─────────────────┐  ┌─────────────────────────────┐    │  │
│  │  │ GET /consume    │  │ GET /status                 │    │  │
│  │  │ (消费数据)      │  │ (采集状态/进度)             │    │  │
│  │  └─────────────────┘  └─────────────────────────────┘    │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

3. 技术选型

组件 技术方案 版本 说明
运行环境 Python 3.10+ 主开发语言
HTTP客户端 httpx 0.27+ 异步HTTP请求
消息队列 Kafka 3.6+ Docker部署
Kafka客户端 kafka-python 2.0+ Python Kafka SDK
API框架 FastAPI 0.109+ REST接口
容器编排 Docker Compose 2.0+ Kafka/Zookeeper部署
任务调度 APScheduler 3.10+ 定时增量采集
数据存储 SQLite 内置 存储采集进度(offset)

4. 项目结构

job_crawler/
├── app/                        # 应用代码
│   ├── api/                    # API路由层
│   │   ├── __init__.py
│   │   └── routes.py           # 路由定义
│   ├── core/                   # 核心配置
│   │   ├── __init__.py
│   │   ├── config.py           # 配置管理
│   │   └── logging.py          # 日志配置
│   ├── models/                 # 数据模型
│   │   ├── __init__.py
│   │   ├── job.py              # 招聘数据模型
│   │   ├── progress.py         # 进度模型
│   │   └── response.py         # 响应模型
│   ├── services/               # 业务服务层
│   │   ├── __init__.py
│   │   ├── api_client.py       # 八爪鱼API客户端
│   │   ├── crawler.py          # 采集核心逻辑
│   │   ├── kafka_service.py    # Kafka服务
│   │   └── progress_store.py   # 进度存储
│   ├── utils/                  # 工具函数
│   │   ├── __init__.py
│   │   └── date_parser.py      # 日期解析
│   ├── __init__.py
│   └── main.py                 # 应用入口
├── docker-compose.yml          # 容器编排含Kafka+App
├── Dockerfile                  # 应用镜像构建
├── requirements.txt            # Python依赖
├── .env.example               # 配置模板
├── .dockerignore              # Docker忽略文件
└── README.md                  # 使用说明

5. 核心模块设计

5.1 增量采集模块

采集策略

# 增量采集流程
1. 读取上次采集的offset首次为0
2. 调用API: GET /data/all?taskId=xxx&offset={offset}&size=100
3. 解析返回数据过滤近7天数据
4. 推送到Kafka
5. 更新offset = offset + size
6. 循环直到 offset >= total

进度持久化

使用SQLite存储采集进度

CREATE TABLE crawl_progress (
    task_id TEXT PRIMARY KEY,
    current_offset INTEGER,
    total INTEGER,
    last_update TIMESTAMP
);

5.2 日期过滤模块

aae397 字段格式解析

原始值 解析规则 示例结果
"今天" 当前日期 2026-01-15
"1月13日" 当年+月日 2026-01-13
"1月9日" 当年+月日 2026-01-09

过滤逻辑

def is_within_7_days(aae397: str, collect_time: str) -> bool:
    """
    判断数据是否在近7天内
    条件:发布日期 AND 采集时间 都在7天内
    """
    today = datetime.now().date()
    seven_days_ago = today - timedelta(days=7)
    
    publish_date = parse_aae397(aae397)  # 解析发布日期
    collect_date = parse_collect_time(collect_time)  # 解析采集时间
    
    return publish_date >= seven_days_ago and collect_date >= seven_days_ago

5.3 Kafka服务模块

Docker Compose配置

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

Topic设计

  • Topic名称: job_data
  • 分区数: 3
  • 副本数: 1
  • 消息格式: JSON

5.4 REST API接口

接口 方法 说明
/consume GET 消费Kafka数据支持batch_size参数
/consume/stream GET SSE流式消费
/status GET 查看采集进度和状态
/crawl/start POST 手动触发采集任务
/crawl/stop POST 停止采集任务

接口详情

GET /consume

// Request
GET /consume?batch_size=10

// Response
{
  "code": 0,
  "data": [
    {
      "job_title": "机动车司机/驾驶",
      "company": "青岛唐盛物流有限公司",
      "salary": "1-1.5万",
      "location": "青岛黄岛区",
      "publish_date": "2026-01-13",
      "collect_time": "2026-01-15",
      "url": "https://www.zhaopin.com/..."
    }
  ],
  "count": 10
}

GET /status

{
  "code": 0,
  "data": {
    "task_id": "00f3b445-d8ec-44e8-88b2-4b971a228b1e",
    "total": 257449,
    "current_offset": 156700,
    "progress": "60.87%",
    "kafka_lag": 1234,
    "status": "running",
    "last_update": "2026-01-15T10:30:00"
  }
}

6. 数据模型

6.1 原始数据字段映射

原始字段 含义 输出字段
Std_class 职位分类 job_category
aca112 职位名称 job_title
AAB004 公司名称 company
acb241 薪资范围 salary
aab302 工作地点 location
aae397 发布日期 publish_date
Collect_time 采集时间 collect_time
ACE760 职位链接 url
acb22a 职位描述 description
Experience 经验要求 experience
aac011 学历要求 education

6.2 Kafka消息格式

{
  "id": "uuid",
  "job_category": "机动车司机/驾驶",
  "job_title": "保底1万+五险+港内A2驾驶员",
  "company": "青岛唐盛物流有限公司",
  "salary": "1-1.5万",
  "location": "青岛黄岛区",
  "publish_date": "2026-01-13",
  "collect_time": "2026-01-15",
  "url": "https://www.zhaopin.com/...",
  "description": "...",
  "experience": "5-10年",
  "education": "学历不限",
  "crawl_time": "2026-01-15T10:30:00"
}

7. 部署流程

7.1 Docker Compose 一键部署(推荐)

# 1. 配置环境变量
cd job_crawler
cp .env.example .env
# 编辑 .env 填入 API_USERNAME 和 API_PASSWORD

# 2. 启动所有服务Zookeeper + Kafka + App
docker-compose up -d

# 3. 查看日志
docker-compose logs -f app

# 4. 停止服务
docker-compose down

7.2 单独构建镜像

# 构建镜像
docker build -t job-crawler:latest .

# 推送到私有仓库(可选)
docker tag job-crawler:latest your-registry/job-crawler:latest
docker push your-registry/job-crawler:latest

7.3 Kubernetes 部署(可选)

# 示例 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: job-crawler
spec:
  replicas: 1
  selector:
    matchLabels:
      app: job-crawler
  template:
    spec:
      containers:
      - name: job-crawler
        image: job-crawler:latest
        ports:
        - containerPort: 8000
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka:9092"
        envFrom:
        - secretRef:
            name: job-crawler-secrets

7.4 服务端口

服务 端口 说明
FastAPI 8000 HTTP API
Kafka 9092 外部访问
Kafka 29092 容器内部访问
Zookeeper 2181 Kafka协调

8. 配置说明

配置文件 config/config.yml

# 应用配置
app:
  name: job-crawler
  version: 1.0.0
  debug: false

# 八爪鱼API配置
api:
  base_url: https://openapi.bazhuayu.com
  task_id: 00f3b445-d8ec-44e8-88b2-4b971a228b1e
  username: "your_username"
  password: "your_password"
  batch_size: 100

# Kafka配置
kafka:
  bootstrap_servers: kafka:29092  # Docker内部网络
  topic: job_data
  consumer_group: job_consumer_group

# 采集配置
crawler:
  interval: 300          # 采集间隔(秒)
  filter_days: 7         # 过滤天数

# 数据库配置
database:
  path: /app/data/crawl_progress.db

配置加载优先级

  1. 环境变量 CONFIG_PATH 指定配置文件路径
  2. 默认路径 config/config.yml

Docker挂载

# docker-compose.yml
volumes:
  - ./config:/app/config:ro    # 配置文件(只读)
  - app_data:/app/data         # 数据持久化

9. 异常处理

异常场景 处理策略
API请求失败 重试3次指数退避
Token过期 返回错误,需手动更新
Kafka连接失败 重试连接,数据暂存本地
日期解析失败 记录日志,跳过该条数据

10. 监控指标

  • 采集进度百分比
  • Kafka消息堆积量(lag)
  • 每分钟采集条数
  • 过滤后有效数据比例
  • API响应时间

11. 后续扩展

  1. 多任务支持: 支持配置多个taskId并行采集
  2. 数据去重: 基于职位URL去重
  3. 告警通知: 采集异常时发送通知
  4. Web管理界面: 可视化监控采集状态

12. Docker 镜像构建

Dockerfile 说明

FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends gcc

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY app/ ./app/

# 创建数据目录
RUN mkdir -p /app/data

# 环境变量
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

构建命令

# 构建
docker build -t job-crawler:latest .

# 运行测试
docker run --rm -p 8000:8000 \
  -e API_USERNAME=xxx \
  -e API_PASSWORD=xxx \
  -e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 \
  job-crawler:latest

13. 代码分层说明

层级 目录 职责
API层 app/api/ 路由定义、请求处理、响应格式化
服务层 app/services/ 业务逻辑、外部服务调用
模型层 app/models/ 数据结构定义、数据转换
工具层 app/utils/ 通用工具函数
核心层 app/core/ 配置、日志等基础设施

14. 快速启动

# 1. 配置
cd job_crawler
cp config/config.yml.docker config/config.yml
# 编辑 config/config.yml 填入账号密码

# 2. 一键启动
docker-compose up -d

# 3. 访问API文档
# http://localhost:8000/docs

# 4. 启动采集
curl -X POST http://localhost:8000/crawl/start

# 5. 查看进度
curl http://localhost:8000/status

# 6. 消费数据
curl http://localhost:8000/consume?batch_size=10

15. Token自动刷新机制

系统实现了Token自动管理

  1. 首次请求时自动获取Token
  2. Token缓存在内存中
  3. 请求前检查Token有效期提前5分钟刷新
  4. 遇到401错误自动重新获取Token
# app/services/api_client.py 核心逻辑
async def _get_token(self) -> str:
    # 检查token是否有效提前5分钟刷新
    if self._access_token and time.time() < self._token_expires_at - 300:
        return self._access_token
    
    # 调用 /token 接口获取新token
    response = await client.post(f"{self.base_url}/token", json={
        "username": self.username,
        "password": self.password,
        "grant_type": "password"
    })
    
    self._access_token = token_data.get("access_token")
    self._token_expires_at = time.time() + expires_in