docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -v C:/data/rabbitmq:/var/lib/rabbitmq rabbitmq:3-management
15672 JMS 链接端口
5672 AMQP 多平台链接端口
消费者
@Component
@RabbitListener(queues = "message_queue") //队列名
public class MessageMQListener {
@RabbitHandler
public void messageHandler(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message);
System.out.println("body="+body);
//手动ack确认 msgTag 信息 false 表示仅确认当前这条消息
channel.basicAck(msgTag,false);
// 手动拒绝确认 msgTag 信息 false 表示仅确认当前这条消息 true是否重新投递
// channel.basicNack(msgTag,false,true);
}
}
生产者
@Configurable
public class RabbitMqConfig {
public static final String EXCHANGE_NAME = "message_exchange";
public static final String QUEUE = "message_queue";
/**
* topic 交换机
* @return
*/
@Bean
public Exchange messageExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 队列
* @return
*/
@Bean
public Queue messageQueue(){
return QueueBuilder.durable(QUEUE).build();
}
/**
* 交换机和队列绑定关系
*/
@Bean
public Binding messageBinding(Queue queue, Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("message.#").noargs();
}
}
------------------------------------------------------------------------------------------
@Autowired
private RabbitTemplate rabbitTemplate;
void testSend(){
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"message.XX","新消息");
}
@Test
void testConfirmCallback(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback--------");
System.out.println("ConfirmCallback--------"+correlationData);
System.out.println("ConfirmCallback--------"+ack);
System.out.println("ConfirmCallback--------"+cause);
if(ack){
System.out.println("发送成功");
}else{
System.out.println("发送失败");
}
}
});
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"message.xx","新消息");
}
@Test
void testReturnCallback(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
int code = returnedMessage.getReplyCode();
System.out.println("code = " +code);
System.out.println("returnedMessage = "+returnedMessage );
}
});
// rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"x.message.XXX,"新消息");//失败
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"message.XXX","新消息");//成功
}
消息有哪几种情况成为死信
消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false
消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
队列的消息长度达到极限
结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列