ActiveMQ在Spring Boot中的详细使用指南

发布于:2025-07-08 ⋅ 阅读:(30) ⋅ 点赞:(0)

📋 目录

🚀 ActiveMQ简介

什么是ActiveMQ?

核心概念

🏗️ 基础架构组件

📝 重要概念解释

ActiveMQ vs 其他消息中间件

🔧 环境搭建

1. ActiveMQ服务端安装

Docker方式(推荐初学者)

手动安装方式

2. 验证安装

访问Web管理界面

连接参数

测试连接

🏗️ Spring Boot集成配置

1. 添加依赖

2. 配置文件

3. ActiveMQ配置类

 

📨 基础消息收发

1. 创建消息实体类

2. 消息生产者服务

3. 消息消费者服务

4. 测试控制器

🎯 消息模式详解

1. 点对点模式(Queue)

特点

实现示例

2. 发布订阅模式(Topic)

特点

实现示例

🚀 高级特性

1. 消息选择器(Message Selector)

基于消息属性过滤

2. 事务消息

JMS事务配置

3. 消息确认机制

手动确认模式

4. 死信队列(DLQ)

死信队列配置

🖥️ Web管理界面

ActiveMQ Web Console使用

1. 访问管理界面

2. 主要功能

队列管理

主题管理

连接监控

消息浏览

2. 自定义监控页面

创建监控控制器

创建监控页面

📊 监控和管理

1. Spring Boot Actuator集成

添加Actuator依赖

配置监控端点

自定义健康指示器

2. 性能监控

消息处理性能监控

3. 日志配置

logback-spring.xml

🎯 实战案例

⚡ 性能优化

💡 最佳实践

❓ 常见问题解决


​​​​​​​

🚀 ActiveMQ简介

什么是ActiveMQ?

ActiveMQ是Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的完整实现,具有高性能、可靠性强、易于使用的特点。

核心概念

🏗️ 基础架构组件
Producer(生产者):负责发送消息
Consumer(消费者):负责接收和处理消息
Broker(代理):消息服务器,负责存储和转发消息
Destination(目的地):消息的目标,包括Queue和Topic
📝 重要概念解释
  • Queue(队列):点对点模式,一条消息只能被一个消费者消费
  • Topic(主题):发布订阅模式,一条消息可以被多个消费者消费
  • JMS(Java Message Service):Java消息服务API标准
  • Message(消息):传输的数据单元
  • Session(会话):生产和消费消息的上下文
  • Connection(连接):客户端与消息服务器的网络连接

ActiveMQ vs 其他消息中间件

特性 ActiveMQ RabbitMQ RocketMQ
开发语言 Java Erlang Java
协议支持 JMS、AMQP、STOMP AMQP 自定义协议
管理界面 Web Console Management UI Console
集群支持
事务支持
消息持久化

🔧 环境搭建

1. ActiveMQ服务端安装

Docker方式(推荐初学者)
# 1. 拉取ActiveMQ镜像
docker pull webcenter/activemq:latest

# 2. 启动ActiveMQ容器
docker run -d \
  --name activemq \
  -p 61616:61616 \
  -p 8161:8161 \
  webcenter/activemq:latest

# 3. 查看容器状态
docker ps | grep activemq

# 4. 查看日志
docker logs activemq
手动安装方式
# 1. 下载ActiveMQ
wget https://archive.apache.org/dist/activemq/5.17.3/apache-activemq-5.17.3-bin.tar.gz

# 2. 解压
tar -zxvf apache-activemq-5.17.3-bin.tar.gz

# 3. 启动ActiveMQ
cd apache-activemq-5.17.3
./bin/activemq start

# 4. 停止ActiveMQ
./bin/activemq stop

# 5. 查看状态
./bin/activemq status

2. 验证安装

访问Web管理界面
URL: http://localhost:8161/admin
默认用户名: admin
默认密码: admin
连接参数
JMS连接URL: tcp://localhost:61616
Web管理端口: 8161
JMX端口: 1099
测试连接
# 使用ActiveMQ自带的测试工具
cd apache-activemq-5.17.3

# 启动消费者
./bin/activemq consumer

# 启动生产者(新开终端)
./bin/activemq producer

🏗️ Spring Boot集成配置

1. 添加依赖

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Spring Boot ActiveMQ Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    
    <!-- ActiveMQ连接池 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
    </dependency>
    
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <!-- 开发工具 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
</dependencies>

2. 配置文件

