RabbitMQ 工作模式

发布于:2025-09-09 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

1. Simple (简单模式)

2. Work Queue (工作队列模式)

3. Publish/Subscribe (发布订阅模式)

4. RoutingKey (路由模式)

5. Topics (通配符模式)

6. RPC (RPC 通信)

7. Publisher Confirms (发布确认模式)

7.1 单独确认

7.2 批量确认 

7.3 异步确认

8. Spring Boot 代码案例


rabbitmq 提供了 7 种工作模式, 进项消息传递, 下面我们来一一进行了解.

常量类

public class Constants {

    public static final String HOST = "192.168.100.10";
    public static final int PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "bite";

    // 工作模式
    public static final String WORK_QUEUE = "work.queue";

    // 发布订阅模式
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";

    // 路由模式
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";

    // 通配符模式
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";

    // rpc 模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";


    // 发布确认模式
    public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
    public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
    public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";
}

1. Simple (简单模式)

一个生产者, 一个消费者, 点到点模式.

P : 生产者

C : 消费者

hell : 队列

特点 : 一个生产者, 一个消费者, 消息只能被消费一次, 也称为点对点模式

适用场景 : 消息只能被单个消费者处理

生产者 : 

public class ProducerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.100.10"); // ip地址
        connectionFactory.setPort(5672);             // 端口号
        connectionFactory.setUsername("study");      // 用户名
        connectionFactory.setPassword("study");      // 用户密码
        connectionFactory.setVirtualHost("bite");    // 虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明交换机 使用内置的交换机
        // 4. 声明队列 (这一步可以省略, 会随机生成一个名称随机的的队列)
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean outDelete,
         *                                  Map<String, Object> argument)
         *
         * 参数说明 :
         * queue : 队列名称
         * durable : 可持久化
         * exclusive : 是否独占
         * outDelete : 是否自动删除
         * argument : 参数
         */
        channel.queueDeclare("hello", true, false, false, null);

        // 5. 发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明 :
         * exchange : 交换机名称
         * routingKey : 内置交换机, routingKey 和队列名保持一致
         * props : 属性配置
         * body : 消息
         */
        for (int i = 0; i < 10; i++) {
            String msg = "hello rabbitmq~" + i;
            channel.basicPublish("", "hello", null, msg.getBytes());
        }

        System.out.println("消息发送成功");

        // 6. 资源释放 (这里要先释放 信道, 在 释放连接, 或者直接是方法连接)
        channel.close();
        connection.close();
    }
}

消费者 : 

public class ConsumerDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1.创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.100.10");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("study");
        connectionFactory.setPassword("study");
        connectionFactory.setVirtualHost("bite");
        Connection connection = connectionFactory.newConnection();

        // 2. 创建 channel
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare("hello", true, false, false, null);

        // 4. 消费消息 (若是生产者已经声明, 那么可以省略, 要是消费者启动的时候没有队列则会报错)
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue : 队列名称
         * autoAck : 是否自动确认
         * callback : 接收到消息后, 执行的逻辑
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中受到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 这里要是有消息没有接收到, 说明资源可能先一步被释放了
                System.out.println("接收到的消息: " + new String(body));
            }
        };
        channel.basicConsume("hello", true, consumer);
        // 等待程序执行完成
        Thread.sleep(2000);

        // 5. 释放资源
        channel.close();
        connection.close();
    }
}

2. Work Queue (工作队列模式)

一个生产者, 多个消费者, 消息消费不会重复.

假如 P 生产了 10 条消息, 那么由 C1 和 C2 共同消费这 10 条消息.

生产者 : 

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.100.10"); // ip地址
        connectionFactory.setPort(5672);             // 端口号
        connectionFactory.setUsername("study");      // 用户名
        connectionFactory.setPassword("study");      // 用户密码
        connectionFactory.setVirtualHost("bite");    // 虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        for (int i = 0; i < 10; i++) {
            String msg = "hello work queue..." + i;
            channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());
        }

        // 4. 释放资源
        channel.close();
        connection.close();
    }
}

消费者1 : 

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列 使用内置的交换机
        // 如果队列不存在, 则创建, 如果存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {

            // 从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息 : " + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        // 4.释放资源
//        channel.close();
//        connection.close();
    }
}

消费者2 : 

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息 : " + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        // 4. 释放资源
//        channel.close();
//        connection.close();
    }
}

