1. Apache Kafka
核心特性
- 优点:
- 高吞吐量(百万级TPS)、低延迟(毫秒级)。
- 分布式架构,支持水平扩展和高容错性。
- 持久化存储(基于磁盘的日志结构),支持流式数据处理。
- 缺点:
- 配置复杂,运维成本高。
- 不适合低频小消息场景(如秒级任务)。
适用场景
- 日志收集与分析(如 ELK 架构)。
- 实时数据管道(如 Flink、Spark Streaming 的数据源)。
- 事件驱动架构(Event Sourcing)。
工作流程
- 生产者将消息发送到指定 Topic。
- Broker 集群持久化消息并按 Partition 存储。
- 消费者组从 Partition 中按顺序消费消息。
使用方法示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my_topic", "key", "message"));
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
2. RabbitMQ
核心特性
- 优点:
- 支持 AMQP、MQTT 等多种协议,灵活的路由规则(Exchange 和 Binding)。
- 提供消息确认(ACK)、死信队列(DLX)等高级功能。
- 管理界面友好,易于监控。
- 缺点:
- 单机吞吐量较低(万级 TPS)。
- 集群扩展性较差(需依赖镜像队列)。
适用场景
- 任务队列(如异步处理耗时操作)。
- 复杂路由需求(如消息广播、RPC 调用)。
- 需要高可靠性的企业级应用。
工作流程
- 生产者发送消息到 Exchange。
- Exchange 根据 Routing Key 和 Binding 规则将消息路由到 Queue。
- 消费者从 Queue 中拉取消息并处理。
使用方法示例
@Bean
public TopicExchange exchange() {
return new TopicExchange("my_exchange");
}
@Bean
public Queue queue() {
return new Queue("my_queue");
}
rabbitTemplate.convertAndSend("my_exchange", "routing.key", "message");
@RabbitListener(queues = "my_queue")
public void handleMessage(String message) {
System.out.println("Received: " + message);
}
3. ActiveMQ
核心特性
- 优点:
- 完全支持 JMS 规范,与 Java 生态无缝集成。
- 支持持久化、事务消息和消息重试。
- 部署简单,适合中小规模场景。
- 缺点:
- 吞吐量和扩展性较低(万级 TPS)。
- 社区活跃度下降(逐渐被 RocketMQ、Kafka 取代)。
适用场景
- 传统企业级应用(如银行、电信系统)。
- 需要 JMS 兼容的遗留系统迁移。
工作流程
- 生产者通过 ConnectionFactory 创建 Connection 和 Session。
- 消息发送到 Queue 或 Topic。
- 消费者通过 MessageListener 异步接收消息。
使用方法示例
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("my_queue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(msg -> {
TextMessage textMsg = (TextMessage) msg;
System.out.println(textMsg.getText());
});
4. RocketMQ
核心特性
- 优点:
- 高吞吐量(十万级 TPS)、低延迟。
- 支持事务消息、消息轨迹追踪。
- 适合金融级高可靠性场景。
- 缺点:
适用场景
- 电商订单系统(如秒杀、库存扣减)。
- 金融支付对账。
- 分布式事务(如基于事务消息的最终一致性)。
工作流程
- 生产者发送消息到 Topic。
- Broker 集群保证消息的持久化和高可用。
- 消费者通过 Push 或 Pull 模式消费消息。
使用方法示例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("my_topic", "TagA", "Hello RocketMQ".getBytes());
producer.send(msg);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my_topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
5. Redis(Pub/Sub 和 Stream)
核心特性
- 优点:
- 轻量级、高性能(基于内存操作)。
- 支持发布/订阅(Pub/Sub)和 Stream(类似 Kafka 的持久化队列)。
- 缺点:
- Pub/Sub 模式不支持消息持久化,Stream 功能较新且社区生态有限。
- 不适合大规模消息堆积场景。
适用场景
- 实时消息推送(如聊天室、通知系统)。
- 简单任务队列(结合 List 结构)。
工作流程(Stream)
- 生产者通过
XADD
命令发送消息到 Stream。
- 消费者组通过
XREADGROUP
按组消费消息。
- 消息确认后通过
XACK
标记为已处理。
使用方法示例(Redis CLI)
XADD my_stream * key1 "value1" key2 "value2"
XGROUP CREATE my_stream my_group $ MKSTREAM
XREADGROUP GROUP my_group consumer1 COUNT 1 STREAMS my_stream >
对比总结
中间件 |
吞吐量 |
延迟 |
可靠性 |
适用场景 |
Kafka |
极高 |
低 |
高 |
日志、实时流处理 |
RabbitMQ |
中 |
低 |
高 |
复杂路由、企业级应用 |
ActiveMQ |
低 |
中 |
中 |
JMS 兼容系统 |
RocketMQ |
高 |
极低 |
极高 |
金融、电商交易 |
Redis |
高 |
极低 |
低 |
实时推送、简单队列 |