零基础学习RabbitMQ(4)--RabbitMQ快速入门

发布于:2025-06-30 ⋅ 阅读:(17) ⋅ 点赞:(0)

我们先创建一个Maven项目引入对应的依赖:


<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

1. 创建生产者

1.1 建立连接 
//1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置ip
        connectionFactory.setHost("162.14.99.141");
        //设置端口号,默认是5672
        connectionFactory.setPort(5672);
        //设置账户密码
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //设置要使用的虚拟机
        connectionFactory.setVirtualHost("ty");
        //获取连接
        Connection connection = connectionFactory.newConnection();
1.2 创建Channel
//2.创建Channel
        Channel channel = connection.createChannel();
1.3 声明交换机

这里我们直接使用内置的交换击就无需声明

1.4 声明队列

        //4.声明队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments) throws IOException;
         * 参数说明
         * queue:队列名称
         * durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失
         * exclusive:是否独占,只能有一个消费者监听队列,
         * autoDelete:是否自动删除,当没有Consumer时自动删除
         * arguments:其它的参数,如最大值,有效期等等
         * */
        channel.queueDeclare("queue1", true, false, false, null);

如果queue1不存在则会创建该队列

1.5 发送消息
        //5.发送消息
        /**
         * void basicPublish(String exchange, String routingKey, boolean mandatory,
         *      boolean immediate, BasicProperties props, byte[] body)
         * 参数说明
         * exchange:交换机名称
         * routingKey:路由名称,routingKey = 队列名称
         * props:配置信息
         * body:发送消息的数据
         */
        String msg = "hello world";
        //这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列
        channel.basicPublish("", "queue1", null, msg.getBytes());

 

这里的""代表使用的这个默认的交换机 

1.6 资源释放 
        //6.资源释放
        channel.close();
        connection.close();

整体代码:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置ip
        connectionFactory.setHost("162.14.99.141");
        //设置端口号,默认是5672
        connectionFactory.setPort(5672);
        //设置账户密码
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //设置要使用的虚拟机
        connectionFactory.setVirtualHost("ty");
        //获取连接
        Connection connection = connectionFactory.newConnection();

        //2.创建Channel
        Channel channel = connection.createChannel();

        //3.声明交换机,这里使用内置的交换机

        //4.声明队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments) throws IOException;
         * 参数说明
         * queue:队列名称
         * durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失
         * exclusive:是否独占,只能有一个消费者监听队列,
         * autoDelete:是否自动删除,当没有Consumer时自动删除
         * arguments:其它的参数,如最大值,有效期等等
         * */
        channel.queueDeclare("queue1", true, false, false, null);

        //5.发送消息
        /**
         * void basicPublish(String exchange, String routingKey, boolean mandatory,
         *      boolean immediate, BasicProperties props, byte[] body)
         * 参数说明
         * exchange:交换机名称
         * routingKey:路由名称,routingKey = 队列名称
         * props:配置信息
         * body:发送消息的数据
         */
        String msg = "hello world";
        //这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列
        channel.basicPublish("", "queue1", null, msg.getBytes());

        //6.资源释放
        channel.close();
        connection.close();
    }
}

我们运行代码后来到管理界面:

可以看到队列被创建成功了,并且有一条消息,由于我们在程序末尾释放了连接所有这里是没有Connections和Channels的。我们如果把释放资源的代码注释掉,再次运行:

 

 

2. 创建消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("162.14.99.141");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("ty");
        Connection connection = connectionFactory.newConnection();
        //2. 创建Channel
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare("queue1", true, false, false, null);
        //4. 消费信息

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
             * 定义如何处理接收到的消息,接收到消息时会自动 调用该方法
             * 参数说明
             * consumerTag: 消费者标签,
             * properties: 配置信息
             * body: 消息的具体内容
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到:" + new String(body));
            }
        };
        //Consumer 用于定义消费者的行为,从RabbitMQ接收消息时,需要提供一个实现了Consumer接口的对象
        //DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口
        /**
         * String basicConsume(String queue, boolean autoAck, Consumer callback);
         * 参数说明
         * queue: 消费的队列的名称
         * autoAck: 是否要自动确认
         * callback: 接收到消息后的执行逻辑
         */
        channel.basicConsume("queue1", true, consumer);
        //5. 释放资源
        channel.close();
        connection.close();
    }
}

 

注意:调用basicConsume()方法后,消费者会不断从队列中获取数据,这个操作是由另一个线程执行的,和主线程是异步的,直到连接被释放,这里从调用basicConsume到连接关闭中间消费的数据数量和打印出来的日志不一定是吻合的,有的消息的日志可能还没有打印出来,主线程就关闭了

这里我们在队列中添加一万条消息再调用消费者来演示:

消费者运行结果:

可以看到只打印了一条数据,但是我们通过可视化界面观察队列:

 

发现其中还有9050条数据。

我们可以把关闭连接的代码注释再次运行:

可以看到成功打印出了所有的日志 


网站公告

今日签到

点亮在社区的每一天
去签到