Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程
一、整体架构
二、实现步骤
步骤1:添加Maven依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
步骤2:配置Redis连接
# application.yml
spring:
redis:
host: localhost
port: 6379
lettuce:
pool:
max-active: 16
max-idle: 8
# redisStream配置信息
app:
redis:
stream: app-events
group: app-group
consumer: consumer-${random.int(1000)}
步骤3:创建消费者组
@Configuration
public class RedisConfig {
@Value("${app.redis.stream}")
private String streamKey;
@Value("${app.redis.group}")
private String groupName;
@Bean
public void createConsumerGroup(StringRedisTemplate redisTemplate) {
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
} catch (Exception e) {
System.out.println("消费者组已存在: " + groupName);
}
}
}
步骤4:配置消息监听容器
@Configuration
public class RedisConfig {
// 配置消息监听线程池
@Bean(name = "redisStreamTaskExecutor")
public ThreadPoolTaskExecutor redisStreamTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("redis-stream-");
return executor;
}
// 创建消息监听容器
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(
RedisConnectionFactory factory,
@Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.executor(executor)
.batchSize(10)
.build();
return StreamMessageListenerContainer.create(factory, options);
}
}
步骤5:注册消息监听器
@Component
public class StreamListenerRegistrar {
@Value("${app.redis.stream}")
private String streamKey;
@Value("${app.redis.group}")
private String groupName;
@Value("${app.redis.consumer}")
private String consumerName;
@PostConstruct
public void registerListener(StreamMessageListenerContainer container,
RedisMessageProcessor processor) {
StreamReadRequest<String> readRequest =
StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
.consumer(Consumer.from(groupName, consumerName))
.autoAcknowledge(false) // 手动ACK
.build();
container.register(readRequest, processor);
}
}
步骤6:实现消息处理器
@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> record) {
CompletableFuture.runAsync(() -> {
try {
// 业务处理逻辑
processBusiness(record);
// 处理成功发送ACK
redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
} catch (Exception e) {
// 失败消息进入Pending List
}
});
}
private void processBusiness(MapRecord<String, String, String> record) throws Exception {
String eventType = record.getValue().get("eventType");
String payload = record.getValue().get("payload");
// 根据事件类型处理
switch (eventType) {
case "ORDER_CREATED": handleOrder(payload); break;
case "PAYMENT_PROCESSED": handlePayment(payload); break;
}
}
}
步骤7:实现Pending消息处理器
@Component
@Slf4j
public class PendingMessageProcessor {
@Value("${app.redis.stream}")
private String streamKey;
@Value("${app.redis.group}")
private String groupName;
@Value("${app.redis.consumer}")
private String consumerName;
// 每分钟处理一次Pending消息
@Scheduled(fixedRate = 60000)
public void processPendingMessages() {
// 1. 查询Pending消息
PendingMessages pending = redisTemplate.opsForStream()
.pending(streamKey, groupName, Range.unbounded(), 100);
pending.forEach(this::handlePendingMessage);
}
private void handlePendingMessage(PendingMessage pending) {
try {
// 2. 重新认领消息
List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
.claim(streamKey,
Consumer.from(groupName, consumerName),
Duration.ofSeconds(30),
pending.getId());
if (!records.isEmpty()) {
MapRecord<String, String, String> record = records.get(0);
// 3. 重试处理
messageProcessor.processBusiness(record);
// 4. 处理成功发送ACK
redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
}
} catch (Exception e) {
// 5. 超过重试次数移入死信队列
if (pending.getTotalDeliveryCount() > 3) {
moveToDeadLetterQueue(pending);
}
}
}
private void moveToDeadLetterQueue(PendingMessage pending) {
// 获取消息内容
List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
.range(streamKey, Range.from(pending.getId()));
if (!records.isEmpty()) {
// 添加到死信队列
redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());
// 确认原始消息
redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());
}
}
}
步骤8:实现消息生产者
@Service
public class RedisMessageProducer {
@Value("${app.redis.stream}")
private String streamKey;
public String sendMessage(String eventType, String payload) {
Map<String, String> message = Map.of(
"eventType", eventType,
"payload", payload,
"timestamp", String.valueOf(System.currentTimeMillis())
);
return redisTemplate.opsForStream()
.add(streamKey, message)
.getValue();
}
}
步骤9:创建REST接口
@RestController
@RequestMapping("/messages")
public class MessageController {
private final RedisMessageProducer producer;
@PostMapping
public String sendMessage(@RequestBody MessageRequest request) {
return producer.sendMessage(request.getEventType(), request.getPayload());
}
@Data
public static class MessageRequest {
private String eventType;
private String payload;
}
}
三、消息生命周期流程图
1. 正常消息处理流程
2. Pending消息处理流程
3. ACK机制工作原理
四、生产环境建议
消费者命名策略
@Value("${app.redis.consumer}") private String consumerName; // 在应用启动时设置 @PostConstruct public void initConsumerName() { String hostName = InetAddress.getLocalHost().getHostName(); String port = environment.getProperty("server.port"); consumerName = "consumer-" + hostName + "-" + port; }
动态配置重试策略
app: pending: max_retry: 5 retry_interval: 30000 # 30秒
死信队列监控
@Scheduled(fixedRate = 3600000) // 每小时检查一次 public void checkDeadLetterQueue() { Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey); if (size > 0) { alertService.sendAlert("死信队列有 " + size + " 条未处理消息"); } }
消息TTL设置
// 发送消息时设置最大长度 public String sendMessage(String eventType, String payload) { MapRecord<String, String, String> record = ...; return redisTemplate.opsForStream() .add(Record.of(record).withMaxLen(10000).approximate(true)); }
六、总结
本文详细介绍了Spring Boot整合Redis实现发布/订阅功能并添加ACK机制的完整方案:
事件驱动架构:使用Redis Stream监听器实现真正的发布/订阅模式
可靠ACK机制:通过手动ACK确认确保消息可靠处理
自动恢复系统:Pending消息处理器自动处理失败消息
死信队列:隔离无法处理的消息,防止系统阻塞
生产就绪:包含多实例部署、动态配置、监控告警等生产级特性
该方案适用于需要高可靠性消息传递的场景,如订单处理、支付系统、事件溯源等,在保证系统吞吐量的同时提供了消息可靠性保障。