rocketmq 之 阿里云转本地部署实践总结

发布于:2025-07-06 ⋅ 阅读:(16) ⋅ 点赞:(0)

前言

本地部署参考 这里

MQ部分

代码举例

阿里云

依赖

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.4.Final</version>
</dependency>

阿里云配置类(自定义的)

public class RmqTopicVO {
    /**
     * 主题类型
     **/
    private String type;
    /**
     * 主题
     **/
    private String topic;

    private String tag;
    /**
     * 是否延时发送
     **/
    private Boolean delayed;
    /**
     * 延时小时数
     **/
    private List<Long> hours;
    /**
     * 延时分钟数
     **/
    private List<Long> minutes;
    /**
     * 延时秒数
     **/
    private List<Long> seconds;
    /**
     * 延时毫秒数
     **/
    private List<Long> millis;

    /**
     * 重试次数
     **/
    private Integer retryTimes;
}

生产者代码

@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
    ProducerBean producer = new ProducerBean();
    producer.setProperties(getMqPropertie());
    return producer;
}

消费者代码

public class RmqServerConfig {
    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String groupId;
    private int threadNums = 20;
    private List<RmqTopicVO> topics;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, threadNums+"");
        consumerBean.setProperties(properties);
        //订阅关系
        if (topics == null || topics.isEmpty()){
            log.error("没有配置RMQ订阅主题");
            return null;
        }
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(topics.size());
        //订阅多个设置topic
        topics.forEach(topic ->{
            Subscription subscription = new Subscription();
            subscription.setTopic(topic.getTopic());
            subscription.setExpression(topic.getTag());
            //这里将注入到spring的消息监听处理类获取到,并放到topic订阅列表中
            subscriptionTable.put(subscription, SpringContextUtil.getBean(topic.getType()));
            log.info("注册RMQ:{}", topic.getType());
        });
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
}

消息监听类(举例)

@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListener {
    private final MyService myService;

    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        try {
            String body = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("收到通知, body:{}", body);
            myService.test(body);
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("mq发生异常", e);
            return Action.ReconsumeLater;
        }
    }
}

本地部署版

依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.9.8</version>
</dependency>

生产者代码

    @Value("${rocketmq.name-server}")
    private String myNameSrvAddr;
    @Value("${rocketmq.producer.access-key}")
    private String myAccessKey;
    @Value("${rocketmq.producer.secret-key}")
    private String mySecretKey;
    @Value("${spring.application.name}")
    private String projectName;

    @Bean( initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQProducer mqProducer() {
        // 1. 创建带认证信息的 RPCHook
        AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));
        DefaultMQProducer producer = new DefaultMQProducer(null, aclHook, true, null);
        // 本地部署客户端要求每个producer有唯一的producerGroup
        String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 5);
        producer.setProducerGroup(projectName + "_" + suffix);
        producer.setNamesrvAddr(myNameSrvAddr);
        producer.setVipChannelEnabled(false);
        return producer;
    }

消费者代码

    @Value("${rocketmq.name-server}")
    private String myNameSrvAddr;
    @Value("${rocketmq.consumer.access-key}")
    private String myAccessKey;
    @Value("${rocketmq.consumer.secret-key}")
    private String mySecretKey;
    private List<DefaultMQPushConsumer> consumers;
    @Value("${spring.application.name}")
    private String projectName;
    @Bean
    public List<DefaultMQPushConsumer> buildConsumer() {
        List<DefaultMQPushConsumer> consumers = new ArrayList<>(topics.size());
        for (RmqTopicVO topicVO : topics) {
            String topic = topicVO.getTopic();
            if (!StringUtils.hasText(topic)){
                continue;
            }
            MessageListenerConcurrently topicListenerBean = SpringContextUtil.getBean(topicVO.getType());
            if (Objects.isNull(topicListenerBean)){
                log.error("topic:[ {} ] no listener object", topic);
                continue;
            }
            String tag = StringUtils.hasText(topicVO.getTag()) ? topicVO.getTag() : "*";
            try {
                AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));
                String groupName = String.join("_", topic, projectName, "group");
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, aclHook, new AllocateMessageQueueAveragely(), true, null);
                consumer.setNamesrvAddr(myNameSrvAddr);
                consumer.subscribe(topic, tag);
                consumer.setConsumeMessageBatchMaxSize(1);
                consumer.setConsumeThreadMin(threadNums);
                consumer.setConsumeThreadMax(threadNums);
                //最大重试次数
                consumer.setMaxReconsumeTimes(3);
                consumer.registerMessageListener(topicListenerBean);
                consumer.start();
                consumers.add(consumer);
            }catch (Exception e){
                log.error("创建topic:[ {} ]消费者异常",topic, e);
            }
        }
        // 保存引用
        this.consumers = consumers;
        return consumers;
    }

    @PreDestroy
    public void destroy() {
        log.info("进入销毁消费者组流程");
        if (consumers != null) {
            for (DefaultMQPushConsumer consumer : consumers) {
                try {
                    consumer.shutdown();
                } catch (Exception e) {
                    // 日志记录
                    log.info("关闭消费者组异常", e);
                }
            }
        }
    }

