RabbitMQ01——基础概念、docker配置rabbitmq、内部执行流程、五种消息类型、测试第一种消息类型

发布于:2025-07-20 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

一、MQ( Message Queue 消息队列)

什么是消息队列

消息队列的作用

消息队列的缺点

MQ的两种实现方式

AMQP和JMS的区别和联系

常见的mq产品

二、docker下安装配置rabbitmq

1、拉取镜像

2、创建并启动容器

3、查看容器状态

4、查看容器日志

5、创建用户 赋权

6、使用浏览器打开RabbitMQ管理界面。

三、rabbitmq 内部执行流程

rabbtimq的5种消息模型

四、测试第一种simple消息模型 

实现过程:

1、引入jar包

2、封装工具类

3、创建生产者对象

消息确认机制(ACK) ACKnowledge

自动确认消息

手动确认消息


一、MQ( Message Queue 消息队列)

什么是消息队列

MQ(Message Queue)消息队列(消息中间件),是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息,来实现进程之间的通信

消息生产者把消息发送给消息队列,消息队列存储转发消息,消息消费者接收消息、处理消息。

消息队列的作用

一、异步

主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

二、解耦

一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

三、削峰填谷

高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

消息队列的缺点

  • 系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况。

    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?(集群)

  • 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。

    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  • 业务一致性。主业务和从属业务一致性的处理。

    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

MQ的两种实现方式

JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

AMQP:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

AMQP和JMS的区别和联系

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

  • JMS规定了两种消息模型(点对点,发布订阅);而AMQP的消息模型更加丰富(常用的有5种)

常见的mq产品

二、docker下安装配置rabbitmq

1、拉取镜像

docker pull rabbitmq:latest
​
docker pull rabbitmq:3.9.0-management

2、创建并启动容器

docker run -id --name rabbitmq -p 5672:5672 -p 15672:15672 -v /etc/rabbimq:/etc/rabbitmq  rabbitmq:3.9.0-management

3、查看容器状态

docker ps

如果容器状态显示为Up,并且端口映射正确,那么RabbitMQ服务已经成功启动。

4、查看容器日志

docker logs rabbitmq

访问RabbitMQ管理界面

5、创建用户 赋权

# 进入容器
docker exec -it rabbitmq bash
​
# 在容器内执行
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

 

6、使用浏览器打开RabbitMQ管理界面。

默认情况下,管理界面端口为15672。在浏览器地址栏输入以下URL:

http://<宿主机IP地址>:15672

三、rabbitmq 内部执行流程

rabbtimq的5种消息模型

四、测试第一种simple消息模型 

  • P(producer/ publisher):生产者,一个发送消息的用户应用程序。

  • C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

  • 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

  • RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

实现过程:

创建springboot项目的时候选择以下依赖

1、引入jar包

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

2、封装工具类

/*
rabbitmq工具类
 */
public class MQUtil {
    public static Connection getConnection() 
            throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("1.94.230.82");
        factory.setPort(5672);
        factory.setVirtualHost("/yan3");
        factory.setUsername("admin");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}

3、创建生产者对象

package com.hl.rabbitmq01.simple;

import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
生产者  javaSE方式简单测试
Simple简单消息模型
生产者----消息队列----消费者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接
        Connection connection = MQUtil.getConnection();
        //2、基于连接,创建信道
        Channel channel = connection.createChannel();
        //3、基于信道,创建队列
            /*
        参数:
            1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建
            2. durable:是否持久化,当mq重启之后,消息还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
            4。当Connection关闭时,是否删除队列
               autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。
         */
        channel.queueDeclare("simpleQueue01", false, false, false, null);
        //发送消息到消息队列
           /*
         参数:
            1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
            2. routingKey:路由名称,简单模式下路由名称使用消息队列名称
            3. props:配置信息
            4. body:发送消息数据
         */
        channel.basicPublish("","simpleQueue01",null,"Hello World".getBytes());
        //4、关闭信道,断开连接
        channel.close();
        connection.close();
    }
}

4、创建消费者 

package com.hl.rabbitmq01.simple;

import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
简单消息模型---消费者
生产者---消息队列---消费者
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接
        Connection connection = MQUtil.getConnection();
        //2.创建信道
        Channel channel = connection.createChannel();
        //3.创建消费者对象
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("Received: " + new String(body));
            }
        };
        //4.绑定消费者对象和消息队列
        /*
        参数1:消息队列名称
        参数2:是否自动确认消息
        参数3:消费者对象
         */
        channel.basicConsume("simpleQueue01", true, consumer);
    }
}

消息确认机制(ACK) ACKnowledge

  • 自动ACK:消息一旦被接收,消费者自动发送ACK

  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

自动确认消息

  DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("Received: " + new String(body));
            }
        };
        //4.绑定消费者对象和消息队列
        /*
        参数1:消息队列名称
        参数2:是否自动确认消息  
        参数3:消费者对象
         */
        channel.basicConsume("simpleQueue01", true, consumer);

手动确认消息

channel.basicAck(envelope.getDeliveryTag(), false);

   //3.创建消费者对象
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {

                System.out.println("Received: " + new String(body));
                //手动确认消息
                //参数1:消息id
                // 参数2:是否批量确认消息  false:只确认当前消息  true:批量确认删除 (小于当前消息id的消息,批量确认)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //4.绑定消费者对象和消息队列
        /*
        参数1:消息队列名称
        参数2:是否自动确认消息  true自动确认消息  false:手动确认消息
        参数3:消费者对象
         */
        channel.basicConsume("simpleQueue01", false, consumer);


网站公告

今日签到

点亮在社区的每一天
去签到