RabbitMq

发布于:2024-07-23 ⋅ 阅读:(44) ⋅ 点赞:(0)
什么是RabbitMQ,有什么特点
  1. 消息传递模式:RabbitMQ支持多种消息传递模式,包括发布/订阅、点对点和工作队列等,使其更灵活适用于各种消息通信场景。
  2. 消息路由和交换机:RabbitMQ引入交换机的概念,用于将消息路由到一个或多个队列。允许根据消息的内容、标签或路由键进行灵活的消息路由,从而实现更复杂的消息传递逻辑。
  3. 消息确认机制:RabbitMQ支持消息确认机制,消费者可以确定已成功处理消息。确保了消息不会再传递后被重复消费,增加了消息的可靠性。
  4. 消息持久性:RabbitMQ允许消息和队列的持久性设置,确保消息再RabbitMQ重新启动后不会丢失。这对于关键的业务消息非常重要。
RabbitMQ和AMQP是什么关系
  • AMQP:AMQP是一个协议规范,而不是一个具体的消息中间件。它是一个开放的消息产地协议,是一种应用层的标准协议,为面向消息的中间件设计。AMQP提供了一种同统一的消息服务,使得不同程序之间可以通过消息队列进行通信。SpringBoot框架默认就提供了对AMQP协议的支持。
  • RabbitMQ:RabbitMQ是一种开源的消息中间件,是一个具体的软件产品。使用AMQP协议来实现消息传递的标准。并且其也支持其他消息传递协议,如STOMP和MQTT。RabbitMQ基于AMQP协议定义的消息格式和交互流程,实现了消息再生产者、交换机队列之间的传递和处理。
RabbitMQ的核心组件有哪些
  1. Broker:RabbitMQ服务器,负责接收和分发消息的应用。
  2. Virtual Host: 虚拟主机,是RabbitMQ中的逻辑容器,用于隔离不同环境或不同应用程序的信息流。每个虚拟主机都有自己的队列交换机等设置,可以理解为一个独立的RabbitMQ服务。
  3. Connection连接:管理和维护与RabbitMQ服务器的TCP连接,生产者、消费者通过这个连接和Broker建立物理网络连接。
  4. Channel通道:是在Connection内创建的轻量级通信通道,用于进行消息的传输和交互。应用程序通过Channel进行消息的发送和接收。通常一个Connection可以建立多个Channel。
  5. Exchange交换机:交换机是消息的中转站,负责接收来自生产者的消息,并将其路由到一个或多个队列中。RabbitMQ提供多种不同类型的交换机,每种不同类型的交换机都有不同的消息路由规则。
  6. Queue队列:队列是消息的存储位置。每个队列都有一个唯一的名称。消息从交换机路由到队列,然后等待消费者消费。
  7. Binding绑定关系:Binding是Exchange和Queue之间的关联规则,定义了消息如何从交换机路由到特定队列。
