接收rabbitmq消息

发布于:2025-06-12 ⋅ 阅读:(18) ⋅ 点赞:(0)

以下是一个使用纯Java(非Spring Boot)接收RabbitMQ消息的完整实现,包含Maven依赖和持续监听消息的循环:

1. 首先添加Maven依赖 (pom.xml)

<dependencies>
    <!-- RabbitMQ Java Client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    
    <!-- 日志框架 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.7</version>
    </dependency>
</dependencies>

2. RabbitMQ消息接收器实现

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQReceiver {

    private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);
    
    private final ConnectionFactory factory;
    private Connection connection;
    private Channel channel;
    private volatile boolean running = true;

    public RabbitMQReceiver(String host, int port, String username, String password) {
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
    }

    public void startListening(String queueName) {
        try {
            // 建立连接
            connection = factory.newConnection();
            channel = connection.createChannel();
            
            // 声明队列(如果不存在则创建)
            channel.queueDeclare(queueName, true, false, false, null);
            logger.info("连接到队列: {}", queueName);

            // 设置每次只接收一条消息(公平分发)
            channel.basicQos(1);

            // 创建消费者
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);
                
                try {
                    // 在这里处理你的业务逻辑
                    processMessage(message);
                    
                    // 手动确认消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    logger.error("消息处理失败", e);
                    // 处理失败时拒绝消息(不重新入队)
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                }
            };

            // 取消消费者回调
            CancelCallback cancelCallback = consumerTag -> {
                logger.warn("消费者被取消: {}", consumerTag);
            };

            // 开始消费消息
            channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
            
            logger.info("开始监听消息... (按CTRL+C停止)");

            // 保持程序运行
            while (running) {
                Thread.sleep(1000); // 防止CPU空转
            }
            
        } catch (IOException | TimeoutException | InterruptedException e) {
            logger.error("RabbitMQ连接失败", e);
        } finally {
            closeResources();
        }
    }

    private void processMessage(String message) {
        // 这里是你的业务逻辑处理
        logger.info("处理消息: {}", message);
        // 示例:打印消息长度
        System.out.println("消息长度: " + message.length());
    }

    public void stop() {
        running = false;
        logger.info("停止监听...");
    }

    private void closeResources() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
            logger.info("RabbitMQ连接已关闭");
        } catch (IOException | TimeoutException e) {
            logger.error("关闭资源时出错", e);
        }
    }

    public static void main(String[] args) {
        // 配置RabbitMQ连接参数
        String host = "localhost";
        int port = 5672;
        String username = "guest";
        String password = "guest";
        String queueName = "my_queue";
        
        RabbitMQReceiver receiver = new RabbitMQReceiver(host, port, username, password);
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            receiver.stop();
            receiver.closeResources();
        }));
        
        // 开始监听
        receiver.startListening(queueName);
    }
}

关键功能说明:

  1. 持续监听机制

    while (running) {
        Thread.sleep(1000); // 防止CPU空转
    }
    

    使用running标志控制循环,优雅退出

  2. 消息处理流程

    • 声明队列确保存在
    • 设置QoS为1(公平分发)
    • 使用DeliverCallback处理消息
    • 手动消息确认(ACK/NACK)
    • 异常处理与错误恢复
  3. 资源管理

    • 使用finally块确保关闭连接
    • 添加Shutdown Hook处理程序终止
    • 线程安全的状态管理(volatile running
  4. 日志记录

    • 使用SLF4J进行日志记录
    • 关键操作都有日志输出

使用说明:

  1. 启动消费者

    mvn compile exec:java -Dexec.mainClass="RabbitMQReceiver"
    
  2. 发送测试消息(使用RabbitMQ管理界面或命令行工具):

    rabbitmqadmin publish exchange=amq.default routing_key=my_queue payload="hello world"
    
  3. 停止程序

    • CTRL+C优雅停止
    • 程序会自动关闭连接

自定义配置:

  1. 修改连接参数

    String host = "your.rabbitmq.host";
    int port = 5672;
    String username = "your_user";
    String password = "your_password";
    String queueName = "your_queue_name";
    
  2. 自定义消息处理
    修改processMessage方法实现你的业务逻辑:

    private void processMessage(String message) {
        // 示例:解析JSON消息
        // JSONObject json = new JSONObject(message);
        // System.out.println("收到订单: " + json.getString("orderId"));
        
        // 你的实际业务逻辑
    }
    
  3. 配置调整

    • 修改channel.basicQos()调整预取数量
    • 修改basicNackrequeue参数控制是否重新入队
    • 添加交换机绑定逻辑(如果需要)

这个实现遵循了RabbitMQ最佳实践,包括:

  • 手动消息确认
  • 公平分发(QoS设置)
  • 连接和通道的异常处理
  • 资源清理
  • 优雅关闭机制

如果需要处理更复杂的场景(如多个队列、消息持久化、死信队列等),可以在channel.queueDeclarechannel.basicConsume方法中添加相应参数。