消息可靠投递

发布于:2024-09-18 ⋅ 阅读:(58) ⋅ 点赞:(0)

消息可靠投递

  • 生产者消息投递到 Broker 时,万一网络断了,生产者以为自己投递成功了,消息队列以为没有生产者投递
  • RabbitMQ 提供了两种方式控制可靠投递,confirm 确认模式,return 退回模式
  • RabbitMQ 提供事务机制,但是性能较差,不做讲解,可自行研究

image-20210816144827884

ConfrmCallBack

消息从生产者投递交换机,交换机给生产者一个响应,生产者收到肯定应答,才能保证消息成功投递到交换机,但是如果没有设置持久化,这时候交换机断电重启,仍然丢失,需要做到以下3个方法才能保证可靠投递到交换机

  1. 队列设置持久化
  2. 消息设置持久化
  3. ConfirmCallBack回调

单个同步确认

生产者投递一个消息,交换机回应,生产者确认之后再发布下一个,吞吐量很低

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//单条同步确认
public class SingleSyncConfirm {
    /**
     * 生产者 → 交换机
     * 生产者投递 1条 消息给交换机
     * 交换机持久化,保存本地之后,给生产者一个应答
     * 生产者接收到成功应答之后,再投递下一条消息
     * 10000条,每条都回执一次 总耗时: 1958 毫秒
     * @param args
     */
    private static final String SINGLE_SYNC_CONFIRM_EXCHANGE = "singleSyncConfirmExchange";
    private static final String SINGLE_SYNC_CONFIRM_QUEUE = "singleSyncConfirmQueue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //交换机声明,队列声明,交换机绑定队列
        channel.exchangeDeclare(SINGLE_SYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(SINGLE_SYNC_CONFIRM_QUEUE,true,false,false,null);
        channel.queueBind(SINGLE_SYNC_CONFIRM_QUEUE,SINGLE_SYNC_CONFIRM_EXCHANGE,SINGLE_SYNC_CONFIRM_QUEUE);
        /**
         * 开启 confirm 确认
         */
        channel.confirmSelect();

        long begin = System.currentTimeMillis();//开始时间

        for (int i = 0; i < 1000; i++) {
            String str = i + "";
            channel.basicPublish(SINGLE_SYNC_CONFIRM_EXCHANGE,SINGLE_SYNC_CONFIRM_QUEUE,null,str.getBytes());
            //生产者等待确认
            boolean b = channel.waitForConfirms();
            if (b) {
                System.out.println("第 " + i + " 条发送成功");
            } else {
                System.out.println("第 " + i + " 条发送失败================");
            }
        }

        long end = System.currentTimeMillis();//结束时间
        System.out.println("总耗时: " + (end - begin) + " 毫秒");
    }
}

批量同步确认

发布批量消息之后,等待,当有某一个故障的时候,不知道是哪个消息出问题

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//多条同步确认
public class MultipleSyncConfirm {

    private static final String MULTIPLE_SYNC_CONFIRM_EXCHANGE = "multipleSyncConfirmExchange";
    private static final String MULTIPLE_SYNC_CONFIRM_QUEUE = "multipleSyncConfirmQueue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //交换机声明
        channel.exchangeDeclare(MULTIPLE_SYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(MULTIPLE_SYNC_CONFIRM_QUEUE,true,false,false,null);
        channel.queueBind(MULTIPLE_SYNC_CONFIRM_QUEUE,MULTIPLE_SYNC_CONFIRM_EXCHANGE,MULTIPLE_SYNC_CONFIRM_QUEUE);

        /**
         * 开启 confirm 确认
         */
        channel.confirmSelect();

        long begin = System.currentTimeMillis();//开始时间

        for (int i = 1; i <= 1000; i++) {
            String str = i + "";
            channel.basicPublish(MULTIPLE_SYNC_CONFIRM_EXCHANGE, MULTIPLE_SYNC_CONFIRM_QUEUE, null, str.getBytes());

            if (i % 100 == 0) {
                //生产者等待确认
                boolean b = channel.waitForConfirms();
                if (b) {
                    System.out.println("第 " + i + " 条发送成功");
                } else {
                    System.out.println("第 " + i + " 条发送失败================");
                }
            }
        }

        long end = System.currentTimeMillis();//结束时间
        System.out.println("总耗时: " + (end - begin) + " 毫秒");
    }
}

异步批量确认

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

//多条异步确认
public class MultipleAsyncConfirm {

