"""Kafka服务""" import json import logging from typing import List, Optional from kafka import KafkaProducer, KafkaConsumer from kafka.errors import KafkaError from kafka.admin import KafkaAdminClient, NewTopic from app.models import JobData from app.core.config import settings logger = logging.getLogger(__name__) class KafkaService: """Kafka生产者/消费者服务""" def __init__(self): self.bootstrap_servers = settings.kafka.bootstrap_servers self.topic = settings.kafka.topic self.consumer_group = settings.kafka.consumer_group self._producer: Optional[KafkaProducer] = None self._ensure_topic() def _ensure_topic(self): """确保Topic存在""" try: admin = KafkaAdminClient( bootstrap_servers=self.bootstrap_servers, client_id="job_crawler_admin" ) existing_topics = admin.list_topics() if self.topic not in existing_topics: topic = NewTopic(name=self.topic, num_partitions=3, replication_factor=1) admin.create_topics([topic]) logger.info(f"创建Topic: {self.topic}") admin.close() except Exception as e: logger.warning(f"检查/创建Topic失败: {e}") @property def producer(self) -> KafkaProducer: """获取生产者实例""" if self._producer is None: self._producer = KafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None, acks='all', retries=3 ) return self._producer def get_consumer(self, auto_offset_reset: str = 'earliest') -> KafkaConsumer: """获取消费者实例""" return KafkaConsumer( self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group, auto_offset_reset=auto_offset_reset, enable_auto_commit=True, value_deserializer=lambda m: json.loads(m.decode('utf-8')), consumer_timeout_ms=5000 ) def produce(self, job_data: JobData) -> bool: """发送消息到Kafka""" try: future = self.producer.send(self.topic, key=job_data.id, value=job_data.model_dump()) future.get(timeout=10) return True except KafkaError as e: logger.error(f"发送消息失败: {e}") return False def produce_batch(self, job_list: List[JobData]) -> int: """批量发送消息""" success_count = 0 for job in job_list: if self.produce(job): success_count += 1 self.producer.flush() return success_count def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]: """消费消息""" messages = [] consumer = KafkaConsumer( self.topic, bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group, auto_offset_reset='earliest', enable_auto_commit=True, value_deserializer=lambda m: json.loads(m.decode('utf-8')), consumer_timeout_ms=timeout_ms, max_poll_records=batch_size ) try: for message in consumer: messages.append(message.value) if len(messages) >= batch_size: break except Exception as e: logger.debug(f"消费超时或完成: {e}") finally: consumer.close() return messages def get_lag(self) -> int: """获取消息堆积量""" try: consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group) partitions = consumer.partitions_for_topic(self.topic) if not partitions: consumer.close() return 0 from kafka import TopicPartition tps = [TopicPartition(self.topic, p) for p in partitions] end_offsets = consumer.end_offsets(tps) total_lag = 0 for tp in tps: committed = consumer.committed(tp) end = end_offsets.get(tp, 0) total_lag += max(0, end - (committed or 0)) consumer.close() return total_lag except Exception as e: logger.warning(f"获取lag失败: {e}") return 0 def close(self): """关闭连接""" if self._producer: self._producer.close() self._producer = None kafka_service = KafkaService()