RocketMq多环境自动隔离

发布于:2024-06-17 ⋅ 阅读:(52) ⋅ 点赞:(0)

一、多环境隔离场景

        当多个环境使用同一套rocketmq的服务的时候,如果不对环境进行隔离,将会导致消息被错误的环境消费,因此可以采用两种方式进行隔离。

方式1
        通过$Value注入,但是这个需要每个环境都维护自己的topic等信息,比较麻烦。

方式2:

        通过环境配置进行自动隔离,比如dev、test、pre、prod等不同环境只需要简单配置一个选项,所有的消息将被自动隔离,这样各个环境共用一套rocketmq服务即可,不需要分环境搭建,无论开发、测试都非常简便,整个公司可以共用一套。

二、多环境自动隔离的原理

        多环境隔离,利用BeanPostProcessor的postProcessBeforeInitialization在监听器实例初始前把对应topic、consumerGroup进行修改,发送消息的时候,也根据环境进行区分要发到那个环境的topic和consumerGroup上去。

三、代码示例

  3.1、增加配置文件

# 自定义属性
rocketmq:
  environment:
    # 隔离环境名称,拼接到topic后,xxx_topic_pre,默认空字符串;
    # 也可根据spring.profiles.active的值
    # name: pre
    # 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果
    # 默认为true,配置类:RocketMqEnvirIsolationConfig
    isolation: true

  3.2、编写配置类

   原理:

      Spring生命周期中BeanPostProcessor在类初始化前,执行postProcessBeforeInitialization中的内容。使用@Component注解,用来把该类扫描到spring容器中进行统一管理。这要就可以在类初始化前,把监听器类的topic/group/tag等修改成自己想要的,然后实例化的时候用的就是改后值。

import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;

/**
 * RocketMQ多环境隔离配置
 * 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
 */
@Configuration
public class RocketMqEnvirIsolationConfig implements BeanPostProcessor {

    @Value("${rocket.environment.isolation:true}")
    private boolean enabledIsolation;

    //Springboot的使用的环境dev、test、pre、prod
    @Value("${spring.prifiles.active}")
    private String environmentName;

    @Override
    public Object postProcessBeforeInitialization(@NonNull Object bean,
                                                  @NonNull String beanName) throws BeansException {
        // DefaultRocketMQListenerContainer是监听器实现类
        if (bean instanceof DefaultRocketMQListenerContainer) {
            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
            // 开启消息隔离情况下获取隔离配置,此处隔离topic,根据自己的需求隔离group或者tag
            if (enabledIsolation && StringUtils.hasText(environmentName)) {
                container.setTopic(String.join("_", container.getTopic(), environmentName));
                container.setConsumerGroup(String.join("_", container.getConsumerGroup(), environmentName));
            }
            return container;
        }
        return bean;
    }
}

 3.3、生产者发送消息

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.constant.RocketMqDelayLevel;
import com.codecoord.rocketmq.domain.RocketMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;

