一、前言
本系列仅做个人笔记使用,内容大部分来自所引用文章,侵删。
本篇内容大部分来自 Spring Data Redis Stream的使用, 推荐阅读原文。
二、Redis Stream 简介
Redis Stream 是 Redis 5.0 引入的一种新的数据结构,它提供了一种强大的消息队列和日志记录解决方案,具有以下特点和用途:
- 消息队列:Redis Stream 可以用作高性能的消息队列。生产者可以将消息添加到流中,消费者可以从流中读取消息进行处理。它支持消费者组的概念,多个消费者可以组成一个组共同消费消息,并且可以记录每个消费者组的消费进度,确保消息不会被重复消费或遗漏。
- 有序日志记录:Stream 中的消息是有序的,每个消息都有一个唯一的 ID,这个 ID 按照添加的顺序递增。这使得它非常适合用于记录系统的操作日志、事件日志等。
- 消息持久化:Redis 本身支持数据持久化,Stream 中的消息也会被持久化到磁盘上,保证了消息不会因为 Redis 实例的重启而丢失。
数据结构丰富:每个消息可以包含多个字段和值,类似于一个小型的哈希表,这使得它可以存储更复杂的信息。
Redis Stream 为处理消息队列和日志记录等场景提供了一种高效、可靠且灵活的解决方案,在分布式系统、微服务架构等中得到了广泛的应用。
三、简单示例
1. 独立消费
独立消费指的是脱离消费组的直接消费Stream中的消息,是使用 xread方法读取流中的数据,流中的数据在读取后并不会被删除,还是存在的。
需要注意的是,XREAD 是 Redis Stream 用于从流中读取消息的命令,它单纯地负责获取消息,因此使用该种方式读取消息会失去消息队列的一些特性,如:
- 不支持消费者组的消息唯一性:XREAD读取时没有消费者组概念,所以如果多个程序同时使用xread读取,都是可以读取到消息的。
- 不支持消息的 ACK 特性 :Redis Stream 中消息确认机制主要与消费者组相关,消费者组中的消息确认操作是通过 XACK 命令来完成的。因此使用 XREAD 读取方式无法完成消息 ACK。
XREAD 方式示例代码如下:
@Slf4j
@Component
public class RedisStreamUtil {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void init() {
// 消息生产者每隔 5s 投递一个消息
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> {
final ObjectRecord<String, Book> record = StreamRecords.newRecord()
.in(Cosntants.STREAM_KEY_001)
.ofObject(new Book(DateUtil.now(), "test"))
.withId(RecordId.autoGenerate());
final RecordId recordId = redisTemplate.opsForStream()
.add(record);
log.info("[Redis 消息投递][recordId = {}]", recordId);
}, 0, 10, TimeUnit.SECONDS);
// 消息消费者
StreamReadOptions streamReadOptions = StreamReadOptions.empty()
// 如果没有数据,则阻塞1s 阻塞时间需要小于`spring.redis.timeout`配置的时间
.block(Duration.ofMillis(5000))
// 一直阻塞直到获取数据,可能会报超时异常
// .block(Duration.ofMillis(0))
// 1次获取10个数据
.count(10);
StringBuilder readOffset = new StringBuilder("0-0");
Executors.newSingleThreadExecutor()
.execute(() -> {
while (true) {
List<ObjectRecord<String, Book>> reads = redisTemplate.opsForStream()
.read(Book.class, streamReadOptions,
StreamOffset.create(
Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
if (CollectionUtils.isEmpty(reads)) {
log.info("[Redis 消息消费][readOffset = {}, 没有获取到数据]", readOffset);
}
for (ObjectRecord<String, Book> read : reads) {
log.info("[Redis 消息消费][id = {}, book = {}]", read.getId(), read.getValue());
readOffset.setLength(0);
// 设置消息偏移量,下次获取消息从该 id 之后
readOffset.append(read.getId());
}
}
});
}
}
运行日志如下:
Redis 中消费后的消息仍存在,如下图:
可以看到通过 XREAD 方式可以正确读取处消息,但是这种方式如果作为一个消息队列明显是不合适的,缺少了很多特性,因此还存在一种消费者组消费的方式,详见下文。
2. 消费者组进行消费
XREAD 的方式如果作为一个消息队列明细是不合适的,因此Redis 提供了通过消费者组的方式来对 Stream 处理。Redis Stream 中的消费者组的概念与 RocketMQ 中类似,具备如下特性:
- 消息分配与负载均衡
- 消息分配:消费者组中的多个消费者可以共同消费同一个流中的消息。当有新消息进入流时,Redis 会将消息分配给组内的某个消费者进行处理。例如,在一个订单处理系统中,多个消费者可以同时处理订单消息,提高处理效率。
- 负载均衡:Redis 会自动在消费者组的各个消费者之间实现负载均衡,将消息均匀地分发给不同的消费者,避免某个消费者负载过重。比如,有三个消费者 consumer1、consumer2 和 consumer3 在同一个消费者组中,新消息会依次轮流分配给它们。
- 消息确认机制
- 消息状态跟踪:消费者组会跟踪每条消息的状态,当消息被分配给某个消费者后,该消息会被标记为 “已交付(pending)”,直到消费者使用 XACK 命令确认消息已处理完毕。例如,消费者从流中读取到一条消息后,在完成业务逻辑处理后调用 XACK 命令,将该消息从 pending 列表中移除。
- 避免消息丢失:通过消息确认机制,即使消费者在处理消息过程中出现故障,未确认的消息仍然会保留在 pending 列表中,待消费者恢复后可以继续处理,确保消息不会丢失。
- 消费进度记录
- 独立进度:每个消费者组都有自己独立的消费进度记录,不同的消费者组可以从流的不同位置开始消费消息。例如,有两个消费者组 group1 和 group2,group1 可以从流的第一条消息开始消费,而 group2 可以从流的中间某个位置开始消费。
- 持续消费:消费者组会记录每个消费者的消费进度,当消费者重启或重新连接时,可以从上次停止的位置继续消费消息,保证消息的连续处理。
- 消息重试机制
- 失败重试:如果消费者在处理消息时失败,可以不确认该消息,该消息会一直保留在 pending 列表中,Redis 会定期将这些未确认的消息重新分配给组内的其他消费者或原消费者进行重试。例如,在一个数据同步系统中,如果某个消费者在同步数据时出现网络异常导致失败,消息会被重新分配处理。
- 重试次数控制:虽然 Redis 本身没有直接提供重试次数的控制,但可以在应用层实现重试次数的限制,避免无限重试。
- 可扩展性
- 动态添加消费者:可以随时向消费者组中动态添加新的消费者,以应对流量高峰或提高处理能力。例如,在电商大促期间,可以临时增加消费者来处理大量的订单消息。
- 支持分布式系统:消费者组非常适合分布式系统,多个消费者可以分布在不同的节点上,共同处理流中的消息,提高系统的整体性能和可靠性。
- 消息历史回溯
- 指定位置消费:消费者组可以从流的任意位置开始消费消息,包括历史消息。通过指定消息 ID 作为起始位置,消费者可以回溯到之前的某个时间点开始处理消息。例如,在进行数据修复或数据分析时,可以从特定的历史消息开始重新处理。
2.1 芋道源码
下面我们来看具体代码示例(参考自 https://github.com/YunaiV/ruoyi-vue-pro):
准备工作 :定义各种扩展接口
/** * {@link AbstractRedisMessage} 消息拦截器 * 通过拦截器,作为插件机制,实现拓展。 * 例如说,多租户场景下的 MQ 消息处理 * * @author 芋道源码 */ public interface RedisMessageInterceptor { default void sendMessageBefore(AbstractRedisMessage message) { } default void sendMessageAfter(AbstractRedisMessage message) { } default void consumeMessageBefore(AbstractRedisMessage message) { } default void consumeMessageAfter(AbstractRedisMessage message) { } } /** * Redis 消息抽象基类 * * @author 芋道源码 */ @Data public abstract class AbstractRedisMessage { /** * 头 */ private Map<String, String> headers = new HashMap<>(); public String getHeader(String key) { return headers.get(key); } public void addHeader(String key, String value) { headers.put(key, value); } } /** * Redis Channel Message 抽象类 * * @author 芋道源码 */ public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage { /** * 获得 Redis Channel,默认使用类名 * * @return Channel */ @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 public String getChannel() { return getClass().getSimpleName(); } }
创建一个 RedisTemplate 代理类,用于扩展 Redis Stream 相关功能
@AllArgsConstructor public class RedisMQTemplate { @Getter private final RedisTemplate<String, ?> redisTemplate; /** * 拦截器数组 */ @Getter private final List<RedisMessageInterceptor> interceptors = new ArrayList<>(); /** * 发送 Redis 消息,基于 Redis pub/sub 实现 * * @param message 消息 */ public <T extends AbstractRedisChannelMessage> void send(T message) { try { sendMessageBefore(message); // 发送消息 redisTemplate.convertAndSend(message.getChannel(), JSON.toJSONString(message)); } finally { sendMessageAfter(message); } } /** * 发送 Redis 消息,基于 Redis Stream 实现 * * @param message 消息 * @return 消息记录的编号对象 */ public <T extends AbstractRedisStreamMessage> RecordId send(T message) { try { sendMessageBefore(message); // 发送消息 return redisTemplate.opsForStream().add(StreamRecords.newRecord() .ofObject(JSON.toJSONString(message)) // 设置内容 .withStreamKey(message.getStreamKey())); // 设置 stream key } finally { sendMessageAfter(message); } } /** * 添加拦截器 * * @param interceptor 拦截器 */ public void addInterceptor(RedisMessageInterceptor interceptor) { interceptors.add(interceptor); } private void sendMessageBefore(AbstractRedisMessage message) { // 正序 interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message)); } private void sendMessageAfter(AbstractRedisMessage message) { // 倒序 for (int i = interceptors.size() - 1; i >= 0; i--) { interceptors.get(i).sendMessageAfter(message); } } }
创建 AbstractRedisStreamMessageListener 抽象类,在其中实现了消息拦截器的逻辑,用于各个消费者实现
/** * Redis Stream 监听器抽象类,用于实现集群消费 * * @param <T> 消息类型。一定要填写噢,不然会报错 * @author 芋道源码 */ public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage> implements StreamListener<String, ObjectRecord<String, String>> { /** * 消息类型 */ private final Class<T> messageType; /** * Redis Channel */ @Getter private final String streamKey; /** * Redis 消费者分组,默认使用 spring.application.name 名字 */ @Value("${spring.application.name}") @Getter private String group; /** * RedisMQTemplate */ @Setter private RedisMQTemplate redisMQTemplate; @SneakyThrows protected AbstractRedisStreamMessageListener() { this.messageType = getMessageClass(); this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey(); } @Override public void onMessage(ObjectRecord<String, String> message) { // 消费消息 T messageObj = JSON.parseObject(message.getValue(), messageType); try { consumeMessageBefore(messageObj); // 消费消息 this.onMessage(messageObj); // ack 消息消费完成 redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message); // TODO 芋艿:需要额外考虑以下几个点: // 1. 处理异常的情况 // 2. 发送日志;以及事务的结合 // 3. 消费日志;以及通用的幂等性 // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 } finally { consumeMessageAfter(messageObj); } } /** * 处理消息 * * @param message 消息 */ public abstract void onMessage(T message); /** * 通过解析类上的泛型,获得消息类型 * * @return 消息类型 */ @SuppressWarnings("unchecked") private Class<T> getMessageClass() { Type type = TypeUtil.getTypeArgument(getClass(), 0); if (type == null) { throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); } return (Class<T>) type; } private void consumeMessageBefore(AbstractRedisMessage message) { assert redisMQTemplate != null; List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors(); // 正序 interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); } private void consumeMessageAfter(AbstractRedisMessage message) { assert redisMQTemplate != null; List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors(); // 倒序 for (int i = interceptors.size() - 1; i >= 0; i--) { interceptors.get(i).consumeMessageAfter(message); } } }
向容器中注入 StreamMessageListenerContainer 对象。
/** * 创建 Redis Stream 集群消费的容器 * <p> * 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a> */ @Bean(initMethod = "start", destroyMethod = "stop") @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer( RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) { RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate(); checkRedisVersion(redisTemplate); // 第一步,创建 StreamMessageListenerContainer 容器 // 创建 options 配置 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) // 一次性最多拉取多少条消息 .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 // .executor(executor) // 运行 Stream 的 poll task // .keySerializer(RedisSerializer.string()) // 可以理解为 Stream Key 的序列化方式 // .hashKeySerializer(RedisSerializer.string()) // 可以理解为 Stream 后方的字段的 key 的序列化方式 // .hashValueSerializer(RedisSerializer.string()) // 可以理解为 Stream 后方的字段的 value 的序列化方式 // .pollTimeout(Duration.ofSeconds(1)) // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小 // .objectMapper(new ObjectHashMapper()) // ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比如:将Book对象转换成map // .errorHandler(t -> log.error("发生了异常", t)) // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理 .build(); // 创建 container 对象 StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); listeners.parallelStream().forEach(listener -> { log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", listener.getStreamKey(), listener.getClass().getName()); // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); } catch (Exception ignore) { } // 设置 listener 对应的 redisTemplate listener.setRedisMQTemplate(redisMQTemplate); // 创建 Consumer 对象 Consumer consumer = Consumer.from(listener.getGroup(), consumerName); // 设置 Consumer 消费进度,以最小消费进度为准 StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); // 设置 Consumer 监听 StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest .builder(streamOffset).consumer(consumer) .autoAcknowledge(false) // 不自动 ack .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false container.register(builder.build(), listener); log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", listener.getStreamKey(), listener.getClass().getName()); }); return container; } /** * 构建消费者名字,使用本地 IP + 进程编号的方式。 * 参考自 RocketMQ clientId 的实现 * * @return 消费者名字 */ private static String buildConsumerName() { return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); } /** * 校验 Redis 版本号,是否满足最低的版本号要求! */ private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) { // 获得 Redis 版本 Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info); String version = MapUtil.getStr(info, "redis_version"); // 校验最低版本必须大于等于 5.0.0 int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false)); if (majorVersion < 5) { throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", version)); } }
编写具体监听类, 当调用 “/redis/stream/sendMessage” 时 TestRedisStreamMessageListener 会收到 MQ 中的消息
@Slf4j @Component public class TestRedisStreamMessageListener extends AbstractRedisStreamMessageListener<TestRedisStreamMessageListener.TestMessage> { @Override public void onMessage(TestMessage message) { log.info("[收到 Redis Stream 消息内容 : {}]", message.message); } @Data public static class TestMessage extends AbstractRedisStreamMessage { private String message; public TestMessage() { } public TestMessage(String message) { this.message = message; } } } @RestController @RequestMapping("/redis/stream") public class RedisStreamController { @Resource private RedisMQTemplate redisMQTemplate; @PostMapping("sendMessage") public String sendMessage(String message) { return redisMQTemplate.send(new TestRedisStreamMessageListener.TestMessage(message)).getValue(); } }
消息接收如下图:
2.2 StreamListener 简析
Redis Stream 并不是 推
方式将消息发送给消费者,而是靠消费者主动 拉
消息,不过可以通过阻塞操作模拟出类似推(Push)的效果。那么既然本质上是 拉
消息, 自然是有一个循环或者定时任务在不停拉取消息的。
在上述代码中 spring-boot-starter-data-redis 为我们提供了一个 org.springframework.data.redis.stream.StreamListener 接口,我们只需要实现这个接口就可以完成Redis Stream 的监听。因此,循着 StreamListener 我们可以找到 org.springframework.data.redis.stream.StreamPollTask,在其中存在如下方法:
@Override
public void run() {
// 启动轮询状态,将状态设置为 starting
pollState.starting();
try {
// 标记当前线程处于事件循环中
isInEventLoop = true;
// 将轮询状态设置为 running,表示事件循环正在运行
pollState.running();
// 执行事件循环的核心逻辑
doLoop();
} finally {
// 事件循环结束,标记当前线程不在事件循环中
isInEventLoop = false;
}
}
private void doLoop() {
do {
try {
// 允许线程被中断
Thread.sleep(0);
// 从 Redis Stream 中读取消息记录
List<ByteRecord> raw = readRecords();
// 反序列化消息记录并将其发送出去进行处理
deserializeAndEmitRecords(raw);
} catch (InterruptedException e) {
// 若线程被中断,取消订阅
cancel();
// 重新设置中断状态,以便上层代码可以感知到中断
Thread.currentThread().interrupt();
} catch (RuntimeException e) {
// 根据异常情况判断是否取消订阅
if (cancelSubscriptionOnError.test(e)) {
cancel();
}
// 调用错误处理程序处理异常
errorHandler.handleError(e);
}
} while (pollState.isSubscriptionActive());
}
private List<ByteRecord> readRecords() {
// 这里的 readFunction 是从 org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer#getReadFunction 方法获取的
return readFunction.apply(pollState.getCurrentReadOffset());
}
这里的 org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer#getReadFunction,其实现如下
private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamReadRequest<K> streamRequest) {
// 将 StreamReadRequest 中的流键进行序列化
byte[] rawKey = ((RedisSerializer<K>) template.getKeySerializer())
.serialize(streamRequest.getStreamOffset().getKey());
// 判断请求是否为消费者组读取请求
if (streamRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest) {
// 将请求转换为消费者组读取请求类型
ConsumerStreamReadRequest<K> consumerStreamRequest = (ConsumerStreamReadRequest<K>) streamRequest;
// 根据是否自动确认消息,选择合适的读取选项
StreamReadOptions readOptions = consumerStreamRequest.isAutoAcknowledge() ? this.readOptions.autoAcknowledge()
: this.readOptions;
// 获取消费者信息
Consumer consumer = consumerStreamRequest.getConsumer();
// 返回一个函数,用于从消费者组中读取消息
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
.xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset)));
}
// 如果不是消费者组读取请求,返回一个普通读取消息的函数
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
.xRead(readOptions, StreamOffset.create(rawKey, offset)));
}
总的来说:StreamListener 就是 Spring 框架帮我们封装好了消息拉取的逻辑。