目录
一、MQ( Message Queue 消息队列)
什么是消息队列
MQ(Message Queue)消息队列(消息中间件),是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息,来实现进程之间的通信
消息生产者把消息发送给消息队列,消息队列存储转发消息,消息消费者接收消息、处理消息。
消息队列的作用
一、异步
主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
二、解耦
一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
三、削峰填谷
高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
消息队列的缺点
系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况。
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?(集群)
系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
业务一致性。主业务和从属业务一致性的处理。
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
MQ的两种实现方式
JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
AMQP:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
AMQP和JMS的区别和联系
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模型(点对点,发布订阅);而AMQP的消息模型更加丰富(常用的有5种)
常见的mq产品
二、docker下安装配置rabbitmq
1、拉取镜像
docker pull rabbitmq:latest
docker pull rabbitmq:3.9.0-management
2、创建并启动容器
docker run -id --name rabbitmq -p 5672:5672 -p 15672:15672 -v /etc/rabbimq:/etc/rabbitmq rabbitmq:3.9.0-management
3、查看容器状态
docker ps
如果容器状态显示为Up
,并且端口映射正确,那么RabbitMQ服务已经成功启动。
4、查看容器日志
docker logs rabbitmq
访问RabbitMQ管理界面
5、创建用户 赋权
# 进入容器
docker exec -it rabbitmq bash
# 在容器内执行
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
6、使用浏览器打开RabbitMQ管理界面。
默认情况下,管理界面端口为15672。在浏览器地址栏输入以下URL:
http://<宿主机IP地址>:15672
三、rabbitmq 内部执行流程
rabbtimq的5种消息模型
四、测试第一种simple消息模型
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
实现过程:
创建springboot项目的时候选择以下依赖
1、引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、封装工具类
/*
rabbitmq工具类
*/
public class MQUtil {
public static Connection getConnection()
throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("1.94.230.82");
factory.setPort(5672);
factory.setVirtualHost("/yan3");
factory.setUsername("admin");
factory.setPassword("123456");
return factory.newConnection();
}
}
3、创建生产者对象
package com.hl.rabbitmq01.simple;
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
生产者 javaSE方式简单测试
Simple简单消息模型
生产者----消息队列----消费者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接
Connection connection = MQUtil.getConnection();
//2、基于连接,创建信道
Channel channel = connection.createChannel();
//3、基于信道,创建队列
/*
参数:
1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建
2. durable:是否持久化,当mq重启之后,消息还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
4。当Connection关闭时,是否删除队列
autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
channel.queueDeclare("simpleQueue01", false, false, false, null);
//发送消息到消息队列
/*
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称,简单模式下路由名称使用消息队列名称
3. props:配置信息
4. body:发送消息数据
*/
channel.basicPublish("","simpleQueue01",null,"Hello World".getBytes());
//4、关闭信道,断开连接
channel.close();
connection.close();
}
}
4、创建消费者
package com.hl.rabbitmq01.simple;
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
简单消息模型---消费者
生产者---消息队列---消费者
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接
Connection connection = MQUtil.getConnection();
//2.创建信道
Channel channel = connection.createChannel();
//3.创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("Received: " + new String(body));
}
};
//4.绑定消费者对象和消息队列
/*
参数1:消息队列名称
参数2:是否自动确认消息
参数3:消费者对象
*/
channel.basicConsume("simpleQueue01", true, consumer);
}
}
消息确认机制(ACK) ACKnowledge
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
自动确认消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("Received: " + new String(body));
}
};
//4.绑定消费者对象和消息队列
/*
参数1:消息队列名称
参数2:是否自动确认消息
参数3:消费者对象
*/
channel.basicConsume("simpleQueue01", true, consumer);
手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
//3.创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("Received: " + new String(body));
//手动确认消息
//参数1:消息id
// 参数2:是否批量确认消息 false:只确认当前消息 true:批量确认删除 (小于当前消息id的消息,批量确认)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4.绑定消费者对象和消息队列
/*
参数1:消息队列名称
参数2:是否自动确认消息 true自动确认消息 false:手动确认消息
参数3:消费者对象
*/
channel.basicConsume("simpleQueue01", false, consumer);