@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {

    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate rocketMqTemplate;

    @Value("${spring.profiles.active}")
    private String environmentName;

    @Value("${rocketmq.isolation:true}")
    private boolean enabledIsolation;


    @GetMapping("/sendMsg")
    public String sendMessage() {
       
        String destination = "user_audit_queue";
        //是否租户隔离 
        if(enabledIsolation){
          destination = String.join("_", destination , environmentName);
        }

        RocketMqMessage message = new RocketMqMessage();
        message.setId(System.currentTimeMillis());
        message.setMessage("这是一个测试消息!");
        message.setCurrentDate(LocalDate.now()); // Java时间字段需要单独处理,否则会序列化失败
        message.setCurrentDateTime(LocalDateTime.now());
        message.setVersion("1.0");

        //1、发送同步消息,消息成功发送到Broker时才返回,message可以入参批量消息
        // 通过SendResult来处理发送结果
        // SendResult sendResult = rocketMqTemplate.syncSend(destination, message);

        /// 发送时指定业务key
        /*Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
                // 设置keys
                .setHeader(RocketMQHeaders.KEYS, message.getId())
                .build();
        SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage);*/




        //2、 发送延迟消息
        Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
        SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage, 3000, RocketMqDelayLevel.FIVE_SECOND);



        //3、发送同步有序消息,需要指定hashKey,可以用业务唯一键
        // rocketMqTemplate.syncSendOrderly(destination, message, message.getId().toString());



        //4、发送异步消息,消息发送后及时返回,然后通过回调方法通知
        // rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
        //     @Override
        //     public void onSuccess(SendResult sendResult) {
        //         log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
        //     }
        //
        //     @Override
        //     public void onException(Throwable e) {
        //         log.error("消息发送失败【{}】", e.getMessage());
        //     }
        // });



        //5、 发送异步有序消息,需要指定hashKey,可以用业务唯一键
        // rocketMqTemplate.asyncSendOrderly(destination, message, message.getId().toString(), new SendCallback() {
        //     @Override
        //     public void onSuccess(SendResult sendResult) {
        //         log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
        //     }
        //
        //     @Override
        //     public void onException(Throwable e) {
        //         log.error("消息发送失败【{}】", e.getMessage());
        //     }
        // });




        //6、 发送单向消息
        // rocketMqTemplate.sendOneWay(destination, message);



        //7、 发送单向有序消息,通过MessageBuilder构建
        // Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
        // rocketMqTemplate.sendOneWayOrderly(destination, buildMessage, message.getId().toString());



        //8、发送和接收回调消息,需要实现 RocketMQReplyListener 监听器类才可以,否则将会超时错误
        // rocketMqTemplate.sendAndReceive(destination, message, new RocketMQLocalRequestCallback<String>() {
        //     @Override
        //     public void onSuccess(String message) {
        //         log.info("消息发送成功,消息类型【{}】", message);
        //     }
        //
        //     @Override
        //     public void onException(Throwable e) {
        //         log.error("消息发送失败", e);
        //     }
        // });



        //9、 调用抽象类方法发送,最终也是syncSend
        // rocketMqTemplate.convertAndSend(destination, "convertAndSend");



        //10、转换消息和发送,底层使用的是syncSend(destination, message),将会被RocketEntityMessageListener消费
        // Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
        //         // 设置请求头
        //         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
        //         .build();
        // 将会被RocketEntityMessageListener03消费
        // Message<Object> buildMessage = MessageBuilder.withPayload(new Object()).build();
        // rocketMqTemplate.send(destination, buildMessage);



        //11、 发送批量消息,批量消息最终转为单挑进行发送
        // List<Message<String>> msgList = new ArrayList<>();
        // for (int i = 0; i < 10; i++) {
        //     msgList.add(MessageBuilder.withPayload("消息:" + i).build());
        // }
        // rocketMqTemplate.syncSend(destination, msgList);
        return message;
    }



    /**
     * 直接将对象进行传输,也可以自己进行json转化后传输
     */
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        String destination = "user_audit_topic";
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        return rocketMqTemplate.syncSend(destination, jsonObject);
    }
}

3.4、消费者监听消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.jeecg.common.base.BaseMap;


@Component
@RocketMQMessageListener(topic = "user_audit_queue",consumerGroup = "user_audit_queue")
@Slf4j
public class UserAuditQueueListener implements RocketMQListener<BaseMap> {


    @Override
    public void onMessage(BaseMap baseMap) {
       //消费者监听到消息进行消费
       ....
    }

}

四、消息中时间类型的支持

RocketMQ内置使用的转换器是RocketMQMessageConverter中MessageConverterConfiguration方法,转换JSON时使用的是MappingJackson2MessageConverter,但是其不支持Java的时间类型,比如LocalDate、Date等,当消息实体中存在上面的时间类型字段时将会报以下错误:

java.lang.RuntimeException: cannot convert message to class com.codecoord.rocketmq.domain.RocketMqMessage
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:486) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.1.jar:4.9.1]
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_231]
  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_231]
  at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_231]
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Expected array or string.

 

     所以需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,然后添加支持时间模块,代码如下:

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import java.util.List;

/**
 * 序列化器处理
 */
@Configuration
public class RocketMqConfig {

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     */
    @Bean
    @Primary
    public RocketMQMessageConverter createRocketMQMessageConverter() {
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if (messageConverter instanceof MappingJackson2MessageConverter) {
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到