RabbitMQ 工作模式
官方提供了七种工作模式
Simple(简单模式)
P
:生产者,发布消息到队列C
:消费者,从队列中获取消息并消费Queue
:消息队列,存储消息。
一个生产者,一个消费者,消息只能被消费一次,也被称为点对点(Point-to-Point)模式。
Work-Queues(工作队列模式)
P
:生产者,发布消息到队列C1
、C2
:消费者,从队列中获取消息并消费Queue
:消息队列,存储消息。
Queue
存储多个消息时,就会分配给不同的消费者,每个消费者接收到不同的消息。
Publish/Subscribe(发布/订阅模式)
P
:生产者,发布消息到队列C1
、C2
:消费者,从队列中获取消息并消费Q1
、Q2
:消息队列,存储消息。X
:即为Exchange
,交换机,交换机可以根据一定的规则将生产者发布的消息路由到指定的队列中。
RabbitMQ 的交换机有四种类型,不同的类型有着不同的策略:
Fanout
:广播,将消息交给所有绑定到交换机的队列。(Publish/Subscribe 模式)Direct
:定向,将消息路由到符合routing key
(路由键)的队列(Routing 模式)。Topic
:通配符,将消息路由到符合routing pattern
(路由匹配规则)的队列(Topics 模式)。Headers
:Headers
类型的交换机不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers
属性进行匹配,这种类型的交换机性能很差,一般不会使用。
routing key 和 binding key:
routing key
:路由键,生产者将消息发给交换机时,指定的一个字符串,用来告诉交换机应该如何将消息路由到指定队列。binding key
:绑定,将队列和交换机绑定时,指定的一个字符串,这样 RabbitMQ 就可以知道如何正确地将消息路由到指定的队列。
Routing(路由模式)
X
:交换机,交换机根据routing key
进行消息路由。
Topics(通配符模式)
X
:交换机,交换机根据routing pattern
进行消息路由。*
:匹配一个单词。#
:匹配多个单词。
RPC(远程过程调用模式)
可以把该模式理解有客户端和服务端间的通信,客户端发送请求,服务端处理请求并返回结果。
客户端发送请求时,指定 correlation _id
和 reply_to
,将该请求发送到 rpc_queue
里。
服务端从 rpc_queue
里取出请求,处理请求后,将结果发送到 reply_to
里。
客户端根据 correlation _id
取出结果。
Publisher Confirms(发布确认模式)
该模式是 RabbitMQ
服务器也就是 Broker
向生产者发送确认消息,生产者接收到确认消息后才认为消息发送成功。
如果 RabbitMQ
服务器因为某种原因没有接收到确认消息,需要根据业务情况决定是否重新发送消息。
工作模式使用案例
创建普通 Maven 项目,引入依赖:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
定义 Constants 类
package mq.Constants;
public class Constants {
public static final String HOST = "47.94.9.33";
public static final int PORT = 5672;
public static final String USER_NAME = "admin";
public static final String PASSWORD = "admin";
public static final String VIRTUAL_HOST = "/";
// * 工作队列模式
public static final String WORK_QUEUE = "work.queue";
// * 发布订阅模式
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
// * 路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
// * 通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
// * RPC 模式
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
// * Publisher Confirms 模式
public static final String P_CONFIRMS_QUEUE1 = "p.confirms.queue1";
public static final String P_CONFIRMS_QUEUE2 = "p.confirms.queue2";
public static final String P_CONFIRMS_QUEUE3 = "p.confirms.queue3";
}
Simple(简单模式)
生产者
package mq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST); // ? 公网 IP
connectionFactory.setPort(Constants.PORT); // ? 端口
connectionFactory.setUsername(Constants.USER_NAME); // ? 用户名
connectionFactory.setPassword(Constants.PASSWORD); // ? 密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虚拟主机
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明交换机(使用内置的交换机)
// TODO 4. 声明队列
/**
* Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* queue:队列名称
* durable:是否持久化
* exclusive:是否独占
* autoDelete:是否自动删除
* arguments:参数
*/
channel.queueDeclare("hello", true, false, false, null);
// TODO 5. 发送消息
/**
* void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称
* routingKey:使用内置交换机,routingKey 和队列名保持一致
* props:属性配置
* body:消息体
*/
String msg = "hello rabbitMQ~";
for (int i = 0; i < 1000; i++) {
channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功," + msg);
// TODO 6. 释放资源
channel.close(); // ! 先关闭 channel
connection.close();
}
}
消费者
package mq.simple;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST); // ? 公网 IP
connectionFactory.setPort(Constants.PORT); // ? 端口
connectionFactory.setUsername(Constants.USER_NAME); // ? 用户名
connectionFactory.setPassword(Constants.PASSWORD); // ? 密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虚拟主机
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare("hello", true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 从队列中,收到消息就会执行该方法
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message 封包的消息,比如交换机,队列名称等...
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume("hello", true, consumer);
Thread.sleep(1000);
// TODO 5. 释放资源
channel.close(); // ! 先关闭 channel
connection.close();
}
}
Work-Queues(工作队列模式)
生产者
package mq.workQueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明交换机(使用内置的交换机)
// TODO 4. 声明队列
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// TODO 5. 发送消息
for (int i = 0; i < 10; i++) {
String msg = "hello work queue mode~:" + i;
channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功");
// TODO 6. 释放资源
channel.close(); // ! 先关闭 channel
connection.close();
}
}
消费者1
package mq.workQueues;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
消费者2
package mq.workQueues;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
Publish/Subscribe(发布/订阅模式)
生产者
package mq.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明交换机
/**
* Exchange.DeclareOk exchangeDeclare(String exchange, the name of the exchange
* BuiltinExchangeType type, the exchange type
* boolean durable, true if we are declaring a durable exchange (the exchange will survive a server restart)
* boolean autoDelete, true if the server should delete the exchange when it is no longer in use
* boolean internal, true if the exchange is internal, it can't be directly published to by a client.
* Map<String, Object> arguments), other properties (construction arguments) for the exchange
*/
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
// TODO 4. 声明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
/**
* Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
* queue: the name of the queue
* exchange: the name of the exchange
* routingKey: the routing key to use for the binding
* arguments: other properties (binding parameters)
*/
// TODO 5. 绑定交换机和队列
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
// TODO 6. 发送消息
for (int i = 0; i < 10; i++) {
String msg = "hello work queue mode~:" + i;
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功");
// TODO 7. 释放资源
channel.close(); // ! 先关闭 channel
connection.close();
}
}
消费者1
package mq.fanout;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
消费者2
package mq.fanout;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
Routing(路由模式)
生产者
package mq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启隧道
Channel channel = connection.createChannel();
// TODO 3. 声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
// TODO 4. 声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
// TODO 5. 将队列和交换机绑定
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
// TODO 6. 发送消息
List<String> list = List.of("a", "b", "c", "d");
Random random = new Random();
for (int i = 0; i < 10; i++) {
String routingKey = list.get(random.nextInt(3));
String msg = "hello routing mode~:" + routingKey;
System.out.println(msg);
channel.basicPublish(Constants.DIRECT_EXCHANGE, routingKey, null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("消息发送成功");
// TODO 7. 释放资源
channel.close(); // ! 先关闭 channel
connection.close();
}
}
消费者1
package mq.direct;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
消费者2
package mq.direct;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
Topics(通配符模式)
生产者
package mq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启隧道
Channel channel = connection.createChannel();
// TODO 3. 声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
// TODO 4. 声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
// TODO 5. 将队列和交换机绑定
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
// TODO 6. 发送消息
String msg1 = "hello topic mode~:ef.a.c";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.c", null, msg1.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q1
String msg2 = "hello topic mode~:rr.a.b";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "rr.a.b", null, msg2.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q1,Q2
String msg3 = "hello topic mode~:c.com.ljh";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.com.ljh", null, msg3.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q2
System.out.println("消息发送成功");
// TODO 7. 释放资源
channel.close(); // ! 先关闭 channel
connection.close();
}
}
消费者1
package mq.topic;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
消费者2
package mq.topic;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
// TODO 4. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();
}
}
RPC(远程过程调用模式)
服务端
package mq.rpc;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RPCServer {
public static void main(String[] args) throws IOException, TimeoutException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启隧道
Channel channel = connection.createChannel();
// TODO 2.1 声明队列
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body);
System.out.println("[RPC Server 接收到请求]:" + request);
String response = "针对请求:" + request + " 做出响应:" + "🫡";
// TODO 4. 发送响应
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes(StandardCharsets.UTF_8));
// TODO 5. 确认收到
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// TODO 3. 接收请求
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, defaultConsumer);
}
}
客户端
package mq.rpc;
import com.rabbitmq.client.*;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
// TODO 2. 开启隧道
Channel channel = connection.createChannel();
// TODO 2.1 声明队列
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
// TODO 3. 发送请求
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.correlationId(correlationId) // ? 唯一标识,标识接收该 ID 的响应
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
String msg = "hello RPC mode~:" + correlationId;
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes(StandardCharsets.UTF_8));
// TODO 4. 接收响应
BlockingQueue<String> queue = new LinkedBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String resp = new String(body);
System.out.println("接收到回调消息:" + resp);
if (properties.getCorrelationId().equals(correlationId)) {
queue.offer(resp);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String res = queue.take();// ! 若没有对应的消息,程序会在这里阻塞
System.out.println("[RPC Client接收到符合 ID 的消息]:" + res);
}
}
Publisher Confirms(发布确认模式)
发布确认
package mq.publisher.confirms;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class PublisherConfirms {
private static final int MAX_MESSAGE = 10000;
static Connection createConnection() throws Exception {
// TODO 1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
// * Strategy #1: Publishing Messages Individually
// publishingMessagesIndividually();
// * Strategy #2: Publishing Messages in Batches
publishingMessagesInBatches();
// * Strategy #3: Handling Publisher Confirms Asynchronously
handlingPublisherConfirmsAsynchronously();
}
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
try (Connection connection = createConnection()) {
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 开启发布确认机制
channel.confirmSelect();
// TODO 4. 声明队列
channel.queueDeclare(Constants.P_CONFIRMS_QUEUE3, true, false, false, null);
SortedSet<Long> sortedSet = Collections.synchronizedSortedSet(new TreeSet<>());
// TODO 5. 监听来自 Broker 的 ack 或 nack
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
sortedSet.headSet(deliveryTag + 1).clear();
} else {
sortedSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
sortedSet.headSet(deliveryTag + 1).clear();
} else {
sortedSet.remove(deliveryTag);
}
// TODO 5.1 根据业务逻辑处理消息重传
}
});
Long start = System.currentTimeMillis();
// TODO 6. 发送消息
for (int i = 0; i < MAX_MESSAGE; i++) {
String msg = "hello Publisher Confirms~:" + i;
Long ackSeq = channel.getNextPublishSeqNo();
sortedSet.add(ackSeq);
channel.basicPublish("", Constants.P_CONFIRMS_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));
}
while (!sortedSet.isEmpty()) {
// Thread.sleep(10);
}
Long end = System.currentTimeMillis();
System.out.printf("异步确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);
}
}
private static void publishingMessagesInBatches() throws Exception {
try (Connection connection = createConnection()) {
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 开启发布确认机制
channel.confirmSelect();
// TODO 4. 声明队列
channel.queueDeclare(Constants.P_CONFIRMS_QUEUE2, true, false, false, null);
Long start = System.currentTimeMillis();
// TODO 5. 发送消息
int batchSize = 100, outstandingMessageCnt = 0;
for (int i = 0; i < MAX_MESSAGE; i++) {
String msg = "hello Publisher Confirms~:" + i;
channel.basicPublish("", Constants.P_CONFIRMS_QUEUE2, null, msg.getBytes(StandardCharsets.UTF_8));
outstandingMessageCnt++;
if (outstandingMessageCnt >= batchSize) {
channel.waitForConfirms(5_000);
outstandingMessageCnt = 0;
}
}
if (outstandingMessageCnt > 0) {
channel.waitForConfirms(5_000);
}
Long end = System.currentTimeMillis();
System.out.printf("批量确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);
}
}
private static void publishingMessagesIndividually() throws Exception {
try (Connection connection = createConnection()) {
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 开启发布确认机制
channel.confirmSelect();
// TODO 4. 声明队列
channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);
Long start = System.currentTimeMillis();
// TODO 5. 发布消息
for (int i = 0; i < MAX_MESSAGE; i++) {
String msg = "hello Publisher Confirms~:" + i;
channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));
// TODO 5.1 等待 5s 收到来自 broker 的确认消息
channel.waitForConfirms(5_000);
}
Long end = System.currentTimeMillis();
System.out.printf("单独确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);
}
}
}
em.out.printf("批量确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);
}
}
private static void publishingMessagesIndividually() throws Exception {
try (Connection connection = createConnection()) {
// TODO 2. 开启信道
Channel channel = connection.createChannel();
// TODO 3. 开启发布确认机制
channel.confirmSelect();
// TODO 4. 声明队列
channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);
Long start = System.currentTimeMillis();
// TODO 5. 发布消息
for (int i = 0; i < MAX_MESSAGE; i++) {
String msg = "hello Publisher Confirms~:" + i;
channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));
// TODO 5.1 等待 5s 收到来自 broker 的确认消息
channel.waitForConfirms(5_000);
}
Long end = System.currentTimeMillis();
System.out.printf("单独确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);
}
}
}