Listener method could not be invoked with the incoming message

发布于:2025-05-18 ⋅ 阅读:(17) ⋅ 点赞:(0)

问题描述

  生产者方代码:

    private void rollbackOrder(long orderId, CorrelationData correlationData) {
        rabbitTemplate.convertAndSend("order-rollback-exchange",
                "rollback.order",
                new QuotaRollbackTO(orderId,null,null),
                correlationData);
    }

  消费者方代码:

    @RabbitListener(queues = "rollback.order.queue")
    public void handleRollback(QuotaRollbackTO quotaRollbackTO, Message message, Channel channel) throws Exception {
        try {
            //回滚订单的逻辑
            orderMainMapper.deleteById(quotaRollbackTO.getOrderId());
            orderProductMapper.deleteById(quotaRollbackTO.getOrderId());

            // 手动ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.err.println("消息处理失败:" + e.getMessage());
            // 拒绝消息并转入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

  Rabbit MQ发送消息报错:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void org.ragdollcat.order.mqlistener.OrderMQListener.handleRollback(org.ragdollcat.order.to.QuotaRollbackTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception]


原因分析:

  由于未配置 RabbitMQ 使用 JSON 消息转换器,Spring Boot 默认采用了 SimpleMessageConverter,导致无法将 JSON 数据反序列化为 QuotaRollbackTO 对象,从而造成监听方法调用失败。在错误信息中,可能会发现以下提示:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.ragdollcat.order.to.QuotaRollbackTO] for GenericMessage [payload=byte[234], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=rollback.order, spring_listener_return_correlation=4e708ed8-bfde-4f43-9282-4701c1b5ef19, amqp_receivedExchange=order-rollback-exchange, spring_returned_message_correlation=4591a034-2ee5-4d4c-8a3c-4c95a3b43572, amqp_deliveryTag=1, amqp_consumerQueue=rollback.order.queue, amqp_redelivered=false, id=58a03a74-a8d4-329c-7d26-c229977f4c15, amqp_consumerTag=amq.ctag-0deI5Ts_m6_K32RdEYyDZA, contentType=application/x-java-serialized-object, timestamp=1747560067493}]

  证实了发送端把对象以JDK默认方式(ObjectOutputStream)序列化成 byte[],接收端尝试用 JSON 的方式将这个 byte[] 解析成 QuotaRollbackTO,失败。


解决方案:

  在配置类中,显式使用 JSON 消息转换器,为 RabbitTemplate 设置 Jackson2JsonMessageConverter(或放在Spring Boot的引导类中也可以,引导类本身也是作为一个配置类),这样后续注入的就是自定义的RabbitTemplate

@Configuration
public class RabbitTemplateConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,MessageConverter jacksonMessageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 开启强制投递,必须配置这个 ReturnCallback 才会生效!
        rabbitTemplate.setMandatory(true);

        // 设置 ConfirmCallback
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println(" 交换机已收到消息:" + correlationData);
            } else {
                System.err.println(" 交换机未收到消息:" + cause + ",相关数据:" + correlationData);
            }
        });

        // 设置 ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.err.println(" 消息未成功路由到队列:");
            System.err.println("  原始消息:" + new String(message.getBody()));
            System.err.println("  应答码:" + replyCode);
            System.err.println("  原因:" + replyText);
            System.err.println("  交换机:" + exchange);
            System.err.println("  路由键:" + routingKey);
        });
	
			//为 RabbitTemplate 设置 `Jackson2JsonMessageConverter`
        rabbitTemplate.setMessageConverter(jacksonMessageConverter);

        return rabbitTemplate;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            MessageConverter jacksonMessageConverter) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonMessageConverter);
        return factory;
    }

}

  同样地在发送消息时,不应采用对象转换为JSON字符串的写法:

rabbitTemplate.convertAndSend("rollback.order", JSON.toJSONString(rollbackTO));

验证方式:

  临时打印发送内容 headers:

rabbitTemplate.setReturnsCallback(returned -> {
    Message returnedMessage = returned.getMessage();
    System.out.println("发送内容类型: " + returnedMessage.getMessageProperties().getContentType());
});

  看到的 contentType 应该是:

“application/json”


网站公告

今日签到

点亮在社区的每一天
去签到