spring jms使用

发布于:2025-06-12 ⋅ 阅读:(23) ⋅ 点赞:(0)

在这里插入图片描述

一、背景介绍

<dependency> 标签中列出的 spring-jms 是 Spring Framework 提供的一个模块,用于简化 Java Message Service (JMS) 的开发。它本身并不是 ActiveMQ 的 JMS 推送,但可以与 ActiveMQ 一起使用,以实现 JMS 消息的发送和接收。

解释

  1. Spring JMS:
    • spring-jms 提供了一些便利的功能和抽象,使得使用 JMS 更加简单和直观。它支持消息的发送、接收、事务管理等功能,并提供了模板类(如 JmsTemplate)来简化消息的发送和接收过程。
  2. ActiveMQ:
    • ActiveMQ 是一个流行的开源消息代理,支持 JMS 协议。它可以作为消息中间件,允许不同的应用程序通过消息进行通信。

二、注意点说明

三、代码

pom.xml

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.2.3.RELEASE</version>
</dependency>

mq.properties

# jms
activemq.brokerURL=tcp://20.150.12.141:18750
activemq.sessionCacheSize=20
activemq.ip=20.150.12.141
activemq.port=18750
activemq.protocol=tcp

ActiveMqConfig

package com.hero.lte.ems.jms.config;

import com.hero.lte.ems.configuration.DynamicConfig;
import com.hero.lte.ems.configuration.DynamicConfigLoader;
import com.hero.lte.ems.jms.EMSJmsTemplate;
import com.hero.lte.ems.jms.support.enums.ActivemqProtocolEnum;
import com.hero.lte.ems.jms.support.model.ActiveMqInfo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;

import javax.jms.ConnectionFactory;

@Configuration
public class ActiveMqConfig {

    private String brokerURL;

    private Integer sessionCacheSize;

    @Bean(name = "activemqInfo")
    public ActiveMqInfo activeMqInfo(){
        ActiveMqInfo activeMqInfo = new ActiveMqInfo();
        DynamicConfig config = DynamicConfigLoader.load("mq.properties");
        activeMqInfo.setIp(config.getString("activemq.ip"));
        activeMqInfo.setPort(config.getInt("activemq.port"));
        activeMqInfo.setProtocolEnum(ActivemqProtocolEnum.formatEnum(config.getString("activemq.protocol")));
        return activeMqInfo;
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        DynamicConfig config = DynamicConfigLoader.load("mq.properties");
        brokerURL = config.getString("activemq.protocol")+"://"+config.getString("activemq.ip")+":"+config.getString("activemq.port");
        sessionCacheSize = config.getInt("activemq.sessionCacheSize");
        ActiveMQConnectionFactory activeMQConnectionFactory
                = new ActiveMQConnectionFactory(brokerURL);
        activeMQConnectionFactory.setUseAsyncSend(true);
        activeMQConnectionFactory.setTrustAllPackages(true);
        return activeMQConnectionFactory;
    }

