160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
"""Kafka服务"""
|
||
import json
|
||
import logging
|
||
from typing import List, Optional
|
||
from kafka import KafkaProducer, KafkaConsumer
|
||
from kafka.errors import KafkaError
|
||
from kafka.admin import KafkaAdminClient, NewTopic
|
||
from app.models import JobData
|
||
from app.core.config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class KafkaService:
|
||
"""Kafka生产者/消费者服务"""
|
||
|
||
def __init__(self):
|
||
self.bootstrap_servers = settings.kafka.bootstrap_servers
|
||
self.topic = settings.kafka.topic
|
||
self.consumer_group = settings.kafka.consumer_group
|
||
self._producer: Optional[KafkaProducer] = None
|
||
self._ensure_topic()
|
||
|
||
def _ensure_topic(self):
|
||
"""确保Topic存在"""
|
||
try:
|
||
admin = KafkaAdminClient(
|
||
bootstrap_servers=self.bootstrap_servers,
|
||
client_id="job_crawler_admin"
|
||
)
|
||
existing_topics = admin.list_topics()
|
||
|
||
if self.topic not in existing_topics:
|
||
topic = NewTopic(name=self.topic, num_partitions=3, replication_factor=1)
|
||
admin.create_topics([topic])
|
||
logger.info(f"创建Topic: {self.topic}")
|
||
admin.close()
|
||
except Exception as e:
|
||
logger.warning(f"检查/创建Topic失败: {e}")
|
||
|
||
@property
|
||
def producer(self) -> KafkaProducer:
|
||
"""获取生产者实例"""
|
||
if self._producer is None:
|
||
self._producer = KafkaProducer(
|
||
bootstrap_servers=self.bootstrap_servers,
|
||
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
|
||
key_serializer=lambda k: k.encode('utf-8') if k else None,
|
||
acks='all',
|
||
retries=3
|
||
)
|
||
return self._producer
|
||
|
||
def get_consumer(self, auto_offset_reset: str = 'earliest') -> KafkaConsumer:
|
||
"""获取消费者实例"""
|
||
return KafkaConsumer(
|
||
self.topic,
|
||
bootstrap_servers=self.bootstrap_servers,
|
||
group_id=self.consumer_group,
|
||
auto_offset_reset=auto_offset_reset,
|
||
enable_auto_commit=True,
|
||
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
||
consumer_timeout_ms=5000
|
||
)
|
||
|
||
def produce(self, job_data: JobData) -> bool:
|
||
"""发送消息到Kafka"""
|
||
try:
|
||
future = self.producer.send(self.topic, key=job_data.id, value=job_data.model_dump())
|
||
future.get(timeout=10)
|
||
return True
|
||
except KafkaError as e:
|
||
logger.error(f"发送消息失败: {e}")
|
||
return False
|
||
|
||
def produce_batch(self, job_list: List[JobData]) -> int:
|
||
"""批量发送消息"""
|
||
success_count = 0
|
||
for job in job_list:
|
||
if self.produce(job):
|
||
success_count += 1
|
||
self.producer.flush()
|
||
return success_count
|
||
|
||
def produce_raw(self, raw_data: dict) -> bool:
|
||
"""发送原始数据到Kafka"""
|
||
try:
|
||
# 使用原始数据中的某个字段作为key,或生成一个
|
||
key = raw_data.get("ACE760", "") or raw_data.get("aca112", "")
|
||
future = self.producer.send(self.topic, key=key, value=raw_data)
|
||
future.get(timeout=10)
|
||
return True
|
||
except KafkaError as e:
|
||
logger.error(f"发送原始消息失败: {e}")
|
||
return False
|
||
|
||
def produce_batch_raw(self, raw_data_list: List[dict]) -> int:
|
||
"""批量发送原始数据"""
|
||
success_count = 0
|
||
for raw_data in raw_data_list:
|
||
if self.produce_raw(raw_data):
|
||
success_count += 1
|
||
self.producer.flush()
|
||
return success_count
|
||
|
||
def consume(self, batch_size: int = 10, timeout_ms: int = 5000) -> List[dict]:
|
||
"""消费消息"""
|
||
messages = []
|
||
consumer = KafkaConsumer(
|
||
self.topic,
|
||
bootstrap_servers=self.bootstrap_servers,
|
||
group_id=self.consumer_group,
|
||
auto_offset_reset='earliest',
|
||
enable_auto_commit=True,
|
||
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
|
||
consumer_timeout_ms=timeout_ms,
|
||
max_poll_records=batch_size
|
||
)
|
||
try:
|
||
for message in consumer:
|
||
messages.append(message.value)
|
||
if len(messages) >= batch_size:
|
||
break
|
||
except Exception as e:
|
||
logger.debug(f"消费超时或完成: {e}")
|
||
finally:
|
||
consumer.close()
|
||
return messages
|
||
|
||
def get_lag(self) -> int:
|
||
"""获取消息堆积量"""
|
||
try:
|
||
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, group_id=self.consumer_group)
|
||
partitions = consumer.partitions_for_topic(self.topic)
|
||
if not partitions:
|
||
consumer.close()
|
||
return 0
|
||
from kafka import TopicPartition
|
||
tps = [TopicPartition(self.topic, p) for p in partitions]
|
||
end_offsets = consumer.end_offsets(tps)
|
||
total_lag = 0
|
||
for tp in tps:
|
||
committed = consumer.committed(tp)
|
||
end = end_offsets.get(tp, 0)
|
||
total_lag += max(0, end - (committed or 0))
|
||
consumer.close()
|
||
return total_lag
|
||
except Exception as e:
|
||
logger.warning(f"获取lag失败: {e}")
|
||
return 0
|
||
|
||
def close(self):
|
||
"""关闭连接"""
|
||
if self._producer:
|
||
self._producer.close()
|
||
self._producer = None
|
||
|
||
|
||
kafka_service = KafkaService()
|