application.yml
# ActiveMQ配置
spring:
  activemq:
    broker-url: tcp://localhost:61616  # ActiveMQ服务器地址
    user: admin                        # 用户名
    password: admin                    # 密码
    in-memory: false                   # 不使用内存模式
    pool:
      enabled: true                    # 启用连接池
      max-connections: 50              # 最大连接数
      idle-timeout: 30000              # 空闲超时时间(毫秒)
    packages:
      trust-all: true                  # 信任所有包(开发环境)
      trusted: com.example.model       # 信任的包(生产环境推荐)

  # JMS配置
  jms:
    pub-sub-domain: false              # false=Queue模式,true=Topic模式
    template:
      default-destination: default-queue  # 默认目的地
      delivery-mode: persistent        # 消息持久化模式
      priority: 100                    # 消息优先级(0-255)
      time-to-live: 36000000          # 消息存活时间(毫秒)
      receive-timeout: 10000           # 接收超时时间(毫秒)

# 应用配置
server:
  port: 8080

# 日志配置
logging:
  level:
    org.apache.activemq: INFO
    org.springframework.jms: DEBUG
    com.example: DEBUG
application.properties(可选)
# ActiveMQ连接配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false

# 连接池配置
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring.activemq.pool.idle-timeout=30000

# JMS配置
spring.jms.pub-sub-domain=false
spring.jms.template.default-destination=default-queue
spring.jms.template.delivery-mode=persistent

3. ActiveMQ配置类

基础配置
package com.example.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
@EnableJms  // 启用JMS
public class ActiveMQConfig {
    
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;
    
    @Value("${spring.activemq.user}")
    private String username;
    
    @Value("${spring.activemq.password}")
    private String password;
    
    /**
     * ActiveMQ连接工厂
     */
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUserName(username);
        factory.setPassword(password);
        
        // 信任所有包(开发环境)
        factory.setTrustAllPackages(true);
        
        // 生产环境推荐指定信任的包
        // factory.setTrustedPackages(Arrays.asList("com.example.model"));
        
        return factory;
    }
    
    /**
     * 连接池工厂
     */
    @Bean
    public PooledConnectionFactory pooledConnectionFactory() {
        PooledConnectionFactory pooledFactory = new PooledConnectionFactory();
        pooledFactory.setConnectionFactory(activeMQConnectionFactory());
        pooledFactory.setMaxConnections(50);  // 最大连接数
        pooledFactory.setIdleTimeout(30000);  // 空闲超时
        return pooledFactory;
    }
    
    /**
     * JmsTemplate - 用于发送消息
     */
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(pooledConnectionFactory());
        template.setMessageConverter(jacksonJmsMessageConverter());
        template.setDeliveryPersistent(true);  // 消息持久化
        template.setSessionTransacted(true);   // 启用事务
        return template;
    }
    
    /**
     * Queue模式的监听器工厂
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(pooledConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        factory.setPubSubDomain(false);  // Queue模式
        factory.setSessionTransacted(true);  // 启用事务
        factory.setConcurrency("3-10");     // 并发消费者数量
        return factory;
    }
    
    /**
     * Topic模式的监听器工厂
     */
    @Bean
    public DefaultJmsListenerContainerFactory topicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(pooledConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        factory.setPubSubDomain(true);   // Topic模式
        factory.setSessionTransacted(true);
        factory.setConcurrency("3-10");
        return factory;
    }
    
    /**
     * 消息转换器 - 支持对象序列化
     */
    @Bean
    public MappingJackson2MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);  // 使用文本消息
        converter.setTypeIdPropertyName("_type");   // 类型标识属性
        return converter;
    }
}

📨 基础消息收发

1. 创建消息实体类

package com.example.model;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * 用户消息实体
 */
public class UserMessage implements Serializable {
    private static final long serialVersionUID = 1L;
    
    private Long id;
    private String username;
    private String email;
    private String action;
    private LocalDateTime timestamp;
    private String description;
    
    // 无参构造函数(JSON反序列化需要)
    public UserMessage() {}
    
    // 全参构造函数
    public UserMessage(Long id, String username, String email, String action, String description) {
        this.id = id;
        this.username = username;
        this.email = email;
        this.action = action;
        this.description = description;
        this.timestamp = LocalDateTime.now();
    }
    
    // Getter和Setter方法
    public Long getId() { return id; }
    public void setId(Long id) { this.id = id; }
    
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
    
    public String getAction() { return action; }
    public void setAction(String action) { this.action = action; }
    
    public LocalDateTime getTimestamp() { return timestamp; }
    public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
    
    public String getDescription() { return description; }
    public void setDescription(String description) { this.description = description; }
    
    @Override
    public String toString() {
        return "UserMessage{" +
                "id=" + id +
                ", username='" + username + '\'' +
                ", email='" + email + '\'' +
                ", action='" + action + '\'' +
                ", timestamp=" + timestamp +
                ", description='" + description + '\'' +
                '}';
    }
}

