目录
“选消息队列就像选交通工具:Kafka是货运专列,RabbitMQ是城市地铁,RocketMQ是全能高铁。选错工具?小心你的数据堵在五环!”
一、江湖地位速览
二、核心参数擂台赛
2.1 性能参数对比
public class MQBenchmark {
// 吞吐量 (msg/s)
private static final int KAFKA_THROUGHPUT = 150_000;
private static final int RABBITMQ_THROUGHPUT = 20_000;
private static final int ROCKETMQ_THROUGHPUT = 100_000;
// 延迟 (ms)
private static final double KAFKA_LATENCY = 5.2;
private static final double RABBITMQ_LATENCY = 0.8;
private static final double ROCKETMQ_LATENCY = 3.5;
}
2.2 架构复杂度
三、代码江湖见真章
3.1 Kafka生产者(日志采集)
public class LogProducer {
private static final String BOOTSTRAP_SERVERS = "kafka1:9092,kafka2:9092";
public void sendLog(String logData) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("acks", "all");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("app_logs", logData);
producer.send(record);
}
}
}
3.2 RabbitMQ消费者(订单处理)
public class OrderConsumer {
private static final String EXCHANGE_NAME = "order_exchange";
public void startConsuming() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq-host");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "payment.orders");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
Order order = parseOrder(delivery.getBody());
paymentService.process(order);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
}
3.3 RocketMQ事务消息(金融交易)
public class TransactionProducer {
private DefaultMQProducer producer;
public void sendTransaction(TransferOrder order) throws Exception {
Message msg = new Message("TRANSFER_TOPIC",
JSON.toJSONBytes(order));
TransactionSendResult result = producer.sendMessageInTransaction(msg,
new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return accountService.prepareTransfer(order) ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
}, null);
if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
alertService.notifyFailedTransaction(order);
}
}
}
四、武林争霸对比表
维度 | Kafka | RabbitMQ | RocketMQ |
---|---|---|---|
吞吐量 | 15万+/秒 | 2万/秒 | 10万+/秒 |
延迟 | 5ms+ | <1ms | 3ms+ |
消息顺序 | 分区内有序 | 无序 | 严格顺序 |
事务支持 | 有限支持 | 无 | 完整支持 |
开发难度 | 高(需理解分区/副本) | 低(AMQP标准) | 中(有中文文档) |
运维成本 | 高(需Zookeeper) | 低(内置管理界面) | 中(需NameServer) |
最佳场景 | 日志收集/流处理 | 企业应用集成 | 金融交易/电商订单 |
五、实战选型指南针
5.1 电商系统架构示例
// 使用Kafka收集用户行为日志
kafkaProducer.send(new UserBehaviorLog(userId, action));
// 通过RabbitMQ处理库存变更
rabbitTemplate.convertAndSend("inventory", "stock.update", stockChange);
// RocketMQ处理支付订单
rocketMQTemplate.sendMessageInTransaction("PAY_ORDER_TOPIC", paymentOrder);
六、性能调优宝典
6.1 Kafka参数调优
props.put("linger.ms", 20); // 适当增加批次等待时间
props.put("batch.size", 16384); // 增大批次大小
props.put("compression.type", "snappy"); // 启用压缩
6.2 RabbitMQ内存控制
// 设置队列最大内存 (50MB)
Map<String, Object> args = new HashMap<>();
args.put("x-max-length-bytes", 50 * 1024 * 1024);
channel.queueDeclare("image_queue", true, false, false, args);
6.3 RocketMQ刷盘策略
// 异步刷盘提升性能(适合允许少量数据丢失的场景)
DefaultMQProducer producer = new DefaultMQProducer("GROUP_NAME");
producer.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
七、运维监控三件套
- Kafka Eagle:可视化监控平台
- Prometheus+Grafana:通用监控方案
- 阿里云专业版(RocketMQ商业支持)
八、血泪教训清单
- Kafka陷阱:分区数不是越多越好!
// 错误示范:创建1000个分区导致性能下降
new Topic("user_events", 1000, (short)3);
- RabbitMQ内存爆炸:
// 必须设置队列最大长度
channel.queueDeclare("unlimited_queue", false, false, false, null); // 危险操作!
- RocketMQ顺序消费:
// 必须使用MessageQueueOrderly模式
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 顺序处理逻辑
}
});
九、未来趋势瞭望
- Serverless化:云原生消息服务
- 智能路由:基于AI的消息分发
- 统一协议:支持多协议转换网关