140 lines
4.8 KiB
Python
140 lines
4.8 KiB
Python
"""Kafka服务"""
|
|
import json
|
|
import logging
|
|
from typing import List, Optional
|
|
from kafka import KafkaProducer, KafkaConsumer
|
|
from kafka.errors import KafkaError
|
|
from kafka.admin import KafkaAdminClient, NewTopic
|
|
from app.models import JobData
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KafkaService:
|
|
"""Kafka生产者/消费者服务"""
|
|
|
|
def __init__(self):
|
|
self.bootstrap_servers = settings.kafka.bootstrap_servers
|
|
self.topic = settings.kafka.topic
|
|
self.consumer_group = settings.kafka.consumer_group
|
|
self._producer: Optional[KafkaProducer] = None
|
|
self._ensure_topic()
|
|
|
|
def _ensure_topic(self):
|
|
"""确保Topic存在"""
|
|
try:
|
|
admin = KafkaAdminClient(
|
|
bootstrap_servers=self.bootstrap_servers,
|
|
client_id="job_crawler_admin"
|
|
)
|
|
existing_topics = admin.list_topics()
|
|
|
|
if self.topic not in existing_topics:
|
|
topic = NewTopic(name=self.topic, num_partitions=3, replication_factor=1)
|
|
admin.create_topics([topic])
|
|
logger.info(f"创建Topic: {self.topic}")
|
|
admin.close()
|
|
except Exception as e:
|
|
logger.warning(f"检查/创建Topic失败: {e}")
|
|
|
|
@property
|
|
def producer(self) -> KafkaProducer:
|
|
"""获取生产者实例"""
|
|
if self._producer is None:
|
|
self._producer = KafkaProducer(
|
|
bootstrap_servers=self.bootstrap_servers,
|
|
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
|
|
key_serializer=lambda k: k.encode('utf-8') if k else None,
|
|
acks='all',
|
|
retries=3
|
|
)
|
|
return self._producer
|
|
|
|
def get_consumer(self, auto_offset_reset: str = 'earliest') -> KafkaConsumer:
|
|
"""获取消费者实例"""
|
|
return KafkaConsumer(
|
|
self.topic,
|
|
bootstrap_servers=self.bootstrap_servers,
|
|
group_id=self.consumer_group,
|
|
auto_offset_reset=auto_offset_reset,
|
|
enable_auto_commit=True,
|
|
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
|
consumer_timeout_ms=5000
|
|
)
|
|
|
|
def produce(self, job_data: JobData) -> bool:
|
|
"""发送消息到Kafka"""
|
|
try:
|
|
data = job_data.to_dict()
|
|
future = self.producer.send(self.topic, key=data.get("_id"), value=data)
|
|
future.get(timeout=10)
|
|
return True
|
|
except KafkaError as e:
|
|
logger.error(f"发送消息失败: {e}")
|
|
return False
|
|
|
|
def produce_batch(self, job_list: List[JobData]) -> int:
|
|
"""批量发送消息"""
|
|
success_count = 0
|
|
for job in job_list:
|
|
if self.produce(job):
|
|
success_count += 1
|
|
self.producer.flush()
|
|
return success_count
|
|
|
|
def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]:
|
|
"""消费消息"""
|
|
messages = []
|
|
consumer = KafkaConsumer(
|
|
self.topic,
|
|
bootstrap_servers=self.bootstrap_servers,
|
|
group_id=self.consumer_group,
|
|
auto_offset_reset='earliest',
|
|
enable_auto_commit=True,
|
|
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
|
consumer_timeout_ms=timeout_ms,
|
|
max_poll_records=batch_size
|
|
)
|
|
try:
|
|
for message in consumer:
|
|
messages.append(message.value)
|
|
if len(messages) >= batch_size:
|
|
break
|
|
except Exception as e:
|
|
logger.debug(f"消费超时或完成: {e}")
|
|
finally:
|
|
consumer.close()
|
|
return messages
|
|
|
|
def get_lag(self) -> int:
|
|
"""获取消息堆积量"""
|
|
try:
|
|
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group)
|
|
partitions = consumer.partitions_for_topic(self.topic)
|
|
if not partitions:
|
|
consumer.close()
|
|
return 0
|
|
from kafka import TopicPartition
|
|
tps = [TopicPartition(self.topic, p) for p in partitions]
|
|
end_offsets = consumer.end_offsets(tps)
|
|
total_lag = 0
|
|
for tp in tps:
|
|
committed = consumer.committed(tp)
|
|
end = end_offsets.get(tp, 0)
|
|
total_lag += max(0, end - (committed or 0))
|
|
consumer.close()
|
|
return total_lag
|
|
except Exception as e:
|
|
logger.warning(f"获取lag失败: {e}")
|
|
return 0
|
|
|
|
def close(self):
|
|
"""关闭连接"""
|
|
if self._producer:
|
|
self._producer.close()
|
|
self._producer = None
|
|
|
|
|
|
kafka_service = KafkaService()
|