From ae681575b99c589f0379ef4f78702e13e1034c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=A1=BA=E4=B8=9C?= <577732344@qq.com> Date: Thu, 15 Jan 2026 17:09:43 +0800 Subject: [PATCH] feat(job_crawler): initialize job crawler service with kafka integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add technical documentation (技术方案.md) with system architecture and design details - Create FastAPI application structure with modular organization (api, core, models, services, utils) - Implement job data crawler service with incremental collection from third-party API - Add Kafka service integration with Docker Compose configuration for message queue - Create data models for job listings, progress tracking, and API responses - Implement REST API endpoints for data consumption (/consume, /status) and task management - Add progress persistence layer using SQLite for tracking collection offsets - Implement date filtering logic to extract data published within 7 days - Create API client service for third-party data source integration - Add configuration management with environment-based settings - Include Docker support with Dockerfile and docker-compose.yml for containerized deployment - Add logging configuration and utility functions for date parsing - Include requirements.txt with all Python dependencies and README documentation --- docs/技术方案.md | 540 +++++++++++++++++++++ job_crawler/.dockerignore | 17 + job_crawler/Dockerfile | 31 ++ job_crawler/README.md | 135 ++++++ job_crawler/app/__init__.py | 2 + job_crawler/app/api/__init__.py | 4 + job_crawler/app/api/routes.py | 112 +++++ job_crawler/app/core/__init__.py | 5 + job_crawler/app/core/config.py | 89 ++++ job_crawler/app/core/logging.py | 22 + job_crawler/app/main.py | 35 ++ job_crawler/app/models/__init__.py | 13 + job_crawler/app/models/job.py | 60 +++ job_crawler/app/models/progress.py | 24 + job_crawler/app/models/response.py | 23 + job_crawler/app/services/__init__.py | 12 + job_crawler/app/services/api_client.py | 91 ++++ job_crawler/app/services/crawler.py | 209 ++++++++ job_crawler/app/services/kafka_service.py | 138 ++++++ job_crawler/app/services/progress_store.py | 95 ++++ job_crawler/app/utils/__init__.py | 4 + job_crawler/app/utils/date_parser.py | 74 +++ job_crawler/config/config.yml | 41 ++ job_crawler/config/config.yml.docker | 39 ++ job_crawler/docker-compose.yml | 75 +++ job_crawler/requirements.txt | 8 + 26 files changed, 1898 insertions(+) create mode 100644 docs/技术方案.md create mode 100644 job_crawler/.dockerignore create mode 100644 job_crawler/Dockerfile create mode 100644 job_crawler/README.md create mode 100644 job_crawler/app/__init__.py create mode 100644 job_crawler/app/api/__init__.py create mode 100644 job_crawler/app/api/routes.py create mode 100644 job_crawler/app/core/__init__.py create mode 100644 job_crawler/app/core/config.py create mode 100644 job_crawler/app/core/logging.py create mode 100644 job_crawler/app/main.py create mode 100644 job_crawler/app/models/__init__.py create mode 100644 job_crawler/app/models/job.py create mode 100644 job_crawler/app/models/progress.py create mode 100644 job_crawler/app/models/response.py create mode 100644 job_crawler/app/services/__init__.py create mode 100644 job_crawler/app/services/api_client.py create mode 100644 job_crawler/app/services/crawler.py create mode 100644 job_crawler/app/services/kafka_service.py create mode 100644 job_crawler/app/services/progress_store.py create mode 100644 job_crawler/app/utils/__init__.py create mode 100644 job_crawler/app/utils/date_parser.py create mode 100644 job_crawler/config/config.yml create mode 100644 job_crawler/config/config.yml.docker create mode 100644 job_crawler/docker-compose.yml create mode 100644 job_crawler/requirements.txt diff --git a/docs/技术方案.md b/docs/技术方案.md new file mode 100644 index 0000000..48b267d --- /dev/null +++ b/docs/技术方案.md @@ -0,0 +1,540 @@ +# 招聘数据增量采集与消息队列服务技术方案 + +## 1. 项目概述 + +### 1.1 需求背景 +从八爪鱼API采集招聘数据,筛选近7天发布的数据,通过内置Kafka服务提供消息队列,供外部系统消费。 + +### 1.2 核心功能 +- 增量采集八爪鱼API招聘数据 +- 日期过滤(发布日期 + 采集时间均在7天内) +- 内置Kafka服务 +- 提供REST API消费接口 + +--- + +## 2. 系统架构 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 系统架构图 │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ 八爪鱼API │───▶│ 采集服务 │───▶│ 日期过滤器 │ │ +│ │ (数据源) │ │ (增量采集) │ │ (7天内数据) │ │ +│ └──────────────┘ └──────────────┘ └────────┬─────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ 内置 Kafka 服务 │ │ +│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ +│ │ │ Zookeeper │ │ Broker │ │ Topic:job_data │ │ │ +│ │ │ (Docker) │ │ (Docker) │ │ │ │ │ +│ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ FastAPI 服务 │ │ +│ │ ┌─────────────────┐ ┌─────────────────────────────┐ │ │ +│ │ │ GET /consume │ │ GET /status │ │ │ +│ │ │ (消费数据) │ │ (采集状态/进度) │ │ │ +│ │ └─────────────────┘ └─────────────────────────────┘ │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 3. 技术选型 + +| 组件 | 技术方案 | 版本 | 说明 | +|------|---------|------|------| +| 运行环境 | Python | 3.10+ | 主开发语言 | +| HTTP客户端 | httpx | 0.27+ | 异步HTTP请求 | +| 消息队列 | Kafka | 3.6+ | Docker部署 | +| Kafka客户端 | kafka-python | 2.0+ | Python Kafka SDK | +| API框架 | FastAPI | 0.109+ | REST接口 | +| 容器编排 | Docker Compose | 2.0+ | Kafka/Zookeeper部署 | +| 任务调度 | APScheduler | 3.10+ | 定时增量采集 | +| 数据存储 | SQLite | 内置 | 存储采集进度(offset) | + +--- + +## 4. 项目结构 + +``` +job_crawler/ +├── app/ # 应用代码 +│ ├── api/ # API路由层 +│ │ ├── __init__.py +│ │ └── routes.py # 路由定义 +│ ├── core/ # 核心配置 +│ │ ├── __init__.py +│ │ ├── config.py # 配置管理 +│ │ └── logging.py # 日志配置 +│ ├── models/ # 数据模型 +│ │ ├── __init__.py +│ │ ├── job.py # 招聘数据模型 +│ │ ├── progress.py # 进度模型 +│ │ └── response.py # 响应模型 +│ ├── services/ # 业务服务层 +│ │ ├── __init__.py +│ │ ├── api_client.py # 八爪鱼API客户端 +│ │ ├── crawler.py # 采集核心逻辑 +│ │ ├── kafka_service.py # Kafka服务 +│ │ └── progress_store.py # 进度存储 +│ ├── utils/ # 工具函数 +│ │ ├── __init__.py +│ │ └── date_parser.py # 日期解析 +│ ├── __init__.py +│ └── main.py # 应用入口 +├── docker-compose.yml # 容器编排(含Kafka+App) +├── Dockerfile # 应用镜像构建 +├── requirements.txt # Python依赖 +├── .env.example # 配置模板 +├── .dockerignore # Docker忽略文件 +└── README.md # 使用说明 +``` + +--- + +## 5. 核心模块设计 + +### 5.1 增量采集模块 + +#### 采集策略 +```python +# 增量采集流程 +1. 读取上次采集的offset(首次为0) +2. 调用API: GET /data/all?taskId=xxx&offset={offset}&size=100 +3. 解析返回数据,过滤近7天数据 +4. 推送到Kafka +5. 更新offset = offset + size +6. 循环直到 offset >= total +``` + +#### 进度持久化 +使用SQLite存储采集进度: +```sql +CREATE TABLE crawl_progress ( + task_id TEXT PRIMARY KEY, + current_offset INTEGER, + total INTEGER, + last_update TIMESTAMP +); +``` + +### 5.2 日期过滤模块 + +#### aae397 字段格式解析 + +| 原始值 | 解析规则 | 示例结果 | +|--------|---------|---------| +| "今天" | 当前日期 | 2026-01-15 | +| "1月13日" | 当年+月日 | 2026-01-13 | +| "1月9日" | 当年+月日 | 2026-01-09 | + +#### 过滤逻辑 +```python +def is_within_7_days(aae397: str, collect_time: str) -> bool: + """ + 判断数据是否在近7天内 + 条件:发布日期 AND 采集时间 都在7天内 + """ + today = datetime.now().date() + seven_days_ago = today - timedelta(days=7) + + publish_date = parse_aae397(aae397) # 解析发布日期 + collect_date = parse_collect_time(collect_time) # 解析采集时间 + + return publish_date >= seven_days_ago and collect_date >= seven_days_ago +``` + +### 5.3 Kafka服务模块 + +#### Docker Compose配置 +```yaml +version: '3.8' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:7.5.0 + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zookeeper +``` + +#### Topic设计 +- Topic名称: `job_data` +- 分区数: 3 +- 副本数: 1 +- 消息格式: JSON + +### 5.4 REST API接口 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/consume` | GET | 消费Kafka数据,支持batch_size参数 | +| `/consume/stream` | GET | SSE流式消费 | +| `/status` | GET | 查看采集进度和状态 | +| `/crawl/start` | POST | 手动触发采集任务 | +| `/crawl/stop` | POST | 停止采集任务 | + +#### 接口详情 + +**GET /consume** +```json +// Request +GET /consume?batch_size=10 + +// Response +{ + "code": 0, + "data": [ + { + "job_title": "机动车司机/驾驶", + "company": "青岛唐盛物流有限公司", + "salary": "1-1.5万", + "location": "青岛黄岛区", + "publish_date": "2026-01-13", + "collect_time": "2026-01-15", + "url": "https://www.zhaopin.com/..." + } + ], + "count": 10 +} +``` + +**GET /status** +```json +{ + "code": 0, + "data": { + "task_id": "00f3b445-d8ec-44e8-88b2-4b971a228b1e", + "total": 257449, + "current_offset": 156700, + "progress": "60.87%", + "kafka_lag": 1234, + "status": "running", + "last_update": "2026-01-15T10:30:00" + } +} +``` + +--- + +## 6. 数据模型 + +### 6.1 原始数据字段映射 + +| 原始字段 | 含义 | 输出字段 | +|---------|------|---------| +| Std_class | 职位分类 | job_category | +| aca112 | 职位名称 | job_title | +| AAB004 | 公司名称 | company | +| acb241 | 薪资范围 | salary | +| aab302 | 工作地点 | location | +| aae397 | 发布日期 | publish_date | +| Collect_time | 采集时间 | collect_time | +| ACE760 | 职位链接 | url | +| acb22a | 职位描述 | description | +| Experience | 经验要求 | experience | +| aac011 | 学历要求 | education | + +### 6.2 Kafka消息格式 +```json +{ + "id": "uuid", + "job_category": "机动车司机/驾驶", + "job_title": "保底1万+五险+港内A2驾驶员", + "company": "青岛唐盛物流有限公司", + "salary": "1-1.5万", + "location": "青岛黄岛区", + "publish_date": "2026-01-13", + "collect_time": "2026-01-15", + "url": "https://www.zhaopin.com/...", + "description": "...", + "experience": "5-10年", + "education": "学历不限", + "crawl_time": "2026-01-15T10:30:00" +} +``` + +--- + +## 7. 部署流程 + +### 7.1 Docker Compose 一键部署(推荐) + +```bash +# 1. 配置环境变量 +cd job_crawler +cp .env.example .env +# 编辑 .env 填入 API_USERNAME 和 API_PASSWORD + +# 2. 启动所有服务(Zookeeper + Kafka + App) +docker-compose up -d + +# 3. 查看日志 +docker-compose logs -f app + +# 4. 停止服务 +docker-compose down +``` + +### 7.2 单独构建镜像 + +```bash +# 构建镜像 +docker build -t job-crawler:latest . + +# 推送到私有仓库(可选) +docker tag job-crawler:latest your-registry/job-crawler:latest +docker push your-registry/job-crawler:latest +``` + +### 7.3 Kubernetes 部署(可选) + +```yaml +# 示例 Deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: job-crawler +spec: + replicas: 1 + selector: + matchLabels: + app: job-crawler + template: + spec: + containers: + - name: job-crawler + image: job-crawler:latest + ports: + - containerPort: 8000 + env: + - name: KAFKA_BOOTSTRAP_SERVERS + value: "kafka:9092" + envFrom: + - secretRef: + name: job-crawler-secrets +``` + +### 7.4 服务端口 +| 服务 | 端口 | 说明 | +|------|------|------| +| FastAPI | 8000 | HTTP API | +| Kafka | 9092 | 外部访问 | +| Kafka | 29092 | 容器内部访问 | +| Zookeeper | 2181 | Kafka协调 | + +--- + +## 8. 配置说明 + +### 配置文件 `config/config.yml` + +```yaml +# 应用配置 +app: + name: job-crawler + version: 1.0.0 + debug: false + +# 八爪鱼API配置 +api: + base_url: https://openapi.bazhuayu.com + task_id: 00f3b445-d8ec-44e8-88b2-4b971a228b1e + username: "your_username" + password: "your_password" + batch_size: 100 + +# Kafka配置 +kafka: + bootstrap_servers: kafka:29092 # Docker内部网络 + topic: job_data + consumer_group: job_consumer_group + +# 采集配置 +crawler: + interval: 300 # 采集间隔(秒) + filter_days: 7 # 过滤天数 + +# 数据库配置 +database: + path: /app/data/crawl_progress.db +``` + +### 配置加载优先级 + +1. 环境变量 `CONFIG_PATH` 指定配置文件路径 +2. 默认路径 `config/config.yml` + +### Docker挂载 + +```yaml +# docker-compose.yml +volumes: + - ./config:/app/config:ro # 配置文件(只读) + - app_data:/app/data # 数据持久化 +``` + +--- + +## 9. 异常处理 + +| 异常场景 | 处理策略 | +|---------|---------| +| API请求失败 | 重试3次,指数退避 | +| Token过期 | 返回错误,需手动更新 | +| Kafka连接失败 | 重试连接,数据暂存本地 | +| 日期解析失败 | 记录日志,跳过该条数据 | + +--- + +## 10. 监控指标 + +- 采集进度百分比 +- Kafka消息堆积量(lag) +- 每分钟采集条数 +- 过滤后有效数据比例 +- API响应时间 + +--- + +## 11. 后续扩展 + +1. **多任务支持**: 支持配置多个taskId并行采集 +2. **数据去重**: 基于职位URL去重 +3. **告警通知**: 采集异常时发送通知 +4. **Web管理界面**: 可视化监控采集状态 + +--- + +## 12. Docker 镜像构建 + +### Dockerfile 说明 + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app + +# 安装系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends gcc + +# 安装Python依赖 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# 复制应用代码 +COPY app/ ./app/ + +# 创建数据目录 +RUN mkdir -p /app/data + +# 环境变量 +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +``` + +### 构建命令 + +```bash +# 构建 +docker build -t job-crawler:latest . + +# 运行测试 +docker run --rm -p 8000:8000 \ + -e API_USERNAME=xxx \ + -e API_PASSWORD=xxx \ + -e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 \ + job-crawler:latest +``` + +--- + +## 13. 代码分层说明 + +| 层级 | 目录 | 职责 | +|------|------|------| +| API层 | `app/api/` | 路由定义、请求处理、响应格式化 | +| 服务层 | `app/services/` | 业务逻辑、外部服务调用 | +| 模型层 | `app/models/` | 数据结构定义、数据转换 | +| 工具层 | `app/utils/` | 通用工具函数 | +| 核心层 | `app/core/` | 配置、日志等基础设施 | + +--- + +## 14. 快速启动 + +```bash +# 1. 配置 +cd job_crawler +cp config/config.yml.docker config/config.yml +# 编辑 config/config.yml 填入账号密码 + +# 2. 一键启动 +docker-compose up -d + +# 3. 访问API文档 +# http://localhost:8000/docs + +# 4. 启动采集 +curl -X POST http://localhost:8000/crawl/start + +# 5. 查看进度 +curl http://localhost:8000/status + +# 6. 消费数据 +curl http://localhost:8000/consume?batch_size=10 +``` + +--- + +## 15. Token自动刷新机制 + +系统实现了Token自动管理: + +1. 首次请求时自动获取Token +2. Token缓存在内存中 +3. 请求前检查Token有效期(提前5分钟刷新) +4. 遇到401错误自动重新获取Token + +```python +# app/services/api_client.py 核心逻辑 +async def _get_token(self) -> str: + # 检查token是否有效(提前5分钟刷新) + if self._access_token and time.time() < self._token_expires_at - 300: + return self._access_token + + # 调用 /token 接口获取新token + response = await client.post(f"{self.base_url}/token", json={ + "username": self.username, + "password": self.password, + "grant_type": "password" + }) + + self._access_token = token_data.get("access_token") + self._token_expires_at = time.time() + expires_in +``` diff --git a/job_crawler/.dockerignore b/job_crawler/.dockerignore new file mode 100644 index 0000000..2491c88 --- /dev/null +++ b/job_crawler/.dockerignore @@ -0,0 +1,17 @@ +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +.env +.venv +env/ +venv/ +.git +.gitignore +*.md +*.db +data/ +.idea/ +.vscode/ +*.log diff --git a/job_crawler/Dockerfile b/job_crawler/Dockerfile new file mode 100644 index 0000000..6bdb36d --- /dev/null +++ b/job_crawler/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.11-slim + +WORKDIR /app + +# 安装系统依赖 +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# 复制依赖文件 +COPY requirements.txt . + +# 安装Python依赖 +RUN pip install --no-cache-dir -r requirements.txt + +# 复制应用代码 +COPY app/ ./app/ + +# 创建目录 +RUN mkdir -p /app/data /app/config + +# 设置环境变量 +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 +ENV CONFIG_PATH=/app/config/config.yml + +# 暴露端口 +EXPOSE 8000 + +# 启动命令 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/job_crawler/README.md b/job_crawler/README.md new file mode 100644 index 0000000..1d6a50c --- /dev/null +++ b/job_crawler/README.md @@ -0,0 +1,135 @@ +# 招聘数据增量采集服务 + +从八爪鱼API采集招聘数据,筛选近7天发布的数据,通过内置Kafka服务提供消息队列供外部消费。 + +## 项目结构 + +``` +job_crawler/ +├── app/ # 应用代码 +│ ├── api/ # API路由 +│ ├── core/ # 核心配置 +│ ├── models/ # 数据模型 +│ ├── services/ # 业务服务 +│ ├── utils/ # 工具函数 +│ └── main.py +├── config/ # 配置文件目录(挂载) +│ ├── config.yml # 配置文件 +│ └── config.yml.docker # Docker配置模板 +├── docker-compose.yml +├── Dockerfile +└── requirements.txt +``` + +## 快速开始 + +### 1. 配置 + +```bash +cd job_crawler + +# 复制配置模板 +cp config/config.yml.docker config/config.yml + +# 编辑配置文件,填入账号密码 +vim config/config.yml +``` + +### 2. 启动服务 + +```bash +# 启动所有服务 +docker-compose up -d + +# 查看日志 +docker-compose logs -f app +``` + +### 3. 单独构建镜像 + +```bash +# 构建镜像 +docker build -t job-crawler:latest . + +# 运行(挂载配置文件) +docker run -d \ + --name job-crawler \ + -p 8000:8000 \ + -v $(pwd)/config:/app/config:ro \ + -v job_data:/app/data \ + job-crawler:latest +``` + +## 配置文件说明 + +`config/config.yml`: + +```yaml +app: + name: job-crawler + debug: false + +api: + base_url: https://openapi.bazhuayu.com + username: "your_username" + password: "your_password" + batch_size: 100 + # 多任务配置 + tasks: + - id: "task-id-1" + name: "青岛招聘数据" + enabled: true + - id: "task-id-2" + name: "上海招聘数据" + enabled: true + - id: "task-id-3" + name: "北京招聘数据" + enabled: false # 禁用 + +kafka: + bootstrap_servers: kafka:29092 + topic: job_data + +crawler: + filter_days: 7 + max_workers: 5 # 最大并行任务数 + +database: + path: /app/data/crawl_progress.db +``` + +## API接口 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/tasks` | GET | 获取所有任务列表 | +| `/status` | GET | 查看采集状态(支持task_id参数) | +| `/crawl/start` | POST | 启动采集(支持task_id参数) | +| `/crawl/stop` | POST | 停止采集(支持task_id参数) | +| `/consume` | GET | 消费数据 | +| `/health` | GET | 健康检查 | + +### 使用示例 + +```bash +# 查看所有任务 +curl http://localhost:8000/tasks + +# 查看所有任务状态 +curl http://localhost:8000/status + +# 查看单个任务状态 +curl "http://localhost:8000/status?task_id=xxx" + +# 启动所有任务 +curl -X POST http://localhost:8000/crawl/start + +# 启动单个任务 +curl -X POST "http://localhost:8000/crawl/start?task_id=xxx" + +# 停止所有任务 +curl -X POST http://localhost:8000/crawl/stop + +# 消费数据 +curl "http://localhost:8000/consume?batch_size=10" +``` diff --git a/job_crawler/app/__init__.py b/job_crawler/app/__init__.py new file mode 100644 index 0000000..3bd787c --- /dev/null +++ b/job_crawler/app/__init__.py @@ -0,0 +1,2 @@ +"""招聘数据采集服务""" +__version__ = "1.0.0" diff --git a/job_crawler/app/api/__init__.py b/job_crawler/app/api/__init__.py new file mode 100644 index 0000000..339d8cb --- /dev/null +++ b/job_crawler/app/api/__init__.py @@ -0,0 +1,4 @@ +"""API路由模块""" +from .routes import router + +__all__ = ["router"] diff --git a/job_crawler/app/api/routes.py b/job_crawler/app/api/routes.py new file mode 100644 index 0000000..8ffbcfd --- /dev/null +++ b/job_crawler/app/api/routes.py @@ -0,0 +1,112 @@ +"""API路由""" +import json +import logging +from typing import Optional +from fastapi import APIRouter, Query, BackgroundTasks, HTTPException +from fastapi.responses import StreamingResponse +from app.models import ApiResponse, ConsumeResponse, StatusResponse +from app.services import crawler_manager, kafka_service + +logger = logging.getLogger(__name__) +router = APIRouter() + + +@router.get("/", response_model=ApiResponse) +async def root(): + """服务状态""" + return ApiResponse(message="招聘数据采集服务运行中", data={"version": "1.0.0"}) + + +@router.get("/health") +async def health_check(): + """健康检查""" + return {"status": "healthy"} + + +@router.get("/status", response_model=StatusResponse) +async def get_status(task_id: Optional[str] = Query(None, description="任务ID,不传则返回所有任务状态")): + """获取采集状态""" + status = crawler_manager.get_status(task_id) + return StatusResponse(data=status) + + +@router.get("/tasks", response_model=ApiResponse) +async def list_tasks(): + """获取所有任务列表""" + tasks = [ + {"task_id": tid, "task_name": c.task_name, "is_running": c.is_running} + for tid, c in crawler_manager.get_all_crawlers().items() + ] + return ApiResponse(data={"tasks": tasks, "count": len(tasks)}) + + +@router.post("/crawl/start", response_model=ApiResponse) +async def start_crawl( + background_tasks: BackgroundTasks, + task_id: Optional[str] = Query(None, description="任务ID,不传则启动所有任务"), + reset: bool = Query(False, description="是否重置进度从头开始") +): + """启动采集任务""" + if task_id: + # 启动单个任务 + crawler = crawler_manager.get_crawler(task_id) + if not crawler: + raise HTTPException(status_code=404, detail=f"任务不存在: {task_id}") + if crawler.is_running: + raise HTTPException(status_code=400, detail=f"任务已在运行中: {task_id}") + background_tasks.add_task(crawler_manager.start_task, task_id, reset) + return ApiResponse(message=f"任务 {task_id} 已启动", data={"task_id": task_id, "reset": reset}) + else: + # 启动所有任务 + background_tasks.add_task(crawler_manager.start_all, reset) + return ApiResponse(message="所有任务已启动", data={"reset": reset}) + + +@router.post("/crawl/stop", response_model=ApiResponse) +async def stop_crawl( + task_id: Optional[str] = Query(None, description="任务ID,不传则停止所有任务") +): + """停止采集任务""" + if task_id: + crawler = crawler_manager.get_crawler(task_id) + if not crawler: + raise HTTPException(status_code=404, detail=f"任务不存在: {task_id}") + if not crawler.is_running: + raise HTTPException(status_code=400, detail=f"任务未在运行: {task_id}") + crawler_manager.stop_task(task_id) + return ApiResponse(message=f"任务 {task_id} 正在停止") + else: + if not crawler_manager.is_any_running: + raise HTTPException(status_code=400, detail="没有正在运行的任务") + crawler_manager.stop_all() + return ApiResponse(message="所有任务正在停止") + + +@router.get("/consume", response_model=ConsumeResponse) +async def consume_data( + batch_size: int = Query(10, ge=1, le=100, description="批量大小"), + timeout: int = Query(5000, ge=1000, le=30000, description="超时时间(毫秒)") +): + """消费Kafka数据""" + try: + messages = kafka_service.consume(batch_size, timeout) + return ConsumeResponse(data=messages, count=len(messages)) + except Exception as e: + logger.error(f"消费数据失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/consume/stream") +async def consume_stream(): + """SSE流式消费""" + async def event_generator(): + consumer = kafka_service.get_consumer() + try: + for message in consumer: + data = json.dumps(message.value, ensure_ascii=False) + yield f"data: {data}\n\n" + except Exception as e: + logger.error(f"流式消费错误: {e}") + finally: + consumer.close() + return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/job_crawler/app/core/__init__.py b/job_crawler/app/core/__init__.py new file mode 100644 index 0000000..888cd7b --- /dev/null +++ b/job_crawler/app/core/__init__.py @@ -0,0 +1,5 @@ +"""核心模块""" +from .config import settings +from .logging import setup_logging + +__all__ = ["settings", "setup_logging"] diff --git a/job_crawler/app/core/config.py b/job_crawler/app/core/config.py new file mode 100644 index 0000000..d8c75ca --- /dev/null +++ b/job_crawler/app/core/config.py @@ -0,0 +1,89 @@ +"""配置管理""" +import os +import yaml +from typing import Optional, List +from pydantic import BaseModel +from functools import lru_cache + + +class AppConfig(BaseModel): + name: str = "job-crawler" + version: str = "1.0.0" + debug: bool = False + + +class TaskConfig(BaseModel): + """单个任务配置""" + id: str + name: str = "" + enabled: bool = True + + +class ApiConfig(BaseModel): + base_url: str = "https://openapi.bazhuayu.com" + username: str = "" + password: str = "" + batch_size: int = 100 + tasks: List[TaskConfig] = [] + + +class KafkaConfig(BaseModel): + bootstrap_servers: str = "localhost:9092" + topic: str = "job_data" + consumer_group: str = "job_consumer_group" + + +class CrawlerConfig(BaseModel): + interval: int = 300 + filter_days: int = 7 + max_workers: int = 5 + + +class DatabaseConfig(BaseModel): + path: str = "data/crawl_progress.db" + + +class Settings(BaseModel): + """应用配置""" + app: AppConfig = AppConfig() + api: ApiConfig = ApiConfig() + kafka: KafkaConfig = KafkaConfig() + crawler: CrawlerConfig = CrawlerConfig() + database: DatabaseConfig = DatabaseConfig() + + @classmethod + def from_yaml(cls, config_path: str) -> "Settings": + """从YAML文件加载配置""" + if not os.path.exists(config_path): + return cls() + + with open(config_path, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) or {} + + # 解析tasks + api_data = data.get('api', {}) + tasks_data = api_data.pop('tasks', []) + tasks = [TaskConfig(**t) for t in tasks_data] + api_config = ApiConfig(**api_data, tasks=tasks) + + return cls( + app=AppConfig(**data.get('app', {})), + api=api_config, + kafka=KafkaConfig(**data.get('kafka', {})), + crawler=CrawlerConfig(**data.get('crawler', {})), + database=DatabaseConfig(**data.get('database', {})) + ) + + def get_enabled_tasks(self) -> List[TaskConfig]: + """获取启用的任务列表""" + return [t for t in self.api.tasks if t.enabled] + + +@lru_cache() +def get_settings() -> Settings: + """获取配置""" + config_path = os.environ.get("CONFIG_PATH", "config/config.yml") + return Settings.from_yaml(config_path) + + +settings = get_settings() diff --git a/job_crawler/app/core/logging.py b/job_crawler/app/core/logging.py new file mode 100644 index 0000000..754350d --- /dev/null +++ b/job_crawler/app/core/logging.py @@ -0,0 +1,22 @@ +"""日志配置""" +import logging +import sys +from .config import settings + + +def setup_logging(): + """配置日志""" + level = logging.DEBUG if settings.app.debug else logging.INFO + + logging.basicConfig( + level=level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + + # 降低第三方库日志级别 + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("kafka").setLevel(logging.WARNING) + logging.getLogger("uvicorn").setLevel(logging.INFO) diff --git a/job_crawler/app/main.py b/job_crawler/app/main.py new file mode 100644 index 0000000..c20a225 --- /dev/null +++ b/job_crawler/app/main.py @@ -0,0 +1,35 @@ +"""FastAPI应用入口""" +import logging +from contextlib import asynccontextmanager +from fastapi import FastAPI +from app.core.config import settings +from app.core.logging import setup_logging +from app.api import router +from app.services import kafka_service + +setup_logging() +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期管理""" + logger.info("服务启动中...") + yield + logger.info("服务关闭中...") + kafka_service.close() + + +app = FastAPI( + title="招聘数据采集服务", + description="从八爪鱼API采集招聘数据,通过Kafka提供消费接口", + version=settings.app.version, + lifespan=lifespan +) + +app.include_router(router) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True) diff --git a/job_crawler/app/models/__init__.py b/job_crawler/app/models/__init__.py new file mode 100644 index 0000000..b1612da --- /dev/null +++ b/job_crawler/app/models/__init__.py @@ -0,0 +1,13 @@ +"""数据模型""" +from .job import JobData +from .progress import CrawlProgress, CrawlStatus +from .response import ApiResponse, ConsumeResponse, StatusResponse + +__all__ = [ + "JobData", + "CrawlProgress", + "CrawlStatus", + "ApiResponse", + "ConsumeResponse", + "StatusResponse" +] diff --git a/job_crawler/app/models/job.py b/job_crawler/app/models/job.py new file mode 100644 index 0000000..2d6ed6c --- /dev/null +++ b/job_crawler/app/models/job.py @@ -0,0 +1,60 @@ +"""招聘数据模型""" +from pydantic import BaseModel +from datetime import datetime +import uuid + + +class JobData(BaseModel): + """招聘数据模型""" + id: str = "" + task_id: str = "" # 任务ID + job_category: str = "" # Std_class - 职位分类 + job_title: str = "" # aca112 - 职位名称 + company: str = "" # AAB004 - 公司名称 + company_type: str = "" # AAB019 - 企业类型 + salary: str = "" # acb241 - 薪资范围 + location: str = "" # aab302 - 工作地点 + address: str = "" # AAE006 - 详细地址 + publish_date: str = "" # aae397 - 发布日期 + collect_time: str = "" # Collect_time - 采集时间 + url: str = "" # ACE760 - 职位链接 + description: str = "" # acb22a - 职位描述 + experience: str = "" # Experience - 经验要求 + education: str = "" # aac011 - 学历要求 + headcount: str = "" # acb240 - 招聘人数 + industry: str = "" # AAB022 - 行业 + company_size: str = "" # Num_employers - 公司规模 + contact: str = "" # AAE004 - 联系人 + company_intro: str = "" # AAB092 - 公司简介 + crawl_time: str = "" # 入库时间 + + def __init__(self, **data): + super().__init__(**data) + if not self.id: + self.id = str(uuid.uuid4()) + if not self.crawl_time: + self.crawl_time = datetime.now().isoformat() + + @classmethod + def from_raw(cls, raw: dict) -> "JobData": + """从原始API数据转换""" + return cls( + job_category=raw.get("Std_class", ""), + job_title=raw.get("aca112", ""), + company=raw.get("AAB004", ""), + company_type=raw.get("AAB019", "").strip(), + salary=raw.get("acb241", ""), + location=raw.get("aab302", ""), + address=raw.get("AAE006", ""), + publish_date=raw.get("aae397", ""), + collect_time=raw.get("Collect_time", ""), + url=raw.get("ACE760", ""), + description=raw.get("acb22a", ""), + experience=raw.get("Experience", ""), + education=raw.get("aac011", ""), + headcount=raw.get("acb240", ""), + industry=raw.get("AAB022", ""), + company_size=raw.get("Num_employers", ""), + contact=raw.get("AAE004", ""), + company_intro=raw.get("AAB092", ""), + ) diff --git a/job_crawler/app/models/progress.py b/job_crawler/app/models/progress.py new file mode 100644 index 0000000..96f4016 --- /dev/null +++ b/job_crawler/app/models/progress.py @@ -0,0 +1,24 @@ +"""采集进度模型""" +from pydantic import BaseModel + + +class CrawlProgress(BaseModel): + """采集进度""" + task_id: str + current_offset: int = 0 + total: int = 0 + last_update: str = "" + status: str = "idle" # idle, running, completed, error + + +class CrawlStatus(BaseModel): + """采集状态响应""" + task_id: str + total: int + current_offset: int + progress: str + kafka_lag: int = 0 + status: str + last_update: str + filtered_count: int = 0 + produced_count: int = 0 diff --git a/job_crawler/app/models/response.py b/job_crawler/app/models/response.py new file mode 100644 index 0000000..417346d --- /dev/null +++ b/job_crawler/app/models/response.py @@ -0,0 +1,23 @@ +"""API响应模型""" +from pydantic import BaseModel +from typing import Optional, Any + + +class ApiResponse(BaseModel): + """通用API响应""" + code: int = 0 + message: str = "success" + data: Optional[Any] = None + + +class ConsumeResponse(BaseModel): + """消费响应""" + code: int = 0 + data: list = [] + count: int = 0 + + +class StatusResponse(BaseModel): + """状态响应""" + code: int = 0 + data: dict = {} diff --git a/job_crawler/app/services/__init__.py b/job_crawler/app/services/__init__.py new file mode 100644 index 0000000..62d8396 --- /dev/null +++ b/job_crawler/app/services/__init__.py @@ -0,0 +1,12 @@ +"""服务模块""" +from .api_client import api_client, BazhuayuClient +from .kafka_service import kafka_service, KafkaService +from .progress_store import progress_store, ProgressStore +from .crawler import crawler_manager, CrawlerManager, TaskCrawler + +__all__ = [ + "api_client", "BazhuayuClient", + "kafka_service", "KafkaService", + "progress_store", "ProgressStore", + "crawler_manager", "CrawlerManager", "TaskCrawler" +] diff --git a/job_crawler/app/services/api_client.py b/job_crawler/app/services/api_client.py new file mode 100644 index 0000000..3a8351a --- /dev/null +++ b/job_crawler/app/services/api_client.py @@ -0,0 +1,91 @@ +"""八爪鱼API客户端""" +import httpx +import time +import logging +from typing import Optional, Dict, Any +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class BazhuayuClient: + """八爪鱼API客户端""" + + def __init__(self): + self.base_url = settings.api.base_url + self.username = settings.api.username + self.password = settings.api.password + self._access_token: Optional[str] = None + self._token_expires_at: float = 0 + + async def _get_token(self) -> str: + """获取访问令牌""" + if self._access_token and time.time() < self._token_expires_at - 300: + return self._access_token + + logger.info("正在获取新的access_token...") + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.base_url}/token", + json={ + "username": self.username, + "password": self.password, + "grant_type": "password" + }, + headers={ + "Content-Type": "application/json", + "Accept": "*/*" + }, + timeout=30 + ) + + if response.status_code != 200: + raise Exception(f"获取token失败: {response.status_code} - {response.text}") + + data = response.json() + token_data = data.get("data", {}) + + self._access_token = token_data.get("access_token") + expires_in = int(token_data.get("expires_in", 86400)) + self._token_expires_at = time.time() + expires_in + + logger.info(f"获取token成功,有效期: {expires_in}秒") + return self._access_token + + async def fetch_data(self, task_id: str, offset: int, size: int = 100) -> Dict[str, Any]: + """获取任务数据""" + token = await self._get_token() + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.base_url}/data/all", + params={ + "taskId": task_id, + "offset": offset, + "size": size + }, + headers={ + "Authorization": f"Bearer {token}", + "Accept": "*/*" + }, + timeout=60 + ) + + if response.status_code == 401: + self._access_token = None + self._token_expires_at = 0 + return await self.fetch_data(task_id, offset, size) + + if response.status_code != 200: + raise Exception(f"获取数据失败: {response.status_code} - {response.text}") + + return response.json() + + async def get_total_count(self, task_id: str) -> int: + """获取数据总数""" + result = await self.fetch_data(task_id, 0, 1) + return result.get("data", {}).get("total", 0) + + +api_client = BazhuayuClient() diff --git a/job_crawler/app/services/crawler.py b/job_crawler/app/services/crawler.py new file mode 100644 index 0000000..6b5511f --- /dev/null +++ b/job_crawler/app/services/crawler.py @@ -0,0 +1,209 @@ +"""多任务增量采集核心逻辑""" +import asyncio +import logging +from typing import Dict, Optional +from concurrent.futures import ThreadPoolExecutor +from app.services.api_client import api_client +from app.services.kafka_service import kafka_service +from app.services.progress_store import progress_store +from app.utils import is_within_days +from app.models import JobData +from app.core.config import settings, TaskConfig + +logger = logging.getLogger(__name__) + + +class TaskCrawler: + """单个任务采集器""" + + def __init__(self, task_config: TaskConfig): + self.task_id = task_config.id + self.task_name = task_config.name or task_config.id + self.batch_size = settings.api.batch_size + self.filter_days = settings.crawler.filter_days + self._running = False + self._total_filtered = 0 + self._total_produced = 0 + + @property + def is_running(self) -> bool: + return self._running + + async def start(self, reset: bool = False): + """开始采集""" + if self._running: + logger.warning(f"[{self.task_name}] 任务已在运行中") + return + + self._running = True + self._total_filtered = 0 + self._total_produced = 0 + logger.info(f"[{self.task_name}] 开始采集任务") + + try: + if reset: + progress_store.reset_progress(self.task_id) + current_offset = 0 + else: + progress = progress_store.get_progress(self.task_id) + current_offset = progress.current_offset if progress else 0 + + total = await api_client.get_total_count(self.task_id) + logger.info(f"[{self.task_name}] 数据总数: {total}, 当前偏移: {current_offset}") + + if current_offset >= total: + logger.info(f"[{self.task_name}] 数据已全部采集完成") + progress_store.save_progress(self.task_id, current_offset, total, "completed", + self._total_filtered, self._total_produced) + self._running = False + return + + while current_offset < total and self._running: + try: + await self._crawl_batch(current_offset) + current_offset += self.batch_size + progress_store.save_progress(self.task_id, current_offset, total, "running", + self._total_filtered, self._total_produced) + progress_pct = min(100, current_offset / total * 100) + logger.info(f"[{self.task_name}] 进度: {progress_pct:.2f}% ({current_offset}/{total})") + await asyncio.sleep(0.5) + except Exception as e: + logger.error(f"[{self.task_name}] 采集批次失败: {e}") + await asyncio.sleep(5) + + status = "completed" if current_offset >= total else "stopped" + progress_store.save_progress(self.task_id, current_offset, total, status, + self._total_filtered, self._total_produced) + logger.info(f"[{self.task_name}] 采集任务 {status}") + except Exception as e: + logger.error(f"[{self.task_name}] 采集任务异常: {e}") + progress_store.save_progress(self.task_id, 0, 0, "error", + self._total_filtered, self._total_produced) + finally: + self._running = False + + async def _crawl_batch(self, offset: int): + """采集一批数据""" + result = await api_client.fetch_data(self.task_id, offset, self.batch_size) + data_list = result.get("data", {}).get("data", []) + if not data_list: + return + + filtered_jobs = [] + for raw in data_list: + aae397 = raw.get("aae397", "") + collect_time = raw.get("Collect_time", "") + if is_within_days(aae397, collect_time, self.filter_days): + job = JobData.from_raw(raw) + job.task_id = self.task_id # 添加任务ID标识 + filtered_jobs.append(job) + + self._total_filtered += len(filtered_jobs) + if filtered_jobs: + produced = kafka_service.produce_batch(filtered_jobs) + self._total_produced += produced + + def stop(self): + """停止采集""" + logger.info(f"[{self.task_name}] 正在停止采集任务...") + self._running = False + + def get_status(self) -> dict: + """获取采集状态""" + stats = progress_store.get_stats(self.task_id) + if not stats: + return { + "task_id": self.task_id, "task_name": self.task_name, + "total": 0, "current_offset": 0, "progress": "0%", + "status": "idle", "last_update": "", + "filtered_count": 0, "produced_count": 0 + } + total = stats.get("total", 0) + current = stats.get("current_offset", 0) + progress = f"{min(100, current / total * 100):.2f}%" if total > 0 else "0%" + return { + "task_id": self.task_id, "task_name": self.task_name, + "total": total, "current_offset": current, "progress": progress, + "status": stats.get("status", "idle"), "last_update": stats.get("last_update", ""), + "filtered_count": stats.get("filtered_count", 0), + "produced_count": stats.get("produced_count", 0) + } + + +class CrawlerManager: + """多任务采集管理器""" + + def __init__(self): + self._crawlers: Dict[str, TaskCrawler] = {} + self._executor = ThreadPoolExecutor(max_workers=settings.crawler.max_workers) + self._init_crawlers() + + def _init_crawlers(self): + """初始化所有启用的任务采集器""" + for task in settings.get_enabled_tasks(): + self._crawlers[task.id] = TaskCrawler(task) + logger.info(f"初始化任务采集器: {task.name} ({task.id})") + + def get_crawler(self, task_id: str) -> Optional[TaskCrawler]: + """获取指定任务的采集器""" + return self._crawlers.get(task_id) + + def get_all_crawlers(self) -> Dict[str, TaskCrawler]: + """获取所有采集器""" + return self._crawlers + + async def start_task(self, task_id: str, reset: bool = False) -> bool: + """启动单个任务""" + crawler = self._crawlers.get(task_id) + if not crawler: + logger.error(f"任务不存在: {task_id}") + return False + if crawler.is_running: + logger.warning(f"任务已在运行: {task_id}") + return False + asyncio.create_task(crawler.start(reset)) + return True + + async def start_all(self, reset: bool = False): + """启动所有任务""" + tasks = [] + for task_id, crawler in self._crawlers.items(): + if not crawler.is_running: + tasks.append(crawler.start(reset)) + if tasks: + await asyncio.gather(*tasks) + + def stop_task(self, task_id: str) -> bool: + """停止单个任务""" + crawler = self._crawlers.get(task_id) + if not crawler: + return False + crawler.stop() + return True + + def stop_all(self): + """停止所有任务""" + for crawler in self._crawlers.values(): + crawler.stop() + + def get_status(self, task_id: str = None) -> dict: + """获取状态""" + if task_id: + crawler = self._crawlers.get(task_id) + return crawler.get_status() if crawler else {} + + # 返回所有任务状态 + return { + "tasks": [c.get_status() for c in self._crawlers.values()], + "kafka_lag": kafka_service.get_lag(), + "running_count": sum(1 for c in self._crawlers.values() if c.is_running) + } + + @property + def is_any_running(self) -> bool: + """是否有任务在运行""" + return any(c.is_running for c in self._crawlers.values()) + + +# 全局管理器实例 +crawler_manager = CrawlerManager() diff --git a/job_crawler/app/services/kafka_service.py b/job_crawler/app/services/kafka_service.py new file mode 100644 index 0000000..328275b --- /dev/null +++ b/job_crawler/app/services/kafka_service.py @@ -0,0 +1,138 @@ +"""Kafka服务""" +import json +import logging +from typing import List, Optional +from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import KafkaError +from kafka.admin import KafkaAdminClient, NewTopic +from app.models import JobData +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class KafkaService: + """Kafka生产者/消费者服务""" + + def __init__(self): + self.bootstrap_servers = settings.kafka.bootstrap_servers + self.topic = settings.kafka.topic + self.consumer_group = settings.kafka.consumer_group + self._producer: Optional[KafkaProducer] = None + self._ensure_topic() + + def _ensure_topic(self): + """确保Topic存在""" + try: + admin = KafkaAdminClient( + bootstrap_servers=self.bootstrap_servers, + client_id="job_crawler_admin" + ) + existing_topics = admin.list_topics() + + if self.topic not in existing_topics: + topic = NewTopic(name=self.topic, num_partitions=3, replication_factor=1) + admin.create_topics([topic]) + logger.info(f"创建Topic: {self.topic}") + admin.close() + except Exception as e: + logger.warning(f"检查/创建Topic失败: {e}") + + @property + def producer(self) -> KafkaProducer: + """获取生产者实例""" + if self._producer is None: + self._producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'), + key_serializer=lambda k: k.encode('utf-8') if k else None, + acks='all', + retries=3 + ) + return self._producer + + def get_consumer(self, auto_offset_reset: str = 'earliest') -> KafkaConsumer: + """获取消费者实例""" + return KafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + group_id=self.consumer_group, + auto_offset_reset=auto_offset_reset, + enable_auto_commit=True, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + consumer_timeout_ms=5000 + ) + + def produce(self, job_data: JobData) -> bool: + """发送消息到Kafka""" + try: + future = self.producer.send(self.topic, key=job_data.id, value=job_data.model_dump()) + future.get(timeout=10) + return True + except KafkaError as e: + logger.error(f"发送消息失败: {e}") + return False + + def produce_batch(self, job_list: List[JobData]) -> int: + """批量发送消息""" + success_count = 0 + for job in job_list: + if self.produce(job): + success_count += 1 + self.producer.flush() + return success_count + + def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]: + """消费消息""" + messages = [] + consumer = KafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + group_id=self.consumer_group, + auto_offset_reset='earliest', + enable_auto_commit=True, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + consumer_timeout_ms=timeout_ms, + max_poll_records=batch_size + ) + try: + for message in consumer: + messages.append(message.value) + if len(messages) >= batch_size: + break + except Exception as e: + logger.debug(f"消费超时或完成: {e}") + finally: + consumer.close() + return messages + + def get_lag(self) -> int: + """获取消息堆积量""" + try: + consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group) + partitions = consumer.partitions_for_topic(self.topic) + if not partitions: + consumer.close() + return 0 + from kafka import TopicPartition + tps = [TopicPartition(self.topic, p) for p in partitions] + end_offsets = consumer.end_offsets(tps) + total_lag = 0 + for tp in tps: + committed = consumer.committed(tp) + end = end_offsets.get(tp, 0) + total_lag += max(0, end - (committed or 0)) + consumer.close() + return total_lag + except Exception as e: + logger.warning(f"获取lag失败: {e}") + return 0 + + def close(self): + """关闭连接""" + if self._producer: + self._producer.close() + self._producer = None + + +kafka_service = KafkaService() diff --git a/job_crawler/app/services/progress_store.py b/job_crawler/app/services/progress_store.py new file mode 100644 index 0000000..c0c7b8d --- /dev/null +++ b/job_crawler/app/services/progress_store.py @@ -0,0 +1,95 @@ +"""采集进度存储""" +import sqlite3 +import os +import logging +from datetime import datetime +from typing import Optional +from contextlib import contextmanager +from app.models import CrawlProgress +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class ProgressStore: + """采集进度存储(SQLite)""" + + def __init__(self, db_path: str = None): + self.db_path = db_path or settings.database.path + os.makedirs(os.path.dirname(self.db_path) or ".", exist_ok=True) + self._init_db() + + def _init_db(self): + """初始化数据库""" + with self._get_conn() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS crawl_progress ( + task_id TEXT PRIMARY KEY, + current_offset INTEGER DEFAULT 0, + total INTEGER DEFAULT 0, + last_update TEXT, + status TEXT DEFAULT 'idle', + filtered_count INTEGER DEFAULT 0, + produced_count INTEGER DEFAULT 0 + ) + """) + conn.commit() + + @contextmanager + def _get_conn(self): + """获取数据库连接""" + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() + + def get_progress(self, task_id: str) -> Optional[CrawlProgress]: + """获取采集进度""" + with self._get_conn() as conn: + cursor = conn.execute("SELECT * FROM crawl_progress WHERE task_id = ?", (task_id,)) + row = cursor.fetchone() + if row: + return CrawlProgress( + task_id=row["task_id"], + current_offset=row["current_offset"], + total=row["total"], + last_update=row["last_update"] or "", + status=row["status"] + ) + return None + + def save_progress(self, task_id: str, offset: int, total: int, + status: str = "running", filtered_count: int = 0, produced_count: int = 0): + """保存采集进度""" + now = datetime.now().isoformat() + with self._get_conn() as conn: + conn.execute(""" + INSERT INTO crawl_progress + (task_id, current_offset, total, last_update, status, filtered_count, produced_count) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(task_id) DO UPDATE SET + current_offset = excluded.current_offset, total = excluded.total, + last_update = excluded.last_update, status = excluded.status, + filtered_count = excluded.filtered_count, produced_count = excluded.produced_count + """, (task_id, offset, total, now, status, filtered_count, produced_count)) + conn.commit() + + def get_stats(self, task_id: str) -> dict: + """获取统计信息""" + with self._get_conn() as conn: + cursor = conn.execute("SELECT * FROM crawl_progress WHERE task_id = ?", (task_id,)) + row = cursor.fetchone() + if row: + return dict(row) + return {} + + def reset_progress(self, task_id: str): + """重置采集进度""" + with self._get_conn() as conn: + conn.execute("DELETE FROM crawl_progress WHERE task_id = ?", (task_id,)) + conn.commit() + + +progress_store = ProgressStore() diff --git a/job_crawler/app/utils/__init__.py b/job_crawler/app/utils/__init__.py new file mode 100644 index 0000000..f22ba0c --- /dev/null +++ b/job_crawler/app/utils/__init__.py @@ -0,0 +1,4 @@ +"""工具模块""" +from .date_parser import parse_aae397, parse_collect_time, is_within_days + +__all__ = ["parse_aae397", "parse_collect_time", "is_within_days"] diff --git a/job_crawler/app/utils/date_parser.py b/job_crawler/app/utils/date_parser.py new file mode 100644 index 0000000..3a20b6c --- /dev/null +++ b/job_crawler/app/utils/date_parser.py @@ -0,0 +1,74 @@ +"""日期解析工具""" +import re +from datetime import datetime, timedelta +from typing import Optional + + +def parse_aae397(date_str: str) -> Optional[datetime]: + """ + 解析发布日期字段 aae397 + 支持格式: + - "今天" + - "1月13日" + - "12月31日" + """ + if not date_str: + return None + + date_str = date_str.strip() + today = datetime.now() + + # 处理 "今天" + if date_str == "今天": + return today + + # 处理 "X月X日" 格式 + pattern = r"(\d{1,2})月(\d{1,2})日" + match = re.match(pattern, date_str) + if match: + month = int(match.group(1)) + day = int(match.group(2)) + year = today.year + + try: + parsed_date = datetime(year, month, day) + if parsed_date > today: + parsed_date = datetime(year - 1, month, day) + return parsed_date + except ValueError: + return None + + return None + + +def parse_collect_time(date_str: str) -> Optional[datetime]: + """ + 解析采集时间字段 Collect_time + 格式: "2026-01-15" + """ + if not date_str: + return None + + try: + return datetime.strptime(date_str.strip(), "%Y-%m-%d") + except ValueError: + return None + + +def is_within_days(date_str: str, collect_time_str: str, days: int = 7) -> bool: + """ + 判断数据是否在指定天数内 + 条件: 发布日期 AND 采集时间 都在指定天数内 + """ + today = datetime.now() + cutoff_date = today - timedelta(days=days) + + publish_date = parse_aae397(date_str) + if publish_date is None: + return False + + collect_date = parse_collect_time(collect_time_str) + if collect_date is None: + return False + + return publish_date >= cutoff_date and collect_date >= cutoff_date diff --git a/job_crawler/config/config.yml b/job_crawler/config/config.yml new file mode 100644 index 0000000..8fe38e3 --- /dev/null +++ b/job_crawler/config/config.yml @@ -0,0 +1,41 @@ +# 招聘数据采集服务配置文件 + +# 应用配置 +app: + name: job-crawler + version: 1.0.0 + debug: false + +# 八爪鱼API配置 +api: + base_url: https://openapi.bazhuayu.com + username: "13051331101" + password: "abc19910515" + batch_size: 100 + # 多任务配置 + tasks: + - id: "00f3b445-d8ec-44e8-88b2-4b971a228b1e" + name: "青岛招聘数据" + enabled: true + - id: "task-id-2" + name: "任务2" + enabled: false + - id: "task-id-3" + name: "任务3" + enabled: false + +# Kafka配置 +kafka: + bootstrap_servers: localhost:9092 + topic: job_data + consumer_group: job_consumer_group + +# 采集配置 +crawler: + interval: 300 # 采集间隔(秒) + filter_days: 7 # 过滤天数 + max_workers: 5 # 最大并行任务数 + +# 数据库配置 +database: + path: data/crawl_progress.db diff --git a/job_crawler/config/config.yml.docker b/job_crawler/config/config.yml.docker new file mode 100644 index 0000000..981406b --- /dev/null +++ b/job_crawler/config/config.yml.docker @@ -0,0 +1,39 @@ +# Docker环境配置文件 +# 复制此文件为 config.yml 并修改账号密码 + +# 应用配置 +app: + name: job-crawler + version: 1.0.0 + debug: false + +# 八爪鱼API配置 +api: + base_url: https://openapi.bazhuayu.com + username: "your_username" + password: "your_password" + batch_size: 100 + # 多任务配置 + tasks: + - id: "00f3b445-d8ec-44e8-88b2-4b971a228b1e" + name: "青岛招聘数据" + enabled: true + - id: "task-id-2" + name: "任务2" + enabled: false + +# Kafka配置(Docker内部网络) +kafka: + bootstrap_servers: kafka:29092 + topic: job_data + consumer_group: job_consumer_group + +# 采集配置 +crawler: + interval: 300 + filter_days: 7 + max_workers: 5 + +# 数据库配置 +database: + path: /app/data/crawl_progress.db diff --git a/job_crawler/docker-compose.yml b/job_crawler/docker-compose.yml new file mode 100644 index 0000000..09f7e41 --- /dev/null +++ b/job_crawler/docker-compose.yml @@ -0,0 +1,75 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + container_name: job-zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + volumes: + - zookeeper_data:/var/lib/zookeeper/data + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "2181"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - job-network + + kafka: + image: confluentinc/cp-kafka:7.5.0 + container_name: job-kafka + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + volumes: + - kafka_data:/var/lib/kafka/data + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"] + interval: 10s + timeout: 10s + retries: 5 + networks: + - job-network + + app: + build: + context: . + dockerfile: Dockerfile + container_name: job-crawler + ports: + - "8000:8000" + environment: + - CONFIG_PATH=/app/config/config.yml + volumes: + - ./config:/app/config:ro + - app_data:/app/data + depends_on: + kafka: + condition: service_healthy + restart: unless-stopped + networks: + - job-network + +networks: + job-network: + driver: bridge + +volumes: + zookeeper_data: + kafka_data: + app_data: diff --git a/job_crawler/requirements.txt b/job_crawler/requirements.txt new file mode 100644 index 0000000..685058b --- /dev/null +++ b/job_crawler/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.109.0 +uvicorn==0.27.0 +httpx==0.27.0 +kafka-python==2.0.2 +apscheduler==3.10.4 +pydantic==2.5.3 +python-dotenv==1.0.0 +PyYAML==6.0.1