基于Node.js,采用Kafka-node模块实现生产者与消费者

发布于:2023-03-28 ⋅ 阅读:(260) ⋅ 点赞:(0)

生产者代码

/**
 * 生产者
 */

const kafka = require('kafka-node');

let conn = {'kafkaHost':'127.0.0.1:9092'};

var MQ = function (){
    this.mq_producers = {};
    this.client = {};
}

MQ.prototype.AddProducer = function (conn, handler){
    console.log('增加生产者',conn, this);
    this.client = new kafka.KafkaClient(conn);
    let producer = new kafka.Producer(this.client);

    producer.on('ready', function(){
        if(!!handler){
            handler(producer);
        }
    });

    producer.on('error', function(err){
        console.error('producer error ',err.stack);
    });

    this.mq_producers['common'] = producer;
    return producer;
}
console.log(MQ);
var mq = new MQ();

mq.AddProducer(conn, function (producer){
    producer.createTopics(['broadcast'], function (){
        setInterval(function(){

            var _msg = {
                topic:['broadcast'], 
                messages:[JSON.stringify({"cmd":"testRpc","value":"Hello World"})],
                partition:0
            }


            //console.log('clientId : ',mq.client.clientId);
            //console.log('topicMetadata ',mq.client.topicMetadata);
            //console.log('brokerMetadata ',mq.client.brokerMetadata);
            //console.log('clusterMetadata ',mq.client.clusterMetadata);
            //console.log('brokerMetadataLastUpdate ',mq.client.brokerMetadataLastUpdate);

            mq.mq_producers['common'].send([_msg], function (err, data){
                console.log("..... ",data);
            })
        }, 2000);
    })
});

消费者代码

/**
 * 消费者
 */

const kafka = require('kafka-node');

let conn = {'kafkaHost':'127.0.0.1:9092'};
let consumers = [
    {
        'type': 'consumer',
        'options': {'autoCommit': true},
        'name':'common',
        'topic':[
            {'topic': 'broadcast', 'partition': 0}
        ]
    }
];

let MQ = function(){
    this.client = {};
    this.mq_consumers = {};
}

MQ.prototype.AddConsumer = function (conn, topics, options, handler){
    this.client = new kafka.KafkaClient(conn);
    let consumer = new kafka.Consumer(this.client, topics, options);

    if(!!handler){
        consumer.on('message', handler);
    }

    consumer.on('error', function(err){
        console.error('consumer error ',err.stack);
    });
    this.mq_consumers['common'] = consumer;
}

var mq = new MQ();


mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
    //console.log('clientId : ',mq.client.clientId);
    //console.log('topicMetadata ',mq.client.topicMetadata);
    //console.log('correlationId ',mq.client.correlationId);
    //console.log('brokerMetadata ',mq.client.brokerMetadata);
    //console.log('clusterMetadata ',mq.client.clusterMetadata);
    let _consumer = mq.mq_consumers['common'];

    //console.log("----------consumer");
    //console.log('topicMetadata ',_consumer.client.topicMetadata);
    //console.log('brokerMetadata ',_consumer.client.brokerMetadata);
    //console.log('clusterMetadata ',_consumer.client.clusterMetadata);
    //console.log(_consumer.payloads);
    console.log(message.value);
});