本文使用springboot集成IBM MQ的客户端, 简单的实现消息接收和发送逻辑.
pom依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.0.4.0</version>
</dependency>
配置文件
这里的配置和上文中创建的队列信息一致
server.port=9111
## mq的连接ip
project.mq.host=192.168.0.106
## mq的连接端口
project.mq.port=1414
## mq的队列管理器名字
project.mq.queue-manager=QM144
## mq的java客户端连接通道
project.mq.channel=SYSTEM_DEF_SVRCONN
## mq的用户名
project.mq.username=mqm
## mq的密码
project.mq.password=Paic2023q2
## mq的接收超时时间
project.mq.receive-timeout=20000
## 收发队列名称(可选)
project.mq.queuename=QUEUE1
配置类
@Configuration
public class JmsConfig {
@Value("${project.mq.host}")
private String host;
@Value("${project.mq.port}")
private Integer port;
@Value("${project.mq.queue-manager}")
private String queueManager;
@Value("${project.mq.channel}")
private String channel;
@Value("${project.mq.username}")
private String username;
@Value("${project.mq.password}")
private String password;
@Value("${project.mq.receive-timeout}")
private long receiveTimeout;
@Bean
public MQQueueConnectionFactory mqQueueConnectionFactory() {
MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
mqQueueConnectionFactory.setHostName(host);
try {
mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
mqQueueConnectionFactory.setCCSID(1208);
mqQueueConnectionFactory.setChannel(channel);
mqQueueConnectionFactory.setPort(port);
mqQueueConnectionFactory.setQueueManager(queueManager);
} catch (Exception e) {
e.printStackTrace();
}
return mqQueueConnectionFactory;
}
@Bean
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory) {
UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
userCredentialsConnectionFactoryAdapter.setUsername(username);
userCredentialsConnectionFactoryAdapter.setPassword(password);
userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
return userCredentialsConnectionFactoryAdapter;
}
@Bean
@Primary
public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(500);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
return jmsTransactionManager;
}
@Bean
public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
jmsTemplate.setReceiveTimeout(receiveTimeout);
return jmsTemplate;
}
}
发送消息
public void sendMq(@RequestParam String msg) {
log.info("获得报文并开始向mq发送:"+msg);
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.send("QUEUE1", session -> {
TextMessage m = session.createTextMessage();
m.setText(msg);
m.setJMSCorrelationID("1111");
return m;
});
// Message message = jmsTemplate.receiveSelected("QUEUE1", "JMSCorrelationID='2222'");
// System.err.println("---->" + message.toString());
log.info("======推送mq报文完成======");
}
接收消息
@Slf4j
@Component
public class MQListener extends MessageListenerAdapter {
@Autowired
JmsOperations jmsOperations;
@Autowired
private JmsTemplate jmsTemplate;
@Override
@JmsListener(destination = "${project.mq.queuename}", selector = "JMSCorrelationID='1111'")
public void onMessage(Message message) {
log.info("从MQ接收的message报文:"+message);
jmsTemplate.send("QUEUE1", session -> {
TextMessage m = session.createTextMessage();
m.setText("return data");
m.setJMSCorrelationID("2222");
return m;
});
}
}
代码地址: https://gitee.com/sharloon/ibm-mq80-demo.git
本文由mdnice多平台发布