RabbitMQ消息可靠性保证机制6--可靠性分析

发布于:2024-12-06 ⋅ 阅读:(23) ⋅ 点赞:(0)

在使用消息中间件的过程中,难免会出现消息错误或者消息丢失等异常情况。这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。

在RabbitMQ中可以使用Firehose实现消息的跟踪,Firehose可以记录每一次发送或者消息的记录,方便RabbitMQ的使用都进行调试、排错等。

FireHose的原理是将生产者投递给RabbitMQ的消息,或者RabbitMQ投递给消费者的消息按照指定的格式,发送到默认交换器上,这个默认交换器的名称是:amq.rabbitmq.trace它是一个topic类型的交换器。发送到交换器的消息的路由键为publis.{exchangename}deliver.{queuename}。其中exchangename和queuename为交换器和队列名字。分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

在这里插入图片描述

上图是一个样例。生产者将消息发送至trace.ex交换器,交换器将消息路由至trace.qu这个队列,然后由消息者将消息取走。当消息到达trace.ex这个队列后,消息就会投递一份到名称为amq.rabbitmq.trace的交换器,按收到交换器的名称加上一个前缀变更publish.trace.ex作为路由的KEY,投递一份至publishtrace这个队列中;接收消息也样如此,当消费都取走消息时,会将消息发送一份到名称为amq.rabbitmq.trace的交换器,按消费者队列的名称加一个前缀变成deliver.trace.qu作为路由的KEY,投递至delivertrace这个队列中。

Firehose命令:

# 开启命令
rabbitmqctl trace_on [-p vhose]
# [-p vhose]是可选参数,用来指定虚拟主机的vhose

# 关闭命令
rabbitmqctl trace_off [-p vhose]

Firehose默认情况下处于关闭状态,并且Firehose的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose开启之后会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储 。

7.9.1 Firehose验证

首先开启追溯

[root@nullnull-os rabbitmq]# rabbitmqctl trace_on -p / 
Starting tracing for vhost "/" ...
Trace enabled for vhost /

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TraceProduce {
  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {
      // 定义交换机
      channel.exchangeDeclare("trace.ex", BuiltinExchangeType.DIRECT, false, true, null);
      // 定义队列
      channel.queueDeclare("trace.qu", false, false, true, null);
      // 队列绑定
      channel.queueBind("trace.qu", "trace.ex", "");

      // 定义保留数据队列
      channel.queueDeclare("publishtrace", false, false, false, null);
      // 绑定
      channel.queueBind("publishtrace", "amq.rabbitmq.trace", "publish.trace.ex");

      for (int i = 0; i < 100; i++) {
        String msg = "这是发送的消息:" + i;
        channel.basicPublish("trace.ex", "", null, msg.getBytes(StandardCharsets.UTF_8));
      }
    } catch (IOException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      e.printStackTrace();
    }
  }
}

检查队列的信息:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name         │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ publishtrace │ 10001000         │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ trace.qu     │ 10001000         │
└──────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# 

这样生产者发送的消息就已经被保存至publishtrace中了,后缀便可以通过检查队列中的消息,检查消息内容。

消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.nio.charset.StandardCharsets;

public class TraceConsumer {
  public static void main(String[] args) throws Exception {

    // 资源限制
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {

      // 定义交换机
      channel.exchangeDeclare("trace.ex", BuiltinExchangeType.DIRECT, false, true, null);
      // 定义队列
      channel.queueDeclare("trace.qu", false, false, true, null);
      // 队列绑定
      channel.queueBind("trace.qu", "trace.ex", "");

      // 定义队列
      channel.queueDeclare("delivertrace", false, false, true, null);
      // 队列绑定
      channel.queueBind("delivertrace", "amq.rabbitmq.trace", "deliver.trace.qu");

      // 接收消息
      for (int i = 0; i < 25; i++) {
        GetResponse getResponse = channel.basicGet("trace.qu", true);
        String msg = new String(getResponse.getBody(), StandardCharsets.UTF_8);
        System.out.println("收到的消息:" + msg);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

此处采用的是拉模式,从队列中获取了25条记录,也就是说队列中还剩余75条记录。

首先查看控制台输出:

收到的消息:这是发送的消息:0
收到的消息:这是发送的消息:1
收到的消息:这是发送的消息:2
收到的消息:这是发送的消息:3
......
收到的消息:这是发送的消息:22
收到的消息:这是发送的消息:23
收到的消息:这是发送的消息:24

检查队列的情况:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name         │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ delivertrace │ 250250         │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ publishtrace │ 10001000         │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ trace.qu     │ 750750         │
└──────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# 

可以发现,拉的消息,都已经被推送到了delivertrace中了。

最后关闭Tracehose

[root@nullnull-os rabbitmq]# rabbitmqctl trace_off -p / 
Stopping tracing for vhost "/" ...
Trace disabled for vhost /

使用Firehose验证完成。


网站公告

今日签到

点亮在社区的每一天
去签到