目录
Python实例题
题目
基于 Apache Kafka 的实时数据流处理平台
问题描述
开发一个基于 Apache Kafka 的实时数据流处理平台,包含以下功能:
- 数据生产者:从多个数据源收集数据
- Kafka 集群:分布式消息队列存储数据流
- 流处理引擎:实时处理和转换数据流
- 数据消费者:将处理后的数据写入目标系统
- 监控与管理:监控 Kafka 集群和数据流处理状态
解题思路
- 搭建 Kafka 集群实现高可用消息队列
- 开发数据生产者从不同数据源收集数据
- 使用 Kafka Streams 或 Apache Flink 实现流处理
- 设计数据消费者将处理结果写入目标系统
- 集成监控工具监控集群和流处理状态
关键代码框架
# 数据生产者示例 - 从API获取数据并发送到Kafka
import requests
import json
from kafka import KafkaProducer
import time
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'api-data-stream'
# 创建Kafka生产者
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8')
)
# API配置
API_URL = 'https://api.example.com/data'
API_KEY = 'your_api_key'
def fetch_data_from_api():
"""从API获取数据"""
headers = {'Authorization': f'Bearer {API_KEY}'}
try:
response = requests.get(API_URL, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API请求失败: {e}")
return None
def send_to_kafka(data):
"""将数据发送到Kafka"""
if not data:
return
for item in data:
# 使用数据中的唯一ID作为键
key = item.get('id', None)
try:
# 发送消息到Kafka
future = producer.send(KAFKA_TOPIC, value=item, key=key)
# 等待确认(可选)
record_metadata = future.get(timeout=10)
logger.info(f"消息发送成功 - 主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")
except Exception as e:
logger.error(f"消息发送失败: {e}")
def run_producer():
"""运行生产者循环"""
logger.info("启动数据生产者...")
try:
while True:
# 从API获取数据
data = fetch_data_from_api()
# 发送数据到Kafka
send_to_kafka(data)
# 等待一段时间再获取下一批数据
time.sleep(60) # 每分钟获取一次数据
except KeyboardInterrupt:
logger.info("生产者被用户中断")
finally:
# 关闭生产者连接
producer.close()
logger.info("生产者已关闭")
if __name__ == "__main__":
run_producer()
# 流处理示例 - 使用Kafka Streams处理实时数据
from kafka import KafkaConsumer, KafkaProducer
from kafka.streams import KafkaStreams, Processor, Stream
import json
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
INPUT_TOPIC = 'api-data-stream'
OUTPUT_TOPIC = 'processed-data-stream'
# 定义处理器
class DataProcessor(Processor):
def process(self, key, value):
try:
# 解析JSON数据
data = json.loads(value)
# 数据转换示例 - 添加处理时间戳
data['processed_at'] = str(int(time.time()))
# 数据过滤示例 - 只处理特定类型的数据
if data.get('type') == 'important':
# 将处理后的数据发送到输出主题
self.context.forward(key, json.dumps(data).encode('utf-8'))
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}")
except Exception as e:
logger.error(f"处理数据时出错: {e}")
# 定义流处理拓扑
def create_stream():
stream_builder = Stream()
# 从输入主题读取数据
stream = stream_builder.stream(INPUT_TOPIC)
# 应用处理器
stream.process(DataProcessor)
# 将结果写入输出主题
stream.to(OUTPUT_TOPIC)
return stream_builder
# 运行流处理应用
def run_stream_processor():
logger.info("启动流处理应用...")
# 创建Kafka配置
config = {
'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
'application.id': 'data-processing-app',
'group.id': 'data-processing-group',
'auto.offset.reset': 'earliest'
}
# 创建并启动流处理应用
stream_builder = create_stream()
kafka_streams = KafkaStreams(stream_builder, config)
try:
kafka_streams.start()
logger.info("流处理应用已启动")
# 保持应用运行
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("流处理应用被用户中断")
finally:
# 关闭流处理应用
kafka_streams.close()
logger.info("流处理应用已关闭")
if __name__ == "__main__":
run_stream_processor()
# 数据消费者示例 - 将处理后的数据写入数据库
import json
from kafka import KafkaConsumer
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import datetime
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'processed-data-stream'
# 配置数据库
DB_URL = 'postgresql://user:password@localhost:5432/stream_data'
engine = create_engine(DB_URL)
Base = declarative_base()
Session = sessionmaker(bind=engine)
# 定义数据模型
class ProcessedData(Base):
__tablename__ = 'processed_data'
id = Column(Integer, primary_key=True)
data_id = Column(String(50), index=True)
type = Column(String(50))
value = Column(String)
processed_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# 创建表(如果不存在)
Base.metadata.create_all(engine)
def consume_and_store():
"""消费Kafka消息并存储到数据库"""
logger.info("启动数据消费者...")
# 创建Kafka消费者
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
auto_offset_reset='earliest',
group_id='database-writer-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
try:
# 消费消息
for message in consumer:
try:
# 获取消息数据
data = message.value
# 创建数据库会话
session = Session()
try:
# 创建数据库记录
db_record = ProcessedData(
data_id=data.get('id'),
type=data.get('type'),
value=json.dumps(data),
processed_at=datetime.datetime.fromtimestamp(int(data.get('processed_at', 0)))
)
# 添加并提交
session.add(db_record)
session.commit()
logger.info(f"成功存储数据到数据库 - ID: {data.get('id')}")
except Exception as e:
logger.error(f"存储数据到数据库失败: {e}")
session.rollback()
finally:
session.close()
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}")
except Exception as e:
logger.error(f"处理消息时出错: {e}")
except KeyboardInterrupt:
logger.info("消费者被用户中断")
finally:
# 关闭消费者连接
consumer.close()
logger.info("消费者已关闭")
if __name__ == "__main__":
consume_and_store()
难点分析
- Kafka 集群配置:确保高可用性和数据持久性
- 消息序列化与反序列化:处理不同格式的数据
- 流处理语义:实现精确一次处理语义
- 数据一致性:跨多个服务保证数据一致性
- 监控与调优:监控 Kafka 集群性能并进行调优
扩展方向
- 添加更多数据源和目标系统支持
- 实现更复杂的流处理逻辑
- 添加数据分区和负载均衡策略
- 集成分布式追踪系统
- 实现自动扩容和故障恢复机制