文章目录
-
- 一、RabbitMQ简介
- 二、消息的发送与消费
- 三、RabbitMQ进阶
- 二、相关问题
-
- 2.1 RabbitMQ上的一个queue中存放的message是否有数量限制
- 2.2 如何确保消息正确地发送至RabbitMQ*
- 2.3 如何确保消息接收方消费了消息
- 2.4 如何避免消息重复投递或重复消费*
- 2.5 消息基于什么传输
- 2.6 消息怎么路由
- 2.7 如何确保消息不丢失*
- 2.8 如何保证消息的顺序性*
- 2.9 RabbitMQ交换器有哪些类型*
- 2.10 RabbitMQ如何保证数据一致性
- 2.11 RabbitMQ结构*
- 2.12 RabbitMQ队列与消费者的关系
- 2.13 如何保证消息的可靠传输
- 2.14 如何解决消息队列的延时以及过期失效问题
- 2.15 怎么保证消息队列的高可用*
- 2.16 Kafka、ActiveMQ、RabbitMQ、RocketMQ对比
一、RabbitMQ简介
RabbitMQ是采用Erlang语言实现AMQP(高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
RabbitMO的具体特点:
1、可靠性:RabbitMO使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。
2、灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMO已经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
3、扩展性:多个RabbitMO节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
4、高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
5、支持多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息中间件协议。
6、多语言客户端:RabbitMQ几乎支持所有常用语言,比如Java、Python、Ruby、PHP、C#、JavaScript等。
7、管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
8、插件机制:RabbitMO提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。
1.1 RabbitMQ概念
1.1.1 生产者和消费者
RabbitMQ整体上是一个生产者与消费者模型,RabbitMQ的整体模型架构:
Producer:生产者,就是投递消息的一方。消息一般可以包含2个部分:消息体(payload)和标签(Label)
。在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个json字符串。当然可以进一步对这个消息体进行序列化操作。
消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由RabbitMO,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
Consumer:消费者,就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
Broker:消息中间件的服务节点。对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。
生产者将消息存入RabbitMQ Broker,以及消费者从Broker中消费数据的整个流程:
生产者将业务方数据进行可能的包装,之后封装成消息,发送到Broker中。消费者订阅并接收消息,经过可能的解包处理得到原始的数据,之后再进行业务处理逻辑。
这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。消费者进程可以使用一个线程去接收消息,存入到内存中,比如使用Java中的BlockingQueue。业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解耦,提高整个应用的处理效率。
1.1.2 队列
Queue:队列,是RabbitMQ的内部对象,用于存储消息。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理:
RabbitMQ不支持队列层面的广播消费
,如果需要广播消费,需要在其上进行二次开发。
1.1.3 交换器、路由键、绑定键
- Exchange交换器
生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
RabbitMQ中的交换器有四种类型,不同的类型有着不同的路由策略。 - RoutingKey路由键
生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个Routing Key需要与交换器类型和绑定键(BindingKey)联
合使用才能最终生效。
在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。 - Binding绑定
RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了:
所示。
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。
BindingKey并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。
交换器相当于投递包裹的邮箱,RoutingKey相当于填写在包裹上的地址,BindingKey相当于包裹的目的地,当填写在包裹上的地址和实际想要投递的地址相匹配时,那么这个包裹就会被正确投递到目的地,最后这个目的地的“主人”一一队列可以保留这个包裹。如果填写的地址出错,邮递员不能正确投递到目的地,包裹可能会回退给寄件人,也有可能被丢弃。
1.1.4 常用的交换器类型(fanout/direct/topic/heders)
RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种。
- fanout(与交换器绑定的所有队列)
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。 - direct(路由键和绑定键完全匹配)
它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。 - topic(路由键和绑定键模式匹配)
topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:
RoutingKey为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”、“com.hidden…client’”;
BindingKey和RoutingKey一样也是点号“.”分隔的字符串;
BindingKey中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“#”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)。
示例:路由键为“com.rabbitmq.client’”的消息会同时路由到Queue1和Queue2;
路由键为“com.hidden.client’”的消息只会路由到Queue2中;
路由键为“com.hidden.demo”的消息只会路由到Queue2中;
路由键为“java.rabbitmq.demo”的消息只会路由到Queue1中;
路由键为“java.util.concurrent”的消息将会被丢弃或者返回给生产者(需要设置mandatory参数),因为它没有匹配任何路由键。
- heders(headers属性匹配)
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
1.1.5 生产者和消费者工作的基本流程
- 生产者发送消息
1、生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
2、生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等。
3、生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等。
4、生产者通过路由键将交换器和队列绑定起来。
5、生产者发送消息至RabbitMO Broker,其中包含路由键、交换器等信息。
6、相应的交换器根据接收到的路由键查找相匹配的队列。
6.1 如果找到,则将从生产者发送过来的消息存入相应的队列中。
6.2 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者。
7、关闭信道。
8、关闭连接。 - 消费者消费消息
1、消费者连接到RabbitMO Broker,建立一个连接(Connection),开启一个信道(Channel)。
2、消费者向RabbitMO Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
3、等待RabbitMO Broker回应并投递相应队列中的消息,消费者接收消息。
4、消费者确认(ack)接收到的消息。
5、RabbitMO从队列中删除相应已经被确认的消息。
6、关闭信道。
7、关闭连接。
此处引入了两个新的概念:Connection和Channel。无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的D。.信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
RabbitMQ采用类似NIO的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TcP连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。
1.2 AMQP协议
RabbitMQ就是AMQP协议的Erlang的实现(当然RabbitMQ还支持STOMP2、MQTT3等协议)。AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey与绑定时的BindingKey相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。
AMQP协议本身包括三层。
1、Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,.客户端可以利用这些命令实现自己的业务逻辑。
2、Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
3、Transport Layer::位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错
误检测和数据表示等。
- 简洁版生产者发送消息代码
Connection connection=factory.newConnection();//创建连接
Channel channel=connection.createChannel();//创建信道
String message ="Hello World!";
channel.basicPublish (EXCHANGE NAME,ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//关闭资源
channel.close();
connection.close();
- 简洁版消费者消费消息代码
Connection connection = factory.newConnection(addresses);//创建连接
final Channelchannel = connection.createChannel();//创建信道
Consumer consumer = new DefaultConsumer(channel){}//省略实现
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME,consumer);
//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
二、消息的发送与消费
2.1 连接RabbitMQ(创建Connection再创建Channel)
连接RabbitMQ的2种方式:
//在给定的参数(P地址、端口号、用户名、密码等)下连接RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setvirtualHost(virtualHost);
factory.setHost(IP_ADDRESS);
factory.setport(PORT);
Connection conn = factory.newConnection();
//使用URI的方式连接RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.seturi("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection接口被用来创建一个Channel:
Channel channel = conn.createChannel();
在创建之后,Channel可以用来发送或者接收消息了。
Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel
。多线程间共享Channel实例是非线程安全的。
通常情况下,在调用createxxx或者newXXX方法之后,我们可以简单地认为Connection或者Channel已经成功地处于开启状态。
2.2 使用交换器和队列
在使用交换器和队列之前应该先声明,示例:
channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);
上面创建了一个持久化的、非自动删除的、绑定类型为direct的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(此队列的名称由RabbitMQ自动生成)。这里的交换器和队列也都没有设置特殊的参数。
上面的代码也展示了如何使用路由键将队列和交换器绑定起来。上面声明的队列具备如下特性:只对当前应用中同一个Connection层面可用,同一个Connection的不同Channel可共用,并且也会在应用连接断开时自动删除。
如果要在应用中共享一个队列,可以做如下声明:
channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
这里的队列被声明为持久化的、非排他的、非自动删除的,而且也被分配另一个确定的已知的名称(由客户端分配而非RabbitMQ自动生成)。
生产者和消费者都可以声明一个交换器或者队列。如果尝试声明一个已经存在的交换器或者队列,只要声明的参数完全匹配现存的交换器或者队列,RabbitMQ就可以什么都不做,并成功返回。如果声明的参数不匹配则会抛出异常。
2.2.1 exchangeDeclare声明交换器/exchangeDelete删除交换器
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,boolean durable,
boolean autoDelete,boolean internal,
Map<string,object>arguments)throws IOException;
这个方法的返回值是Exchange.Declare0K,用来标识成功声明了一个交换器。
exchange:交换器的名称。
type:交换器的类型。
durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器”。
internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument:其他一些结构化参数。
有声明创建交换器的方法,当然也有删除交换器的方法:
Exchange.Deleteok exchangeDelete(String exchange)throws IOException;
void exchangeDeleteNoWait(String exchange,boolean ifUnused)throws IOException;
Exchange.Deleteok exchangeDelete(String exchange,boolean ifUnused)throws IOException;
其中exchange表示交换器的名称,而ifUnused用来设置是否在交换器没有被使用的情况下删除。如果isUnused设置为true,则只有在此交换器没有被使用的情况下才会被删除:如果设置false,则无论如何这个交换器都要被删除。
2.2.2 queueDeclare声明队列/queueDelete删除队列/queuePurge清空队列中的内容
Queue.DeclareOk queueDeclare(String queue,
boolean durable,boolean exclusive,
boolean autoDelete,Map<String,Object>arguments)throws IOException;
queue:队列的名称。
durable:设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
exclusive:设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列:“首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
arguments:设置队列的其他一些参数。
生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能是声明队列。
与交换器对应,关于队列也有删除的相应方法:
Queue.Deleteok queueDelete(String queue)throws IOException;
Queue.Deleteok queueDelete(String queue,boolean ifUnused,boolean ifEmpty) throws IOException;
void queueDeleteNoWait(String queue,boolean ifUnused,boolean ifEmpty) throws IOException;
其中queue表示队列的名称,ifUnused可以参考上一小节的交换器。ifEmpty设置为true表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。
与队列相关的还有一个有意思的方法queuePurge,区别于queueDelete,这个方法用来清空队列中的内容,而不删除队列本身:
Queue.Purgeok queuePurge(String queue)throws IOException;
2.2.3 queueBind绑定交换器和队列/queueUnbind将交换器和队列解绑/exchangeBind将交换器与交换器绑定
Queue.Bindok queueBind(String queue,
String exchange,String routingKey,
Map<String,Object>arguments)throws IOException;
queue:队列名称;
exchange:交换器的名称;
routingKey:用来绑定队列和交换器的路由键;
argument:定义绑定的一些参数。
不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。
Queue.Unbindok queueUnbind(String queue,
String exchange,String routingKey,
Map<String,Object>arguments)throws IOException;
不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定:
Exchange.Bindok exchangeBind(String destination,
String source,String routingKey,
Map<String,object>arguments)throws IOException;
将交换器与交换器绑定之后,消息从source交换器转发到destination交换器,某种程度上来说destination交换器可以看作一个队列。示例代码:
channel.exchangeDeclare("source","direct",false,true,null);
channel.exchangeDeclare("destination","fanout",false,true,null);
channel.exchangeBind("destination","source","exKey");
channel.queueDeclare("queue",false,false,true,null);
channel.queueBind("queue","destination","");
channel.basicPublish ("source","exKey",null,"exToExDemo".getBytes());
上述代码的含义:生产者发送消息至交换器source中,交换器source根据路由键找到与其匹配的另一个交换器destination,并把消息转发到destination中,进而存储在destination绑定的队列queue中。
2.2.4 何时创建队列
RabbitMQ的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。
按照RabbitMQ官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。
预先创建好资源,可以确保交换器和队列之间正确地绑定匹配。很多时候,由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失:或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。
当然可以配合mandatory参数或者备份交换器来提高程序的健壮性。
2.3 发送消息
如果要发送一个消息,可以使用Channel类的basicPublish方法,比如发送一条内容为“Hello World!”的消息:
byte[]messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);
为了更好地控制发送,可以使用mandatory这个参数,或者可以发送一些特定属性的信息:
channel.basicPublish(exchangeName,routingKey,mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
上面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化(即存入磁盘)在服务器中。同时这条消息的优先级(priority)设置为l,content-type为“text/plain”。
还可以发送一条带有过期时间(expiration)的消息:
channel.basicPublish(exchangeName,routingKey,
new AMOP.BasicProperties.Builder()
expiration ("60000")
.bui1d()),
messageBodyBytes);
重载的发送方法:
void basicPublish(String exchange,String routingKey,
boolean mandatory,
boolean immediate,BasicProperties props,
byte[]body)throws IOException;
exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中。
routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。
props:消息的基本属性集。
byte[] body:消息体(payload),真正需要发送的消息。
2.4 消费消息
RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。
2.4.1 推模式
接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer相关的API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区分,关键消费代码:
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMOP.BasicProperties properties,
byte[]body) throws IOException{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//(process the message components here ...
channel.basicAck(deliveryTag,false);
}
}
);
上面代码中显式地设置autoAck为false,然后在接收到消息之后进行显式ack操作(channel.basicAck),对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。
basicConsume的重载方法:
String basicConsume(String queue,boolean autoAck,String consumerTag,
boolean noLocal,boolean exclusive,
Map<String,Object>arguments,Consumer callback) throws IOException;
queue:队列的名称;
autoAck:设置是否自动确认。建议设成false,即不自动确认;
consumerTag:消费者标签,用来区分多个消费者:
noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者;
exclusive:设置是否排他:
arguments:设置消费者的其他参数:
callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写(override)其中的方法。
除了重写handleDelivery方法,更复杂的消费者客户端会重写更多的方法,如:
void handleConsumeok(String consumerTag);
void handleCancelok(String consumerTag);
void handleCancel(String consumerTag)throws IOException;
void handleshutdownsignal(String consumerTag,ShutdownsignalException sig);
void handleRecoverok(String consumerTag);
每个Channel都拥有自己独立的线程。最常用的做法是一个Channel对应一个消费者
,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个Channel中维持多个消费者,但是要注意一个问题,如果Channe1中的一个消费者一直在运行,那么其他消费者的callback会被“耽搁”。
2.4.2 拉模式
拉模式的消费方式,通过channel.basicGet方法可以单条地获取消息,其返回值是GetRespone。Channel类的basicGet方法没有其他重载方法,只有:
//queue代表队列的名称
//如果设置autoAck为false,那么同样需要调用channel.basicAck来确认消息已被成功接收
GetResponse basicGet(String queue,boolean autoAck)throws IOException;
示例:
GetResponse response = channel.basicGet (QUEUE NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope ()getDeliveryTag(),false);
2.5 消费端的确认与拒绝(自动确认/手动确认/拒绝消费)
- 消息确认
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMO会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置autoAck参数为false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显式调用Basic.Ack命令为止。
当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMO一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMO会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMO允许消费者消费一条消息的时间可以很久很久。 - 消息拒绝
在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,可以使用Basic.Reject这个命令,消费者客户端可以调用与其对应的channel.basicReject方法来告诉RabbitMQ拒绝这个消息。
Channel类中的basicReject方法:
void basicReject(long deliveryTag,boolean requeue)throws IOException;
deliveryTag可以看作消息的编号,它是一个64位的长整型值,最大值是9223372036854775807。
如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。消费者客户端可以调用channel.basicNack方法来实现,方法:
void basicNack(long deliveryTag,boolean multiple,boolean requeue)throws IOException;
multiple参数设置为false则表示拒绝编号为deliveryTag的这一条消息,这时候basicNack和basicReject方法一样;multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
对于requeue,AMQP中还有一个命令Basic.Recover具备可重入队列的特性。其对应的客户端方法为:
Basic.Recoverok basicRecover()throws IOException;
Basic.Recoverok basicRecover(boolean requeue)throws IOException;
这个channel.basicRecover方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置requeue这个参数,相当于channel.basicRecover(true),即requeue默认为true.
2.6 关闭连接
在应用程序使用完之后,需要关闭连接,释放资源:
channel.close();
conn.close();
显式地关闭Channel是个好习惯,但这不是必须的,在Connection关闭的时候,Channel也会自动关闭。
Connection和Channel所具备的生命周期:
Open:开启状态
,代表当前对象可以使用。
Closing:正在关闭状态
。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。
Closed:已经关闭状态
。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身。
Connection和Channel最终都是会成为Closed的状态,不论是程序正常调用的关闭方法,或者是客户端的异常,再或者是发生了网络异常。
在Connection和Channel中,与关闭相关的方法有addShutdownListener
(ShutdownListener listener)、removeShutdownListener (ShutdownListner listener)。当Connection或者Channel的状态转变为Closed的时候会调用ShutdownListener。而且如果将一个ShutdownListener注册到一个已经处于Closed状态的对象(这里特指Connection和Channel对象)时,会立刻调用ShutdownListener。
getCloseReason方法可以让你知道对象关闭的原因;isOpen方法检测对象当前是否处于开启状态;close(int closeCode,String closeMessage)方法显式地通知当前对象执行关闭操作。
ShutdownListener的使用:
connection.addshutdownListener(new ShutdownListener()
public void shutdownCompleted(ShutdownSignalException cause){
}
});
当触发ShutdownListener的时候,就可以获取到ShutdownsignalException,这个ShutdownSignalException包含了关闭的原因,这里原因也可以通过调用前面所提及的getCloseReason方法获取。
ShutdownSignalException提供了多个方法来分析关闭的原因。isHardError方法可以知道是Connection的还是Channel的错误;getReason方法可以获取cause相关的信息,示例:
public void shutdownCompleted(ShutdownsignalException cause)
{
if (cause.isHardError()){
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication()){
Method reason = cause.getReason();
//...
}
}else{
Channel ch = (Channel)cause.getReference();
//...
}
}
三、RabbitMQ进阶
3.1 不可达的消息怎么处理
mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。
3.1.1 mandatory参数
当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。
生产者可以通过调用channel.addReturnListener来添加ReturnListener监听器,来获取到没有被正确路由到合适队列的消息。
channel.basicPublish(EXCHANGE NAME,""true,
MessageProperties.PERSISTENT TEXT PLAIN,
"mandatory test".getBytes());
channel.addReturnListener(new ReturnListener(){
public void handleReturn(int replyCode,String replyText,
String exchange,String routingKey,
AMOP.BasicProperties basicProperties,
byte[]body)throws IOException
String message = new String(body);
System.out,println("Basic.Return返回的结果是:"+message);
});
3.1.2 immediate参数
当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ3.0版本开始去掉了对immediate参数的支持,对此RabbitMQ官方解释是:immediate参数会影响镜像队列的性能,增加了代码复杂性,建议采用TTL和DLX的方法替代。
3.1.3 备份交换器
备份交换器,英文名称为Alternate Exchange,简称AE,或者更直白地称之为“备胎交换器”。生产者在发送消息的时候如果不设矍mandatory参数,那么消息在未被路由的情况下将会丢失:如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也可以通过策略(Policy)的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉Policy的设置。
Map<String,Object>args new HashMap<string,Object>();
args.put ("alternate-exchange","myAe");
channel.exchangeDeclare ("normalExchange","direct",true,false,args);
channel.exchangeDeclare ("myAe","fanout",true,false,null);
channel.queueDeclare("normalQueue",true,false,false,null);
channel.queueBind("normalQueue","normalExchange","normalKey");
channel.queueDeclare("unroutedQueue",true,false,false,null);
channel.queueBind("unroutedQueue","myAe","");
上面的代码中声明了两个交换器normalExchange和myAe,分别绑定了normalQueue和unroutedQueue这两个队列,同时将myAe设置为normalExchange的备份交换器。注意myAe的交换器类型为fanout。
在下图中,如果此时发送一条消息到normalExchange上,当路由键等于“normalKey”的时候,消息能正确路由到normalQueue这个队列中。如果路由键设为其他值,比如“errorKey”,即消息不能被正确地路由到与normalExchange绑定的任何队列上,此时就会发送给myAe,进而发送到unroutedQueue这个队列。
备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,如若想设置为direct或者topic的类型也没有什么不妥。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。
考虑这样一种情况,如果备份交换器的类型是direct,并且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为key1,则可以存储到队列中。
对于备份交换器,总结了以下几种特殊情况:
如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
3.2 过期时间TTL
RabbitMQ可以对消息和队列设置TTL。
3.2.1 设置消息的TTL
目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息。
通过队列属性设置消息TTL的方法是在channel.queueDeclare方法中加入x-message-ttl参数实现的,这个参数的单位是毫秒。
Map<String,Object> argss = new HashMap<String,Object>()
argss.put("x-message-ttl",6000);
channel.queueDeclare(queueName,durable,exclusive,autoDelete,argss);
如果不设置TTL,则表示此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代RabbitMQ3.0版本之前的immediate参数,之所以部分代替,是因为immediate参数在投递失败时会用Basic.Return将消息返回(这个功能可以用死信队列来实现)。
针对每条消息设置TTL的方法是在channel.basicPublish方法中加入expiration的属性参数,单位为毫秒。
AMOP.BasicProperties.Builder builder = new AMOP.BasicProperties.Builder();
builder.deliveryMode(2);//持久化消息
builder.expiration("60000");//TTL=60000ms
AMOP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,mandatory,properties,
"ttlTestMessage".getBytes());
对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。
第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。
3.2.2 设置队列的TTL
通过channe1.queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。
RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ重启后,持久化的队列的过期时间会被重新计算。
用于表示过期时间的x-expires参数以毫秒为单位,并且服从和x-message-ttl一样的约束条件,不过不能设置为0。比如该参数设置为1000,则表示该队列如果在1秒钟之内未使用则会被删除。
Map<String,Object>args = new HashMap<String,object>();
args.put("x-expires",1800000);
channel.queueDeclare("myqueue",false,false,false,args);
3.3 死信队列
DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个
交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
消息过期:
队列达到最大长度。
DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMO就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理,这个特性与将消息的TTL设置为0配合使用可以弥补immediate参数的功能。
通过在channel.queueDeclare方法中设置x-dead-letter-exchange参数来为这个队列添加DLX:
channel.exchangeDeclare("dlx exchange","direct");//DLX:dlx exchange
Map<string,Object>args = new HashMap<string,Object>();
args.put("x-dead-letter-exchange","dlx exchange");
//为队列myqueue添加DLX
channel.queueDeclare("myqueue",false,false,false,args);
也可以为这个DLX指定路由键,如果没有特殊指定,则使用原队列的路由键:
args.put("x-dead-letter-routing-key","dlx-routing-key");
对于RabbitMQ来说,DLX是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。DLX配合TTL使用还可以实现延迟队列的功能。
3.4 延迟队列
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
延迟队列的使用场景有很多,比如:
在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。
用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过DLX和TTL模拟出延迟队列的功能。
在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为5秒、10秒、30秒、1分钟、5分钟、10分钟、30分钟、1小时这几个维度,当然也可以再细化一下。
在下图中,只设置了5秒、10秒、30秒、1分钟这四个等级。根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。这里队列分别设置了过期时间为5秒、10秒、30秒、1分钟,同时也分别配置了DLX和相应的死信队列。当相应的消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。
3.5 优先级队列
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。可以通过设置队列的x-max-priority参数来实现。
Map<String,object>args = new HashMap<string,object>();
args.put("x-max-priority",10);
channel.queueDeclare("queue.priority",true,false,false,args);
消息发送时可以设置消息当前的优先级:
AMOP.BasicProperties.Builder builder = new AMOP.BasicProperties.Builder();
builder.priority(5);
AMOP.BasicProperties properties = builder.build();
channel.basicPublish("exchange priority","rk priority",properties,("messages
")getBytes());
上面的代码中设置消息的优先级为5。默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速且Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
3.6 持久化
持久化可以提高RabbitMO的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
交换器的持久化是通过在声明交换器是将durable参数置为true实现的。如果交换器不设置持久化,那么在RabbitMO服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。
队列的持久化是通过在声明队列时将durable参数置为true实现的。如果队列不设置持久化,那么在RabbitMO服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的。
首先从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将autoAck参数设置为false,并进行手动确认。
其次,在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ并不会为每条消息都进行同步存盘的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
这个问题怎么解决呢?这里可以引入RabbitMQ的镜像队列机制,相当
于配置了副本,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证RabbitMQ消息不丢失,但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至RabbitMQ中,前提还要保证在调用channel.basicPublish方法的时候交换器能够将消息正确路由到相应的队列之中。
3.7 生产者确认
在使用RabbitMO的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ针对这个问题,提供了两种解决方式:
通过事务机制实现;
通过发送方确认(publisher confirm)机制实现。
3.7.1 事务机制
RabbitMQ客户端中与事务机制相关的方法有三个:channel.txSelect、channel.txCommit和channel.txRollback。channel,txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMO了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"transaction messages".getBytes());
channel.txCommit();
可以发现开启事务机制与不开启相比多了四个步骤:
客户端发送Tx.Select,将信道置为事务模式:
Broker回复Tx.Select-Ok,确认已将信道置为事务模式:
在发送完消息之后,客户端发送Tx.Commit提交事务;
Broker回复Tx.Commit-ok,确认事务提交。
上面是事务正常提交,事务回滚如下:
try{
channel.txSelect();
channel.basicPublish(exchange,routingKey,
MessageProperties.PERSISTENT TEXT PLAIN,msg.getBytes());
int result = 1/0;
channel.txCommit();
}catch(Exception e){
e.printstackTrace();
channel.txRollback();
}
事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会“吸干”RabbitMO的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?RabbitMO提供了一个方案,即发送方确认机制。
3.7.2 发送发确认机制
二、相关问题
2.1 RabbitMQ上的一个queue中存放的message是否有数量限制
可以认为是无限制,因为限制取决于机器器的内存,但是消息过多会导致处理效率的下降。
2.2 如何确保消息正确地发送至RabbitMQ*
RabbitMQ使用发送方确认模式,确保消息正确地发送到RabbitMQ。
发送方确认模式:将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。
发送确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
2.3 如何确保消息接收方消费了消息
接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理理消息。
特殊情况:
- 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下个订阅的消费者。(可能存在消息重复消费的隐患,需要根据bizId去重)
- 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
2.4 如何避免消息重复投递或重复消费*
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。
2.5 消息基于什么传输
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量量没有限制。
- RabbitMQ采用类似NIO(Non-blocking I/O)做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
- 每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。
2.6 消息怎么路由
从概念上来说,消息路路由必须有三部分:交换器、路由、绑定。生产者把消息发布到交换器器上;绑定决定了消息如何从交换器路由到特定的队列;消息最终到达队列,并被消费者接收。
1、消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
2、通过队列路由键,可以把队列绑定到交换器器上。
3、消息到达交换器后,RabbitMQ会将消息的路路由键与队列列的路路由键进行匹配(针对不同的交换器有不同的路由规则)。
4、如果能够匹配到队列列,则消息会投递到相应队列列中;如果不能匹配到任何队列列,消息将进入 “黑洞”。
消息提供方 >>> 路由 >>> 一至多个队列。
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为一下三种:
fanout:如果交换器收到消息,将会广播到所有绑定的队列上。
direct:如果路由键完全匹配,消息就被投递到相应的队列。
topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符。
2.7 如何确保消息不丢失*
- 1、生产者弄丢了数据
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit 。伪代码:
// 开启事务
channel txSelect
try {
// 发送消息
} catch (Exception ) {
channel txRollback
// 再次重发这条消息
}
// 提交事务
channel txCommit
问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。
一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
- 2、RabbitMQ弄丢了数据
这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
- 第二个是发送消息的时候将消息的 deliveryMode 设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个 queue 里的数据。
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的。
- 3、消费端弄丢了数据
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
2.8 如何保证消息的顺序性*
顺序会错乱的场景:RabbitMQ:一个queue,多个consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。这时顺序就出了问题。
解决:拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
2.9 RabbitMQ交换器有哪些类型*
- fanout交换器:它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列列中;
- direct交换器:direct类型的交换器路由规则很简单,它会把消息路由到哪些BindingKey和RoutingKey完全匹配的队列中;
- topic交换器:匹配规则比direct更灵活。
- headers交换器:根据发送消息内容的headers属性进行匹配(由于性能很差,不实用)。
常用的交换器:
1、direct:如果路由键完全匹配,消息就被投递到相应的队列。
2、fanout:如果交换器收到消息,将会播到所有绑定的队列上。
3、topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符,比如:“*” 匹配特定位置的任意文本, “.” 把路由键分为了几个部分,“#” 匹配所有规则等。特别注意:发往topic交换器的消息不能随意的设置选择键(routing_key),必须是由"."隔开的一系列的标识符组成。
2.10 RabbitMQ如何保证数据一致性
- 生产者确认机制:消息持久化后异步回调通知生产者,保证消息已经发出去;
- 消息持久化:设置消息持久化;
- 消费者确认机制:消费者成功消费消息之后,手动确认,保证消息已经消费。
2.11 RabbitMQ结构*
Broker:简单来说就是消息队列列服务器器实体。
Exchange:消息交换机,它指定消息按什什么规则,路路由到哪个队列。
Queue:消息队列列载体,每个消息都会被投入到一个或多个队列列。
Binding:绑定,它的作用就是把exchange和queue按照路路由规则绑定起来。
Routing Key:路路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
2.12 RabbitMQ队列与消费者的关系
- 一个队列可以绑定多个消费者;
- 消息默认以循环的方式发送给消费者;
- 消费者收到消息默认自动确认,也可以改成手动确认。
2.13 如何保证消息的可靠传输
数据的丢失问题,可能出现在生产者、MQ、消费者中。
- 生产者丢失
生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用RabbitMQ提供的事务功能,就是生产者发送数据之前开启RabbitMQ事务,然后发送消息。如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事,然后重试发送消息;如果收到了消息,那么可以提交事务。这样做吞吐量会下来,因为太耗性能。
所以你要确保说写RabbitMQ的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok 了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。 - MQ中丢失
RabbitMQ自己弄丢了数据,这个你必须开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。
设置持久化有两个步骤:创建queue的时候将其设置为持久化,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据。第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。
持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给 RabbitMQ开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ中,但是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ 挂了,就会导致内存里的一点点数据丢失。 - 消费端丢失
消费的时候,还没处理,结果进程挂了,比如重启了,RabbitMQ认为你都消费了,这数据就丢了。这个时候得用RabbitMQ提供的ack机制。简单来说,就是你关闭RabbitMQ的自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。
2.14 如何解决消息队列的延时以及过期失效问题
- 消息积压处理办法:临时紧急扩容
先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉。新建一个topic,partition是原来的 10 倍,临时建立好原先10倍的queue数量。然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer 机器来消费消息。 - MQ中消息失效
假设你用的是RabbitMQ,RabbtiMQ是可以设置过期时间的,也就是TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,数据大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把丢的数据给他补回来。比如1 万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。
2.15 怎么保证消息队列的高可用*
RabbitMQ是基于主从(非分布式)做高可用性的。
RabbitMQ有三种模式:单机模式、普通集群模式、镜像集群模式。
- 单机模式
Demo级别的。 - 普通集群模式(无高可用性)
普通集群模式,意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。你创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。
这种方式确实很麻烦,也没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个queue所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放queue的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让RabbitMQ落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个queue拉取数据。
这方案没有什么所谓的高可用性,主要是提高吞吐量,就是说让集群中多个节点来服务某个queue的读写操作。 - 镜像集群模式(高可用性)
在镜像集群模式下,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上。
如何开启这个镜像集群模式呢?其实很简单,RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
2.16 Kafka、ActiveMQ、RabbitMQ、RocketMQ对比
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|---|
单机吞吐量 | 比 RabbitMQ低 | 2.6w/s( 消息做持 久化) | 11.6w/s | 17.3w/s |
开发语言 | Java | Erlang | Java | Scala/Java |
主要维护者 | Apache | Mozilla/Spring | Alibaba | Apache |
成熟度 | 成熟 | 成熟 | 开源版本不够成熟 | 比较成熟 |
订阅形式 | 点对点 (p2p)、广 播(发布订阅) | 提供了4 种: direct、topic、headers、fanout。fanout就是广播模式 | 基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式 | 基于topic以及按照topic进行正则匹配的发布订阅模式 |
持久化 | 支持少量堆积 | 支持少量堆积 | 支持大量堆积 | 支持大量堆积 |
顺序消息 | 不支持 | 不支持 | 支持 | 支持 |
性能稳定性 | 好 | 好 | 一般 | 较差 |
集群方式 | 支持简单集群模 式,比如’主备’,对高级集群模式支持不好。 | 支持简单集群,'复 制’模式,对高级集群模式支持不 好。 | 常用多对’MasterSlave’模 式,开源版本需手动切换Slave变成Master | 天然的‘LeaderSlave’无状态集群,每台服务器既是Master也是Slave |
管理界面 | 一般 | 较好 | 一般 | 无 |
RabbitMQ:延时低,微妙级延时,社区活跃度高,bug修复及时,而且提供了很友善的后台界面;用Erlang 语言开发,只熟悉 Java 的无法阅读源码和自行修复 bug。
RocketMQ:阿里维护的消息中间件,可以达到十万级的吞吐量,支持分布式事务。
Kafka:分布式的中间件,最大优点是其吞吐量高,一般运用于大数据系统的实时运算和日志采集的场景,功能简单,可靠性高,扩展性高;缺点是可能导致重复消费。
中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用RocketMQ是很好的选择。如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,社区活跃度很高。