【RabbitMQ】实现RPC通信的完整指南

发布于:2025-05-16 ⋅ 阅读:(13) ⋅ 点赞:(0)

RPC 通信

RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术

  • 类似 Http 远程调用

RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程
image.png

  • 注意
    • 没有生产者和消费者,取而代之的是客户端和服务器
    • reply_to:回调队列的名称
    • correlation_id:不能重复,用来确保请求和响应是一对
  • 大概流程:
    1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,服务端处理之后,会把响应结果发送到这个队列
    2. 服务端收到请求后,处理请求并发送响应到 replyTo 指定的回调队列
    3. 客户端再回调队列上等待响应消息,一旦收到响应,客户端会检查消息的 correlationID 属性,以确保它是所期望的响应
      • 等待响应消息,是通过一个阻塞队列来实现
      • 如果没有响应进来,就会一直阻塞。通过一个阻塞队列,来让其等待响应完成
      • 如果阻塞队列里面没有消息,就会一直等待,等到有消息为止

大致流程

  • 客户端:
    1. 发送请求(携带 replyToCorrelationID
    2. 接收响应(校验 correlationID
  • 服务端:
    1. 接收请求,进行响应
    2. 发送响应(按照客户端指定的 replyTo,设置 correlationID

创建相关队列

public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";  
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";  
  • 涉及到两个队列
    • 请求队列
    • 响应队列

客户端代码

客户端代码主要流程如下:

  1. 声明两个队列,包含回调队列 RPC_REQUEST_QUEUE,声明本次请求的唯一标志 correlationID
  2. RPC_REQUEST_QUEUEcorrelationID 配置到要发送的消息队列中
  3. 使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 使用阻塞队列有消息后,主线程被唤醒,打印返回内容

声明队列

channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

发送请求

//发送请求(使用内置交换机)  
String msg = "hello rpc...";  
//设置请求的唯一标识  
String correlationID = UUID.randomUUID().toString();  
//设置请求的相关属性  
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  
        .correlationId(correlationID)  
        .replyTo(Constants.RPC_RESPONSE_QUEUE)  
        .build();  
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

接收响应

使用阻塞队列,来存储回调结果

// 接收响应  
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
DefaultConsumer consumer = new DefaultConsumer(channel){  
    //逻辑是比对 correlationID 是否一致  
    @Override  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        String respMsg = new String(body);  
        System.out.println("接收到回调信息:" + respMsg);  
        if (correlationID.equals(properties.getCorrelationId())) {  
            // 如果 correlationID 校验一致,说明就是我们要的响应  
            response.offer(respMsg);  
        }  
    }  
};  
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  


// 获取回调的结果
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
System.out.println("[RPC Client 响应结果]: " + result);

完整代码

package rabbitmq.rpc;  
  
import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  
  
import java.io.IOException;  
import java.util.UUID;  
import java.util.concurrent.*;  
  
/**  
 * RPC Client * 1. 发送请求  
 * 2. 接收响应  
 */  
public class RpcClient {  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        //1. 建立连接  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
        connectionFactory.setHost(Constants.HOST);  
        connectionFactory.setPort(Constants.PORT);  
        connectionFactory.setUsername(Constants.USER_NAME);  
        connectionFactory.setPassword(Constants.PASSWORD);  
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  
        Connection connection = connectionFactory.newConnection();  
  
        //2. 开启信道  
        Channel channel = connection.createChannel();  
  
        //3. 声明队列  
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);  
  
        //4. 发送请求(使用内置交换机)  
        String msg = "hello rpc...";  
        //设置请求的唯一标识  
        String correlationID = UUID.randomUUID().toString();  
        //设置请求的相关属性  
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  
                .correlationId(correlationID)  
                .replyTo(Constants.RPC_RESPONSE_QUEUE)  
                .build();  
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());  
  
        //5. 接收响应  
        // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
        DefaultConsumer consumer = new DefaultConsumer(channel){  
            //逻辑是比对 correlationID 是否一致  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                String respMsg = new String(body);  
                System.out.println("接收到回调信息:" + respMsg);  
                if (correlationID.equals(properties.getCorrelationId())) {  
                    // 如果 correlationID 校验一致,说明就是我们要的响应  
                    response.offer(respMsg);  
                }  
            }  
        };  
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  
        String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
        System.out.println("[RPC Client 响应结果]: " + result);  
    }  
}

服务端代码

服务端代码主要流程如下:

  1. 接收消息
  2. 根据消息内容进行相应处理,把应答结果返回到回调队列中

设置同时只能获取一个消息

//设置一次只能接收一条消息  
channel.basicQos(1);

如果不设置 basicQosRabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0

  • prefetchCount0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者
  • 这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动

RPC 模式下,同上期望的是一对一的消息处理,即一个请求对应一个相应。消费者在处理完一个消息并确认之后,才会接收到下一条消息

接收消息

接收消息,并做出相应处理

DefaultConsumer consumer = new DefaultConsumer(channel) {  
    @Override  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        String request = new String(body, "UTF-8");  
        System.out.println("接收到请求:" + request);  
        String responses = "针对 request:" + request + ",响应成功";  
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  
                .correlationId(properties.getCorrelationId())  
                .build();  
        channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  
        channel.basicAck(envelope.getDeliveryTag(), false);  
  
    }  
};  
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);

RabbitMQ 消息确定机制

  • RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息对类确认消息
    • 自动确认(autoAck=true):消息对类在将消息发送给消费者之后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
    • 手动确认(autoAck=false):消息队列在将消息发送给消费者之后,需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景

完整代码

package rabbitmq.rpc;  
  
import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
/**  
 * RPC server * 1. 接收请求  
 * 2. 发送响应  
 */  
public class RpcServer {  
    public static void main(String[] args) throws IOException, TimeoutException {  
  
        //1. 建立连接  
        ConnectionFactory connectionFactory = new ConnectionFactory();  
        connectionFactory.setHost(Constants.HOST);  
        connectionFactory.setPort(Constants.PORT);  
        connectionFactory.setUsername(Constants.USER_NAME);  
        connectionFactory.setPassword(Constants.PASSWORD);  
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  
        Connection connection = connectionFactory.newConnection();  
  
        //2. 开启信道  
        Channel channel = connection.createChannel();  
  
        //3. 接收请求  
        //设置一次只能接收一条消息  
        channel.basicQos(1);  
        DefaultConsumer consumer = new DefaultConsumer(channel) {  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                String request = new String(body, "UTF-8");  
                System.out.println("接收到请求:" + request);  
                String responses = "针对 request:" + request + ",响应成功";  
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  
                        .correlationId(properties.getCorrelationId())  
                        .build();  
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  
                channel.basicAck(envelope.getDeliveryTag(), false);  
  
            }  
        };  
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);  
    }  
}

运行程序

启动客户端

image.png

启动服务端

运行服务端

接收到请求:hello rpc...

网站公告

今日签到

点亮在社区的每一天
去签到