    private static final String MULTIPLE_ASYNC_CONFIRM_EXCHANGE = "multipleAsyncConfirmExchange";
    private static final String MULTIPLE_ASYNC_CONFIRM_QUEUE = "multipleAsyncConfirmQueue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        if (true) {
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
        }
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //交换机声明
        channel.exchangeDeclare(MULTIPLE_ASYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(MULTIPLE_ASYNC_CONFIRM_QUEUE, true, false, false, null);
        channel.queueBind(MULTIPLE_ASYNC_CONFIRM_QUEUE, MULTIPLE_ASYNC_CONFIRM_EXCHANGE, MULTIPLE_ASYNC_CONFIRM_QUEUE);

        /**
         * 开启 confirm 确认
         */
        channel.confirmSelect();

        long begin = System.currentTimeMillis();//开始时间


        /**
         * 开启 confirm 回调函数
         */
        ConfirmCallback ackCallBack = new ConfirmCallback() {
            /**
             * 成功回调
             * @param deliveryTag
             * @param multiple      true表示已确认多条,false表示已确认单条
             * @throws IOException
             */
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("已确认多条: " + deliveryTag);
                } else {
                    System.out.println("已确认单条: " + deliveryTag);
                }
            }
        };
        ConfirmCallback nackCallBack = new ConfirmCallback() {
            /**
             * 失败回调
             * @param deliveryTag
             * @param multiple      true表示未确认多条,false表示未确认单条
             * @throws IOException
             */
            @Override
            public void handle(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("未确认多条: " + deliveryTag);
                } else {
                    System.out.println("未确认多条: " + deliveryTag);
                }
            }
        };
        channel.addConfirmListener(ackCallBack, nackCallBack);

        for (int i = 1; i <= 1000; i++) {
            String str = i + "\t" + new Date().getTime();
            channel.basicPublish(MULTIPLE_ASYNC_CONFIRM_EXCHANGE, MULTIPLE_ASYNC_CONFIRM_QUEUE, null, str.getBytes());
        }

        long end = System.currentTimeMillis();//结束时间
        System.out.println("总耗时: " + (end - begin) + " 毫秒==========================");
    }
}

对比

单条确认:简单,吞吐量低

批量确认:简单,吞吐量比单条确认高,当一批消息中有一个出问题,不知道是哪一个

异步批量确认:代码复杂,吞吐量高

ReturnCallBack

交换机投递到队列


SpringBoot 整合

ConfirmCallback

引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件中开启
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
  • 较低版本中配置是

    publisher-confirms: true
    
  • publisher-confirm-type:取值有3种

    • none 禁用发布确认模式,默认
    • correlated 消息从生产者投递到交换机成功后触发回调
    • simple 类似 correlated ,支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法
配置类

在 Publish/Subscribe 基础之上整合

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ConfirmCallBackImpl implements RabbitTemplate.ConfirmCallback {

    /**
     * @param correlationData 相关数据,一般存储一个id,用来辨识唯一性
     * @param ack             确认,如果成功返回 true,如果失败返回 false
     * @param cause           原因,失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("correlationData = " + correlationData);
        System.out.println("ack = " + ack);
        System.out.println("cause = " + cause);

        if (ack) {
            System.out.println("投递到交换机成功");
        } else {
            System.out.println("投递失败,原因是:\t" + cause);
            System.out.println("将失败的消息 " + correlationData.getId() + " 保存到数据库");
        }
    }
}
//设置确认回调
rabbitTemplate.setConfirmCallback(confirmCallBackI);

image-20210816182706901

因为是和之前的一起整合,所以设置 ConfirmCallBack 在注入 RabbitTemplate 时一起设置
也可以使用匿内部类或者Lambda表达式
测试
  • http://localhost:8080/confirm/test/abc 正常投递
  • http://localhost:8080/confirm/test-fail/abc 失败投递
import com.example.constants.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("confirm")
public class ConfirmController {
    /**
     * 注入 RabbitTemplate 模板对象
     */
    @Resource
    private RabbitTemplate rabbitTemplate;


    @RequestMapping("test/{name}")
    public String send(@PathVariable("name") String name) {

        /**
         * 发送消息
         */
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(Constants.MY_FANOUT_EXCHANGE, "", str, correlationData);

        try {
            /**
             * 睡眠,程序还没运行结束
             * 确认回调已经执行,说明是异步的
             */
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }

    /**
     * 测试失败
     *
     * @param name
     * @return
     */
    @RequestMapping("test-fail/{name}")
    public String sendFail(@PathVariable("name") String name) {
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        /**
        * 创建一个不存在的交换机名称,用来测试投递失败
        */
        String errorExchangeName = Constants.MY_FANOUT_EXCHANGE + "abc";
        rabbitTemplate.convertAndSend(errorExchangeName, "", str, correlationData);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }
}

ReturnCallBack

引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件中开启
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-returns: true
配置类
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ReturnsCallBackImpl implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("returnedMessage = " + returnedMessage);
        System.out.println("returnedMessage.getMessage() = " + returnedMessage.getMessage());
        System.out.println("returnedMessage.getReplyCode() = " + returnedMessage.getReplyCode());
        System.out.println("returnedMessage.getReplyText() = " + returnedMessage.getReplyText());
        System.out.println("returnedMessage.getExchange() = " + returnedMessage.getExchange());
        System.out.println("returnedMessage.getRoutingKey() = " + returnedMessage.getRoutingKey());
    }
}
//设置确认回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallBackImpl);

image-20210817032048020

测试
  • 在 Direct 模式下,修改路由为一个不存在的路由
import com.example.constants.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("return")
public class ReturnController {
    /**
     * 注入 RabbitTemplate 模板对象
     */
    @Resource
    private RabbitTemplate rabbitTemplate;


    @RequestMapping("test/{name}")
    public String send(@PathVariable("name") String name) {

        /**
         * 发送消息
         */
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, "sms", str, correlationData);
        rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, "email", str, correlationData);

        try {
            /**
             * 睡眠,程序还没运行结束
             * 确认回调已经执行,说明是异步的
             */
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }

    /**
     * 测试失败
     *
     * @param name
     * @return
     */
    @RequestMapping("test-fail/{name}")
    public String sendFail(@PathVariable("name") String name) {
        String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        /**
         * 使用错误的路由键,用来测试交换机不能达到队列
         */
        String routingKey = "abc";
        rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, routingKey, str, correlationData);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消息发送成功!\t" + str);
        return "success";
    }
}


今日签到

点亮在社区的每一天
去签到