消息中间件

发布于:2025-02-27 ⋅ 阅读:(12) ⋅ 点赞:(0)

1. Apache Kafka

核心特性

  • 优点
    • 高吞吐量(百万级TPS)、低延迟(毫秒级)。
    • 分布式架构,支持水平扩展和高容错性。
    • 持久化存储(基于磁盘的日志结构),支持流式数据处理。
  • 缺点
    • 配置复杂,运维成本高。
    • 不适合低频小消息场景(如秒级任务)。

适用场景

  • 日志收集与分析(如 ELK 架构)。
  • 实时数据管道(如 Flink、Spark Streaming 的数据源)。
  • 事件驱动架构(Event Sourcing)。

工作流程

  1. 生产者将消息发送到指定 Topic。
  2. Broker 集群持久化消息并按 Partition 存储。
  3. 消费者组从 Partition 中按顺序消费消息。

使用方法示例

// 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my_topic", "key", "message"));

// 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
}

2. RabbitMQ

核心特性

  • 优点
    • 支持 AMQP、MQTT 等多种协议,灵活的路由规则(Exchange 和 Binding)。
    • 提供消息确认(ACK)、死信队列(DLX)等高级功能。
    • 管理界面友好,易于监控。
  • 缺点
    • 单机吞吐量较低(万级 TPS)。
    • 集群扩展性较差(需依赖镜像队列)。

适用场景

  • 任务队列(如异步处理耗时操作)。
  • 复杂路由需求(如消息广播、RPC 调用)。
  • 需要高可靠性的企业级应用。

工作流程

  1. 生产者发送消息到 Exchange。
  2. Exchange 根据 Routing Key 和 Binding 规则将消息路由到 Queue。
  3. 消费者从 Queue 中拉取消息并处理。

使用方法示例

// 配置 Exchange 和 Queue
@Bean
public TopicExchange exchange() {
    return new TopicExchange("my_exchange");
}

@Bean
public Queue queue() {
    return new Queue("my_queue");
}

// 生产者
rabbitTemplate.convertAndSend("my_exchange", "routing.key", "message");

// 消费者
@RabbitListener(queues = "my_queue")
public void handleMessage(String message) {
    System.out.println("Received: " + message);
}

3. ActiveMQ

核心特性

  • 优点
    • 完全支持 JMS 规范,与 Java 生态无缝集成。
    • 支持持久化、事务消息和消息重试。
    • 部署简单,适合中小规模场景。
  • 缺点
    • 吞吐量和扩展性较低(万级 TPS)。
    • 社区活跃度下降(逐渐被 RocketMQ、Kafka 取代)。

适用场景

  • 传统企业级应用(如银行、电信系统)。
  • 需要 JMS 兼容的遗留系统迁移。

工作流程

  1. 生产者通过 ConnectionFactory 创建 Connection 和 Session。
  2. 消息发送到 Queue 或 Topic。
  3. 消费者通过 MessageListener 异步接收消息。

使用方法示例

// 生产者
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("my_queue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);

// 消费者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(msg -> {
    TextMessage textMsg = (TextMessage) msg;
    System.out.println(textMsg.getText());
});

4. RocketMQ

核心特性

  • 优点
    • 高吞吐量(十万级 TPS)、低延迟。
    • 支持事务消息、消息轨迹追踪。
    • 适合金融级高可靠性场景。
  • 缺点
    • 中文文档较少,国际化支持较弱。
    • 功能复杂度较高。

适用场景

  • 电商订单系统(如秒杀、库存扣减)。
  • 金融支付对账。
  • 分布式事务(如基于事务消息的最终一致性)。

工作流程

  1. 生产者发送消息到 Topic。
  2. Broker 集群保证消息的持久化和高可用。
  3. 消费者通过 Push 或 Pull 模式消费消息。

使用方法示例

// 生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("my_topic", "TagA", "Hello RocketMQ".getBytes());
producer.send(msg);

// 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my_topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    System.out.println(new String(msgs.get(0).getBody()));
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

5. Redis(Pub/Sub 和 Stream)

核心特性

  • 优点
    • 轻量级、高性能(基于内存操作)。
    • 支持发布/订阅(Pub/Sub)和 Stream(类似 Kafka 的持久化队列)。
  • 缺点
    • Pub/Sub 模式不支持消息持久化,Stream 功能较新且社区生态有限。
    • 不适合大规模消息堆积场景。

适用场景

  • 实时消息推送(如聊天室、通知系统)。
  • 简单任务队列(结合 List 结构)。

工作流程(Stream)

  1. 生产者通过 XADD 命令发送消息到 Stream。
  2. 消费者组通过 XREADGROUP 按组消费消息。
  3. 消息确认后通过 XACK 标记为已处理。

使用方法示例(Redis CLI)

# 生产者
XADD my_stream * key1 "value1" key2 "value2"

# 消费者组
XGROUP CREATE my_stream my_group $ MKSTREAM
XREADGROUP GROUP my_group consumer1 COUNT 1 STREAMS my_stream >

对比总结

中间件 吞吐量 延迟 可靠性 适用场景
Kafka 极高 日志、实时流处理
RabbitMQ 复杂路由、企业级应用
ActiveMQ JMS 兼容系统
RocketMQ 极低 极高 金融、电商交易
Redis 极低 实时推送、简单队列

网站公告

今日签到

点亮在社区的每一天
去签到