    @Bean
    public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory){
        DynamicConfig config = DynamicConfigLoader.load("mq.properties");
        brokerURL = config.getString("activemq.protocol")+"://"+config.getString("activemq.ip")+":"+config.getString("activemq.port");
        sessionCacheSize = config.getInt("activemq.sessionCacheSize");
        PooledConnectionFactory pooledConnectionFactory
                = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQconnectionFactory);
        pooledConnectionFactory.setMaxConnections(100);
        pooledConnectionFactory.setExpiryTimeout(20000);
        return pooledConnectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(ActiveMQConnectionFactory activeMQconnectionFactory) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setSessionCacheSize(sessionCacheSize);
        connectionFactory.setTargetConnectionFactory(activeMQconnectionFactory);
        return connectionFactory;
    }

    @Bean
    public EMSJmsTemplate jmsTemplate(CachingConnectionFactory connectionFactory) {
        EMSJmsTemplate jmsTemplate = new EMSJmsTemplate();
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.setReceiveTimeout(1000);
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setConnectionFactory(connectionFactory);
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(CachingConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(CachingConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

}

JmsSender

package com.hero.lte.ems.cdc.jms;

import com.alibaba.fastjson.JSON;
import com.hero.lte.ems.cdc.constant.CdcConstant;
import com.hero.lte.ems.cdc.producer.ActiveMQMessageProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;
import java.util.UUID;

/**
 * JMS消息发送者
 */
@Service
public class JmsSender {

    private final String key = UUID.randomUUID().toString();
    @Resource
    ActiveMQMessageProducer activeMQMessageProducer;

    private static Logger log = LoggerFactory.getLogger(JmsSender.class);

    @Resource
    private JmsTemplate jmsTemplate;

    public void send(String message) {
        activeMQMessageProducer.send(CdcConstant.JMS_CDC, key, message);
    }

    /**
     * 推送消息
     *
     * @param destName 消息主题
     * @param obj      消息内容 (json格式发送出去)
     */
    public void send(final String destName, final Object obj, final Boolean isPubSubDomain) {
//        EMSJmsTemplate jmsTemplate = SpringContextHolder.getBean("jmsTemplate");
        try {
            Destination destination = null;
            if (isPubSubDomain) {
                destination = new ActiveMQTopic(destName);
            } else {
                destination = new ActiveMQQueue(destName);
            }

            jmsTemplate.send(destination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage msg = session.createTextMessage();
                    if (obj instanceof String) {
                        msg.setText(obj.toString());
                    } else {
                        msg.setText(JSON.toJSONString(obj));
                    }
                    log.debug("Send {}-{}", destName, obj);
                    return msg;
                }
            });
        } catch (JmsException e) {
            log.error(" jms push info to client error : ", e);
        }
    }

    /**
     * 推送消息
     *
     * @param destName 消息主题 topic
     * @param obj      消息内容 (json格式发送出去)
     */
    public void send(final String destName, final Object obj) {
        send(destName, obj, true);
    }

    /**
     * 推送消息并监听响应(订阅模式)
     *
     * @param topic 消息主题
     * @param obj   消息内容 (json格式发送出去)
     * @return
     */
    public String sendAndReceive(final String topic, final Object obj) {
//        EMSJmsTemplate jmsTemplate = (EMSJmsTemplate) SpringContextHolder.getBean("jmsTemplate");
        try {
            Message message = jmsTemplate.sendAndReceive(topic, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage msg = session.createTextMessage();
                    // 设置消息内容
                    msg.setText(JSON.toJSONString(obj));
                    log.debug("sendAndReceive {}-{}", topic, obj);
                    return msg;
                }
            });
            if (message != null) {
                return ((TextMessage) message).getText();
            }
        } catch (JmsException | JMSException e) {
            log.error(" jms push info to client error : ", e);
        }
        return null;
    }

}

ActiveMqInfo

package com.hero.lte.ems.jms.support.model;

import com.hero.lte.ems.jms.support.enums.ActivemqProtocolEnum;

import java.io.Serializable;

public class ActiveMqInfo implements Serializable
{
    /**
     * 
     */
    private static final long serialVersionUID = -2965965930227658765L;

    private String ip;

    private int port;

    private ActivemqProtocolEnum protocolEnum;

    public String getIp()
    {
        return ip;
    }

    public void setIp(String ip)
    {
        this.ip = ip;
    }

    public int getPort()
    {
        return port;
    }

    public void setPort(int port)
    {
        this.port = port;
    }

    public ActivemqProtocolEnum getProtocolEnum()
    {
        return protocolEnum;
    }

    public void setProtocolEnum(ActivemqProtocolEnum protocolEnum)
    {
        this.protocolEnum = protocolEnum;
    }

    @Override
    public String toString()
    {
        StringBuilder sb = new StringBuilder();
        sb.append("ActiveMqInfo:[");
        sb.append("ip:").append(ip).append(",");
        sb.append("port").append(port).append(",");
        sb.append("protocolEnum").append(protocolEnum);
        sb.append("]");
        return sb.toString();
    }
}

ActivemqProtocolEnum

package com.hero.lte.ems.jms.support.enums;

import java.io.Serializable;

public enum ActivemqProtocolEnum implements Serializable {
    /**
     * Tcp
     */
    TCP("tcp");
    private String protocol;

    public String getProtocol()
    {
        return protocol;
    }

    private ActivemqProtocolEnum(String protocol)
    {
        this.protocol = protocol;
    }

    @Override
    public String toString()
    {
        return protocol;
    }

    public static ActivemqProtocolEnum formatEnum(String protocol)
    {
        for (ActivemqProtocolEnum protocolItem : ActivemqProtocolEnum.values())
        {
            if (protocolItem.protocol.equalsIgnoreCase(protocol))
            {
                return protocolItem;
            }
        }
        return null;
    }

}

生产者

import com.hero.lte.ems.jms.JmsSender;

JmsSender.send(JmsConstant.OBSERVE_REPORT_RECEIVE_JMS, json);

消费者

@JmsListener(destination = JmsConstant.OBSERVE_REPORT_RECEIVE_JMS, containerFactory = "jmsListenerContainerTopic")
    public void receive(String msg) {
        logger.debug("receive message={}", msg);
        ......
    }