消息批量发送
生产者使用sendBatch同时发送多个主题消息,给消费者组group1和group2,消费者使用eachBatch接搜消息。
producer.ts
import { CompressionTypes, Kafka } from 'kafkajs';
async function run() {
const kafka = new Kafka({
clientId: 'test5',
brokers: ['localhost:9092'],
connectionTimeout: 1000, // 1 秒连接超时
});
const producer = kafka.producer();
await producer.connect();
for (let i = 1; i <= 10; ++i) {
let name = Math.random().toString().slice(2, -1);
let age = Math.ceil(Math.random() * 40);
let sex = Math.random() * 100 > 50 ? 1 : 0;
let person = {
id: i,
name,
age,
sex,
desc: `id:${i},姓名:${name},年龄:${age},性别:${sex}`,
};
await producer.sendBatch({
// acks = 0:不等待任何副本确认,性能最好,但数据丢失风险最大。
// acks = 1:只等待 leader 副本确认,性能较好,但在 leader 宕机时仍有丢失风险。
// acks = -1 或 acks = all:等待所有副本确认,保证最高数据可靠性,但牺牲吞吐量和增加延迟。
acks: -1,
topicMessages: [1, 2, 3].map((item) => {
return {
topic: `topic${item}`,
messages: [1, 2, 3].map(item => {
return {
value: `${person.id}.${item}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`,
headers: { id: String(item) },
};
}),
};
}),
compression: CompressionTypes.GZIP,
});
console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`);
}
await producer.disconnect();
}
run();
consumer.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'test3',
brokers: ['localhost:9092'],
connectionTimeout: 100, // 0.1 秒连接超时
requestTimeout: 1000,
});
const consumer = kafka.consumer({
groupId: 'group1',
rackId: 'test2.group1.consumer1',
maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s)
rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s)
heartbeatInterval: 6000, // 心跳间隔(默认 3s)
maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s)
allowAutoTopicCreation: true, // topic不存在自动创建
});
await consumer.connect();
await consumer.subscribe({ topic: 'topic1', fromBeginning: true });
await consumer.subscribe({ topic: 'topic2', fromBeginning: true });
await consumer.subscribe({ topic: 'topic3', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachBatch: async ({ batch, resolveOffset }) => {
batch.messages.forEach(msg => {
console.log('batch:', batch.topic, msg.headers?.id?.toString?.(), msg.value?.toString());
resolveOffset(batch.lastOffset());
});
},
});
consumer2.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'test4',
brokers: ['localhost:9092'],
connectionTimeout: 100, // 0.1 秒连接超时
requestTimeout: 1000,
});
const consumer = kafka.consumer({
groupId: 'group2',
rackId: 'test2.group2.consumer2',
maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s)
rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s)
heartbeatInterval: 6000, // 心跳间隔(默认 3s)
maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s)
allowAutoTopicCreation: true, // topic不存在自动创建
});
await consumer.connect();
await consumer.subscribe({ topic: 'topic1', fromBeginning: true });
await consumer.subscribe({ topic: 'topic2', fromBeginning: true });
await consumer.subscribe({ topic: 'topic3', fromBeginning: true });
await consumer.run({
autoCommit: false, // 禁用自动提交
partitionsConsumedConcurrently: 10, // 控制每次最多消费的分区数
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
batch.messages.forEach(msg => {
console.log('batch:', msg.value?.toString());
});
resolveOffset(batch.lastOffset());
// await heartbeat();
},
});
事务
生产者启动事务发送消息给两个不同topic的消费者,当本次事务数据全部发送成功并确认后会向下偏移,如果有消息处理不成功会回滚整个事务。
producer.ts
import { CompressionTypes, Kafka } from 'kafkajs';
async function run() {
const kafka = new Kafka({
clientId: 'test6',
brokers: ['localhost:9092'],
connectionTimeout: 1000, // 1 秒连接超时
});
const producer = kafka.producer({
transactionalId: 't1',
maxInFlightRequests: 1, // 仅一个请求在等待响应时发送
idempotent: true, // 启用幂等性,确保消息不会重复,精确一次语义
});
await producer.connect();
for (let i = 1; i <= 10; ++i) {
let name = Math.random().toString().slice(2, -1);
let age = Math.ceil(Math.random() * 40);
let sex = Math.random() * 100 > 50 ? 1 : 0;
let person = {
id: i,
name,
age,
sex,
desc: `id:${i},姓名:${name},年龄:${age},性别:${sex}`,
};
const transaction = await producer.transaction()
try {
await transaction.send({
topic: 'topic1',
compression: CompressionTypes.GZIP,
messages: [
{
value: `transcation,${person.id}.${i}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`,
headers: { id: String(i) },
}
],
});
await transaction.commit();
} catch (e) {
await transaction.abort();
}
console.log(`${person.id}@_@${person.name}@_@${person.age}@_@${person.sex}@_@${person.desc}`);
}
await producer.disconnect();
}
run();
consumer.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'test7',
brokers: ['localhost:9092'],
connectionTimeout: 100, // 0.1 秒连接超时
requestTimeout: 1000,
});
const consumer = kafka.consumer({
groupId: 'group1',
rackId: 'test7.group1.consumer1',
maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s)
rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s)
heartbeatInterval: 6000, // 心跳间隔(默认 3s)
maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s)
allowAutoTopicCreation: true, // topic不存在自动创建
});
await consumer.connect();
await consumer.subscribe({ topic: 'topic1', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, topic, partition }) => {
console.log(topic, partition, message.value?.toString(), message.headers?.id?.toString(), message.offset);
consumer.commitOffsets([{ topic, partition, offset: String(Number(message.offset) + 1) }]);
},
});
consumer2.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'test8',
brokers: ['localhost:9092'],
connectionTimeout: 100, // 0.1 秒连接超时
requestTimeout: 1000,
});
const consumer = kafka.consumer({
groupId: 'group2',
rackId: 'test8.group2.consumer2',
maxBytes: 3 * 1024 * 1024, // 单次 poll 请求从每个分区拉取的最大数据量(默认1MB,单位字节Bytes)
sessionTimeout: 60000, // 消费者心跳超时时间(默认 30s)
rebalanceTimeout: 60000, // rebalance 最大等待时间(默认 60s)
heartbeatInterval: 6000, // 心跳间隔(默认 3s)
maxWaitTimeInMs: 500, // 每次拉取消息等待的时间(默认0.5s)
allowAutoTopicCreation: true, // topic不存在自动创建
});
await consumer.connect();
await consumer.subscribe({ topic: 'topic1', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, topic, partition }) => {
console.log(topic, partition, message.value?.toString(), message.headers?.id?.toString(), message.offset);
consumer.commitOffsets([{ topic, partition, offset: String(Number(message.offset) + 1) }]);
},
});