在Nodejs中使用kafka(四)消息批量发送,事务

发布于:2025-02-19 ⋅ 阅读:(22) ⋅ 点赞:(0)

消息批量发送

生产者使用sendBatch同时发送多个主题消息,给消费者组group1和group2,消费者使用eachBatch接搜消息。

producer.ts

import { CompressionTypes, Kafka } from 'kafkajs';


async function run() {
  const kafka = new Kafka({
    clientId: 'test5',
    brokers: ['localhost:9092'],
    connectionTimeout: 1000, // 1 秒连接超时
  });

  const producer = kafka.producer();

  await producer.connect();

  for (let i = 1; i <= 10; ++i) {
    let name = Math.random().toString().slice(2, -1);
    let age = Math.ceil(Math.random() * 40);
    let sex = Math.random() * 100 > 50 ? 1 : 0;
    let person = {
      id: i,
      name,
      age,
      sex,
      desc: `id:${i},姓名:${name},年龄:${age},性别:${sex}`,
    };

    await producer.sendBatch({
      // acks = 0:不等待任何副本确认,性能最好,但数据丢失风险最大。
      // acks = 1:只等待 leader 副本确认,性能较好,但在 leader 宕机时仍有丢失风险。
      // acks = -1 或 acks = all:等待所有副本确认,保证最高数据可靠性,但牺牲吞吐量和增加延迟。
      acks: -1,
      topicMessages: [1, 2, 3].map((item) => {
        return {
          topic: `topic${item}`,
          messages: [1, 2, 3].map(item => {
            return {
              value: `${person.id}.${item}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`,
              headers: { id: String(item) },
            };
          }),
        };
      }),
      compression: CompressionTypes.GZIP,
    });

    console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`);
  }

  await producer.disconnect();
}

run();

consumer.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test3',
  brokers: ['localhost:9092'],
  connectionTimeout: 100, // 0.1 秒连接超时
  requestTimeout: 1000,
});

const consumer = kafka.consumer({
  groupId: 'group1',
  rackId: 'test2.group1.consumer1',
  maxBytes: 3 * 1024 * 1024,  // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
  sessionTimeout: 60000,      // 消费者心跳超时时间(默认 30s)
  rebalanceTimeout: 60000,   // rebalance 最大等待时间(默认 60s)
  heartbeatInterval: 6000,    // 心跳间隔(默认 3s)
  maxWaitTimeInMs: 500,       // 每次拉取消息等待的时间(默认0.5s)
  allowAutoTopicCreation: true, // topic不存在自动创建
});

await consumer.connect();

await consumer.subscribe({ topic: 'topic1', fromBeginning: true });
await consumer.subscribe({ topic: 'topic2', fromBeginning: true });
await consumer.subscribe({ topic: 'topic3', fromBeginning: true });

await consumer.run({
  autoCommit: false,
  eachBatch: async ({ batch, resolveOffset }) => {
    batch.messages.forEach(msg => {
      console.log('batch:', batch.topic, msg.headers?.id?.toString?.(), msg.value?.toString());
      resolveOffset(batch.lastOffset());
    });
  },
});

consumer2.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test4',
  brokers: ['localhost:9092'],
  connectionTimeout: 100, // 0.1 秒连接超时
  requestTimeout: 1000,
});

const consumer = kafka.consumer({
  groupId: 'group2',
  rackId: 'test2.group2.consumer2',
  maxBytes: 3 * 1024 * 1024,  // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
  sessionTimeout: 60000,      // 消费者心跳超时时间(默认 30s)
  rebalanceTimeout: 60000,   // rebalance 最大等待时间(默认 60s)
  heartbeatInterval: 6000,    // 心跳间隔(默认 3s)
  maxWaitTimeInMs: 500,       // 每次拉取消息等待的时间(默认0.5s)
  allowAutoTopicCreation: true, // topic不存在自动创建
});

await consumer.connect();

await consumer.subscribe({ topic: 'topic1', fromBeginning: true });
await consumer.subscribe({ topic: 'topic2', fromBeginning: true });
await consumer.subscribe({ topic: 'topic3', fromBeginning: true });

await consumer.run({
  autoCommit: false,  // 禁用自动提交
  partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数
  eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
    batch.messages.forEach(msg => {
      console.log('batch:', msg.value?.toString());
    });
    resolveOffset(batch.lastOffset());
    // await heartbeat();
  },
});


 事务

生产者启动事务发送消息给两个不同topic的消费者,当本次事务数据全部发送成功并确认后会向下偏移,如果有消息处理不成功会回滚整个事务。 

 producer.ts

import { CompressionTypes, Kafka } from 'kafkajs';


async function run() {
  const kafka = new Kafka({
    clientId: 'test6',
    brokers: ['localhost:9092'],
    connectionTimeout: 1000, // 1 秒连接超时
  });

  const producer = kafka.producer({
    transactionalId: 't1',
    maxInFlightRequests: 1, // 仅一个请求在等待响应时发送
    idempotent: true,       // 启用幂等性,确保消息不会重复,精确一次语义
  });

  await producer.connect();

  for (let i = 1; i <= 10; ++i) {
    let name = Math.random().toString().slice(2, -1);
    let age = Math.ceil(Math.random() * 40);
    let sex = Math.random() * 100 > 50 ? 1 : 0;
    let person = {
      id: i,
      name,
      age,
      sex,
      desc: `id:${i},姓名:${name},年龄:${age},性别:${sex}`,
    };

    const transaction = await producer.transaction()

    try {
      await transaction.send({
        topic: 'topic1',
        compression: CompressionTypes.GZIP,
        messages: [
          {
            value: `transcation,${person.id}.${i}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`,
            headers: { id: String(i) },
          }
        ],
      });

      await transaction.commit();
    } catch (e) {
      await transaction.abort();
    }

    console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`);
  }

  await producer.disconnect();
}