消息监听类(举例)

@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListenerConcurrently {
    private final MyService myService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            boolean success = onMessage(msg);
            if (!success) {
                // 只要有一条消息处理失败,整个批次返回RECONSUME_LATER
                // 这样这一批消息会被整体重试
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        // 所有消息都成功处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private boolean onMessage(MessageExt message) {
        try {
            String body = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("收到通知,body:{}", body);
            myService.test(body);
        } catch (Exception e) {
            log.error("mq发生异常, topic: {}, msgKey: {}", message.getTopic(), message.getKeys(), e);
            // 抛出异常,RocketMQ会自动重试
            return false;
        }
        return true;
    }
}

问题点

  1. 生产者和消费者组一定不能共用,要不然会出现各种问题,如果你没出现问题,那可能是你测试的不够多
  2. 关于服务消费同一消息时,要设置不同组,MQ默认是集群消费,不是广播消费。
  3. 消费时,为了避免批量消费中有一个错误导致所有回滚,可以设置每批次拉取一条消息
  4. 消息延迟问题,rocektmq 4系列,只有延迟级别,不能设置具体的时间(5系列支持)

RocketMQ-MQTT部分

由于系统需要知道设备上下线,处理业务,所以基于1.0.1版本修改了代码

修改部分

1.心跳检测日志输出到指定文件(这个自己看着改就好,应该很简单)
2.给某个topic发送设备上下线消息
3.记录设备连接日志,方便运维

给某个topic发送设备上下线消息

增加参数

service.conf配置文件中添加设备上下线通知topic
connectionNotifyTopic= ,默认iot (具体看下面相关代码)

消息类:

public class ClientStatusEventVO implements Serializable {
    public static final String CONNECT_TAG = "connect";
    public static final String DISCONNECT_TAG = "disconnect";
    public static final String TCP_CLEAN_TAG = "tcpclean";
    /**
     * 每个TCP连接的唯一标识
     */
    private String channelId;
    /**
     * 具体设备
     */
    private String clientId;
    /**
     * 事件类型:connect/disconnect/tcpclean
     */
    private String eventType;
    /**
     * 时间
     */
    private Long time;
    /**
     * 客户端使用的公网出口IP地址
     */
    private String clientIp;

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public void setEventType(String eventType) {
        this.eventType = eventType;
    }

    public void setTime(Long time) {
        this.time = time;
    }

    public void setClientIp(String clientIp) {
        this.clientIp = clientIp;
    }

    public String getChannelId() {
        return channelId;
    }

    public String getClientId() {
        return clientId;
    }

    public String getEventType() {
        return eventType;
    }

    public Long getTime() {
        return time;
    }

    public String getClientIp() {
        return clientIp;
    }

    @Override
    public String toString() {
        return "ClientStatusEventVO{" +
                "channelId='" + channelId + '\'' +
                ", clientId='" + clientId + '\'' +
                ", eventType='" + eventType + '\'' +
                ", time=" + time +
                ", clientIp='" + clientIp + '\'' +
                '}';
    }
}

发送消息类

在mqtt-ds中创建
在这里插入图片描述

@Component
public class NotifyConnectionManager {
    private static final Logger logger = LoggerFactory.getLogger(NotifyConnectionManager.class);
    private static final String PRODUCER_GROUP_NAME = "iot_producer_group";
    @Resource
    private ServiceConf serviceConf;
    private DefaultMQProducer defaultMQProducer;

    @PostConstruct
    public void init() throws MQClientException {
        String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8);
        String groupName = PRODUCER_GROUP_NAME + "_" + suffix;
        defaultMQProducer = MqFactory.buildDefaultMQProducer(groupName, serviceConf.getProperties());
        defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
        defaultMQProducer.start();
        logger.info("NotifyConnectionManager initialize successfully. groupName: {}", groupName);
    }

    public void putMessageToMq(String tag, String msgStr){
        String notifyTopic = serviceConf.getProperties().getProperty("connectionNotifyTopic", "iot");
        String msgKey = UUID.randomUUID().toString().replace("-", "");
        Message rmqMsg = new Message(
                notifyTopic,
                tag,
                msgKey,
                // 消息体
                msgStr.getBytes(StandardCharsets.UTF_8)
        );
        try {
            defaultMQProducer.send(rmqMsg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    logger.info("Send notify msg successfully, topic:{}, tag:{}, msgKey:{}, msg:{}, result:{}", notifyTopic, tag, msgKey, msgStr, sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, throwable);
                }
            });
        }catch (Exception e){
            logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, e);
        }
    }
}

在这里插入图片描述
上面三个类中,分别表示连接异常,已连接,连接断开。

ConnectHandler:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    // create link successfully , send message to mq
    String clientId = ChannelInfo.getClientId(ctx.channel());
    String channelId = ChannelInfo.getId(ctx.channel());
    String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
    long timeStamp = System.currentTimeMillis();
    ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();
    clientStatusEventVO.setChannelId(channelId);
    clientStatusEventVO.setClientId(clientId);
    clientStatusEventVO.setClientIp(remoteIp);
    clientStatusEventVO.setEventType(ClientStatusEventVO.TCP_CLEAN_TAG);
    clientStatusEventVO.setTime(timeStamp);
    String jsonString = JSON.toJSONString(clientStatusEventVO);
    // send connection details to topic, default is iot
    notifyConnectionManager.putMessageToMq(ClientStatusEventVO.TCP_CLEAN_TAG, jsonString);

    logger.info(" [ConnectHandler] [tcpclean]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);

    if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {
        logger.error("exceptionCaught {}", ctx.channel(), cause);
    }
    channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
}

MqttConnectHandler:

public void doHandler(ChannelHandlerContext ctx, MqttConnectMessage connectMessage, HookResult upstreamHookResult) {
   final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
   final MqttConnectPayload payload = connectMessage.payload();

   Channel channel = ctx.channel();
   ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
   ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());
   ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());

   String remark = upstreamHookResult.getRemark();
   if (!upstreamHookResult.isSuccess()) {
       byte connAckCode = (byte) upstreamHookResult.getSubCode();
       MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);
       channel.writeAndFlush(MqttMessageFactory.buildConnAckMessage(mqttConnectReturnCode));
       channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
       return;
   }

   CompletableFuture<Void> future = new CompletableFuture<>();
   ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);

   // use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'
   scheduler.schedule(() -> {
       if (!future.isDone()) {
           future.complete(null);
       }
   }, 1, TimeUnit.SECONDS);

   try {
       MqttConnAckMessage mqttConnAckMessage = MqttMessageFactory.buildConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
       future.thenAccept(aVoid -> {
           if (!channel.isActive()) {
               return;
           }
           ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);
           channel.writeAndFlush(mqttConnAckMessage);
       });
       sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);

       // save will message
       WillMessage willMessage = null;
       if (variableHeader.isWillFlag()) {
           if (payload.willTopic() == null || payload.willMessageInBytes() == null) {
               logger.error("Will message and will topic can not be empty");
               channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "Will message and will topic can not be empty");
               return;
           }

           willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());
           sessionLoop.addWillMessage(channel, willMessage);
       }

       // create link successfully , send message to mq
       String clientId = ChannelInfo.getClientId(ctx.channel());
       String channelId = ChannelInfo.getId(ctx.channel());
       String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
       long timeStamp = System.currentTimeMillis();
       ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();
       clientStatusEventVO.setChannelId(channelId);
       clientStatusEventVO.setClientId(clientId);
       clientStatusEventVO.setClientIp(remoteIp);
       clientStatusEventVO.setEventType(ClientStatusEventVO.CONNECT_TAG);
       clientStatusEventVO.setTime(timeStamp);
       String jsonString = JSON.toJSONString(clientStatusEventVO);
       // send connection details to topic, default is iot
       notifyConnectionManager.putMessageToMq(ClientStatusEventVO.CONNECT_TAG, jsonString);

       logger.info(" [MqttConnectHandler] [Connect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);
   } catch (Exception e) {
       logger.error("Connect:{}", payload.clientIdentifier(), e);
       channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");
   }
}

MqttDisconnectHandler:

public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {
   // disconnect manually, send message to mq
   String clientId = ChannelInfo.getClientId(ctx.channel());
   String channelId = ChannelInfo.getId(ctx.channel());
   String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
   long timeStamp = System.currentTimeMillis();
   ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();
   clientStatusEventVO.setChannelId(channelId);
   clientStatusEventVO.setClientId(clientId);
   clientStatusEventVO.setClientIp(remoteIp);
   clientStatusEventVO.setEventType(ClientStatusEventVO.DISCONNECT_TAG);
   clientStatusEventVO.setTime(timeStamp);
   String jsonString = JSON.toJSONString(clientStatusEventVO);
   // send connection details to topic, default is iot
   notifyConnectionManager.putMessageToMq(ClientStatusEventVO.DISCONNECT_TAG, jsonString);
   logger.info(" [MqttDisconnectHandler] [Disconnect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);

   channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.CLIENT, "disconnect");
}

这样,就实现了 阿里云的MQTT的disconnect,connect,tcpclean。

效果展示:
在这里插入图片描述