/**
 * 订单消息实体
 */
public class OrderMessage implements Serializable {
    private static final long serialVersionUID = 1L;
    
    private String orderId;
    private String userId;
    private String productName;
    private Integer quantity;
    private Double totalPrice;
    private String status;
    private LocalDateTime createTime;
    
    // 构造函数
    public OrderMessage() {}
    
    public OrderMessage(String orderId, String userId, String productName, 
                       Integer quantity, Double totalPrice, String status) {
        this.orderId = orderId;
        this.userId = userId;
        this.productName = productName;
        this.quantity = quantity;
        this.totalPrice = totalPrice;
        this.status = status;
        this.createTime = LocalDateTime.now();
    }
    
    // Getter和Setter方法省略...
    
    @Override
    public String toString() {
        return "OrderMessage{" +
                "orderId='" + orderId + '\'' +
                ", userId='" + userId + '\'' +
                ", productName='" + productName + '\'' +
                ", quantity=" + quantity +
                ", totalPrice=" + totalPrice +
                ", status='" + status + '\'' +
                ", createTime=" + createTime +
                '}';
    }
}

2. 消息生产者服务

package com.example.service;

import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 消息生产者服务
 */
@Service
public class MessageProducerService {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    /**
     * 发送简单文本消息到Queue
     */
    public void sendTextMessage(String queueName, String message) {
        try {
            jmsTemplate.convertAndSend(queueName, message);
            System.out.println("✅ 文本消息发送成功到Queue: " + queueName);
            System.out.println("   消息内容: " + message);
        } catch (Exception e) {
            System.err.println("❌ 文本消息发送失败: " + e.getMessage());
            throw new RuntimeException("消息发送失败", e);
        }
    }
    
    /**
     * 发送用户消息对象到Queue
     */
    public void sendUserMessage(String queueName, UserMessage userMessage) {
        try {
            jmsTemplate.convertAndSend(queueName, userMessage);
            System.out.println("✅ 用户消息发送成功到Queue: " + queueName);
            System.out.println("   消息内容: " + userMessage);
        } catch (Exception e) {
            System.err.println("❌ 用户消息发送失败: " + e.getMessage());
            throw new RuntimeException("用户消息发送失败", e);
        }
    }
    
    /**
     * 发送订单消息到Queue
     */
    public void sendOrderMessage(String queueName, OrderMessage orderMessage) {
        try {
            jmsTemplate.convertAndSend(queueName, orderMessage);
            System.out.println("✅ 订单消息发送成功到Queue: " + queueName);
            System.out.println("   订单ID: " + orderMessage.getOrderId());
        } catch (Exception e) {
            System.err.println("❌ 订单消息发送失败: " + e.getMessage());
            throw new RuntimeException("订单消息发送失败", e);
        }
    }
    
    /**
     * 发送消息到Topic(发布订阅模式)
     */
    public void sendMessageToTopic(String topicName, Object message) {
        try {
            // 临时设置为Topic模式
            jmsTemplate.setPubSubDomain(true);
            jmsTemplate.convertAndSend(topicName, message);
            // 恢复Queue模式
            jmsTemplate.setPubSubDomain(false);
            
            System.out.println("✅ 消息发布成功到Topic: " + topicName);
            System.out.println("   消息内容: " + message);
        } catch (Exception e) {
            System.err.println("❌ Topic消息发送失败: " + e.getMessage());
            throw new RuntimeException("Topic消息发送失败", e);
        }
    }
    
    /**
     * 发送带优先级的消息
     */
    public void sendPriorityMessage(String queueName, String message, int priority) {
        try {
            jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {
                messagePostProcessor.setJMSPriority(priority);
                return messagePostProcessor;
            });
            System.out.println("✅ 优先级消息发送成功: 优先级=" + priority);
            System.out.println("   消息内容: " + message);
        } catch (Exception e) {
            System.err.println("❌ 优先级消息发送失败: " + e.getMessage());
        }
    }
    
    /**
     * 发送延时消息
     */
    public void sendDelayMessage(String queueName, String message, long delayTime) {
        try {
            jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {
                messagePostProcessor.setLongProperty("AMQ_SCHEDULED_DELAY", delayTime);
                return messagePostProcessor;
            });
            System.out.println("✅ 延时消息发送成功: 延时=" + delayTime + "毫秒");
            System.out.println("   消息内容: " + message);
        } catch (Exception e) {
            System.err.println("❌ 延时消息发送失败: " + e.getMessage());
        }
    }
    
    /**
     * 批量发送消息
     */
    public void sendBatchMessages(String queueName, String messagePrefix, int count) {
        try {
            for (int i = 1; i <= count; i++) {
                String message = messagePrefix + " #" + i;
                jmsTemplate.convertAndSend(queueName, message);
            }
            System.out.println("✅ 批量消息发送成功: " + count + " 条消息");
        } catch (Exception e) {
            System.err.println("❌ 批量消息发送失败: " + e.getMessage());
        }
    }
}

