@Service
public class MessageConsumerService {
private final StringRedisTemplate redisTemplate;
private static final String PROCESSED_MSG_KEY = "processed_msgs";
private static final long EXPIRE_TIME = 7; // 过期时间(天)
public MessageConsumerService(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void processMessage(String msgId, String payload) {
// 1. 原子性检查消息是否已处理 + 添加到已处理集合;设置过期时间,避免集合无限增长
Boolean added = redisTemplate.opsForSet().add(
PROCESSED_MSG_KEY, msgId,7, TimeUnit.DAYS
);
// 2. 如果返回 null 或 false,表示消息已存在,直接返回
if (added == null || !added) {
log.info("消息已处理,跳过: {}", msgId);
return;
}
try {
// 4. 处理实际业务逻辑
doProcess(payload);
} catch (Exception e) {
// 5. 业务处理失败时,需移除已处理标记(可选)
redisTemplate.opsForSet().remove(PROCESSED_MSG_KEY, msgId);
throw e;
}
}
private void doProcess(String payload) {
// 业务逻辑实现
log.info("处理消息: {}", payload);
}
}
幂等性校验的关键要素
- 唯一消息 ID
消息发送方必须生成全局唯一的 msgId
常见实现方式:UUID、雪花算法(Snowflake)、业务主键哈希 - 原子性操作
必须保证 检查存在性 和 标记已处理 是原子操作
推荐使用 Redis 的 Lua 脚本或 SADD 命令(返回值可判断是否新增) - 过期时间设置
避免集合无限增长,占用过多内存
过期时间应大于消息可能的最大重试时间窗口(如 7 天) - 异常处理
业务处理失败时,是否需要回滚已处理标记?
若选择回滚,需保证失败时原子性移除标记
若不回滚,可能导致消息永久不被处理(需结合死信队列)