使用Kafka和kafkajs构建示例项目

发布于:2025-04-04 ⋅ 阅读:(11) ⋅ 点赞:(0)

设置Kafka

按照Docker搭建kafka环境文档安装Kafka

创建主题

docker run --rm -it --net=host lensesio/fast-data-dev bash
kafka-topics --create --topic notification-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

新建项目

mkdir kafka-sample-project
cd kafka-sample-project/
npm init -y
npm install kafkajs typescript ts-node

创建配置文件

tsconfig.json

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "NodeNext",
    "moduleResolution": "NodeNext",
    "esModuleInterop": true,
    "strict": true,
    "sourceMap": true,
    "removeComments": true,
    "forceConsistentCasingInFileNames": true,
    "skipLibCheck": true
  },
  "include": ["*.ts"],
  "exclude": ["node_modules", "dist"]
}

package.json

{
  "name": "kafka-sample-project",
  "version": "1.0.0",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start:producer": "ts-node producer.ts",
    "start:consumer": "ts-node consumer.ts"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "description": "",
  "dependencies": {
    "kafkajs": "^2.2.4",
    "typescript": "^5.8.2"
  },
  "devDependencies": {
    "ts-node": "^10.9.2"
  }
}

config.ts

/**
 * Kafka配置文件
 * 集中管理Kafka相关的配置参数
 */

// Kafka连接配置
export const kafkaConfig = {
  brokers: ['localhost:9092'],
  clientId: {
    producer: 'sample-producer',
    consumer: 'sample-consumer'
  },
  consumerGroup: 'notification-group',
};

// 主题配置
export const topics = {
  email: 'email-topic',
  sms: 'sms-topic'
};

// 消息类型定义
export interface EmailPayload {
  to: string;
  from: string;
  subject: string;
  body: string;
}

export interface SmsPayload {
  phoneNumber: string;
  message: string;
}

export type NotificationPayload = EmailPayload | SmsPayload;

创建Kafka生产者

import { Kafka, Producer } from 'kafkajs';
import { kafkaConfig, topics, EmailPayload, SmsPayload } from './config';

// 创建Kafka实例
const kafka = new Kafka({
  clientId: kafkaConfig.clientId.producer,
  brokers: kafkaConfig.brokers,
});

const producer: Producer = kafka.producer();

const runProducer = async (): Promise<void> => {
  await producer.connect();

  const sendMessage = async (topic: string, message: string): Promise<void> => {
    await producer.send({
      topic,
      messages: [{ value: message }],
    });
  };

  const sendNotification = async <T extends EmailPayload | SmsPayload>(topic: string, payload: T): Promise<void> => {
    const message = JSON.stringify(payload);
    await sendMessage(topic, message);
    console.log(`Message sent to ${topic}: ${message}`);
  };

  try {
    // 发送邮件通知示例
    const emailPayload: EmailPayload = {
      to: 'receiver@example.com',
      from: 'sender@example.com',
      subject: 'Sample Email',
      body: 'This is a sample email notification',
    };
    await sendNotification(topics.email, emailPayload);

    // 发送短信通知示例
    const smsPayload: SmsPayload = {
      phoneNumber: '1234567890',
      message: 'This is a sample SMS notification',
    };
    await sendNotification(topics.sms, smsPayload);
  } catch (error) {
    console.error('Error sending notifications:', error);
    throw error; // 重新抛出错误以便在外层处理
  }

  await producer.disconnect();
};

runProducer()
  .then(() => {
    console.log('Producer completed successfully');
  })
  .catch((error) => {
    console.error('Failed to run kafka producer', error);
    process.exit(1);
  });

创建Kafka消费者

import { type EachMessagePayload, Kafka, Consumer } from 'kafkajs';
// 从正确的配置文件路径导入
import { kafkaConfig, topics, EmailPayload, SmsPayload } from './config';

// 创建Kafka实例
const kafka = new Kafka({
  clientId: kafkaConfig.clientId.consumer,
  brokers: kafkaConfig.brokers,
});

const consumer: Consumer = kafka.consumer({ groupId: kafkaConfig.consumerGroup });

/**
 * 处理接收到的消息
 */
const handleMessage = async ({ topic, partition, message }: EachMessagePayload): Promise<void> => {
  try {
    // 检查 message.value 是否为 null
    const messageContent = message.value?.toString() ?? 'No message content';
    console.log(`Received message from topic '${topic}': ${messageContent}`);
    
    // 尝试解析消息内容
    let parsedMessage: EmailPayload | SmsPayload | null = null;
    try {
      if (message.value) {
        parsedMessage = JSON.parse(messageContent);
      }
    } catch (parseError) {
      console.error(`Failed to parse message from topic ${topic}:`, parseError);
      // 解析失败时不提交偏移量,让消息可以重新处理
      return;
    }

    // 根据主题处理不同类型的消息
    if (topic === topics.email) {
      // 处理邮件通知
      console.log('Handling email notification:', messageContent);
      // 这里可以添加实际的邮件处理逻辑
    } else if (topic === topics.sms) {
      // 处理短信通知
      console.log('Handling SMS notification:', messageContent);
      // 这里可以添加实际的短信处理逻辑
    } else {
      console.log('Unknown topic:', topic);
    }
    
    // 只有在成功处理消息后才提交偏移量
    await consumer.commitOffsets([{ topic, partition, offset: message.offset }]);
  } catch (error) {
    console.error(`Error processing message from topic ${topic}:`, error);
    // 处理失败时不提交偏移量,让消息可以重新处理
  }
};

/**
 * 运行消费者服务
 */
const runConsumer = async (): Promise<void> => {
  try {
    await consumer.connect();
    
    // 订阅主题
    await consumer.subscribe({ topic: topics.email });
    await consumer.subscribe({ topic: topics.sms });

    console.log(`Consumer subscribed to topics: ${topics.email}, ${topics.sms}`);

    await consumer.run({
      eachMessage: handleMessage,
    });
  } catch (error) {
    console.error('Error running consumer:', error);
    throw error;
  }
};

// 添加优雅关闭处理
const gracefulShutdown = async (): Promise<void> => {
  try {
    console.log('Shutting down consumer...');
    await consumer.disconnect();
    console.log('Consumer disconnected');
    process.exit(0);
  } catch (error) {
    console.error('Error during shutdown:', error);
    process.exit(1);
  }
};

// 监听进程终止信号
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);

runConsumer()
  .then(() => {
    console.log('Consumer is running...');
  })
  .catch((error) => {
    console.error('Failed to run kafka consumer', error);
    process.exit(1);
  });

运行项目

新建2个终端,运行命令:

npm run start:producer
npm run start:consumer

可以看到终端输出正常

详细代码:https://github.com/wan88888/kafka-sample-project


网站公告

今日签到

点亮在社区的每一天
去签到