这里要注意, 我们要先启动消费者, 然后再启动生产者, 因为如果先启动生产者, 就直接生成 10 条消息, 然后再启动消费者的时候, 先启动的消费者就会直接将我们的消息全都消费完.

3. Publish/Subscribe (发布订阅模式)

生产者将消息发给交换机, 然后交换机将消息发送给所有与他绑定的队列.

Exchange : 交换机 (X),  

作用 : 生产者将消息发送到 Exchange, 有交换机将消息按照一定的规则路由到一个或者多个队列中 (上图中生产者将消息投递到队列中, 实际上这个在 RabbitMQ 中不会发生.) Exchange 只负责转发消息, 不具备存储消息的能力, 因此如果没有任何队列与 Exchange 绑定, 或者没有符合路由规则的队列, 那么消息就会丢失.

Fanout : 广播, 将消息交给所有绑定到交换机的队列 (Publish / Subscibe 模式)

生产者 : 

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明交换机
        // durable true:交换机在服务器重启后依然存在
        // BuiltinExchangeType : 枚举类型
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        // 4. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

        // 5. 交换机和队列进行绑定
        channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
        channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

        // 6. 发布消息
        String msg = "hello fanout...";
        channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());

        // 7. 释放资源
        channel.close();
        connection.close();

    }
}

消费者1 : 

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);

        // 4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收消息 : " + new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);

        // 5. 释放资源
        channel.close();
        connection.close();
    }
}

消费者2 : 

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

        // 4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收消息 : " + new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);

        // 5. 释放资源
        channel.close();
        connection.close();
    }
}

这里每个消费者都会消费所有的消息, 并不平均消费.

4. RoutingKey (路由模式)

路由模式会将消息按照 BingingKey 将消息定向发送给队列

RoutingKey : 路由键. 生产者将消息发给交换机时, 指定的一个字符串, 用来告诉交换机应该如何处理这个消息.

BingingKey : 绑定. RabbitMQ 中通过 Binding (绑定) 将交换机与队列关联起来, 在绑定的时候一般会指定一个 Binding Key, 这样 RabbitMQ 就会知道如何正确地将消息路由到队列了.

生产者 : 

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {

        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.100.10"); // ip地址
        connectionFactory.setPort(5672);             // 端口号
        connectionFactory.setUsername("study");      // 用户名
        connectionFactory.setPassword("study");      // 用户密码
        connectionFactory.setVirtualHost("bite");    // 虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // 4. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

        // 5. 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");

        // 6. 发送消息
        String msg = "hello direct, my routingkey is a....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());

        String msg_b = "hello direct, my routingkey is b....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());

        String msg_c = "hello direct, my routingkey is c....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
        System.out.println("消息发送成功");

        // 7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者1 : 

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);

        // 4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
    }
}

消费者2 : 

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);

        // 4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);
    }
}

5. Topics (通配符模式)

通配符模式就是在上面路由模式的 Binding Key 基础上加上了通配符. 使 Binding Key 可以匹配更多.

这里的通配符有两种 : 

  • * : 匹配一个单词.
  • # : 匹配多个单词.

生产者 : 

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        // 4. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        // 5. 绑定交换机和队列
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");

        // 6. 发送消息
        String msg = "hello topic, my routingkey is ae.a.f....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //转发到Q1

        String msg_b = "hello topic, my routingkey is ef.a.b....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2

        String msg_c = "hello topic, my routingkey is c.ef.d....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
        System.out.println("消息发送成功");

        // 7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者1 : 

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);

        // 4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收消息 : " + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);

        // 5. 释放资源
//        channel.close();
//        connection.close();
    }
}

消费者2 : 

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        // 4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收消息 : " + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);

        // 5. 释放资源
//        channel.close();
//        connection.close();
    }
}

6. RPC (RPC 通信)

RPC 通信是在客户端和服务端中进行的, 并在通信的过程中使用 reply_to 和 correlation_id 进行队列选择 和 身份验证.

在 客户端发起 RPC 通信的时候, 在发送的请求中会包含 reply_to 和 correlation_id. 

  • reply : 指定了服务器在发送响应时, 使用的是哪一个队列.
  • correlation : 代表了这次会话的 id. 客户端在拿到消息后, 会将自己发送出去的 correlation_id 和这次消息中的 correlation_id 进行比较, 要是相同, 就表示这个响应与客户端发送的请求匹配, 就能继续进行下面的操作, 要是不相同, 就代表不是客户端所期待的响应.

