<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
消息处理 work 多个消费者共同处理消息处理 Work模型的使用: 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理 - 通过设置prefetch来控制消费者预取的消息数量 @RabbitListener(queues = "simple.queue") rabbitTemplate.convertAndSend(queueName, message + i); 发布订阅的模型 多了一个exchange角色 Fanout Direct Topic FanoutConfig
在consumer中创建一个类,声明队列和交换机:
package cn.itheima.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("exchange.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
3.4.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test public void testFanoutExchange() { // 队列名称 String exchangeName = "exchange.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
3.4.3.消息接收
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
在Fanout模式中,一条消息,会被所有订阅的队列都消费
Direct类型不同的消息被不同的队列消费 RoutingKey
只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
描述下Direct交换机与Topic交换机的差异?
Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】"); }
3.7.2.配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
配置消息转换器。
在启动类中添加一个Bean即可:
@Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }