SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能

发布于:2024-11-04 ⋅ 阅读:(60) ⋅ 点赞:(0)

一、RabbitMq 下载安装

官网:https://www.rabbitmq.com/docs

二、开发步骤:

1.MAVEN 配置

   		<!--RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.7</version>
        </dependency>

2. RabbitMqConfig 配置

package com.lq.common.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitMqConfig {
    /**延迟交换机名称*/
    public static final String  DELAY_EXCHANGE="DelayExchange";
    /**延迟队列名称*/
    public static final String  DELAY_QUEUE="DelayQueue";

    public static final String ROUTING_KEY="delay";


    @Bean
    public CustomExchange customExchange(){
        Map<String, Object> map = new HashMap<>();
        //设置交换机支持延迟消息推送
        map.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,map);
    }

    @Bean
    public Queue delayQueue(){

        return new Queue(DELAY_QUEUE,true);

    }

    @Bean
    public Binding DelayBinding(){
        return BindingBuilder.bind(delayQueue()).to(customExchange()).with(ROUTING_KEY).noargs();
    }

}

3. RabbitMqUtil 工具类

package com.lq.common.util;

import com.lq.common.config.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Service
@Slf4j
public class RabbitMqUtil {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private DateTimeFormatter formatterDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    @PostConstruct
    public void init(){
        /**
         * 消息发送到交换机成功回调函数
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    log.info("消息投递到交换机成功");
                }else {
                    log.error("消息投递到交换机失败,原因->{}",cause);
                }
            }
        });
        /**交换机投递到队列失败回调函数**/
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("投递到队列失败,错误原因->{}",returned);
            }
        });

    }

    /**
     * @Description 发送延迟消息
     * @param content 延迟内容
     * @param delayTime 延迟时间 ,单位ms;  例如 5000 代表 5 秒
     * @Author hqd
     * @Date 2024-10-21
     */
    public Boolean sendDelayMessage(String content,Integer delayTime){
        log.info("消息发送时间->{}",LocalDateTime.now().format(formatterDateTime));

        rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.ROUTING_KEY, content, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                log.info("延迟时间->{}",delayTime);
                //这个底层就是setHeader("x-delay",i);是一样的 设置延时时间
                message.getMessageProperties().setDelay(delayTime);//单位毫秒
                return message;
            }
        });
        return true;

    }


}

4. DailyDelaySendConsumer 消费者监听

package com.lq.daily.mq.consumer;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.lq.common.config.RabbitMqConfig;
import com.lq.daily.dto.DailyDelaySendDTO;
import com.lq.daily.service.ILqDailyService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @Description 日报延迟发送消费者
 * @Author hqd
 * @Date 2024-10-21 16:04
 */
@Slf4j
@Component
public class DailyDelaySendConsumer {
    @Autowired
    private ILqDailyService lqDailyService;

    private DateTimeFormatter formatterDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE)
    public void dailyDelaySendListener(String content, Channel channel, Message message) throws IOException, InterruptedException{
        log.info("消息接收时间->{}", LocalDateTime.now().format(formatterDateTime));
        log.info("接收消息内容是->{}",content);
        log.info("{}",message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

        //处理日报发送业务逻辑
        if (StrUtil.isNotBlank(content)&& content.startsWith("{")){
            DailyDelaySendDTO dto = JSONObject.parseObject(content, DailyDelaySendDTO.class);
            if (ObjectUtil.isNotEmpty(dto)){
                lqDailyService.updateDailyDelaySend(dto.getDailyCode(), LocalDateTime.parse(dto.getDelaySendTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")));
            }
        }

    }
}

5. 测试延迟发送

   @PassToken
    @GetMapping("/testDelayMq")
    @ApiOperation("测试Mq 延迟消息发送")
    public void testDelayMq(){
        DailyDelaySendDTO dto = new DailyDelaySendDTO();
        dto.setDailyCode("DC2024101015135400001");
        dto.setDelaySendTime("2024-10-22 10:58");

        LocalDateTime sendTime = LocalDateTime.parse(dto.getDelaySendTime()+":00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        long between = ChronoUnit.MILLIS.between(LocalDateTime.now(), sendTime);

        rabbitMqUtil.sendDelayMessage(JSON.toJSONString(dto),new Long(between).intValue());
    }

在这里插入图片描述