文章目录
一、背景介绍
<dependency> 标签中列出的 spring-jms 是 Spring Framework 提供的一个模块,用于简化 Java Message Service (JMS) 的开发。它本身并不是 ActiveMQ 的 JMS 推送,但可以与 ActiveMQ 一起使用,以实现 JMS 消息的发送和接收。
解释
Spring JMS
:- spring-jms 提供了一些便利的功能和抽象,使得使用 JMS 更加简单和直观。它支持消息的发送、接收、事务管理等功能,并提供了模板类(如 JmsTemplate)来简化消息的发送和接收过程。
ActiveMQ
:- ActiveMQ 是一个流行的开源消息代理,支持 JMS 协议。它可以作为消息中间件,允许不同的应用程序通过消息进行通信。
二、注意点说明
三、代码
pom.xml
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.3.RELEASE</version>
</dependency>
mq.properties
# jms
activemq.brokerURL=tcp://20.150.12.141:18750
activemq.sessionCacheSize=20
activemq.ip=20.150.12.141
activemq.port=18750
activemq.protocol=tcp
ActiveMqConfig
package com.hero.lte.ems.jms.config;
import com.hero.lte.ems.configuration.DynamicConfig;
import com.hero.lte.ems.configuration.DynamicConfigLoader;
import com.hero.lte.ems.jms.EMSJmsTemplate;
import com.hero.lte.ems.jms.support.enums.ActivemqProtocolEnum;
import com.hero.lte.ems.jms.support.model.ActiveMqInfo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import javax.jms.ConnectionFactory;
@Configuration
public class ActiveMqConfig {
private String brokerURL;
private Integer sessionCacheSize;
@Bean(name = "activemqInfo")
public ActiveMqInfo activeMqInfo(){
ActiveMqInfo activeMqInfo = new ActiveMqInfo();
DynamicConfig config = DynamicConfigLoader.load("mq.properties");
activeMqInfo.setIp(config.getString("activemq.ip"));
activeMqInfo.setPort(config.getInt("activemq.port"));
activeMqInfo.setProtocolEnum(ActivemqProtocolEnum.formatEnum(config.getString("activemq.protocol")));
return activeMqInfo;
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
DynamicConfig config = DynamicConfigLoader.load("mq.properties");
brokerURL = config.getString("activemq.protocol")+"://"+config.getString("activemq.ip")+":"+config.getString("activemq.port");
sessionCacheSize = config.getInt("activemq.sessionCacheSize");
ActiveMQConnectionFactory activeMQConnectionFactory
= new ActiveMQConnectionFactory(brokerURL);
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionFactory;
}
@Bean
public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory){
DynamicConfig config = DynamicConfigLoader.load("mq.properties");
brokerURL = config.getString("activemq.protocol")+"://"+config.getString("activemq.ip")+":"+config.getString("activemq.port");
sessionCacheSize = config.getInt("activemq.sessionCacheSize");
PooledConnectionFactory pooledConnectionFactory
= new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(activeMQconnectionFactory);
pooledConnectionFactory.setMaxConnections(100);
pooledConnectionFactory.setExpiryTimeout(20000);
return pooledConnectionFactory;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setSessionCacheSize(sessionCacheSize);
connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);
return connectionFactory;
}
@Bean
public EMSJmsTemplate jmsTemplate(CachingConnectionFactory connectionFactory) {
EMSJmsTemplate jmsTemplate = new EMSJmsTemplate();
jmsTemplate.setPubSubDomain(true);
jmsTemplate.setReceiveTimeout(1000);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setConnectionFactory(connectionFactory);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(CachingConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(CachingConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
JmsSender
package com.hero.lte.ems.cdc.jms;
import com.alibaba.fastjson.JSON;
import com.hero.lte.ems.cdc.constant.CdcConstant;
import com.hero.lte.ems.cdc.producer.ActiveMQMessageProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.*;
import java.util.UUID;
/**
* JMS消息发送者
*/
@Service
public class JmsSender {
private final String key = UUID.randomUUID().toString();
@Resource
ActiveMQMessageProducer activeMQMessageProducer;
private static Logger log = LoggerFactory.getLogger(JmsSender.class);
@Resource
private JmsTemplate jmsTemplate;
public void send(String message) {
activeMQMessageProducer.send(CdcConstant.JMS_CDC, key, message);
}
/**
* 推送消息
*
* @param destName 消息主题
* @param obj 消息内容 (json格式发送出去)
*/
public void send(final String destName, final Object obj, final Boolean isPubSubDomain) {
// EMSJmsTemplate jmsTemplate = SpringContextHolder.getBean("jmsTemplate");
try {
Destination destination = null;
if (isPubSubDomain) {
destination = new ActiveMQTopic(destName);
} else {
destination = new ActiveMQQueue(destName);
}
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
if (obj instanceof String) {
msg.setText(obj.toString());
} else {
msg.setText(JSON.toJSONString(obj));
}
log.debug("Send {}-{}", destName, obj);
return msg;
}
});
} catch (JmsException e) {
log.error(" jms push info to client error : ", e);
}
}
/**
* 推送消息
*
* @param destName 消息主题 topic
* @param obj 消息内容 (json格式发送出去)
*/
public void send(final String destName, final Object obj) {
send(destName, obj, true);
}
/**
* 推送消息并监听响应(订阅模式)
*
* @param topic 消息主题
* @param obj 消息内容 (json格式发送出去)
* @return
*/
public String sendAndReceive(final String topic, final Object obj) {
// EMSJmsTemplate jmsTemplate = (EMSJmsTemplate) SpringContextHolder.getBean("jmsTemplate");
try {
Message message = jmsTemplate.sendAndReceive(topic, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(JSON.toJSONString(obj));
log.debug("sendAndReceive {}-{}", topic, obj);
return msg;
}
});
if (message != null) {
return ((TextMessage) message).getText();
}
} catch (JmsException | JMSException e) {
log.error(" jms push info to client error : ", e);
}
return null;
}
}
ActiveMqInfo
package com.hero.lte.ems.jms.support.model;
import com.hero.lte.ems.jms.support.enums.ActivemqProtocolEnum;
import java.io.Serializable;
public class ActiveMqInfo implements Serializable
{
/**
*
*/
private static final long serialVersionUID = -2965965930227658765L;
private String ip;
private int port;
private ActivemqProtocolEnum protocolEnum;
public String getIp()
{
return ip;
}
public void setIp(String ip)
{
this.ip = ip;
}
public int getPort()
{
return port;
}
public void setPort(int port)
{
this.port = port;
}
public ActivemqProtocolEnum getProtocolEnum()
{
return protocolEnum;
}
public void setProtocolEnum(ActivemqProtocolEnum protocolEnum)
{
this.protocolEnum = protocolEnum;
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append("ActiveMqInfo:[");
sb.append("ip:").append(ip).append(",");
sb.append("port").append(port).append(",");
sb.append("protocolEnum").append(protocolEnum);
sb.append("]");
return sb.toString();
}
}
ActivemqProtocolEnum
package com.hero.lte.ems.jms.support.enums;
import java.io.Serializable;
public enum ActivemqProtocolEnum implements Serializable {
/**
* Tcp
*/
TCP("tcp");
private String protocol;
public String getProtocol()
{
return protocol;
}
private ActivemqProtocolEnum(String protocol)
{
this.protocol = protocol;
}
@Override
public String toString()
{
return protocol;
}
public static ActivemqProtocolEnum formatEnum(String protocol)
{
for (ActivemqProtocolEnum protocolItem : ActivemqProtocolEnum.values())
{
if (protocolItem.protocol.equalsIgnoreCase(protocol))
{
return protocolItem;
}
}
return null;
}
}
生产者
import com.hero.lte.ems.jms.JmsSender;
JmsSender.send(JmsConstant.OBSERVE_REPORT_RECEIVE_JMS, json);
消费者
@JmsListener(destination = JmsConstant.OBSERVE_REPORT_RECEIVE_JMS, containerFactory = "jmsListenerContainerTopic")
public void receive(String msg) {
logger.debug("receive message={}", msg);
......
}