run();

consumer.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test7',
  brokers: ['localhost:9092'],
  connectionTimeout: 100, // 0.1 秒连接超时
  requestTimeout: 1000,
});

const consumer = kafka.consumer({
  groupId: 'group1',
  rackId: 'test7.group1.consumer1',
  maxBytes: 3 * 1024 * 1024,  // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
  sessionTimeout: 60000,      // 消费者心跳超时时间(默认 30s)
  rebalanceTimeout: 60000,   // rebalance 最大等待时间(默认 60s)
  heartbeatInterval: 6000,    // 心跳间隔(默认 3s)
  maxWaitTimeInMs: 500,       // 每次拉取消息等待的时间(默认0.5s)
  allowAutoTopicCreation: true, // topic不存在自动创建
});

await consumer.connect();

await consumer.subscribe({ topic: 'topic1', fromBeginning: true });

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ message, topic, partition }) => {
    console.log(topic, partition, message.value?.toString(), message.headers?.id?.toString(), message.offset);
    consumer.commitOffsets([{ topic, partition, offset: String(Number(message.offset) + 1) }]);
  },
});

consumer2.ts

import { Kafka } from 'kafkajs'


const kafka = new Kafka({
  clientId: 'test8',
  brokers: ['localhost:9092'],
  connectionTimeout: 100, // 0.1 秒连接超时
  requestTimeout: 1000,
});

const consumer = kafka.consumer({
  groupId: 'group2',
  rackId: 'test8.group2.consumer2',
  maxBytes: 3 * 1024 * 1024,  // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
  sessionTimeout: 60000,      // 消费者心跳超时时间(默认 30s)
  rebalanceTimeout: 60000,   // rebalance 最大等待时间(默认 60s)
  heartbeatInterval: 6000,    // 心跳间隔(默认 3s)
  maxWaitTimeInMs: 500,       // 每次拉取消息等待的时间(默认0.5s)
  allowAutoTopicCreation: true, // topic不存在自动创建
});

await consumer.connect();

await consumer.subscribe({ topic: 'topic1', fromBeginning: true });

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ message, topic, partition }) => {
    console.log(topic, partition, message.value?.toString(), message.headers?.id?.toString(), message.offset);
    consumer.commitOffsets([{ topic, partition, offset: String(Number(message.offset) + 1) }]);
  },
});

 

 

 


网站公告

今日签到

点亮在社区的每一天
去签到