RabbitMQ快速入门

发布于:2025-06-13 ⋅ 阅读:(23) ⋅ 点赞:(0)

RabbitMQ快速入门

1. docker安装RabbitMQ

rabbitMQ docker镜像官网:https://hub.docker.com/_/rabbitmq/

SpringBoot整合RabbitMQ参考网站:https://docs.spring.io/spring-boot/3.3/reference/messaging/amqp.html

docker安装rabbitMQ

docker run -d --name rabbitmq --restart=always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672 rabbitmq:3.13.7-management

RabbitMQ结构和概念
在这里插入图片描述

RabbitMQ中的几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源进行逻辑分组

基本消息队列消息发送流程:

  1. 建立connection;
  2. 创建channel;
  3. 利用channel声明队列;
  4. 利用channel向队列发送消息

基本消息队列消息接收流程:

  1. 建立connection;
  2. 创建channel;
  3. 利用channel声明队列;
  4. 定义consumer的消费行为handleDelivery();
  5. 利用channel将消费者与队列绑定

2. 简单队列模型

在这里插入图片描述

简单队列模型角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接收并缓存消息
  • consumer:订阅队列,处理队列中的消息

SpringBoot整合RabbitMQ(生产者)

  1. 引入依赖
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置application.yml
  rabbitmq:
    host: 192.168.xx.xx
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  1. 创建RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
    @Bean()
    public Queue simpleQueue(){
        return new Queue("simple_queue",true);
    }
}

对于生产者,当队列不存在时或消息发送时,均会报错。未防止误判,需提前创建Queue的Bean,若该队列不存在,则会自动创建,若存在,则不创建。

  1. 编写消息发送类
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Service
public class SimpleQueueService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public Boolean send(String msg){
        rabbitTemplate.convertAndSend("simple_queue",msg);
        return true;
    }
}

消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SimpleConsumerService {
    @RabbitListener(queues = "simple_queue")
    public void receive(String msg){
        System.out.println("处理消息:"+msg);
    }
}

添加@RabbitListener注解的方法即为消费者处理函数,queues参数可以为数组(多个队列时),函数的参数类型与发送者的类型一致

3. 工作队列模型

在这里插入图片描述

consumer1和consumer2为合作关系,一起处理queue中的消息。作用:提高消息的处理速度,避免消息的堆积

生产者

public Boolean send(String msg){
	for (int i=0;i<50;i++){
		rabbitTemplate.convertAndSend("simple_queue","message "+i);
	}
	return true;
}

消费者

@Component
public class SimpleConsumerService {
    @RabbitListener(queues = "simple_queue")
    public void receiver1(String msg) throws InterruptedException {
        System.out.println("队列1处理消息:"+msg);
        Thread.sleep(10);
    }

    @RabbitListener(queues = "simple_queue")
    public void receiver2(String msg) throws InterruptedException {
        System.out.println("队列2处理消息:"+msg);
        Thread.sleep(20);
    }
}

默认情况下,队列会以轮询的方式将消息均分给消费者,如果想实现消费者按需处理,可设置prefetch的值

spring:
  application:
    name: hello-world

  rabbitmq:
    host: 192.168.5.3
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #工作队列的消费者每次可取1条消息,处理完成后再取下一条

4. 发布订阅模型

简单队列模型和工作队列模型只有一个队列,消息只能提交给一个消费者处理。发布订阅模型通过交换机(exchange)可实现将消息推送到多个队列,交给多个消费者处理。
在这里插入图片描述

常见的exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

exchange只负责消息的转发,而不是存储,路由失败则消息丢失

4.1 Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个与其绑定的queue

在这里插入图片描述

案例

  1. 在Consumer服务中添加配置类,声明FanoutExchange和队列Queue,同时绑定FanoutExchange和Queue
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
//    声明exchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout");
    }
//      声明队列1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1",true);
    }
    //      声明队列2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2",true);
    }
//    绑定队列1和交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //    绑定队列2和交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
  1. 编写消费者
public class SimpleConsumerService {
    @RabbitListener(queues = "fanout.queue1")
    public void receiver1(String msg) throws InterruptedException {
        System.out.println("队列1处理消息:"+msg);
        Thread.sleep(10);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void receiver2(String msg) throws InterruptedException {
        System.out.println("队列2处理消息:"+msg);
        Thread.sleep(20);
    }
}

3.编写生产者

@Service
public class SimpleQueueService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public Boolean send(String msg){
//        rabbitTemplate.convertAndSend("simple_queue",msg);
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend("fanout","","message "+i);
        }
        return true;
    }
}

4.2 DirectExchange

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模型。

在这里插入图片描述

  • 每一个Queue都与Exchange设置一个BindingKey(Exchange和Queue默认BindingKey为交换机或Queue名称)
  • 生产者发布消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例

  1. 在consumer服务中声明两个消费者,分别监听queue1和queue2,并利用@RabbitListener声明Exchange,Queue、RoutingKey
@Component
public class SimpleConsumerService {
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"})) 	//key为数组,为Queue设置的RoutingKey
    public void receiver1(String msg) throws InterruptedException {
        System.out.println("队列1处理消息:"+msg);
        Thread.sleep(10);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}))	//key为数组,为Queue设置的RoutingKey
    public void receiver2(String msg) throws InterruptedException {
        System.out.println("队列2处理消息:"+msg);
        Thread.sleep(20);
    }
}
  1. 编写生产者
@Service
public class SimpleQueueService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    public Boolean send(String msg){
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend("direct","blue","message "+i);
        }
        return true;
    }
}

4.3 TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以==.==分割

在这里插入图片描述

Queue与Exchange指定BingKey时可以使用通配符:

#:代指0个或者多个单词

*:代指一个单词

案例

  1. 消费者
@Component
public class SimpleConsumerService {
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
            key = "hunan.#"))
    public void receiver1(String msg) throws InterruptedException {
        System.out.println("队列1处理消息:"+msg);
        Thread.sleep(10);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
            key = "#.news"))
    public void receiver2(String msg) throws InterruptedException {
        System.out.println("队列2处理消息:"+msg);
        Thread.sleep(20);
    }
}
  1. 生产者
 public Boolean send(String msg){
     for (int i=0;i<10;i++){
         rabbitTemplate.convertAndSend("topic","hunan.flowers","message "+i);
     }
     return true;
}

5. SpringAMQP消息转化器

Spring的消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

  1. 在publisher服务中引入依赖
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.11.3</version>
</dependency>
  1. 在publisher服务中声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}