Files
ocups-kafka/job_crawler/app/services/api_client.py
李顺东 ae681575b9 feat(job_crawler): initialize job crawler service with kafka integration
- Add technical documentation (技术方案.md) with system architecture and design details
- Create FastAPI application structure with modular organization (api, core, models, services, utils)
- Implement job data crawler service with incremental collection from third-party API
- Add Kafka service integration with Docker Compose configuration for message queue
- Create data models for job listings, progress tracking, and API responses
- Implement REST API endpoints for data consumption (/consume, /status) and task management
- Add progress persistence layer using SQLite for tracking collection offsets
- Implement date filtering logic to extract data published within 7 days
- Create API client service for third-party data source integration
- Add configuration management with environment-based settings
- Include Docker support with Dockerfile and docker-compose.yml for containerized deployment
- Add logging configuration and utility functions for date parsing
- Include requirements.txt with all Python dependencies and README documentation
2026-01-15 17:09:43 +08:00

92 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""八爪鱼API客户端"""
import httpx
import time
import logging
from typing import Optional, Dict, Any
from app.core.config import settings
logger = logging.getLogger(__name__)
class BazhuayuClient:
"""八爪鱼API客户端"""
def __init__(self):
self.base_url = settings.api.base_url
self.username = settings.api.username
self.password = settings.api.password
self._access_token: Optional[str] = None
self._token_expires_at: float = 0
async def _get_token(self) -> str:
"""获取访问令牌"""
if self._access_token and time.time() < self._token_expires_at - 300:
return self._access_token
logger.info("正在获取新的access_token...")
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/token",
json={
"username": self.username,
"password": self.password,
"grant_type": "password"
},
headers={
"Content-Type": "application/json",
"Accept": "*/*"
},
timeout=30
)
if response.status_code != 200:
raise Exception(f"获取token失败: {response.status_code} - {response.text}")
data = response.json()
token_data = data.get("data", {})
self._access_token = token_data.get("access_token")
expires_in = int(token_data.get("expires_in", 86400))
self._token_expires_at = time.time() + expires_in
logger.info(f"获取token成功有效期: {expires_in}")
return self._access_token
async def fetch_data(self, task_id: str, offset: int, size: int = 100) -> Dict[str, Any]:
"""获取任务数据"""
token = await self._get_token()
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/data/all",
params={
"taskId": task_id,
"offset": offset,
"size": size
},
headers={
"Authorization": f"Bearer {token}",
"Accept": "*/*"
},
timeout=60
)
if response.status_code == 401:
self._access_token = None
self._token_expires_at = 0
return await self.fetch_data(task_id, offset, size)
if response.status_code != 200:
raise Exception(f"获取数据失败: {response.status_code} - {response.text}")
return response.json()
async def get_total_count(self, task_id: str) -> int:
"""获取数据总数"""
result = await self.fetch_data(task_id, 0, 1)
return result.get("data", {}).get("total", 0)
api_client = BazhuayuClient()