1 引入依赖 <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.1</version> </dependency> 2配置 rocketmq: name-server: 192.168.150.50:9876 producer: group: dist-test # 生产者组 pull-consumer: # pull模式消费者 group: test topic: MyTopic 3 启动类引入配置 创建主题: [root@localhost bin]# sh mqadmin updateTopic -n 192.168.150.50:9876 -b 192.168.150.50:10911 -t MyTopic -w 4 -r 4 create topic to 192.168.150.50:10911 success. TopicConfig [topicName=MyTopic, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}] -n 192.168.150.50:9876:Name Server地址。 -b 192.168.150.50:10911:Broker地址。 -t MyTopic:主题名称。 -w 4:写队列数量。 -r 4:读队列数量。 引入配置类 @SpringBootApplication @ImportAutoConfiguration({RocketMQAutoConfiguration.class}) 4 测试,用SendController测试发送,用SendController测试接收 // RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive // RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend broker的配置也可以不用配置 namesrvAddr= autoCreateTopicEnable=true brokerIP1=192.168.150.50
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/rocketmq") public class SendController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String send() { /** * 普通消息:只负责发送无需等待响应 * 参数1:topic:tag tag可省略不写 * 参数2:Object类型的消息体 * 消费监听器:一般配合着 RocketMQListener<T> 使用 */ rocketMQTemplate.convertAndSend("MyTopic2", "hello world"); // Message message = new Message("topic", "tag", "key", "message body".getBytes()); // message.putUserProperty("REPLY_TO_CLIENT", "yourClientID"); // Set the reply property // producer.send(message); return "success"; } @GetMapping("/send2") public String send2() { /** * 普通消息:等待消费者响应 * 参数信息和上面一样 * 消费者监听器:一般配合着 RocketMQReplyListener<T, R> 使用 */ String res = rocketMQTemplate.sendAndReceive("MyTopic", "hello RocketMQ", String.class); return res; } }
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.springframework.stereotype.Component; // RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive // RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend @Slf4j @Component @RocketMQMessageListener(topic = "MyTopic", consumerGroup = "test_consumer") public class TestRocketMQMessageListener implements RocketMQReplyListener<String, String> { @Override public String onMessage(String s) { log.info("接收到RocketMQ消息[topic={}] ======> {}", "test", s); return SendStatus.SEND_OK.name(); } }
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; // RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive // RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend @Slf4j @Component @RocketMQMessageListener(topic = "MyTopic2", consumerGroup = "test_consumer2") public class TestRocketMQMessageListener2 implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("接收到RocketMQ消息[topic={}] ======> {}", "test", s); } }