RabbitMQ的交换机的类型
  • Direct Exchange:直连交换机,这种交换机根据消息的路由键(Routing Key)将消息发送到与之完全匹配的队列。只有当消息的路由键与队列绑定的路由键相同时,消息才会被路由到队列,是一种简单的路由策略,适用于点对点通信。

    如:当一个队列绑定到交换机要求路由键为“key”,则只会转发RoutingKey标记为“key”的消息,不会转发“key1”等等。是完全匹配、单播的模式。

    请添加图片描述

    //配置类
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DirectExchangeConfig {
    
        public static final String DIRECT_EXCHANGE_NAME = "direct-exchange";
        public static final String QUEUE_A_NAME = "queue-a";
        public static final String QUEUE_B_NAME = "queue-b";
        public static final String ROUTING_KEY_A = "key-a";
        public static final String ROUTING_KEY_B = "key-b";
    
        @Bean
        DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE_NAME);
        }
    
        @Bean
        Queue queueA() {
            return new Queue(QUEUE_A_NAME);
        }
    
        @Bean
        Queue queueB() {
            return new Queue(QUEUE_B_NAME);
        }
    
        @Bean
        Binding bindQueueAToDirect(Queue queueA, DirectExchange directExchange) {
            return BindingBuilder.bind(queueA).to(directExchange).with(ROUTING_KEY_A);
        }
    
        @Bean
        Binding bindQueueBToDirect(Queue queueB, DirectExchange directExchange) {
            return BindingBuilder.bind(queueB).to(directExchange).with(ROUTING_KEY_B);
        }
    }
    
    //生产者
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DirectProducer {
    
        private final AmqpTemplate rabbitTemplate;
    
        @Autowired
        public DirectProducer(AmqpTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        public void send(String routingKey, String message) {
            rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE_NAME, routingKey, message);
        }
    }
    //消费者
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class DirectConsumerA {
    
        @RabbitListener(queues = DirectExchangeConfig.QUEUE_A_NAME)
        public void receiveA(String in) {
            System.out.println("Queue A received: '" + in + "'");
        }
    }
    
    @Component
    public class DirectConsumerB {
    
        @RabbitListener(queues = DirectExchangeConfig.QUEUE_B_NAME)
        public void receiveB(String in) {
            System.out.println("Queue B received: '" + in + "'");
        }
    }
    
  • Topic Exchange:主题交换机,这种交换机通过通配符匹配,根据消息的路由键与队列绑定时指定的路由模式匹配程度(#表示一个或多个词,*表示一个词。),将消息路由到一个或者是多个队列。多用于发布/订阅模式和复杂的消息路由需求。

    1. Topic中,将routingKey通过“.”来分为多个部分

    2. “*”:代表一个部分

    3. “#”:代表0个或多个部分(如果绑定的路由键为‘#’时,则接受所有消息,因为路由键所有都匹配)

      请添加图片描述

    然后发送一条信息,routingkey为”key1.key2.key3.key4",那么根据"."将这个路由键分为了4个部分,此条路由键,将会匹配:

    1. key1.key2.key3.:成功匹配,因为可以代表一个部
    2. key1.#:成功匹配,因为#可以代表0或多个部分
    3. *key2.*.key4: 成功匹配,因为第一和第三部分分别为key1和key3,且为4个部分,刚好匹配4.#.key3.key4:成功匹配,#可以代表多个部分,正好匹配中了我们的key1和key2如果发送消息routingkey为"key1",那么将只能匹配中key1.#,#可以代表0个部分
    //配置类
    @Configuration
    public class TopicExchangeConfig {
    
        public static final String TOPIC_EXCHANGE_NAME = "topic-exchange";
        public static final String QUEUE_A_NAME = "queue-a";
        public static final String QUEUE_B_NAME = "queue-b";
    
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE_NAME);
        }
    
        @Bean
        Queue queueA() {
            return new Queue(QUEUE_A_NAME);
        }
    
        @Bean
        Queue queueB() {
            return new Queue(QUEUE_B_NAME);
        }
    
        @Bean
        Binding bindQueueAToTopic(Queue queueA, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueA).to(topicExchange).with("key.*");
        }
    
        @Bean
        Binding bindQueueBToTopic(Queue queueB, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueB).to(topicExchange).with("*.key");
        }
    }//队列queue-a绑定到主题交换机上的key.*模式,这意味着任何以key.开头的routing key都会被路由到queue-a。队列queue-b绑定到*.key模式,这意味着任何以.key结尾的routing key都会被路由到queue-b
    
    //生产者
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicProducer {
    
        private final AmqpTemplate rabbitTemplate;
    
        @Autowired
        public TopicProducer(AmqpTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        public void send(String routingKey, String message) {
            rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE_NAME, routingKey, message);
        }
    }//当 routingKey="hello.key" 则会路由到queueB
    
    //消费者
    @Component
    public class TopicConsumerA {
    
        @RabbitListener(queues = TopicExchangeConfig.QUEUE_A_NAME)
        public void receiveA(String in) {
            System.out.println("Queue A received: '" + in + "'");
        }
    }
    
    @Component
    public class TopicConsumerB {
    
        @RabbitListener(queues = TopicExchangeConfig.QUEUE_B_NAME)
        public void receiveB(String in) {
            System.out.println("Queue B received: '" + in + "'");
        }
    }
    
  • Headers Exchange:头交换机,不处理路由键, 而是根据发送的消息内容中的headers属性进行匹配 。只有当消息的标头和绑定规则完全匹配时,消息才会被路由到队列。适用于需要复杂消息匹配的场景。

    消费方指定的headers中必须包含一个“x-match“的键。
    键"x-match"的值有2个

    1. x-match=all :表示所有的键值对都匹配才能接受到消息
    2. x-match =any:表示只要有键值对匹配就能接受到消息

    请添加图片描述

    //配置类
    @Bean
        HeadersExchange headersExchange() {
            return new HeadersExchange(HEADER_EXCHANGE_NAME);
        }
        @Bean
        Queue queue() {
            return new Queue(QUEUE_NAME);
        }
        @Bean
        Binding binding(Queue queue, HeadersExchange headersExchange) {
            return BindingBuilder.bind(queue).to(headersExchange).whereAll(new String[]{"x-match", "id"}).matches("true", "123");
        }//whereAll:必须匹配所有的键值对  whereAny:至少有一个匹配的键值对
    
    //生产者
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HeaderProducer {
    
        private final RabbitTemplate rabbitTemplate;
    
        @Autowired
        public HeaderProducer(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        public void send(String message) {
            MessageProperties props = new MessageProperties();
            props.setHeader("x-match", "true");
            props.setHeader("id", "123");
            Message msg = new Message(message.getBytes(), props);
            rabbitTemplate.convertAndSend(HeaderExchangeConfig.HEADER_EXCHANGE_NAME, "", msg);
        }
    }
    //消费者
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HeaderConsumer {
    
        @RabbitListener(queues = HeaderExchangeConfig.QUEUE_NAME)
        public void receive(String in) {
            System.out.println("Received: '" + in + "'");
        }
    }
    
  • Fanout Exchange:扇形交换机,采用广播的方式,根据绑定的交换机路由到与之对应的所有队列。用于发布/订阅模式,其中一个消息被广播给所有订阅者。

    请添加图片描述

    //配置类   
    public static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";
        public static final String QUEUE_A_NAME = "queue-a";
        public static final String QUEUE_B_NAME = "queue-b";
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE_NAME);
        }
    
        @Bean
        Queue queueA() {
            return new Queue(QUEUE_A_NAME);
        }
    
        @Bean
        Queue queueB() {
            return new Queue(QUEUE_B_NAME);
        }
    
        @Bean
        Binding bindQueueAToFanout(Queue queueA, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueA).to(fanoutExchange);
        }
    
        @Bean
        Binding bindQueueBToFanout(Queue queueB, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueB).to(fanoutExchange);
        }
    
    //生产者
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FanoutProducer {
    
        private final AmqpTemplate rabbitTemplate;
    
        @Autowired
        public FanoutProducer(AmqpTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        public void send(String message) {
            rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE_NAME, "", message);
        }
    }
    //消费者
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FanoutConsumerA {
    
        @RabbitListener(queues = FanoutExchangeConfig.QUEUE_A_NAME)
        public void receiveA(String in) {
            System.out.println("Queue A received: '" + in + "'");
        }
    }
    
    @Component
    public class FanoutConsumerB {
    
        @RabbitListener(queues = FanoutExchangeConfig.QUEUE_B_NAME)
        public void receiveB(String in) {
            System.out.println("Queue B received: '" + in + "'");
        }
    }
    
    RabbitMq如何保证消息不丢失?
    • 消息丢失得原因分析如下图:

      请添加图片描述