客户端 : 

public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 队列声明
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

        // 4. 发送请求
        String msg = "hello rpc...";
        // 设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        // 设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        // 5. 接收响应
        // 使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果] : "+ result);
    }
}

服务端 : 

public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();

        // 3. 接收请求
        // 设置同时只能获取一个消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到请求 : "+ request);
                String response = "针对 request : "+ request +", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                // 手动确认信息,
                // envelope.getDeliveryTag() : 确认哪一条消息
                // multiple : 是否批量确认
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

为啥使用阻塞队列来存储 RPC 响应消息呢 ? 

  • 线程同步机制 : RPC  客户端发送请求后需要等待服务端响应结果, 消息消费是异步进行的, 阻塞的队列提供了线程安全的阻塞等待机制, 确保主线能正确等待并获取到响应结果.
  • 容量控制 : 限定队列大小为 1, 因为 RPC 通常是一次请求对应一次响应, 避免了不必要的内存占用和响应堆积.
  • 简化异步处理 : response.take() 方法可以使客户端在没有收到响应的响应时, 阻塞在 response.take() , 直到有相应的响应, 程序才执行结束.

7. Publisher Confirms (发布确认模式)

因为在生产者向 broker 发送消息的时候, 我们的应用程序 (也就是生产者)可能因为故障, 或者网络抖动等各种原因, 生产者没有成功向 broker 发送消息.

针对这个问题, 我们就可以使用 发布确认模式来解决这个问题. 

什么是发布确认模式呢 ? 

        生产者将信道设置成 confirm (确认) 模式, 一旦信道进入 confirm 模式, 所有在该信道上面发布的消息都会被指派一个唯一的 ID (从1开始), 一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认给生产者 (包含消息的唯一 ID), 这就使得生产者知道消息已经正确到达目的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写入磁盘之后发出. broker 回传给生产者的确认消息中.

        当消息被 Broker 接受并处理后, 会异步地给生产者发送 ack, 表名消息已经到达. 若失败, 就会发送 nack, 于是生产者可以根据 broker 返回的是 ack 还是 nack 来判断消息是否被成功处理.

        这里的消息确认不等同于消息消费

这里 RabbitMQ 给我们提供了三种确认方式 : 单独确认, 批量确认, 异步确认.

7.1 单独确认

生产每发送一条消息, 就会等待 Broker 返回 ack, nack

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception {

        try(Connection connection = createConnection()) {

            // 1. 开启信道
            Channel channel = connection.createChannel();

            // 2. 设置信道为confirm模式
            channel.confirmSelect();

            // 3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);

            // 4. 发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {

                String msg = "hello publisher confirms" + i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());

                // 等待 broker确认
                // 最长等待时间 5 s
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数 : %d, 耗时 : %d ms \n",MESSAGE_COUNT, end - start);
        }
    }

这里我们会发先要是一条一条的消息确认, 会非常耗时. 于是有两种策略可以改进这种耗时

  • Publishing Messages in Batches (批量确认) :每发送一批消息后, 调用channel.waitForConfirms(), 等待服务器的确认返回.
  • Handling Publisher Confirms Asynchronously (异步确认) : 提供一个回调方法, 服务端确认了一条或者多条消息后客户端会回这个方法进行处理.

7.2 批量确认 

