Python自定义消费Kafka至HDFS

发布于:2025-04-03 ⋅ 阅读:(28) ⋅ 点赞:(0)

📌Python自定义消费Kafka至HDFS

当Flume消费Kafka出现问题间隔很长时间才发现,此时需要将历史未消费的数据,通过Python脚本重新消费并写入到指定位置,之后在数仓建表等操作,具体代码如下【Kafka --> Python脚本 --> HDFS】


from kafka import KafkaConsumer
import json
from datetime import datetime
import time
from hdfs import InsecureClient

##### 自定义消费KAFKA 数据 #####

def consume_kafka_topic(bootstrap_servers, topic_name):
    """
    消费指定Kafka Topic并处理消息(TXT格式)
    参数:
        bootstrap_servers (str): Kafka集群地址
        topic_name (str): 要消费的Topic名称
    """
    # 创建HDFS客户端(需替换实际Hadoop地址和用户名)
    hdfs_client = InsecureClient('http://xxxxxxxxx:9870/', user='xxx')

    # 定义时间范围(毫秒级时间戳)
    start_time = int(datetime(2025, 3, 1).timestamp() * 1000)
    end_time = int(datetime(2025, 3, 31).timestamp() * 1000)

    # 创建Kafka消费者
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='test-group4',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    buffer = []
    count = 0
    file_num = 1  # 文件序号

    print(f"开始消费Topic: {topic_name},时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")

    try:
        for message in consumer:
            # 提取消息时间(假设消息中的time字段已经是毫秒时间戳)
            msg_time = message.value.get('time', 0)
            # print(f"msg_time:{msg_time}")
            # print(f"msg_time:{msg_time}")

            # 时间过滤(注意:原始代码中的+28800可能需要根据实际情况调整时区)
            if start_time <= msg_time < end_time:
                # 将整个JSON对象转为字符串
                json_str = json.dumps(message.value, ensure_ascii=False) + "\n"  # 添加换行符
                buffer.append(json_str)
                count += 1

                # 达到30万条时写入文件
                if count >= 300000:
                    filename = f"kafka_data_{file_num}.txt"
                    hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"

                    # 写入HDFS
                    hdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)
                    print(f"已写入文件: hdfs://{hdfs_path} | 消息数: {count}")

                    # 重置计数器和缓冲区
                    buffer = []
                    count = 0
                    file_num += 1

    except KeyboardInterrupt:
        print("\n用户中断消费,正在保存剩余数据...")
    finally:
        # 保存剩余消息
        if count > 0:
            filename = f"kafka_data_{file_num}.txt"
            hdfs_path = f"/tmp/mqtt_drive_data1/{filename}"

            # 写入HDFS
            hdfs_client.write(hdfs_path, "".join(buffer).encode('utf-8-sig'), overwrite=True)
            print(f"已写入剩余文件: hdfs://{hdfs_path} | 消息数: {count}")

        # 清理资源
        consumer.close()
        hdfs_client.close()
        print("消费任务完成")

if __name__ == "__main__":
    # 配置参数
    KAFKA_SERVERS = "xxxxxxxx:9092,xxxxxxx:9092,xxxxxxx:9092"
    TARGET_TOPIC = "mqtt_drive"

    # 执行消费函数
    consume_kafka_topic(KAFKA_SERVERS, TARGET_TOPIC)