微服务--消息队列mq

发布于:2025-06-15 ⋅ 阅读:(20) ⋅ 点赞:(0)

1. mq简介

        消息队列是分布式系统中的异步通信中间件,采用"生产者-消费者"模型实现服务间解耦通信

核心作用

  • 服务解耦
  • 异步处理
  • 流量削峰
  • 数据同步
  • 最终一致性

消息队列模式

  • 发布/订阅模式:一对多广播
  • 工作队列模式:竞争消费
  • 死信队列:处理失败消息
  • 延迟队列:定时任务处理
  • 消息回溯:Kafka按offset重新消费

2. mq入门

        使用SpringAMQP实现HelloWorld中的基础消息队列功能,一个生产者,一个队列,一个消费者

2.1 启动mq

        打开mq下载目录,输入命令(rabbitmq-server start)启动

网址localhost:15672访问,账号密码均为guest

2.2 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.gaohe</groupId>
    <artifactId>clouddemo</artifactId>
    <packaging>pom</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <name>clouddemo</name>
    <description>clouddemo</description>
    <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>3.3.3</spring-boot.version>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.gaohe.clouddemo.ClouddemoApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2.3 在yml配置文件中配置连接信息

spring:
  rabbitmq:
    host: localhost # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: guest # 用户名
    password: guest # 密码

2.4 在publisher中利用RabbitTemplate发送信息到simple.queue队列

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {

    @Autowired
    public RabbitTemplate rabbitTemplate;

//    发送消息
@Test
public void test1(){
//        1.发送的队列
    String queueName1 ="hello.queue";
//        2.发送的消息
    String msg = "你好我哟一个帽衫";
//        3.发送
    rabbitTemplate.convertAndSend(queueName1,msg);
}

}

2.5 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

package com.gaohe.consumer.lisenner;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HelloLisenner {

    @RabbitListener(queues = "hello.queue")
    public void helloQueueLisenner(String msg){
        System.out.println("helloQueueLisenner"+msg);
    }
    @RabbitListener(queues = "hello.queue")
    public void helloQueueLisenner2(String msg){
        System.out.println("helloQueueLisenner2"+msg);
    }

}

3.交换机

        Exchange是消息队列系统中的消息路由中枢,负责接收生产者发送的消息并根据特定规则将消息路由到一个或多个队列中。

常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

3.1 路由交换机(FanoutExchange)

  • 在consumer服务创建一个类,添加注解,声明交换机,队列以及绑定关系对象
package com.gaohe.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class FanoutConfig {

//    交换机
    @Bean
    public FanoutExchange fanout1(){
        return new FanoutExchange("itgaohe.fanout");
    }
    //    定义队列
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }
    //    队列绑定交换机
    @Bean
    public Binding binding1(FanoutExchange fanout1){
        return BindingBuilder.bind(queue1()).to(fanout1);
    }
    //    定义队列
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    //    队列绑定交换机
    @Bean
    public Binding binding2(FanoutExchange fanout1){
        return BindingBuilder.bind(queue2()).to(fanout1);
    }
}

 

  • 在consumer服务中的监听类中添加方法进行监听
package com.gaohe.consumer.lisenner;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutLisenner {

    @RabbitListener(queues = "fanout.queue1")
    public void fanoutQueueLisenner(String msg){
        System.out.println("fanoutQueueLisenner:"+msg);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void fanoutQueueLisenner2(String msg){
        System.out.println("fanoutQueueLisenner2:"+msg);
    }
}

 

  • 在publisher服务创建测试类进行测试

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {

    @Autowired
    public RabbitTemplate rabbitTemplate;


    @Test
    public void test3(){
//        1.发送的队列
        String exName ="itgaohe.fanout";
//        2.发送的消息
        String msg = "你好";
//        3.发送
        rabbitTemplate.convertAndSend(exName,"",msg);
    }

}

3.2 路由交换机(DirectExchange)

        交换机,队列不仅可以单独配置,也可以在监听类使用注解进行配置

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectLisenner {


    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue("direct.queue1"),
                    exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),
                    key = {"blue","red"}
            )

    )
    public void directQueueLisenner(String msg){
        System.out.println("directQueueLisenner"+msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue("direct.queue2"),
                    exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),
                    key = {"yellow","red"}
            )

    )
    public void directQueueLisenner2(String msg){
        System.out.println("directQueueLisenner2"+msg);
    }

}

         publisher测试类进行测试

package com.gaohe.publisher;


import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {

    @Autowired
    public RabbitTemplate rabbitTemplate;


    @Test
    public void test3(){
//        1.发送的队列
        String exName ="itgaohe.direct";
//        2.发送的消息
        String msg = "I LOVE YOU ";
//        3.发送
        rabbitTemplate.convertAndSend(exName,"yellow",msg);
    }

}

3.3 广播交换机(TopicExchange)

  • 监听类
package com.gaohe.consumer.lisenner;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class TopicLisenner {


    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue("topic.queue1"),
                    exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),
                    key = {"china.#","#.weather"}
            )

    )
    public void directQueueLisenner(String msg){
        System.out.println("directQueueLisenner"+msg);
    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue("topic.queue2"),
                    exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),
                    key = {"us.#","#.weather"}
            )

    )
    public void directQueueLisenner2(String msg){
        System.out.println("directQueueLisenner2"+msg);
    }

}
  • 测试类
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {

    @Autowired
    public RabbitTemplate rabbitTemplate;


    @Test
    public void test4(){
//        1.发送的
        String exName ="itgaohe.topic";
//        2.发送的消息
        String msg = "hello world6666";
//        3.发送
        rabbitTemplate.convertAndSend(exName,"aa.weather",msg);
    }
}

        用的最多的是路由交换机和广播交换机

4. mq消息转换器

        消息转换器是消息中间件中的数据格式转换层,负责在消息生产/消费过程中实现:

  • Java对象 ↔ 消息体序列化/反序列化

  • 消息属性(headers/properties)的自动处理

  • 不同数据格式间的相互转换

配置消息转换器

  • 父工程导入依赖
<dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
</dependency>
  • 给提供者和消费者配置消息转换器Bean对象
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class mqConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • 定义消费者,监听队列并消费消息

  • 测试

 


网站公告

今日签到

点亮在社区的每一天
去签到