从图可知 生产者的消息到达消费者会经过两次网络传输,并且会在RabbitMq服务器中进行路由。

所以一般有以下三种丢失的场景:

  1. 生产者消息发送到RabbitMq服务器过程中出现消息丢失。可能由于网络波动,或者是服务器宕机。
  2. RabbitMQ 服务器消息持久化出现消息丢失。 消息发送到 RabbitMQ 之后,未能及时存储完成持久化,RabbitMQ 服务器出现宕机重启,消息出现丢失。
  3. 消费者拉取消息过程以及拿到消息后出现消息丢失。消费者从 RabbitMQ 服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。

针对上述三种消息丢失场景,RabbitMQ 提供了相应的解决方案,confirm 消息确认机制(生产者),消息持久化机制(RabbitMQ 服务),ACK事务机制(消费者)

confirm消息确认机制(生产者)

Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时,它会等待 RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。消息正确投递到 queue 时,会返回 ack。
消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失。

application.yml中需要配置publisher-confirm-type

  1. none
    • 这是默认设置,意味着不启用发布确认。在这种模式下,消息发送后,无论是否到达 RabbitMQ 服务器,都不会有确认信息返回。这通常用于性能优化场景,因为没有确认机制会带来更高的消息吞吐量。
  2. simple
    • 在这种模式下,RabbitMQ 会为每个发送的消息提供一个确认。当消息被 RabbitMQ 接收并持久化后,会发送一个确认给生产者。但是,simple 模式不会关联每个消息的确认信息,也就是说,你无法知道具体是哪一个消息被确认了。这对于不需要精细控制的消息发布场景来说足够了。
  3. correlated
    • 这是最强大的确认模式。在 correlated 模式下,每个消息都有一个唯一的标识符(CorrelationData),这样就可以跟踪每个消息的确认状态。当消息被确认时,RabbitMQ 会发送回一个包含该消息标识符的确认信息。这样,你就可以确切地知道哪些消息被成功接收,哪些可能失败了。这对于需要高可靠性和精确消息追踪的应用场景非常有用。

示例代码如下:

配置类代码如下:

@Configuration
public class RabbitConfig {

    @Bean
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("Message with id: " + correlationData.getId() + " successfully sent");
            } else {
                System.out.println("Message with id: " + correlationData.getId() + " failed to send due to: " + cause);
            }
        });
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("Returned message: " + new String(message.getBody()));
            System.out.println("Exchange: " + exchange);
            System.out.println("Routing key: " + routingKey);
            System.out.println("Reply code: " + replyCode);
            System.out.println("Reply text: " + replyText);
        });
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

生产者代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class MessageProducerService {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public MessageProducerService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Transactional
    public void sendMessage(String message) {
        try {
            // 发送消息
            rabbitTemplate.convertAndSend("exchange-name", "routing-key", message);
            // 业务逻辑
            doBusinessLogic();
        } catch (Exception e) {
            // 处理异常
            throw new RuntimeException(e);
        }
    }

    private void doBusinessLogic() {
        // 业务逻辑代码
    }
}

RabbitTemplate在Spring AMQP中提供了多种机制来确认消息是否成功传递给RabbitMQ以及消息是否成功路由到预期的队列。setConfirmCallbacksetReturnCallback是其中的两种重要机制,它们分别在不同的情况下被触发,用于处理消息的确认和返回。

  1. setConfirmCallback

setConfirmCallback方法用于设置一个回调,当消息被投递到RabbitMQ的Broker后,Broker会发送一个确认信号回给生产者,表明消息已经被接收。这个确认信号包含了消息的唯一标识符,称为CorrelationData,通常是一个UUID或其他可识别的标识符,以及一个布尔值ack,表明消息是否被成功接收。

触发时机

  • 消息到达Broker:当消息到达Broker并被放置在队列中或在尝试放置消息时失败,Broker会发送确认信号。
  • 消息持久化:如果消息被设置为持久化的,Broker在将消息写入磁盘后才发送确认信号。

使用场景

  • 确认消息投递:在高可用性或关键业务流程中,需要确认每条消息都被Broker接收。
  • 性能优化:在高吞吐量的应用中,可以使用发布确认的批量确认模式以减少网络开销。
  1. setReturnCallback

