我们先创建一个Maven项目引入对应的依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
1. 创建生产者
1.1 建立连接
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置ip
connectionFactory.setHost("162.14.99.141");
//设置端口号,默认是5672
connectionFactory.setPort(5672);
//设置账户密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//设置要使用的虚拟机
connectionFactory.setVirtualHost("ty");
//获取连接
Connection connection = connectionFactory.newConnection();
1.2 创建Channel
//2.创建Channel
Channel channel = connection.createChannel();
1.3 声明交换机
这里我们直接使用内置的交换击就无需声明
1.4 声明队列
//4.声明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments) throws IOException;
* 参数说明
* queue:队列名称
* durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失
* exclusive:是否独占,只能有一个消费者监听队列,
* autoDelete:是否自动删除,当没有Consumer时自动删除
* arguments:其它的参数,如最大值,有效期等等
* */
channel.queueDeclare("queue1", true, false, false, null);
如果queue1不存在则会创建该队列
1.5 发送消息
//5.发送消息
/**
* void basicPublish(String exchange, String routingKey, boolean mandatory,
* boolean immediate, BasicProperties props, byte[] body)
* 参数说明
* exchange:交换机名称
* routingKey:路由名称,routingKey = 队列名称
* props:配置信息
* body:发送消息的数据
*/
String msg = "hello world";
//这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列
channel.basicPublish("", "queue1", null, msg.getBytes());
这里的""代表使用的这个默认的交换机
1.6 资源释放
//6.资源释放
channel.close();
connection.close();
整体代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置ip
connectionFactory.setHost("162.14.99.141");
//设置端口号,默认是5672
connectionFactory.setPort(5672);
//设置账户密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//设置要使用的虚拟机
connectionFactory.setVirtualHost("ty");
//获取连接
Connection connection = connectionFactory.newConnection();
//2.创建Channel
Channel channel = connection.createChannel();
//3.声明交换机,这里使用内置的交换机
//4.声明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments) throws IOException;
* 参数说明
* queue:队列名称
* durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失
* exclusive:是否独占,只能有一个消费者监听队列,
* autoDelete:是否自动删除,当没有Consumer时自动删除
* arguments:其它的参数,如最大值,有效期等等
* */
channel.queueDeclare("queue1", true, false, false, null);
//5.发送消息
/**
* void basicPublish(String exchange, String routingKey, boolean mandatory,
* boolean immediate, BasicProperties props, byte[] body)
* 参数说明
* exchange:交换机名称
* routingKey:路由名称,routingKey = 队列名称
* props:配置信息
* body:发送消息的数据
*/
String msg = "hello world";
//这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列
channel.basicPublish("", "queue1", null, msg.getBytes());
//6.资源释放
channel.close();
connection.close();
}
}
我们运行代码后来到管理界面:
可以看到队列被创建成功了,并且有一条消息,由于我们在程序末尾释放了连接所有这里是没有Connections和Channels的。我们如果把释放资源的代码注释掉,再次运行:
2. 创建消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("162.14.99.141");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("ty");
Connection connection = connectionFactory.newConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare("queue1", true, false, false, null);
//4. 消费信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
* 定义如何处理接收到的消息,接收到消息时会自动 调用该方法
* 参数说明
* consumerTag: 消费者标签,
* properties: 配置信息
* body: 消息的具体内容
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到:" + new String(body));
}
};
//Consumer 用于定义消费者的行为,从RabbitMQ接收消息时,需要提供一个实现了Consumer接口的对象
//DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口
/**
* String basicConsume(String queue, boolean autoAck, Consumer callback);
* 参数说明
* queue: 消费的队列的名称
* autoAck: 是否要自动确认
* callback: 接收到消息后的执行逻辑
*/
channel.basicConsume("queue1", true, consumer);
//5. 释放资源
channel.close();
connection.close();
}
}
注意:调用basicConsume()方法后,消费者会不断从队列中获取数据,这个操作是由另一个线程执行的,和主线程是异步的,直到连接被释放,这里从调用basicConsume到连接关闭中间消费的数据数量和打印出来的日志不一定是吻合的,有的消息的日志可能还没有打印出来,主线程就关闭了
这里我们在队列中添加一万条消息再调用消费者来演示:
消费者运行结果:
可以看到只打印了一条数据,但是我们通过可视化界面观察队列:
发现其中还有9050条数据。
我们可以把关闭连接的代码注释再次运行:
可以看到成功打印出了所有的日志