Files
ocups-kafka/job_crawler/app/api/routes.py
李顺东 ae681575b9 feat(job_crawler): initialize job crawler service with kafka integration
- Add technical documentation (技术方案.md) with system architecture and design details
- Create FastAPI application structure with modular organization (api, core, models, services, utils)
- Implement job data crawler service with incremental collection from third-party API
- Add Kafka service integration with Docker Compose configuration for message queue
- Create data models for job listings, progress tracking, and API responses
- Implement REST API endpoints for data consumption (/consume, /status) and task management
- Add progress persistence layer using SQLite for tracking collection offsets
- Implement date filtering logic to extract data published within 7 days
- Create API client service for third-party data source integration
- Add configuration management with environment-based settings
- Include Docker support with Dockerfile and docker-compose.yml for containerized deployment
- Add logging configuration and utility functions for date parsing
- Include requirements.txt with all Python dependencies and README documentation
2026-01-15 17:09:43 +08:00

113 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""API路由"""
import json
import logging
from typing import Optional
from fastapi import APIRouter, Query, BackgroundTasks, HTTPException
from fastapi.responses import StreamingResponse
from app.models import ApiResponse, ConsumeResponse, StatusResponse
from app.services import crawler_manager, kafka_service
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/", response_model=ApiResponse)
async def root():
"""服务状态"""
return ApiResponse(message="招聘数据采集服务运行中", data={"version": "1.0.0"})
@router.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
@router.get("/status", response_model=StatusResponse)
async def get_status(task_id: Optional[str] = Query(None, description="任务ID不传则返回所有任务状态")):
"""获取采集状态"""
status = crawler_manager.get_status(task_id)
return StatusResponse(data=status)
@router.get("/tasks", response_model=ApiResponse)
async def list_tasks():
"""获取所有任务列表"""
tasks = [
{"task_id": tid, "task_name": c.task_name, "is_running": c.is_running}
for tid, c in crawler_manager.get_all_crawlers().items()
]
return ApiResponse(data={"tasks": tasks, "count": len(tasks)})
@router.post("/crawl/start", response_model=ApiResponse)
async def start_crawl(
background_tasks: BackgroundTasks,
task_id: Optional[str] = Query(None, description="任务ID不传则启动所有任务"),
reset: bool = Query(False, description="是否重置进度从头开始")
):
"""启动采集任务"""
if task_id:
# 启动单个任务
crawler = crawler_manager.get_crawler(task_id)
if not crawler:
raise HTTPException(status_code=404, detail=f"任务不存在: {task_id}")
if crawler.is_running:
raise HTTPException(status_code=400, detail=f"任务已在运行中: {task_id}")
background_tasks.add_task(crawler_manager.start_task, task_id, reset)
return ApiResponse(message=f"任务 {task_id} 已启动", data={"task_id": task_id, "reset": reset})
else:
# 启动所有任务
background_tasks.add_task(crawler_manager.start_all, reset)
return ApiResponse(message="所有任务已启动", data={"reset": reset})
@router.post("/crawl/stop", response_model=ApiResponse)
async def stop_crawl(
task_id: Optional[str] = Query(None, description="任务ID不传则停止所有任务")
):
"""停止采集任务"""
if task_id:
crawler = crawler_manager.get_crawler(task_id)
if not crawler:
raise HTTPException(status_code=404, detail=f"任务不存在: {task_id}")
if not crawler.is_running:
raise HTTPException(status_code=400, detail=f"任务未在运行: {task_id}")
crawler_manager.stop_task(task_id)
return ApiResponse(message=f"任务 {task_id} 正在停止")
else:
if not crawler_manager.is_any_running:
raise HTTPException(status_code=400, detail="没有正在运行的任务")
crawler_manager.stop_all()
return ApiResponse(message="所有任务正在停止")
@router.get("/consume", response_model=ConsumeResponse)
async def consume_data(
batch_size: int = Query(10, ge=1, le=100, description="批量大小"),
timeout: int = Query(5000, ge=1000, le=30000, description="超时时间(毫秒)")
):
"""消费Kafka数据"""
try:
messages = kafka_service.consume(batch_size, timeout)
return ConsumeResponse(data=messages, count=len(messages))
except Exception as e:
logger.error(f"消费数据失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/consume/stream")
async def consume_stream():
"""SSE流式消费"""
async def event_generator():
consumer = kafka_service.get_consumer()
try:
for message in consumer:
data = json.dumps(message.value, ensure_ascii=False)
yield f"data: {data}\n\n"
except Exception as e:
logger.error(f"流式消费错误: {e}")
finally:
consumer.close()
return StreamingResponse(event_generator(), media_type="text/event-stream")