Files
ocups-kafka/job_crawler/app/services/kafka_service.py
李顺东 ae681575b9 feat(job_crawler): initialize job crawler service with kafka integration
- Add technical documentation (技术方案.md) with system architecture and design details
- Create FastAPI application structure with modular organization (api, core, models, services, utils)
- Implement job data crawler service with incremental collection from third-party API
- Add Kafka service integration with Docker Compose configuration for message queue
- Create data models for job listings, progress tracking, and API responses
- Implement REST API endpoints for data consumption (/consume, /status) and task management
- Add progress persistence layer using SQLite for tracking collection offsets
- Implement date filtering logic to extract data published within 7 days
- Create API client service for third-party data source integration
- Add configuration management with environment-based settings
- Include Docker support with Dockerfile and docker-compose.yml for containerized deployment
- Add logging configuration and utility functions for date parsing
- Include requirements.txt with all Python dependencies and README documentation
2026-01-15 17:09:43 +08:00

139 lines
4.8 KiB
Python

"""Kafka服务"""
import json
import logging
from typing import List, Optional
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from kafka.admin import KafkaAdminClient, NewTopic
from app.models import JobData
from app.core.config import settings
logger = logging.getLogger(__name__)
class KafkaService:
"""Kafka生产者/消费者服务"""
def __init__(self):
self.bootstrap_servers = settings.kafka.bootstrap_servers
self.topic = settings.kafka.topic
self.consumer_group = settings.kafka.consumer_group
self._producer: Optional[KafkaProducer] = None
self._ensure_topic()
def _ensure_topic(self):
"""确保Topic存在"""
try:
admin = KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers,
client_id="job_crawler_admin"
)
existing_topics = admin.list_topics()
if self.topic not in existing_topics:
topic = NewTopic(name=self.topic, num_partitions=3, replication_factor=1)
admin.create_topics([topic])
logger.info(f"创建Topic: {self.topic}")
admin.close()
except Exception as e:
logger.warning(f"检查/创建Topic失败: {e}")
@property
def producer(self) -> KafkaProducer:
"""获取生产者实例"""
if self._producer is None:
self._producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3
)
return self._producer
def get_consumer(self, auto_offset_reset: str = 'earliest') -> KafkaConsumer:
"""获取消费者实例"""
return KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.consumer_group,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=5000
)
def produce(self, job_data: JobData) -> bool:
"""发送消息到Kafka"""
try:
future = self.producer.send(self.topic, key=job_data.id, value=job_data.model_dump())
future.get(timeout=10)
return True
except KafkaError as e:
logger.error(f"发送消息失败: {e}")
return False
def produce_batch(self, job_list: List[JobData]) -> int:
"""批量发送消息"""
success_count = 0
for job in job_list:
if self.produce(job):
success_count += 1
self.producer.flush()
return success_count
def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]:
"""消费消息"""
messages = []
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id=self.consumer_group,
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=timeout_ms,
max_poll_records=batch_size
)
try:
for message in consumer:
messages.append(message.value)
if len(messages) >= batch_size:
break
except Exception as e:
logger.debug(f"消费超时或完成: {e}")
finally:
consumer.close()
return messages
def get_lag(self) -> int:
"""获取消息堆积量"""
try:
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group)
partitions = consumer.partitions_for_topic(self.topic)
if not partitions:
consumer.close()
return 0
from kafka import TopicPartition
tps = [TopicPartition(self.topic, p) for p in partitions]
end_offsets = consumer.end_offsets(tps)
total_lag = 0
for tp in tps:
committed = consumer.committed(tp)
end = end_offsets.get(tp, 0)
total_lag += max(0, end - (committed or 0))
consumer.close()
return total_lag
except Exception as e:
logger.warning(f"获取lag失败: {e}")
return 0
def close(self):
"""关闭连接"""
if self._producer:
self._producer.close()
self._producer = None
kafka_service = KafkaService()