RabbitMQ应用
一.工作模式
1.简单模式
P:生产者
C:消费者
Queue:队列
简单模式是一个生产者一个消费者,也可以成为点对点模式。
适用场景:
消息只能被消费一次。
2.工作队列模式
一个生产者和多个消费者,在多个消息的情况下,Queue队列会把消息发给不同的消费者,每个消费者都会收到不同的消息且所有消费者加起来消费的总理就是生产者创建消息的总量。
适用场景:
集群环境中的异步处理。
举个栗子:
比如12306的短信通知服务,订票成功后,订单消息会发送到RabbitMQ,短信服务从RabbitMQ中获取订单信息,并发送通知信息(在短信服务器之间进行任务分配)
3.发布/订阅模式
X:交换机
交换机的类型:
(1)Fanout:广播, 将消息交给所有绑定到交换机的队列(publish/Subscribe模式)
(2)Direct:定向,把消息交给符合routing key队列(routing模式)
Routing Key:路由键,生产者将消息发给交换器时,指定的一个字符串,用来告诉交换机应该如何处理这个消息
Binding Key:绑定,RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候一般会指定一个Binding key ,这样RabbitMQ就知道如何正确地将消息路由到队列了。
(3)Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)
(4)headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配,headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失。
一个生产者p,多个消费者c1,c2。X代表交换机消息复制多份,每个消费者接收相同的消息。生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者。
适用场景
信息需要被多个消费者同时接受的场景,如:实时通知或者广播消息。
4.路由模式
路由模式是发布订阅模式的变种,在发布订阅的基础上,增加路由key,发布订阅模式是无条件的将所有消息分发给所有消费者,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消息队列。
适用场景:
需要根据特定规则分发消息的场景。
5.通配符模式
路由模式的升级版,在RoutingKey的基础上,增加了通配符的功能,使之更加灵活,Topics和Routing的基本原理相同,即生产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列,类似于正则表达式的方式来定义RoutingKey的模式
不同的是,RoutingKey的匹配方式不同,Routing模式是相等匹配,topics模式是通配符匹配。
适用场景
需要灵活匹配和过滤消息的场景
6.RPC模式
RPC通信的过程中,没有生产者和消费者,比较像RPC远程调用,大概是通过两个队列实现了一个可回调的过程。
7.发布确认
Publisher Confirms模式RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,生产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接受并处理。
1.生产者将Channel设置为confirm模式(通过调用channel.confirmSelect()完成)后,发布的每一条消息都会获得一个唯一的ID,生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态。
2.当消息被RabbitMQ服务器接收并处理后,服务器会异步地向生产者发送⼀个确认(ACK)给生产者(包含消息的唯⼀ID),表明消息已经送达.
通过Publisher Confirms模式,生产者可以确保消息被RabbitMQ服务器成功接收,从而避免消息丢失的问题.
适⽤场景:
对数据安全性要求较高的场景.比如金融交易,订单处理
二.使用API
1.工作队列模式
Consumer1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.使用内置交换机
//4.声明队列
channel.queueDeclare("hello",true,false,false,null);
//5.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("hello",true,defaultConsumer);
//6.资源关闭
/*channel.close();
connection.close();*/
}
}
Consumer2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.使用内置交换机
//4.声明队列
channel.queueDeclare("hello",true,false,false,null);
//5.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("hello",true,defaultConsumer);
//6.资源关闭
/*channel.close();
connection.close();*/
}
}
Producer
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明交换机 (使用内置的交换机)
//4.声明队列
channel.queueDeclare("hello",true,false,false,null);
//5.发送消息
for (int i = 0; i < 10; i++) {
String message = "hello RabbitMQ + " + i;
channel.basicPublish("","hello",null,message.getBytes());
}
//6.资源释放
/*channel.close();
connection.close();*/
}
}
2.发布订阅模式
Producer
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明交换机
//交换机名称,交换机类型,可持久化
channel.exchangeDeclare("fanout", BuiltinExchangeType.FANOUT,true);
//4.声明队列
channel.queueDeclare("queue1",true,false,false,null);
channel.queueDeclare("queue2",true,false,false,null);
//5.交换机和队列进行绑定
channel.queueBind("queue1","fanout","");
channel.queueBind("queue2","fanout","");
//6.发送消息
String message = "hello fanou";
channel.basicPublish("fanout","",null,message.getBytes());
//7.资源释放
channel.close();
connection.close();
}
}
Consumer1
package fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue1",true,false,false,null);
//4.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("queue1",true,defaultConsumer);
//5.资源关闭
/*channel.close();
connection.close();*/
}
}
Consumer2
package fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue2",true,false,false,null);
//4.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("queue2",true,defaultConsumer);
//5.资源关闭
/*channel.close();
connection.close();*/
}
}
3.路由模式
Producer
package direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 11:04
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare("direct", BuiltinExchangeType.DIRECT,true);
//4.声明队列
channel.queueDeclare("queue1",true,false,false,null);
channel.queueDeclare("queue2",true,false,false,null);
//5.绑定交换机和队列
channel.queueBind("queue1","direct","a");
channel.queueBind("queue2","direct","a");
channel.queueBind("queue2","direct","b");
channel.queueBind("queue2","direct","c");
//6.发送消息
String message1 = "hello direct my routing key is a" ;
channel.basicPublish("direct","a",null,message1.getBytes());
String message2 = "hello direct my routing key is b" ;
channel.basicPublish("direct","b",null,message2.getBytes());
String message3 = "hello direct my routing key is c" ;
channel.basicPublish("direct","c",null,message3.getBytes());
//7.资源关闭
channel.close();
connection.close();
}
}
Consumer1
package direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 9:53
*/
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue1",true,false,false,null);
//4.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("queue1",true,defaultConsumer);
//5.资源关闭
/*channel.close();
connection.close();*/
}
}
Consumer2
package direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 9:53
*/
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue2",true,false,false,null);
//4.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("queue2",true,defaultConsumer);
//5.资源关闭
/*channel.close();
connection.close();*/
}
}
4.通配符模式
- . 是分割符
- *代表一个单词
- #表示多个单词
Producer
package topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 13:52
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC,true);
//4.声明队列
channel.queueDeclare("queue1",true,false,false,null);
channel.queueDeclare("queue2",true,false,false,null);
//5.绑定交换机和队列
channel.queueBind("queue1","topic","*.a.*");
channel.queueBind("queue2","topic","*.*.b");
channel.queueBind("queue2","topic","c.#");
//6.发送消息
String message1 = "hello topic my routing key is a" ;
channel.basicPublish("topic","ae.a.f",null,message1.getBytes());//转发Q1
String message2 = "hello topic my routing key is b" ;
channel.basicPublish("topic","ef.a.b",null,message2.getBytes());//转发Q1和Q2
String message3 = "hello topic my routing key is c" ;
channel.basicPublish("topic","c.ef.d",null,message3.getBytes());//转发Q2
//7.资源关闭
channel.close();
connection.close();
}
}
Consumer1
package topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 13:52
*/
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue1",true,false,false,null);
//4.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("queue1",true,defaultConsumer);
//5.资源关闭
/*channel.close();
connection.close();*/
}
}
Consumer2
package topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 13:52
*/
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue2",true,false,false,null);
//4.消费消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息: " + new String(body));
}
};
channel.basicConsume("queue2",true,defaultConsumer);
//5.资源关闭
/*channel.close();
connection.close();*/
}
}
5.RPC
客户端
(1)发送请求(携带replyTo,CorrelationID)
(2)接收相应(校验(CorrelationID))
package rpc;
import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.AMQBasicProperties;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 16:20
*/
public class RpcClinet {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare("queue1",true,false,false,null);
channel.queueDeclare("queue2",true,false,false,null);
//4.发送请求
//发送的消息内容
String message = "hello RPC";
//设置请求的唯一标识和相关属性
String correlationID = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.replyTo(correlationID)
.correlationId("queue1")
.build();
channel.basicPublish("","queue1",props,message.getBytes());
//5.接收响应
//使用阻塞队列来进行处理响应
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String responseMessage = new String(body);
System.out.println("收到消息: " + responseMessage);
if (correlationID.equals(properties.getCorrelationId())) {
//如果此时服务器返回来的响应的CorrelationID和客户端的相同,则加入阻塞队列中
blockingQueue.offer(responseMessage);
}
}
};
channel.basicConsume("queue2",true,defaultConsumer);
String result = blockingQueue.take();
System.out.println("响应结果: " + result);
}
}
服务器
(1)接收请求,进行响应
(2)发送响应(按照客户端replyTo,设置CorrelationID)
package rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 16:20
*/
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
//2.通信
Channel channel = connection.createChannel();
//3.接收请求
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body,"UTF-8");
System.out.println("接收请求: " +request);
String response = "响应成功";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("","queue2",props,response.getBytes());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("queue1",false,defaultConsumer);
}
}
6.发布确认模式
存在的三个问题
生产者问题,因为应用程序故障,网络抖动等各种原因,生产者没有成功向broker发送消息。
消息中间件自身问题,生产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失。
消费者问题,Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费失败的消息从队列中删除了。
单独/批量/异步确认
package publisher;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
* Created with IntelliJ IDEA.
* Description:
* User: hp
* Date: 2025-04-01
* Time: 21:14
*/
public class PublisherConfirms {
static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("1.95.194.117");
connectionFactory.setPort(5672);
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setVirtualHost("study");
Connection connection = connectionFactory.newConnection();
return connection;
}
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
//单独确认
publishingMessagesIndividually();
//批量确认
publishingMessagesInBatches();
//异步确认
handlingpublisherConfirmsAsynchronously();
}
private static void handlingpublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2.设置Confirm模式
channel.confirmSelect();
//3.声明队列
channel.queueDeclare("queue3",true,false,false,null);
//4.监听Confirm
long start = System.currentTimeMillis();
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
}
});
//5.发送消息
for (int i = 0; i < 10000; i++) {
String message = "hello publisher Confirms " + i;
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("","queue3",null,message.getBytes());
confirmSeqNo.add(seqNo);
}
while (!confirmSeqNo.isEmpty()) {
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.println("异步确认策略消息条数: " + 200 + " 耗时: " + (end-start));
}
}
private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2.设置Confirm模式
channel.confirmSelect();
//3.声明队列
channel.queueDeclare("queue2",true,false,false,null);
//4.发送消息,等待确认
//当消息数量达到一定的时候就进行确认
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < 10000; i++) {
String message = "hello publisher Confirms " + i;
channel.basicPublish("","queue2",null,message.getBytes());
outstandingMessageCount++;
//等待确认
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms(5000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirms(5000);
}
long end = System.currentTimeMillis();
System.out.println("批量确认策略消息条数: " + 200 + " 耗时: " + (end-start));
}
}
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = createConnection()) {
//1.开启通信
Channel channel = connection.createChannel();
//2.设置Confirm模式
channel.confirmSelect();
//3.声明队列
channel.queueDeclare("queue1",true,false,false,null);
long start = System.currentTimeMillis();
//4.发送消息,等待确认
for (int i = 0; i < 10000; i++) {
String message = "hello publisher Confirms " + i;
channel.basicPublish("","queue1",null,message.getBytes());
//等待确认
channel.waitForConfirms(5000);
}
long end = System.currentTimeMillis();
System.out.println("单独确认策略消息条数: " + 200 + " 耗时: " + (end-start));
}
}
}
三.Spring下演示
1.添加RabbitMQ的依赖
2.配置yml
#amqp://username:password@Ip:port/virtual-host
3.代码
String返回消息的内容。
Message ( org.springframework.amqp.core.Message ):Spring AMQP的Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等。
Channel ( com.rabbitmq.client.Channel ):RabbitMQ的通道对象,可以用于进行更⾼级的操作,如手动确认消息。
简单的RabbitMQ
Configuration
@Configuration
public class RabbitMQConfig {
@Bean("work")
//选择的Queue是SpringFramwork.anmpq里面的
public Queue workQueue() {
//队列名
return QueueBuilder.durable("queue1").build();
}
}
Producer
package com.example.rabiitmq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work() {
//使用内置交换机,RoutingKey和队列名相同
rabbitTemplate.convertAndSend("","queue1","hello spring ampq work...");
return "发送成功";
}
}
Consumer
工作队列模式
Consumer
package com.example.rabiitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
@RabbitListener(queues = "queue1")
public void queueListener1(Message message, Channel channel) {
System.out.println("listener1队列名称queue1,收到的消息: " + message + " Channel: " + channel);
}
@RabbitListener(queues = "queue1")
public void queueListener2(String message) {
System.out.println("listener2队列名称queue1,收到的消息: " + message);
}
}
发布订阅模式
Configuration
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder.durable("fanout.queue1").build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable("fanout.queue2").build();
}
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("fanoutExchange").durable(true).build();
}
@Bean("fanoutQueueBinding1")
public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean("fanoutQueueBinding2")
public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange")FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2")Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
Producer
@RequestMapping("/fanout")
public String fanout() {
rabbitTemplate.convertAndSend("fanoutExchange","","hello spring fanout ampq...");
return "发送成功";
}
Listener
package com.example.rabiitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
@RabbitListener(queues = "fanout.queue1")
public void queueListener1(String message) {
System.out.println("队列名称fanout.queue1,收到的消息: " + message);
}
@RabbitListener(queues = "fanout.queue2")
public void queueListener2(String message) {
System.out.println("队列名称fanout.queue2,收到的消息: " + message);
}
}
路由模式
Configuration
@Bean("directQueue1")
public Queue directQueue1() {
return QueueBuilder.durable("direct.queue1").build();
}
@Bean("directQueue2")
public Queue directQueue2() {
return QueueBuilder.durable("direct.queue2").build();
}
@Bean("directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("directExchange").build();
}
@Bean("directBinding1")
public Binding directBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
@Bean("directBinding2")
public Binding directBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("black");
}
@Bean("directBinding3")
public Binding directBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
Producer
@RequestMapping("/direct/{routingKey}")
public String direct(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend("directExchange",routingKey,"hello spring direct amqp + " + routingKey);
return "发送成功";
}
Listener
@Component
public class directListener {
@RabbitListener(queues = "direct.queue1")
public void queueListener1(String message) {
System.out.println("队列名称direct.queue1,收到的消息: " + message);
}
@RabbitListener(queues = "direct.queue2")
public void queueListener2(String message) {
System.out.println("队列名称direct.queue2,收到的消息: " + message);
}
}
通配符模式
Configuration
@Bean("topicQueue1")
public Queue topicQueue1() {
return QueueBuilder.durable("topic.queue1").build();
}
@Bean("topicQueue2")
public Queue topicQueue2() {
return QueueBuilder.durable("topic.queue2").build();
}
@Bean("topicExchange")
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange("topicExchange").durable(true).build();
}
@Bean("topicBinding1")
public Binding topicBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
}
@Bean("topicBinding2")
public Binding topicBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
}
@Bean("topicBinding3")
public Binding topicBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
}
Producer
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend("topicExchange",routingKey,"hello spring topic amqp + " + routingKey);
return "发送成功";
}
Listener
@Component
public class topicListener {
@RabbitListener(queues = "topic.queue1")
public void queueListener1(String message) {
System.out.println("队列名称topic.queue1,收到的消息: " + message);
}
@RabbitListener(queues = "topic.queue2")
public void queueListener2(String message) {
System.out.println("队列名称topic.queue2,收到的消息: " + message);
}
}