在单股确认的基础上, 我们设置了一次可以确认的最大个数 batchSize, 当发送的消息个数到达了 batchSize 时, 进行批量确认. 最后在将没有被确认的消息进行统一处理.

    /**
     * 批量确认
     * @throws Exception
     */
    private static void publishingMessagesInBatches() throws Exception{

        try(Connection connection = createConnection()) {

            // 1. 开启信道
            Channel channel = connection.createChannel();

            // 2. 设置信道为confirm模式
            channel.confirmSelect();

            // 3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);

            // 4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            
            // 指定一次性处理的消息
            int batchSize = 100;
            int outstandingMessageCount = 0;
            
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms" +  i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                
                // 当消息数量到达指定一次处理的消息数的时候, 批量一起处理
                if (outstandingMessageCount == batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            
            // 将最后没有到指定的消息数的消息, 一起处理
            if (outstandingMessageCount > 0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数 : %d, 耗时 : %d ms \n",MESSAGE_COUNT, end - start);
        }
    }

7.3 异步确认

在异步确认中, 我们需要实现 现 ConfirmListener 接口中的 handleAck 和 handleNack. 就表示了服务器返回 ack 和 nack 时客户端需要进行的操作. 这里我们使用 个 ConcurrentNavigableMap 或者 SortedSet 来存储 deliveryTag 和 msg. 当服务器返回 ack 时, 就将 deliveryTag 从 map 中删除, 若返回 nack, 将 deliveryTag 从 map 中删除后进行重新发送.... 最后通过判断 map 是否为空来判断所有的消息是否成功被服务器消费了.

    /**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{

        try (Connection connection = createConnection()){

            // 1. 开启信道
            Channel channel = connection.createChannel();

            // 2. 设置信道为confirm模式
            channel.confirmSelect();

            // 3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);

            // 4. 监听confirm
            long start = System.currentTimeMillis();

//            // 有序集合, 存储未确认的消息 ID
//            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());

            // 存放 deliveryTag 和 msg 的集合
            // key 为 deliveryTag,value 为对应的 msg
            /**
             * 选择 ConcurrentNavigableMap 而不是 ConcurrentHashMap:
             * 基于跳表实现,清除所有 <= deliveryTag 的时间复杂度为 O(log n)
             * 线程安全
             * 保证了原子性
             */
            ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();

            channel.addConfirmListener(new ConfirmListener() {

                /**
                 * 消息成功确认
                 * @param deliveryTag : 表示当前消息
                 * @param multiple : true 表示批量删除;false 表示值删除当前 deliveryTag
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                    // confirmSeqNo.headSet(n) : 返当前集中小于 n 的集合
                    if (multiple){

                        // 批量确认 : 将集合中⼩于等于当前序号 deliveryTag 元素的集合清除,表⽰
                        // 这批序号的消息都已经被 ack 了
//                        confirmSeqNo.headSet(deliveryTag + 1).clear();

                        // true : 表示包含了 deliveryTag
                        map.headMap(deliveryTag, true).clear();
                    }else {
                        // 单条确认 : 将当前的 deliveryTag 从集合中移除
//                        confirmSeqNo.remove(deliveryTag);
                        map.remove(deliveryTag);
                    }
                }

                /**
                 * 消息确认失败
                  * @param deliveryTag
                 * @param multiple
                 */
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                    if (multiple) {
                        // 批量处理:获取所有需要重发的消息
                        ConcurrentNavigableMap<Long, String> failedMessages = map.headMap(deliveryTag, true);

                        // 先保存需要重发的消息内容
                        Map<Long, String> toResend = new HashMap<>(failedMessages);

                        // 清除失败记录
                        failedMessages.clear();

                        // 重新发送所有失败的消息
                        for (String msg : toResend.values()) {
                            long newDeliveryTag = channel.getNextPublishSeqNo();
                            channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));
                            map.put(newDeliveryTag, msg);
                        }
                    } else {
                        // 单条处理:获取失败消息内容
                        String msg = map.get(deliveryTag);

                        // 清除失败记录
                        map.remove(deliveryTag);

                        // 重新发送
                        if (msg != null) {
                            long newDeliveryTag = channel.getNextPublishSeqNo();
                            channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));
                            map.put(newDeliveryTag, msg);
                        }
                    }
                }
            });


            // 5. 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {

                String msg = "hello publisher confirms" + i;

                // 获取该条消息的唯一序列号, 并为下一条消息分配唯一序列号
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());

                // 将序号存入集合中
//                confirmSeqNo.add(seqNo);
                map.put(seqNo, msg);

            }

            // 保证消息都已经确认
//            while (!confirmSeqNo.isEmpty()){
//                Thread.sleep(10);
//            }
            while (!map.isEmpty()) {
                Thread.sleep(10);
            }

            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end - start);

        }
    }

上面三种确认模式下, 异步确认的时间消耗是最少的

8. Spring Boot 代码案例

这次案例包括了几种常用的模式 : 工作模式, 发布订阅模式, 路由模式, 通配符模式.

常量声明 : 

public class Constants {

    public static final String WORK_QUEUE = "work.queue";

    // 发布订阅模式
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    
    // 路由模式
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    
    // 通配符模式
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";

}

