Redis广播缓存优化
📋 目录
1. 项目背景
1.1 业务场景
系统 : System 基于Flink的实时检测系统
功能 : 动态App过滤,根据IP地址匹配App配置信息
数据量 : Redis中存储8000条App配置数据
并行度 : Flink任务运行在300个并行度
1.2 原始架构问题
┌─────────────────────────────────────────────────────────────┐
│ 原始架构 - 问题严重 │
├─────────────────────────────────────────────────────────────┤
│ TaskManager 1 TaskManager 2 ... TaskManager N │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Slot1 Slot2 │ │ Slot1 Slot2 │ ... │ Slot1 Slot2 │ │
│ │ ┌───┐ ┌───┐ │ │ ┌───┐ ┌───┐ │ │ ┌───┐ ┌───┐ │ │
│ │ │ ❌│ │ ❌│ │ │ │ ❌│ │ ❌│ │ ... │ │ ❌│ │ ❌│ │ │
│ │ └───┘ └───┘ │ │ └───┘ └───┘ │ │ └───┘ └───┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Redis (CPU 100%) │ │
│ │ 每30分钟: 300并行度 × 8000条 = 240万次查询 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. 问题分析
2.1 核心问题
🔥 Redis性能瓶颈
总查询次数 = 300 个并行度 × 8000 条数据 = 2,400 ,000次查询/30分钟
平均QPS = 2,400 ,000 ÷ ( 30 × 60 ) = 1,333 QPS
峰值QPS = 远超平均值(集中查询时)
结果: Redis CPU 100 %,系统不可用
💾 内存资源浪费
单条App数据大小 ≈ 1KB
每个算子缓存 = 8000 条 × 1KB = 8MB
总内存使用 = 8MB × 300 个算子 = 2 .4GB
实际TaskManager内存 = 2 .4GB ÷ 10 个TM = 240MB/TM
内存利用率 = 极低,大量重复数据
⏰ 数据一致性问题
更新周期 = 30 分钟
数据一致性窗口 = 0 -30分钟不等
业务影响 = 新增App配置生效延迟
2.2 问题根因分析
问题类型
根本原因
影响程度
业务风险
Redis压力
每个算子独立查询全量数据
🔴 严重
系统不可用
内存浪费
300个算子重复缓存相同数据
🟡 中等
资源浪费
扩展性差
数据量增长时问题指数级恶化
🔴 严重
无法扩展
维护困难
缓存逻辑分散在各个算子中
🟡 中等
开发效率低
3. 解决方案
3.1 方案演进路径
graph TD
A[原方案: 每个算子独立缓存] --> B[方案1: Hash存储优化]
B --> C[方案2: Async I/O实时查询]
C --> D[方案3: 单点加载+广播分发]
D --> E[方案4: 模板化设计]
A --> A1[❌ Redis CPU 100%]
B --> B1[✅ 查询减少99.99%]
C --> C1[❌ 仍有网络压力]
D --> D1[✅ 内存节省96%]
E --> E1[✅ 高度可复用]
3.2 最终方案:单点加载+广播分发
3.2.1 架构设计
┌─────────────────────────────────────────────────────────────┐
│ 优化后架构 - 性能卓越 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 单点数据源 (并行度=1) │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ AppBroadcastSource │ │ │
│ │ │ - 5分钟刷新一次 │ │ │
│ │ │ - SCAN + Pipeline批量获取 │ │ │
│ │ │ - 8000次查询 (vs 240万次) │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ 广播分发 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 广播状态层 │ │
│ │ 每个TaskManager存储一份数据 (80MB vs 2.4GB) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ TaskManager 1 TaskManager 2 ... TaskManager N│ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ │
│ │ │ ✅共享状态 │ │ ✅共享状态 │ ... │ ✅共享状态 ││ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘│ │
│ └─────────────────────────────────────────────────────────┐ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Redis (CPU < 1%) │ │
│ │ 每5分钟: 1个数据源 × 8000条 = 8000次查询 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
3.2.2 核心组件
1. 数据源层 - AbstractRedisBroadcastSource
public abstract class AbstractRedisBroadcastSource < T > extends RichSourceFunction < T > {
private final String keyPattern;
private final long refreshIntervalMs;
protected abstract T parseRedisData ( String key, String value) ;
protected abstract boolean isValidData ( T data) ;
protected abstract String getSourceName ( ) ;
- 使用SCAN 替代KEYS ,避免阻塞
- Pipeline 批量获取,减少网络往返
- 可中断等待,支持优雅停止
- 线程安全的队列管理
}
2. 处理器层 - AbstractBroadcastProcessFunction
public abstract class AbstractBroadcastProcessFunction < IN, BROADCAST, OUT>
extends BroadcastProcessFunction < IN, BROADCAST, OUT> {
public abstract MapStateDescriptor < String , BROADCAST> getBroadcastStateDescriptor ( ) ;
protected abstract String extractLookupKey ( IN input) ;
protected abstract boolean processMatched ( IN input, BROADCAST data, Collector < OUT> out) ;
protected abstract String extractStorageKey ( BROADCAST data) ;
- 自动状态管理
- 冷启动保护
- 异常恢复机制
- 调试日志支持
}
3. 模板层 - BroadcastTemplate
public class BroadcastTemplate < IN, BROADCAST, OUT> {
public static < IN, BROADCAST, OUT> Builder < IN, BROADCAST, OUT> builder ( ) {
return new Builder < > ( ) ;
}
public DataStream < OUT> applyTo ( DataStream < IN> mainStream) {
}
}
4. 方案对比
4.1 性能指标对比
指标
原方案
Hash优化
Async I/O
广播方案
提升幅度
Redis QPS
1,333
0.17
8,000
0.17
↓99.99%
内存使用
2.4GB
2.4GB
可控
80MB
↓96.7%
网络连接
3,000个
300个
3,000个
10个
↓99.7%
启动时间
5-10分钟
5-10分钟
30秒
30秒
↓90%
数据一致性
30分钟
30分钟
实时
5分钟
↑83%
扩展性
线性恶化
线性恶化
线性增长
常数级
质的飞跃
4.2 资源使用对比
Redis压力对比
每30分钟查询: 300 × 8000 = 2,400 ,000次
平均QPS: 1,333
峰值QPS: 5,000 + ( 集中查询)
CPU使用率: 100 %
状态: 🔴 系统不可用
每5分钟查询: 1 × 8000 = 8,000 次
平均QPS: 0.17
峰值QPS: 50 ( Pipeline批量)
CPU使用率: < 1 %
状态: ✅ 系统正常
内存使用对比
算子数量: 300 个
每算子内存: 8MB
总内存: 2 .4GB
分布: 分散在各TaskManager
利用率: 极低 ( 重复数据)
TaskManager数量: 10 个
每TM内存: 8MB
总内存: 80MB
分布: TaskManager级别共享
利用率: 高 ( 无重复数据)
4.3 可靠性对比
可靠性指标
原方案
广播方案
改进说明
故障恢复
慢 (5-10分钟)
快 (30秒)
无需预加载全量数据
Redis故障影响
严重 (系统不可用)
轻微 (继续使用缓存)
降级机制
内存溢出风险
高
低
内存使用可控
网络故障影响
严重
轻微
减少网络依赖
数据一致性
差 (30分钟)
好 (5分钟)
更频繁的更新
5. 架构设计
5.1 整体架构流程图
Flink集群
数据源层 (并行度=1)
广播层
TaskManager集群
业务流处理
TM1
TM2
TMN
Redis存储层
主数据流 业务数据
DynamicAppFilterFunction 广播处理函数
输出流 过滤后数据
BroadcastState 8MB共享状态
算子1
算子2
算子...
BroadcastState 8MB共享状态
算子1
算子2
算子...
BroadcastState 8MB共享状态
算子1
算子2
算子...
BroadcastStream 广播状态分发
AppBroadcastSource - SCAN keys - Pipeline批量获取 - 5分钟刷新
system:app:info:192.168.1.1
system:app:info:192.168.1.2
system:app:info:...
system:app:info:192.168.1.8000
5.2 数据流转流程
Redis
AppBroadcastSource
广播流
TaskManager
处理函数
主数据流
输出流
系统启动阶段
SCAN system:app:info:*
返回8000个keys
Pipeline批量GET
返回8000条数据
发送App数据
广播到所有TM
存储到BroadcastState
数据处理阶段
业务数据 (含IP)
查询BroadcastState
返回App配置
匹配language与pid_tree
输出增强后的数据
丢弃数据
alt
[匹配成功]
[匹配失败]
定期刷新阶段 (每5分钟)
重新SCAN和GET
返回最新数据
发送更新数据
更新BroadcastState
Redis
AppBroadcastSource
广播流
TaskManager
处理函数
主数据流
输出流
5.3 原方案 vs 新方案流程对比
原方案流程
原方案 - 每个算子独立缓存
查询Redis 8000次
算子1启动
查询Redis 8000次
算子2启动
查询Redis 8000次
算子...
查询Redis 8000次
算子300启动
本地缓存8000条
本地缓存8000条
本地缓存8000条
本地缓存8000条
30分钟后重新查询
30分钟后重新查询
30分钟后重新查询
30分钟后重新查询
新方案流程
新方案 - 单点加载广播分发
查询Redis 8000次
单点数据源启动
广播到所有TaskManager
TM1共享状态
TM2共享状态
TM...共享状态
TM10共享状态
算子1-30共享访问
算子31-60共享访问
算子...共享访问
算子271-300共享访问
5分钟定时器
6. 性能提升
6.1 Redis性能提升
查询次数优化
每次更新: 300 个算子 × 8000 条数据 = 2,400 ,000次查询
更新频率: 每30分钟
日查询量: 2,400 ,000 × 48 = 115,200 ,000次/天
Redis状态: CPU 100 %,不可用
每次更新: 1 个数据源 × 8000 条数据 = 8,000 次查询
更新频率: 每5分钟
日查询量: 8,000 × 288 = 2,304 ,000次/天
Redis状态: CPU < 1 %,正常运行
查询减少: 99.99 %
CPU使用: 从100% → < 1 %
系统可用性: 从不可用 → 高可用
网络优化
for key in keys:
value = redis.get( key)
pipeline = redis.pipeline( )
for key in keys:
pipeline.get( key)
results = pipeline.execute( )
6.2 内存使用优化
内存分布对比
原方案内存分布:
┌─────────────────────────────────────────────────────────┐
│ TaskManager 1 (240MB) TaskManager 2 (240MB) ... │
│ ┌─────┬─────┬─────┐ ┌─────┬─────┬─────┐ │
│ │ 8MB │ 8MB │ 8MB │ │ 8MB │ 8MB │ 8MB │ ... │
│ │算子1│算子2│算子3│ │算子1│算子2│算子3│ │
│ └─────┴─────┴─────┘ └─────┴─────┴─────┘ │
│ 重复数据 ❌ 重复数据 ❌ │
└─────────────────────────────────────────────────────────┘
总内存: 2.4GB (大量重复)
新方案内存分布:
┌─────────────────────────────────────────────────────────┐
│ TaskManager 1 (8MB) TaskManager 2 (8MB) ... │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ 共享广播状态 │ │ 共享广播状态 │ ... │
│ │ 8MB │ │ 8MB │ │
│ │ ┌───┬───┬───┐ │ │ ┌───┬───┬───┐ │ │
│ │ │算1│算2│算3│ │ │ │算1│算2│算3│ │ ... │
│ │ └───┴───┴───┘ │ │ └───┴───┴───┘ │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ 共享访问 ✅ 共享访问 ✅ │
└─────────────────────────────────────────────────────────┘
总内存: 80MB (无重复)
6.3 启动性能优化
启动阶段
原方案
新方案
提升
数据加载
300个算子并发加载
1个数据源加载
启动冲突消除
Redis压力
240万次查询峰值
8000次查询
压力减少99.67%
加载时间
5-10分钟
30秒
时间减少90%
失败率
高 (Redis超载)
低 (压力可控)
可靠性大幅提升
7. 模板化设计
7.1 设计思想
7.1.1 抽象层次设计
┌─────────────────────────────────────────────────────────┐
│ 模板化架构 │
├─────────────────────────────────────────────────────────┤
│ 应用层 │ App广播 │ 规则广播 │ 配置广播 │ ... │
│ │ 方案 │ 方案 │ 方案 │ │
├─────────────────────────────────────────────────────────┤
│ 模板层 │ BroadcastTemplate │
│ │ (一站式方案模板) │
├─────────────────────────────────────────────────────────┤
│ 处理器层 │ AbstractBroadcastProcessFunction │
│ │ (抽象广播处理器) │
├─────────────────────────────────────────────────────────┤
│ 数据源层 │ AbstractRedisBroadcastSource │
│ │ (抽象Redis数据源) │
├─────────────────────────────────────────────────────────┤
│ 基础层 │ Flink BroadcastProcessFunction │
│ │ RichSourceFunction │
└─────────────────────────────────────────────────────────┘
7.1.2 模板参数化设计
AbstractRedisBroadcastSource < T > (
String keyPattern,
int refreshIntervalMinutes,
)
AbstractBroadcastProcessFunction < IN, BROADCAST, OUT> {
extractLookupKey ( IN input)
processMatched ( IN , BROADCAST , OUT )
extractStorageKey ( BROADCAST data)
isValidBroadcastData ( BROADCAST data)
}
BroadcastTemplate . < IN, BROADCAST, OUT> builder ( )
. source ( dataSource)
. processor ( processor)
. sourceName ( name)
. processorName ( name)
. build ( )
7.2 模板使用流程
是
否
业务需求
需要Redis广播缓存?
确定三个参数
使用其他方案
Redis Key模式
更新周期
数据结构
继承AbstractRedisBroadcastSource
实现3个抽象方法
parseRedisData - 数据解析
isValidData - 数据验证
getSourceName - 源名称
继承AbstractBroadcastProcessFunction
实现4个抽象方法
extractLookupKey - 查询键提取
processMatched - 匹配处理
extractStorageKey - 存储键提取
isValidBroadcastData - 数据验证
使用BroadcastTemplate组装
一行代码应用到数据流
完成 - 获得高性能广播方案
7.3 模板复用性分析
7.3.1 代码复用率
原始开发: 200 -300行 ( 数据源 + 处理器 + 连接逻辑)
模板开发: 30 -50行 ( 继承 + 实现抽象方法)
代码减少: 85 %
原始开发: 2 -3天 ( 设计 + 开发 + 测试)
模板开发: 2 -4小时 ( 实现 + 测试)
时间减少: 90 %
7.3.2 适用场景
场景类型
适用性
开发复杂度
示例
配置广播
✅ 完美适用
极低
App配置、规则配置
字典广播
✅ 完美适用
低
IP黑名单、用户配置
实时更新
✅ 适用
中
动态规则、实时配置
复杂处理
⚠️ 需定制
中高
多表关联、复杂计算
7.4 模板扩展性设计
用户配置实现
IP黑名单实现
规则实现
App实现
核心模板
UserConfigSource
UserConfigProcessor
用户配置完整方案
IpBlacklistSource
IpBlacklistProcessor
IP黑名单完整方案
RuleBroadcastSource
RuleProcessFunction
规则完整方案
AppBroadcastSource
DynamicAppFilterFunction
App完整方案
AbstractRedisBroadcastSource
AbstractBroadcastProcessFunction
BroadcastTemplate
8. 实施指南
8.1 迁移步骤
阶段1: 准备阶段 (1天)
- 部署新的模板代码
- 保留原有代码作为回滚备份
- 配置监控和日志
- 验证Redis连接池配置
- 检查Flink集群资源
- 准备监控Dashboard
阶段2: 灰度测试 (2-3天)
- 选择1个TaskManager进行测试
- 监控Redis压力变化
- 验证数据正确性
- 对比内存使用情况
- 监控启动时间
- 检查数据一致性
阶段3: 全量部署 (1天)
- 更新Flink任务配置
- 重启Flink集群
- 实时监控系统状态
- Redis CPU使用率 < 1 %
- 内存使用减少 > 95 %
- 启动时间 < 1 分钟
8.2 监控指标
8.2.1 Redis监控
- CPU使用率: 目标 < 5 %
- QPS: 目标 < 100
- 连接数: 目标 < 50
- 内存使用: 监控增长趋势
- CPU > 50 %: 警告
- CPU > 80 %: 严重
- QPS > 500 : 警告
- 连接数 > 100 : 警告
8.2.2 Flink监控
- TaskManager内存: 监控广播状态大小
- 启动时间: 目标 < 2 分钟
- 数据延迟: 监控处理延迟
- 错误率: 目标 < 0.1 %
- 广播状态 > 50MB: 警告
- 启动时间 > 5 分钟: 警告
- 数据延迟 > 10 秒: 警告
- 错误率 > 1 %: 严重
8.3 风险控制
8.3.1 回滚方案
1 . 停止新版本Flink任务
2 . 启动备份的原版本任务
3 . 验证系统恢复正常
4 . 分析问题原因
- Redis CPU > 80 %持续5分钟
- Flink任务启动失败
- 数据正确性问题
- 业务指标异常
8.3.2 应急预案
1 . 立即启用Redis读写分离
2 . 增加Redis实例
3 . 临时调整刷新频率
1 . 检查资源配置
2 . 调整并行度
3 . 重启TaskManager
1 . 对比新旧数据
2 . 检查解析逻辑
3 . 验证Redis数据
8.4 最佳实践
8.4.1 配置优化
taskmanager.memory.managed.fraction : 0.6
state.backend.incremental : true
state.checkpoints.num-retained : 5
maxmemory-policy : allkeys- lru
timeout : 300
tcp-keepalive : 60
8.4.2 开发规范
- 继承AbstractRedisBroadcastSource
- 实现所有抽象方法
- 添加详细的日志和异常处理
- 编写单元测试
- 继承AbstractBroadcastProcessFunction
- 验证输入数据的有效性
- 处理边界情况和异常
- 添加性能监控点
- 使用构建器模式创建模板
- 设置合适的名称便于监控
- 配置合理的刷新间隔
- 添加业务监控指标
9. 总结
9.1 核心成果
🚀 性能提升显著
Redis查询减少99.99%
内存使用减少96.7%
启动时间减少90%
🛡️ 系统可靠性提升
Redis CPU从100% → <1%
系统从不可用 → 高可用
数据一致性从30分钟 → 5分钟
🔧 开发效率提升
模板化设计,代码复用率85%
开发时间从2-3天 → 2-4小时
维护成本大幅降低
9.2 技术价值
架构创新 : 单点加载+广播分发模式
模板化设计 : 高度抽象和可复用的框架
性能优化 : 多维度的系统性能提升
工程实践 : 完整的实施和监控方案
9.3 业务价值
成本节约 : 减少Redis资源需求,降低基础设施成本
稳定性提升 : 系统高可用,减少故障和维护成本
扩展性增强 : 支持业务快速增长,无需担心性能瓶颈
开发加速 : 新功能快速上线,提升业务响应速度
10. 附录
10.1 相关代码文件
📦 核心文件列表
├── AbstractRedisBroadcastSource.java # 抽象Redis数据源
├── AppBroadcastSource.java # App数据源实现
├── AbstractBroadcastProcessFunction.java # 抽象广播处理器
├── DynamicAppFilterFunction.java # App处理器实现
├── BroadcastTemplate.java # 广播方案模板
└── SystemDynamicConsume.java # 业务应用代码
10.2 性能测试数据
测试场景
原方案
新方案
提升比例
Redis QPS峰值
5000+
50
99%
内存使用峰值
2.4GB
80MB
96.7%
启动时间
8分钟
45秒
90.6%
CPU使用率
100%
<1%
99%
结果
优化前
优化后
参考资料