📋 目录
🚀 ActiveMQ简介
什么是ActiveMQ?
ActiveMQ是Apache软件基金会开发的开源消息中间件,是Java消息服务(JMS)的完整实现,具有高性能、可靠性强、易于使用的特点。
核心概念
🏗️ 基础架构组件
Producer(生产者):负责发送消息
Consumer(消费者):负责接收和处理消息
Broker(代理):消息服务器,负责存储和转发消息
Destination(目的地):消息的目标,包括Queue和Topic
📝 重要概念解释
- Queue(队列):点对点模式,一条消息只能被一个消费者消费
- Topic(主题):发布订阅模式,一条消息可以被多个消费者消费
- JMS(Java Message Service):Java消息服务API标准
- Message(消息):传输的数据单元
- Session(会话):生产和消费消息的上下文
- Connection(连接):客户端与消息服务器的网络连接
ActiveMQ vs 其他消息中间件
特性 | ActiveMQ | RabbitMQ | RocketMQ |
---|---|---|---|
开发语言 | Java | Erlang | Java |
协议支持 | JMS、AMQP、STOMP | AMQP | 自定义协议 |
管理界面 | Web Console | Management UI | Console |
集群支持 | ✅ | ✅ | ✅ |
事务支持 | ✅ | ✅ | ✅ |
消息持久化 | ✅ | ✅ | ✅ |
🔧 环境搭建
1. ActiveMQ服务端安装
Docker方式(推荐初学者)
# 1. 拉取ActiveMQ镜像
docker pull webcenter/activemq:latest
# 2. 启动ActiveMQ容器
docker run -d \
--name activemq \
-p 61616:61616 \
-p 8161:8161 \
webcenter/activemq:latest
# 3. 查看容器状态
docker ps | grep activemq
# 4. 查看日志
docker logs activemq
手动安装方式
# 1. 下载ActiveMQ
wget https://archive.apache.org/dist/activemq/5.17.3/apache-activemq-5.17.3-bin.tar.gz
# 2. 解压
tar -zxvf apache-activemq-5.17.3-bin.tar.gz
# 3. 启动ActiveMQ
cd apache-activemq-5.17.3
./bin/activemq start
# 4. 停止ActiveMQ
./bin/activemq stop
# 5. 查看状态
./bin/activemq status
2. 验证安装
访问Web管理界面
URL: http://localhost:8161/admin
默认用户名: admin
默认密码: admin
连接参数
JMS连接URL: tcp://localhost:61616
Web管理端口: 8161
JMX端口: 1099
测试连接
# 使用ActiveMQ自带的测试工具
cd apache-activemq-5.17.3
# 启动消费者
./bin/activemq consumer
# 启动生产者(新开终端)
./bin/activemq producer
🏗️ Spring Boot集成配置
1. 添加依赖
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot ActiveMQ Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- ActiveMQ连接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- 开发工具 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
2. 配置文件
application.yml
# ActiveMQ配置
spring:
activemq:
broker-url: tcp://localhost:61616 # ActiveMQ服务器地址
user: admin # 用户名
password: admin # 密码
in-memory: false # 不使用内存模式
pool:
enabled: true # 启用连接池
max-connections: 50 # 最大连接数
idle-timeout: 30000 # 空闲超时时间(毫秒)
packages:
trust-all: true # 信任所有包(开发环境)
trusted: com.example.model # 信任的包(生产环境推荐)
# JMS配置
jms:
pub-sub-domain: false # false=Queue模式,true=Topic模式
template:
default-destination: default-queue # 默认目的地
delivery-mode: persistent # 消息持久化模式
priority: 100 # 消息优先级(0-255)
time-to-live: 36000000 # 消息存活时间(毫秒)
receive-timeout: 10000 # 接收超时时间(毫秒)
# 应用配置
server:
port: 8080
# 日志配置
logging:
level:
org.apache.activemq: INFO
org.springframework.jms: DEBUG
com.example: DEBUG
application.properties(可选)
# ActiveMQ连接配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false
# 连接池配置
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring.activemq.pool.idle-timeout=30000
# JMS配置
spring.jms.pub-sub-domain=false
spring.jms.template.default-destination=default-queue
spring.jms.template.delivery-mode=persistent
3. ActiveMQ配置类
基础配置
package com.example.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageType;
@Configuration
@EnableJms // 启用JMS
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
/**
* ActiveMQ连接工厂
*/
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerUrl);
factory.setUserName(username);
factory.setPassword(password);
// 信任所有包(开发环境)
factory.setTrustAllPackages(true);
// 生产环境推荐指定信任的包
// factory.setTrustedPackages(Arrays.asList("com.example.model"));
return factory;
}
/**
* 连接池工厂
*/
@Bean
public PooledConnectionFactory pooledConnectionFactory() {
PooledConnectionFactory pooledFactory = new PooledConnectionFactory();
pooledFactory.setConnectionFactory(activeMQConnectionFactory());
pooledFactory.setMaxConnections(50); // 最大连接数
pooledFactory.setIdleTimeout(30000); // 空闲超时
return pooledFactory;
}
/**
* JmsTemplate - 用于发送消息
*/
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(pooledConnectionFactory());
template.setMessageConverter(jacksonJmsMessageConverter());
template.setDeliveryPersistent(true); // 消息持久化
template.setSessionTransacted(true); // 启用事务
return template;
}
/**
* Queue模式的监听器工厂
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(pooledConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setPubSubDomain(false); // Queue模式
factory.setSessionTransacted(true); // 启用事务
factory.setConcurrency("3-10"); // 并发消费者数量
return factory;
}
/**
* Topic模式的监听器工厂
*/
@Bean
public DefaultJmsListenerContainerFactory topicListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(pooledConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setPubSubDomain(true); // Topic模式
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
/**
* 消息转换器 - 支持对象序列化
*/
@Bean
public MappingJackson2MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT); // 使用文本消息
converter.setTypeIdPropertyName("_type"); // 类型标识属性
return converter;
}
}
📨 基础消息收发
1. 创建消息实体类
package com.example.model;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 用户消息实体
*/
public class UserMessage implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String username;
private String email;
private String action;
private LocalDateTime timestamp;
private String description;
// 无参构造函数(JSON反序列化需要)
public UserMessage() {}
// 全参构造函数
public UserMessage(Long id, String username, String email, String action, String description) {
this.id = id;
this.username = username;
this.email = email;
this.action = action;
this.description = description;
this.timestamp = LocalDateTime.now();
}
// Getter和Setter方法
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getAction() { return action; }
public void setAction(String action) { this.action = action; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
@Override
public String toString() {
return "UserMessage{" +
"id=" + id +
", username='" + username + '\'' +
", email='" + email + '\'' +
", action='" + action + '\'' +
", timestamp=" + timestamp +
", description='" + description + '\'' +
'}';
}
}
/**
* 订单消息实体
*/
public class OrderMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String orderId;
private String userId;
private String productName;
private Integer quantity;
private Double totalPrice;
private String status;
private LocalDateTime createTime;
// 构造函数
public OrderMessage() {}
public OrderMessage(String orderId, String userId, String productName,
Integer quantity, Double totalPrice, String status) {
this.orderId = orderId;
this.userId = userId;
this.productName = productName;
this.quantity = quantity;
this.totalPrice = totalPrice;
this.status = status;
this.createTime = LocalDateTime.now();
}
// Getter和Setter方法省略...
@Override
public String toString() {
return "OrderMessage{" +
"orderId='" + orderId + '\'' +
", userId='" + userId + '\'' +
", productName='" + productName + '\'' +
", quantity=" + quantity +
", totalPrice=" + totalPrice +
", status='" + status + '\'' +
", createTime=" + createTime +
'}';
}
}
2. 消息生产者服务
package com.example.service;
import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* 消息生产者服务
*/
@Service
public class MessageProducerService {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 发送简单文本消息到Queue
*/
public void sendTextMessage(String queueName, String message) {
try {
jmsTemplate.convertAndSend(queueName, message);
System.out.println("✅ 文本消息发送成功到Queue: " + queueName);
System.out.println(" 消息内容: " + message);
} catch (Exception e) {
System.err.println("❌ 文本消息发送失败: " + e.getMessage());
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 发送用户消息对象到Queue
*/
public void sendUserMessage(String queueName, UserMessage userMessage) {
try {
jmsTemplate.convertAndSend(queueName, userMessage);
System.out.println("✅ 用户消息发送成功到Queue: " + queueName);
System.out.println(" 消息内容: " + userMessage);
} catch (Exception e) {
System.err.println("❌ 用户消息发送失败: " + e.getMessage());
throw new RuntimeException("用户消息发送失败", e);
}
}
/**
* 发送订单消息到Queue
*/
public void sendOrderMessage(String queueName, OrderMessage orderMessage) {
try {
jmsTemplate.convertAndSend(queueName, orderMessage);
System.out.println("✅ 订单消息发送成功到Queue: " + queueName);
System.out.println(" 订单ID: " + orderMessage.getOrderId());
} catch (Exception e) {
System.err.println("❌ 订单消息发送失败: " + e.getMessage());
throw new RuntimeException("订单消息发送失败", e);
}
}
/**
* 发送消息到Topic(发布订阅模式)
*/
public void sendMessageToTopic(String topicName, Object message) {
try {
// 临时设置为Topic模式
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend(topicName, message);
// 恢复Queue模式
jmsTemplate.setPubSubDomain(false);
System.out.println("✅ 消息发布成功到Topic: " + topicName);
System.out.println(" 消息内容: " + message);
} catch (Exception e) {
System.err.println("❌ Topic消息发送失败: " + e.getMessage());
throw new RuntimeException("Topic消息发送失败", e);
}
}
/**
* 发送带优先级的消息
*/
public void sendPriorityMessage(String queueName, String message, int priority) {
try {
jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {
messagePostProcessor.setJMSPriority(priority);
return messagePostProcessor;
});
System.out.println("✅ 优先级消息发送成功: 优先级=" + priority);
System.out.println(" 消息内容: " + message);
} catch (Exception e) {
System.err.println("❌ 优先级消息发送失败: " + e.getMessage());
}
}
/**
* 发送延时消息
*/
public void sendDelayMessage(String queueName, String message, long delayTime) {
try {
jmsTemplate.convertAndSend(queueName, message, messagePostProcessor -> {
messagePostProcessor.setLongProperty("AMQ_SCHEDULED_DELAY", delayTime);
return messagePostProcessor;
});
System.out.println("✅ 延时消息发送成功: 延时=" + delayTime + "毫秒");
System.out.println(" 消息内容: " + message);
} catch (Exception e) {
System.err.println("❌ 延时消息发送失败: " + e.getMessage());
}
}
/**
* 批量发送消息
*/
public void sendBatchMessages(String queueName, String messagePrefix, int count) {
try {
for (int i = 1; i <= count; i++) {
String message = messagePrefix + " #" + i;
jmsTemplate.convertAndSend(queueName, message);
}
System.out.println("✅ 批量消息发送成功: " + count + " 条消息");
} catch (Exception e) {
System.err.println("❌ 批量消息发送失败: " + e.getMessage());
}
}
}
3. 消息消费者服务
package com.example.service;
import com.example.model.OrderMessage;
import com.example.model.UserMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 消息消费者服务
*/
@Service
public class MessageConsumerService {
/**
* 消费文本消息 - Queue模式
*/
@JmsListener(destination = "text.queue")
public void receiveTextMessage(String message) {
try {
System.out.println("📨 接收到文本消息: " + message);
// 模拟业务处理
processTextMessage(message);
System.out.println("✅ 文本消息处理完成");
} catch (Exception e) {
System.err.println("❌ 文本消息处理失败: " + e.getMessage());
throw new RuntimeException("消息处理失败", e);
}
}
/**
* 消费用户消息对象 - Queue模式
*/
@JmsListener(destination = "user.queue")
public void receiveUserMessage(UserMessage userMessage) {
try {
System.out.println("📨 接收到用户消息: " + userMessage);
// 根据用户行为进行不同处理
switch (userMessage.getAction().toLowerCase()) {
case "register":
handleUserRegistration(userMessage);
break;
case "login":
handleUserLogin(userMessage);
break;
case "logout":
handleUserLogout(userMessage);
break;
case "update":
handleUserUpdate(userMessage);
break;
default:
System.out.println("🤔 未知的用户行为: " + userMessage.getAction());
}
System.out.println("✅ 用户消息处理完成");
} catch (Exception e) {
System.err.println("❌ 用户消息处理失败: " + e.getMessage());
throw new RuntimeException("用户消息处理失败", e);
}
}
/**
* 消费订单消息 - Queue模式
*/
@JmsListener(destination = "order.queue")
public void receiveOrderMessage(OrderMessage orderMessage) {
try {
System.out.println("📨 接收到订单消息: " + orderMessage);
// 根据订单状态进行处理
switch (orderMessage.getStatus().toLowerCase()) {
case "created":
handleOrderCreated(orderMessage);
break;
case "paid":
handleOrderPaid(orderMessage);
break;
case "shipped":
handleOrderShipped(orderMessage);
break;
case "completed":
handleOrderCompleted(orderMessage);
break;
case "cancelled":
handleOrderCancelled(orderMessage);
break;
default:
System.out.println("🤔 未知的订单状态: " + orderMessage.getStatus());
}
System.out.println("✅ 订单消息处理完成");
} catch (Exception e) {
System.err.println("❌ 订单消息处理失败: " + e.getMessage());
throw new RuntimeException("订单消息处理失败", e);
}
}
/**
* 消费Topic消息 - 发布订阅模式
* 多个消费者可以同时接收到同一条消息
*/
@JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")
public void receiveNewsFromTopic(String news) {
try {
System.out.println("📡 [新闻订阅者1] 接收到新闻: " + news);
// 处理新闻消息
processNews(news, "订阅者1");
System.out.println("✅ [新闻订阅者1] 新闻处理完成");
} catch (Exception e) {
System.err.println("❌ [新闻订阅者1] 新闻处理失败: " + e.getMessage());
}
}
/**
* 另一个Topic消费者
*/
@JmsListener(destination = "news.topic", containerFactory = "topicListenerContainerFactory")
public void receiveNewsFromTopic2(String news) {
try {
System.out.println("📡 [新闻订阅者2] 接收到新闻: " + news);
// 处理新闻消息
processNews(news, "订阅者2");
System.out.println("✅ [新闻订阅者2] 新闻处理完成");
} catch (Exception e) {
System.err.println("❌ [新闻订阅者2] 新闻处理失败: " + e.getMessage());
}
}
/**
* 消费原始JMS消息(可以获取更多消息属性)
*/
@JmsListener(destination = "raw.message.queue")
public void receiveRawMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String content = textMessage.getText();
// 获取消息属性
String messageId = message.getJMSMessageID();
int priority = message.getJMSPriority();
long timestamp = message.getJMSTimestamp();
System.out.println("