队列, 交换机 和 绑定关系声明 : 

@Configuration
public class RabbitMQConfig {

    // 工作模式
    @Bean("workQueue")
    public Queue workQueue() {
        return QueueBuilder.durable(Constants.WORK_QUEUE).build();
    }

    // *************************************************************

    // 发布确认模式
    // 声明队列
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1(){
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
    }

    @Bean("fanoutQueue2")
    public Queue fanoutQueue2(){
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
    }

    // 声明交换机
    // durable : true : 持久化
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
    }

    // 队列和交换机绑定
    // @Qualifier : 指定当前要注入的 Bean
    @Bean("fanoutQueueBinding1")
    public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){
        // 将名为 fanoutExchange 的交换机和名为 fanoutQueue1 的队里进行绑定
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean("fanoutQueueBinding2")
    public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    // *************************************************************

    // 路由模式
    @Bean("directQueue1")
    public Queue directQueue1(){
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    }

    @Bean("directQueue2")
    public Queue directQueue2(){
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    }

    @Bean("directExchange")
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
    }

    @Bean("directQueueBinding1")
    public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }

    @Bean("directQueueBinding2")
    public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("black");
    }

    @Bean("directQueueBinding3")
    public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }

    // *************************************************************

    // 通配符模式
    @Bean("topicQueue1")
    public Queue topicQueue1(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    }

    @Bean("topicQueue2")
    public Queue topicQueue2(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    }

    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
    }

    @Bean("topicQueueBinding1")
    public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
    }

    @Bean("topicQueueBinding2")
    public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
    }

    @Bean("topicQueueBinding3")
    public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
    }
}

生产者 : 

@RequestMapping("/producer")
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 工作模式
    @RequestMapping("/work")
    public String work() {

        // 使用内置交换机, RoutingKey 和队列名称一致
        rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp : work...");
        return "发送成功";

    }

    // 发布订阅模式
    @RequestMapping("/fanout")
    public String fanout(){
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout...");
        return "发送成功";
    }

    // 路由模式
    // @PathVariable : 取路径中的变量
    @RequestMapping("/direct/{routingKey}")
    public String direct(@PathVariable("routingKey") String routingKey){
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello spring amqp:direct, my routing key is " + routingKey);
        return "发送成功";
    }

    // 通配符模式
    @RequestMapping("/topic/{routingKey}")
    public String topic(@PathVariable("routingKey") String routingKey){
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is " + routingKey);
        return "发送成功";
    }
}

消费者 : 

工作模式 : 

@Component
public class WorkListener {

    /**
     * @RabbitListener 是 Spring 框架中用于监听 RabbitMQ 队列的注解
     * 通过这个注解, 可以定义一个方法, 以便从 RabbitMQ 队列中接收消息
     * 该注解支持多种参数类型, 这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息
     *
     * 常用的参数类型 :
     * 1. String : 返回消息的内容
     * 2. Message : SpringAMQP 的 Message 类, 返回原始的消息体以及消息的属性, 如果消息ID, 内容和队列信息等等
     * 3. Channel : RabbitMQ 的通道对,可以用于进行更高级的操作, 如手动确认消息
     */
    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void queueListener1(Message message, Channel channel){
        System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息 : " +message + ",channel:" + channel);
    }

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void queueListener2(String message){
        System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息 : " + message);
    }
}

发布订阅模式 : 

@Component
public class FanoutListener {

    @RabbitListener(queues = Constants.FANOUT_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息 : " +message);
    }

    @RabbitListener(queues = Constants.FANOUT_QUEUE2)
    public void queueListener2(String message){
        System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息 : " +message);
    }
}

路由模式 : 

@Component
public class DirectListener {
    @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列[" + Constants.DIRECT_QUEUE1+"] 接收到消息 : " + message);
    }

    @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    public void queueListener2(String message){
        System.out.println("队列[" + Constants.DIRECT_QUEUE2+"] 接收到消息 : " + message);
    }
}

通配符模式 : 

@Component
public class TopicListener {

    @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    public void queueListener1(String message){
        System.out.println("队列[" + Constants.TOPIC_QUEUE1+"] 接收到消息 : " + message);
    }

    @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    public void queueListener2(String message){
        System.out.println("队列" + Constants.TOPIC_QUEUE2+"] 接收到消息 : " + message);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到