RabbitMQ 消息模式实战:从简单队列到复杂路由(二)

发布于:2025-05-16 ⋅ 阅读:(22) ⋅ 点赞:(0)

进阶探索:工作队列模式

工作队列模式剖析

工作队列模式,也被称为任务队列模式,是对简单队列模式的一种扩展和优化,旨在解决当任务量较大时,单个消费者无法快速处理所有任务的问题 。在工作队列模式中,依然有生产者负责生成并发送任务消息,但消费者不再是单一的个体,而是多个消费者共同协作。

其核心工作原理是负载均衡,生产者将大量的任务消息发送到同一个队列中,多个消费者同时监听这个队列。RabbitMQ 会按照一定的规则,将队列中的消息依次分发给各个消费者,确保每个消费者都能分配到任务,从而实现任务的并行处理 。这种模式下,消息的分发机制主要有轮询分发和公平分发两种。轮询分发是 RabbitMQ 的默认分发方式,它会将消息逐个发送到在序列中的下一个消费者,不考虑每个消费者处理任务的时长等因素,平均每个消费者获取相同数量的消息 。而公平分发则会根据消费者的处理能力进行任务分配,通过设置basicQos方法,限制每个消费者在同一时刻只能处理一条未确认的消息,在这个消费者确认消息之前,不会再发送下一条消息给它,从而将消息分配给下一个不忙的消费者,实现更合理的任务分配 。

实际应用场景

工作队列模式在实际业务中有着广泛的应用。以订单处理场景为例,在电商大促期间,订单量会瞬间暴增,如果仅依靠单个订单处理服务(即单个消费者)来处理这些订单,必然会导致处理速度缓慢,订单积压,严重影响用户体验。而采用工作队列模式,将订单消息发送到队列中,多个订单处理服务(多个消费者)从队列中获取订单消息并并行处理,能够大大提高订单处理的效率,快速响应用户的订单请求 。

在数据处理领域,比如日志分析场景。系统会产生大量的日志数据,需要对这些日志进行分析处理,提取有用的信息。可以将日志数据封装成消息发送到工作队列,多个日志分析服务(消费者)从队列中获取日志消息进行分析,从而实现高效的数据处理,快速得出分析结果,为系统的优化和决策提供支持 。

代码示例展示

下面通过 Java 代码来展示如何实现工作队列模式。假设我们有一个任务队列,生产者将任务消息发送到该队列,多个消费者从队列中获取任务并处理。

生产者代码如下:


import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Producer {

private final static String QUEUE_NAME = "work_queue";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i = 0; i < 10; i++) {

String message = "Task " + i;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");

}

}

}

}

在生产者代码中,我们创建了一个连接工厂ConnectionFactory,设置了 RabbitMQ 服务器地址为本地localhost。通过工厂创建连接Connection和通道Channel,声明了一个名为work_queue的队列。然后通过循环发送 10 条任务消息到队列中,每条消息的内容为 “Task + 序号” 。

消费者代码如下:


import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

public class Consumer {

private final static String QUEUE_NAME = "work_queue";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 设置同一时刻服务器只发送一条消息给消费者,实现公平分发

channel.basicQos(1);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");

// 模拟任务处理时间

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

System.out.println(" [x] Done");

// 手动确认消息已被处理

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

// 关闭自动确认,改为手动确认

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

}

}

}

在消费者代码中,同样创建了连接工厂、连接和通道,并声明了与生产者相同的队列work_queue。通过channel.basicQos(1)方法设置了公平分发,确保同一时刻每个消费者只处理一条消息。定义了DeliverCallback回调函数来处理接收到的消息,在回调函数中,打印接收到的消息,通过Thread.sleep(1000)模拟任务处理时间,处理完成后,使用channel.basicAck方法手动确认消息已被处理,参数delivery.getEnvelope().getDeliveryTag()是消息的唯一标识,false表示不批量确认 。最后,通过channel.basicConsume方法开始消费消息,设置autoAck为false,即关闭自动确认,开启手动确认模式 。通过运行多个消费者实例,可以看到任务消息会被均衡地分配到各个消费者进行处理,实现了工作队列模式的负载均衡和并行处理功能。

消息广播:发布订阅模式

发布订阅模式原理

发布订阅模式是 RabbitMQ 中一种非常灵活且强大的消息通信模式,它打破了一对一或一对多的常规消息传递限制,实现了消息的广播式分发 。在这个模式中,交换机(Exchange)扮演着核心角色,它是消息的中转站,负责接收生产者发送的消息,并根据特定的规则将消息路由到一个或多个队列中 。

与简单队列模式和工作队列模式不同,发布订阅模式中生产者不再直接将消息发送到队列,而是发送到交换机。交换机不存储消息,它仅仅根据自身的类型和绑定规则来决定如何处理接收到的消息 。在发布订阅模式中,通常使用扇形交换机(Fanout Exchange)。扇形交换机的工作方式非常简单直接,它会将接收到的每一条消息无条件地广播到所有与它绑定的队列中,不关心消息的路由键(routing key),就像一个广播电台,只要有消息到达,就会向所有听众广播 。

当生产者将消息发送到扇形交换机后,交换机会迅速将消息复制多份,分别发送到与之绑定的各个队列。每个队列都可以有一个或多个消费者,这些消费者可以独立地从队列中获取消息并进行处理 。这样,一条消息就可以被多个不同的消费者同时接收和处理,实现了消息的广播效果,满足了多系统、多模块同时对同一消息感兴趣的业务需求 。

应用场景举例

发布订阅模式在实际业务中有广泛的应用。以电商系统中的实时通知场景为例,当用户完成一笔订单支付后,系统需要向多个模块发送通知。通过发布订阅模式,订单支付成功的消息被发送到交换机,与交换机绑定的库存管理队列、物流调度队列、用户积分队列等多个队列都会接收到该消息 。库存管理模块的消费者接收到消息后,会对商品库存进行扣减操作;物流调度模块的消费者会根据订单信息安排发货;用户积分模块的消费者则会为用户增加相应的积分 。这样,通过一次消息发布,多个相关业务模块都能及时获取到关键信息并进行相应处理,实现了系统间的高效协作和信息共享 。

在消息推送场景中,发布订阅模式同样发挥着重要作用。比如在一个新闻资讯平台,当有新的新闻发布时,生产者将新闻消息发送到交换机,与交换机绑定的各个用户设备队列都会收到该消息 。不同用户设备(手机端、PC 端等)上的消费者接收到消息后,会将最新的新闻推送给用户,使用户能够及时获取到感兴趣的内容 。这种方式大大提高了消息推送的效率和覆盖面,确保了信息的及时传递 。

代码实现步骤

下面通过 Java 代码来展示如何实现发布订阅模式。假设我们有一个生产者将消息发送到交换机,多个消费者从与交换机绑定的队列中获取消息。

首先,在 Maven 项目中引入 RabbitMQ 客户端依赖,确保pom.xml文件中有如下配置:


<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>5.16.0</version>

</dependency>

生产者代码如下:


import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Publisher {

private final static String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

// 声明交换机,类型为fanout

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = "Publish/Subscribe Message";

// 发送消息到交换机,routingKey为空

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");

}

}

}

在生产者代码中,创建了连接工厂、连接和通道后,使用channel.exchangeDeclare方法声明了一个名为fanout_exchange的扇形交换机。然后通过channel.basicPublish方法将消息发送到交换机,由于是扇形交换机,不关心路由键,所以第二个参数为空字符串 。

消费者代码如下:


import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

public class Subscriber {

private final static String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

try (Connection connection = factory.newConnection();

Channel channel = connection.createChannel()) {

// 声明交换机,确保与生产者一致

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 声明一个临时队列,队列名称随机生成

String queueName = channel.queueDeclare().getQueue();

// 将队列绑定到交换机,routingKey为空

channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {

String message = new String(delivery.getBody(), "UTF-8");

System.out.println(" [x] Received '" + message + "'");

};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

}

}

}

消费者代码中,同样先声明了扇形交换机fanout_exchange。然后使用channel.queueDeclare()方法创建了一个临时队列,队列名称是随机生成的,并且该队列具有独占、自动删除的特性,即当消费者断开连接时,队列会自动被删除 。接着通过channel.queueBind方法将临时队列绑定到交换机上,routingKey 为空 。最后定义了消息处理回调函数DeliverCallback,当接收到消息时,会打印出接收到的消息内容 。使用channel.basicConsume方法开始消费消息,设置为自动确认模式 。运行多个消费者实例,当生产者发送消息后,可以看到每个消费者都能接收到相同的消息,成功实现了发布订阅模式 。


网站公告

今日签到

点亮在社区的每一天
去签到