个人名片:
博主:酒徒ᝰ.
个人简介:沉醉在酒中,借着一股酒劲,去拼搏一个未来。
本篇励志:三人行,必有我师焉。
本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》,SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 点击观看
三、SpringAMQP
5.发布、订阅模型-Topic
RabbitMQ是一种高效、可靠、灵活的消息队列中间件,用于实现分布式系统中的数据交换和消息处理。在RabbitMQ中,Topic是一种常见的消息路由模式,它允许生产者将消息发送到多个队列,同时允许多个消费者从这些队列中接收和处理消息。
一、RabbitMQ Topic模式概述
RabbitMQ的Topic模式是指消息按照一定的规则被路由到一个或多个队列中,消费者通过订阅这些队列来接收和处理消息。在Topic模式下,生产者将消息发送到一个或多个Topic,每个Topic可以有一个或多个消费者订阅。当消息被发布到Topic时,RabbitMQ会将消息路由到订阅了该Topic的所有消费者。
二、RabbitMQ Topic模式工作原理
- 消息路由
在Topic模式下,生产者将消息发送到特定的Topic。消费者通过订阅该Topic来接收和处理消息。当消费者成功处理消息后,RabbitMQ会从队列中删除该消息,使其无法被其他消费者接收。
- 消息持久化
RabbitMQ的Topic模式支持消息的持久化存储,即消息可以保存在磁盘上,以防止在系统崩溃时丢失数据。此外,Topic模式还支持消息的内存缓存,以提高读取效率。
三、RabbitMQ Topic模式的优点
(1)发布/订阅模式:Topic模式实现了发布/订阅的消息路由方式,生产者和消费者之间解耦,消费者可以动态地订阅或取消订阅Topic,使得系统的灵活性和可扩展性得到提高。
(2)多路广播:Topic模式可以将消息发送到多个队列,从而实现多路广播的效果,使得多个消费者可以同时接收和处理同一份消息。
(3)负载均衡:由于消息可以被路由到多个队列,不同的消费者可以并行处理消息,从而提高了系统的吞吐量和负载均衡的能力。
(4)持久化存储:Topic模式支持消息的持久化存储,提高了系统的可靠性和稳定性。
三、RabbitMQ Topic模式的缺点
(1)消息重复:由于Topic可以将消息路由到多个队列,可能会导致同一个消息被多个消费者接收和处理,从而产生重复的处理结果。
(2)订阅管理:Topic模式需要消费者手动订阅或取消订阅Topic,增加了系统的复杂性和管理难度。
五、RabbitMQ Topic模式应用场景
- 分布式系统:在分布式系统中,通常需要将消息发送到多个节点进行处理,Topic模式可以将消息路由到多个队列,从而实现分布式处理的效果。
- 解耦和灵活性:在一些场景下,需要将生产者和消费者解耦,使得消费者可以动态地加入或离开系统。Topic模式可以满足这种需求,消费者可以随时订阅或取消订阅Topic。
- 多通道通信:在多通道通信场景中,不同的通道需要处理不同类型的消息,Topic模式可以将不同类型的消息路由到不同的队列,从而实现多通道通信。
- 数据流处理:在数据流处理场景中,需要将数据流分成不同的部分进行处理,Topic模式可以将数据流路由到不同的队列,从而实现数据流的分布式处理。
总结
RabbitMQ的Topic模式是一种灵活的消息路由模式,它实现了发布/订阅的消息路由方式,支持多路广播、负载均衡和持久化存储等特性。然而,Topic模式也存在消息重复和订阅管理复杂等缺点。在实际应用中,需要根据具体场景选择合适的消息路由模式并进行优化。
发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
案例:利用SpringAMQP演示TopicExchange的使用
实现思路如下:
并利用@RabbitListener声明Exchange、Queue、RoutingKey
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
在publisher中编写测试方法,向itcast. topic发送消息
步骤1:在consumer服务声明Exchange、Queue
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
并利用@RabbitListener声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue3"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.*"
))
public void listenTopicQueue3(String msg){
System.out.println("消费者接收到topic.queue3的消息:【" + msg + "】");
}
步骤2:在publisher服务发送消息到TopicExchange
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testTopicExchange() {
String exchangeName = "itcast.topic";
String message = "hello, jack";
rabbitTemplate.convertAndSend(exchangeName, "china.new", message);
}
总结:
描述下Direct交换机与Topic交换机的差异?
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词