3. 消息消费者服务

package com.example.service;

import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

/**
 * 消息消费者服务
 */
@Service
public class MessageConsumerService {
    
    /**
     * 消费文本消息 - Queue模式
     */
    @JmsListener(destination = "text.queue")
    public void receiveTextMessage(String message) {
        try {
            System.out.println("📨 接收到文本消息: " + message);
            
            // 模拟业务处理
            processTextMessage(message);
            
            System.out.println("✅ 文本消息处理完成");
        } catch (Exception e) {
            System.err.println("❌ 文本消息处理失败: " + e.getMessage());
            throw new RuntimeException("消息处理失败", e);
        }
    }
    
    /**
     * 消费用户消息对象 - Queue模式
     */
    @JmsListener(destination = "user.queue")
    public void receiveUserMessage(UserMessage userMessage) {
        try {
            System.out.println("📨 接收到用户消息: " + userMessage);
            
            // 根据用户行为进行不同处理
            switch (userMessage.getAction().toLowerCase()) {
                case "register":
                    handleUserRegistration(userMessage);
                    break;
                case "login":
                    handleUserLogin(userMessage);
                    break;
                case "logout":
                    handleUserLogout(userMessage);
                    break;
                case "update":
                    handleUserUpdate(userMessage);
                    break;
                default:
                    System.out.println("🤔 未知的用户行为: " + userMessage.getAction());
            }
            
            System.out.println("✅ 用户消息处理完成");
        } catch (Exception e) {
            System.err.println("❌ 用户消息处理失败: " + e.getMessage());
            throw new RuntimeException("用户消息处理失败", e);
        }
    }
    
    /**
     * 消费订单消息 - Queue模式
     */
    @JmsListener(destination = "order.queue")
    public void receiveOrderMessage(OrderMessage orderMessage) {
        try {
            System.out.println("📨 接收到订单消息: " + orderMessage);
            
            // 根据订单状态进行处理
            switch (orderMessage.getStatus().toLowerCase()) {
                case "created":
                    handleOrderCreated(orderMessage);
                    break;
                case "paid":
                    handleOrderPaid(orderMessage);
                    break;
                case "shipped":
                    handleOrderShipped(orderMessage);
                    break;
                case "completed":
                    handleOrderCompleted(orderMessage);
                    break;
                case "cancelled":
                    handleOrderCancelled(orderMessage);
                    break;
                default:
                    System.out.println("🤔 未知的订单状态: " + orderMessage.getStatus());
            }
            
            System.out.println("✅ 订单消息处理完成");
        } catch (Exception e) {
            System.err.println("❌ 订单消息处理失败: " + e.getMessage());
            throw new RuntimeException("订单消息处理失败", e);
        }
    }
    
    /**
     * 消费Topic消息 - 发布订阅模式
     * 多个消费者可以同时接收到同一条消息
     */
    @JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")
    public void receiveNewsFromTopic(String news) {
        try {
            System.out.println("📡 [新闻订阅者1] 接收到新闻: " + news);
            
            // 处理新闻消息
            processNews(news, "订阅者1");
            
            System.out.println("✅ [新闻订阅者1] 新闻处理完成");
        } catch (Exception e) {
            System.err.println("❌ [新闻订阅者1] 新闻处理失败: " + e.getMessage());
        }
    }
    
    /**
     * 另一个Topic消费者
     */
    @JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")
    public void receiveNewsFromTopic2(String news) {
        try {
            System.out.println("📡 [新闻订阅者2] 接收到新闻: " + news);
            
            // 处理新闻消息
            processNews(news, "订阅者2");
            
            System.out.println("✅ [新闻订阅者2] 新闻处理完成");
        } catch (Exception e) {
            System.err.println("❌ [新闻订阅者2] 新闻处理失败: " + e.getMessage());
        }
    }
    
    /**
     * 消费原始JMS消息(可以获取更多消息属性)
     */
    @JmsListener(destination = "raw.message.queue")
    public void receiveRawMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String content = textMessage.getText();
                
                // 获取消息属性
                String messageId = message.getJMSMessageID();
                int priority = message.getJMSPriority();
                long timestamp = message.getJMSTimestamp();
                
                System.out.println("

网站公告

今日签到

点亮在社区的每一天
去签到