setReturnCallback方法用于设置一个回调,当消息无法被正确路由到任何一个队列时,Broker会将消息返回给生产者。这通常发生在以下情况:

  • Routing Key不存在:如果使用Direct或Fanout交换机时,没有队列与指定的Routing Key绑定。
  • Routing Key不匹配:如果使用Topic或Header交换机时,消息的Routing Key与队列绑定的模式不匹配。
  • Mandatory标志位:当发送消息时设置了mandatory标志位为true,并且消息不能被正确路由到队列时,Broker会返回消息。
  • Immediate标志位:当发送消息时设置了immediate标志位为true,并且消息不能立即被消费时,Broker会返回消息。

触发时机

  • 消息未被路由:当消息因为上述原因未能被正确路由到队列时,Broker会返回消息。

使用场景

  • 错误检测:在开发阶段或生产环境中,可以帮助检测和调试Routing Key的错误或配置问题。
  • 消息重试或记录:对于未能路由的消息,可以设计逻辑来重新发送消息或记录日志以供后续分析。

综合使用

通常,setConfirmCallbacksetReturnCallback会被一起使用,以实现更全面的消息确认和错误处理策略。这确保了消息不仅被Broker接收,而且也被正确地路由到预期的队列中。如果消息未能正确路由,可以通过setReturnCallback回调进行处理,如重新发送消息、记录错误或采取其他补救措施。

注意事项

  • 当使用setConfirmCallback时,需要考虑网络延迟和Broker的确认延迟,这可能影响确认信号的及时性。
  • 当使用setReturnCallback时,需要处理好返回的消息,避免无限循环发送或不当的处理导致的问题。
消息持久化机制(RabbitMQ服务)

生产者确认可以保证消息投递到RabbitMQ的队列之中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能会导致消息丢失。

所以想要确保消息在RabbitMQ中安全保存,必须开启消息持久化机制,这种机制可以分为三种。

  1. 交换机持久化: 交换机持久化意味着交换机的定义在 RabbitMQ 重启后仍然存在,不会丢失 。 即使 RabbitMQ 重启,所有的绑定关系、交换机类型及其属性都会保持不变,确保消息路由规则的连续性 。
  2. 队列持久化: 队列持久化意味着队列的定义在 RabbitMQ 重启后仍然存在,同样不会丢失, 如果在 RabbitMQ 停止前队列中有未被消费的消息,这些消息在 RabbitMQ 重启后仍然会存在于队列中,等待被消费 。
  3. 消息持久化: 消息持久化意味着消息在存储到 RabbitMQ 时会被写入磁盘,而不仅仅是内存中 , 即使 RabbitMQ 服务突然关闭,持久化消息不会丢失,确保了消息的完整性。

以下示例会将三种持久化一同配置:

//Config配置类
@Configuration
public class RabbitConfig {

    public static final String DURABLE_QUEUE_NAME = "durable.queue";
    public static final String DURABLE_EXCHANGE_NAME = "durable.exchange";
    public static final String ROUTING_KEY = "durable.routing.key";

    @Bean
    Queue durableQueue(){
        //通过QueueBuilder.durable创建出的队列就是持久化队列
        return QueueBuilder.durable(DURABLE_QUEUE_NAME).build();
    }

    @Bean
    DirectExchange durableExchange() {
        //三个参数DURABLE_EXCHANGE_NAME表示交换机名称
        //true 代表是持久化 默认为false
        //false 当没有queue与其绑定时不自动删除
        return new DirectExchange(DURABLE_EXCHANGE_NAME, true, false);
    }
    @Bean
    Binding binding(Queue durableQueue, DirectExchange durableExchange) {
        return BindingBuilder.bind(durableQueue).to(durableExchange).with(ROUTING_KEY);
    }

}

//生产者代码
package com.example.demo.demos.service;

import com.example.demo.demos.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class RabbitMQProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        MessageProperties messageProperties = new MessageProperties();
        //利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
		//1:非持久化
		//2:持久化
        //如下是设置delivery-mode为2
  messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        Message persistentMessage = new Message(message.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(RabbitConfig.DURABLE_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, persistentMessage);
    }
}

上述代码配置了交换机持久化、队列持久化以及消息的持久化。