设置Kafka
按照Docker搭建kafka环境文档安装Kafka
创建主题
docker run --rm -it --net=host lensesio/fast-data-dev bash
kafka-topics --create --topic notification-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
新建项目
mkdir kafka-sample-project
cd kafka-sample-project/
npm init -y
npm install kafkajs typescript ts-node
创建配置文件
tsconfig.json
{
"compilerOptions": {
"target": "ES2020",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"strict": true,
"sourceMap": true,
"removeComments": true,
"forceConsistentCasingInFileNames": true,
"skipLibCheck": true
},
"include": ["*.ts"],
"exclude": ["node_modules", "dist"]
}
package.json
{
"name": "kafka-sample-project",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start:producer": "ts-node producer.ts",
"start:consumer": "ts-node consumer.ts"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"dependencies": {
"kafkajs": "^2.2.4",
"typescript": "^5.8.2"
},
"devDependencies": {
"ts-node": "^10.9.2"
}
}
config.ts
/**
* Kafka配置文件
* 集中管理Kafka相关的配置参数
*/
// Kafka连接配置
export const kafkaConfig = {
brokers: ['localhost:9092'],
clientId: {
producer: 'sample-producer',
consumer: 'sample-consumer'
},
consumerGroup: 'notification-group',
};
// 主题配置
export const topics = {
email: 'email-topic',
sms: 'sms-topic'
};
// 消息类型定义
export interface EmailPayload {
to: string;
from: string;
subject: string;
body: string;
}
export interface SmsPayload {
phoneNumber: string;
message: string;
}
export type NotificationPayload = EmailPayload | SmsPayload;
创建Kafka生产者
import { Kafka, Producer } from 'kafkajs';
import { kafkaConfig, topics, EmailPayload, SmsPayload } from './config';
// 创建Kafka实例
const kafka = new Kafka({
clientId: kafkaConfig.clientId.producer,
brokers: kafkaConfig.brokers,
});
const producer: Producer = kafka.producer();
const runProducer = async (): Promise<void> => {
await producer.connect();
const sendMessage = async (topic: string, message: string): Promise<void> => {
await producer.send({
topic,
messages: [{ value: message }],
});
};
const sendNotification = async <T extends EmailPayload | SmsPayload>(topic: string, payload: T): Promise<void> => {
const message = JSON.stringify(payload);
await sendMessage(topic, message);
console.log(`Message sent to ${topic}: ${message}`);
};
try {
// 发送邮件通知示例
const emailPayload: EmailPayload = {
to: 'receiver@example.com',
from: 'sender@example.com',
subject: 'Sample Email',
body: 'This is a sample email notification',
};
await sendNotification(topics.email, emailPayload);
// 发送短信通知示例
const smsPayload: SmsPayload = {
phoneNumber: '1234567890',
message: 'This is a sample SMS notification',
};
await sendNotification(topics.sms, smsPayload);
} catch (error) {
console.error('Error sending notifications:', error);
throw error; // 重新抛出错误以便在外层处理
}
await producer.disconnect();
};
runProducer()
.then(() => {
console.log('Producer completed successfully');
})
.catch((error) => {
console.error('Failed to run kafka producer', error);
process.exit(1);
});
创建Kafka消费者
import { type EachMessagePayload, Kafka, Consumer } from 'kafkajs';
// 从正确的配置文件路径导入
import { kafkaConfig, topics, EmailPayload, SmsPayload } from './config';
// 创建Kafka实例
const kafka = new Kafka({
clientId: kafkaConfig.clientId.consumer,
brokers: kafkaConfig.brokers,
});
const consumer: Consumer = kafka.consumer({ groupId: kafkaConfig.consumerGroup });
/**
* 处理接收到的消息
*/
const handleMessage = async ({ topic, partition, message }: EachMessagePayload): Promise<void> => {
try {
// 检查 message.value 是否为 null
const messageContent = message.value?.toString() ?? 'No message content';
console.log(`Received message from topic '${topic}': ${messageContent}`);
// 尝试解析消息内容
let parsedMessage: EmailPayload | SmsPayload | null = null;
try {
if (message.value) {
parsedMessage = JSON.parse(messageContent);
}
} catch (parseError) {
console.error(`Failed to parse message from topic ${topic}:`, parseError);
// 解析失败时不提交偏移量,让消息可以重新处理
return;
}
// 根据主题处理不同类型的消息
if (topic === topics.email) {
// 处理邮件通知
console.log('Handling email notification:', messageContent);
// 这里可以添加实际的邮件处理逻辑
} else if (topic === topics.sms) {
// 处理短信通知
console.log('Handling SMS notification:', messageContent);
// 这里可以添加实际的短信处理逻辑
} else {
console.log('Unknown topic:', topic);
}
// 只有在成功处理消息后才提交偏移量
await consumer.commitOffsets([{ topic, partition, offset: message.offset }]);
} catch (error) {
console.error(`Error processing message from topic ${topic}:`, error);
// 处理失败时不提交偏移量,让消息可以重新处理
}
};
/**
* 运行消费者服务
*/
const runConsumer = async (): Promise<void> => {
try {
await consumer.connect();
// 订阅主题
await consumer.subscribe({ topic: topics.email });
await consumer.subscribe({ topic: topics.sms });
console.log(`Consumer subscribed to topics: ${topics.email}, ${topics.sms}`);
await consumer.run({
eachMessage: handleMessage,
});
} catch (error) {
console.error('Error running consumer:', error);
throw error;
}
};
// 添加优雅关闭处理
const gracefulShutdown = async (): Promise<void> => {
try {
console.log('Shutting down consumer...');
await consumer.disconnect();
console.log('Consumer disconnected');
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
process.exit(1);
}
};
// 监听进程终止信号
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);
runConsumer()
.then(() => {
console.log('Consumer is running...');
})
.catch((error) => {
console.error('Failed to run kafka consumer', error);
process.exit(1);
});
运行项目
新建2个终端,运行命令:
npm run start:producer
npm run start:consumer
可以看到终端输出正常
详细代码:https://github.com/wan88888/kafka-sample-project