Centos 7下使用C++使用Rdkafka库实现生产者消费者

发布于:2025-07-11 ⋅ 阅读:(15) ⋅ 点赞:(0)

1. 了解 Kafka

Apache Kafka 是一个分布式流处理平台,核心功能包括:

  • 发布/订阅消息系统:解耦生产者和消费者

  • 分布式存储:持久化、容错的消息存储

  • 流处理:实时处理数据流

核心概念

概念 说明
Broker Kafka 集群中的单个服务器节点
Topic 消息的逻辑分类(如:user_activity
Partition Topic 的分区(并行处理单位),消息按顺序存储
Producer 向 Topic 发布消息的客户端
Consumer 订阅 Topic 并处理消息的客户端
Consumer Group 多个消费者协同消费同一 Topic(每个分区只被组内一个消费者消费)
Offset 消息在分区中的唯一位置标识

2. 了解 rdkafka

rdkafka 是 Kafka 的 C/C++ 客户端库,提供:

  • 高性能生产/消费 API(支持 C/C++/Python 等)

  • 特性:

    • 异步/同步发送模式

    • 自动负载均衡

    • 消息压缩(gzip, snappy, lz4)

    • SASL 认证

    • 精确一次语义(EOS)

  • 开源地址:edenhill/librdkafka

3. 代码实现

以下是使用 librdkafka 的 C++ 接口操作 Kafka 的生产者和消费者完整实现:

生产者代码 (producer.cpp)
#include <iostream>
#include <string>
#include <sstream>
#include <librdkafka/rdkafkacpp.h>

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {
        if (message.err()) {
            std::cerr << "消息发送失败: " << message.errstr() << std::endl;
        } else {
            std::cout << "消息发送成功: " << message.topic_name() 
                      << " [" << message.partition() << "] @ " 
                      << message.offset() << std::endl;
        }
    }
};

int main() {
    // 1. 创建配置对象
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    std::string errstr;

    // 2. 设置配置参数
    if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置错误: " << errstr << std::endl;
        return 1;
    }
    
    // 设置消息确认模式 (all = 最高可靠性)
    if (conf->set("acks", "all", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置错误: " << errstr << std::endl;
        return 1;
    }

    // 3. 创建生产者实例
    ProducerDeliveryReportCb delivery_cb;
    if (conf->set("dr_cb", &delivery_cb, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置回调错误: " << errstr << std::endl;
        return 1;
    }

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "创建生产者失败: " << errstr << std::endl;
        return 1;
    }
    delete conf;

    // 4. 创建Topic对象
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    RdKafka::Topic *topic = RdKafka::Topic::create(producer,"cpp_test_topic",tconf,errstr);
    if (!topic) {
        std::cerr << "创建Topic失败: " << errstr << std::endl;
        delete tconf;
        return 1;
    }
    delete tconf;

    // 5. 生产消息
    for (int i = 0; i < 10; ++i) {
        std::string key = "key-" + std::to_string(i);
        std::string payload = "Message #" + std::to_string(i);
        
        // 发送消息
        RdKafka::ErrorCode resp = producer->produce(
            topic, 
            RdKafka::Topic::PARTITION_UA, // 自动分区分配
            RdKafka::Producer::RK_MSG_COPY,
            const_cast<char*>(payload.c_str()), payload.size(),
            const_cast<char*>(key.c_str()), key.size(),
            NULL
        );

        if (resp != RdKafka::ERR_NO_ERROR) {
            std::cerr << "生产消息失败: " << RdKafka::err2str(resp) << std::endl;
        } else {
            std::cout << "已发送: " << payload << std::endl;
        }
        
        // 处理事件队列
        producer->poll(0);
    }

    // 6. 等待所有消息完成发送
    while (producer->outq_len() > 0) {
        std::cout << "等待发送队列: " << producer->outq_len() << std::endl;
        producer->poll(100);
    }

    // 7. 清理资源
    delete topic;
    delete producer;

    return 0;
}
消费者代码 (consumer.cpp)
#include <iostream>
#include <string>
#include <csignal>
#include <vector>
#include <librdkafka/rdkafkacpp.h>

bool running = true;

void stop(int sig) {
    running = false;
}

class ConsumerEventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                std::cerr << "错误: " << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::Event::EVENT_LOG:
                std::cout << "日志: " << event.str() << std::endl;
                break;
            default:
                std::cout << "事件: " << event.type() << ": " << event.str() << std::endl;
                break;
        }
    }
};

int main() {
    // 注册信号处理
    signal(SIGINT, stop);
    signal(SIGTERM, stop);

    // 1. 创建配置对象
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    std::string errstr;

    // 2. 设置配置参数
    if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置错误: " << errstr << std::endl;
        return 1;
    }

    // 设置消费组
    if (conf->set("group.id", "cpp_consumer_group", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置错误: " << errstr << std::endl;
        return 1;
    }

    // 从最早的消息开始消费
    if (conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "配置错误: " << errstr << std::endl;
        return 1;
    }

    // 3. 设置事件回调
    ConsumerEventCb event_cb;
    if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "设置回调失败: " << errstr << std::endl;
        return 1;
    }

    // 4. 创建消费者实例
    RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    if (!consumer) {
        std::cerr << "创建消费者失败: " << errstr << std::endl;
        return 1;
    }
    delete conf;

    // 5. 订阅Topic
    std::vector<std::string> topics;
    topics.push_back("cpp_test_topic");
    RdKafka::ErrorCode resp = consumer->subscribe(topics);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "订阅失败: " << RdKafka::err2str(resp) << std::endl;
        return 1;
    }

    // 6. 消费消息
    while (running) {
        // 等待消息 (1000ms超时)
        RdKafka::Message *msg = consumer->consume(1000);
        
        switch (msg->err()) {
            case RdKafka::ERR__TIMED_OUT:
                break;  // 超时继续
                
            case RdKafka::ERR_NO_ERROR:
                // 成功消费到消息
                std::cout << "收到消息: "
                          << "主题: " << msg->topic_name() 
                          << " | 分区: [" << msg->partition() << "]"
                          << " | 偏移量: " << msg->offset() << std::endl;
                          
                if (msg->key()) {
                    std::cout << "键: " << *msg->key() << " => ";
                }
                
                std::cout << "值: " << static_cast<const char*>(msg->payload()) 
                          << std::endl;
                break;
                
            default:
                std::cerr << "消费错误: " << msg->errstr() << std::endl;
                break;
        }
        
        // 手动提交偏移量
        consumer->commitAsync(msg);
        delete msg;
    }

    // 7. 关闭消费者
    consumer->close();
    delete consumer;

    return 0;
}
编译运行

# 编译生产者
g++ -o producer producer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl

# 编译消费者
g++ -o consumer consumer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl


网站公告

今日签到

点亮在社区的每一天
去签到