Apache Flink 中的 状态(State) 是流处理中非常重要的概念,用于在无界数据流中保存中间结果、实现容错和一致性。Flink 提供了多种类型的状态管理机制,适用于不同的业务场景。
🧩 一、Flink 状态分类
Flink 的状态可以分为以下几类:
类型
描述
Operator State
操作符级别状态,绑定到具体的算子实例上,不与 key 绑定
Keyed State
键控状态,绑定到特定的 key 上,每个 key 有自己的独立状态
Broadcast State
广播状态,将一份全局配置或规则广播给所有任务并共享使用
📌 二、各类状态详解
1. Operator State(操作符状态)
✅ 特点:
不依赖于 key
所有数据共享同一个状态
常用于 Source、Sink 等不需要按 key 区分状态的场景
🧠 示例:
public class MyOperatorStateFunction extends RichMapFunction < Integer , Integer > {
private transient ListState < Integer > state;
@Override
public void open ( Configuration parameters) {
state = getRuntimeContext ( ) . getListState ( new ListStateDescriptor < > ( "my-state" , Integer . class ) ) ;
}
@Override
public Integer map ( Integer value) throws Exception {
state. add ( value) ;
return value * 2 ;
}
}
2. Keyed State(键控状态)
✅ 特点:
每个 key 都拥有自己的独立状态
支持高效的分布式状态管理
只能在 keyBy()
后的 keyed stream 中使用
⚙️ 支持的 Keyed State 类型:
类型
描述
ValueState<T>
存储单个值,如当前最大值
ListState<T>
存储列表,如最近 N 条记录
MapState<K,V>
存储键值对
ReducingState<T>
存储归约后的结果(如 sum、avg)
AggregatingState<IN, OUT>
自定义聚合函数,输出不同类型的结果
🧠 示例:使用 ValueState
记录当前最大值
public class MaxValueStateFunction extends RichMapFunction < Integer , Integer > {
private transient ValueState < Integer > maxValueState;
@Override
public void open ( Configuration parameters) {
maxValueState = getRuntimeContext ( ) . getState (
new ValueStateDescriptor < > ( "max-value" , Integer . class )
) ;
}
@Override
public Integer map ( Integer value) throws Exception {
Integer currentMax = maxValueState. value ( ) ;
if ( currentMax == null || value > currentMax) {
maxValueState. update ( value) ;
}
return maxValueState. value ( ) ;
}
}
3. Broadcast State(广播状态)
✅ 特点:
将一个状态流广播到所有下游任务
允许所有任务访问相同的只读状态(可变但需谨慎)
常用于配置更新、黑名单、规则匹配等场景
🧠 示例:广播配置流并与主数据流连接
DataStream < Rule > ruleStream = env. fromElements ( new Rule ( "blacklist" , Arrays . asList ( "user1" , "user2" ) ) ) ;
BroadcastStream < Rule > broadcastRuleStream = ruleStream. broadcast ( ) ;
mainStream
. connect ( broadcastRuleStream)
. process ( new KeyedBroadcastProcessFunction < String , Event , Rule , FilteredEvent > ( ) {
} ) ;
🗃 三、状态存储方式
Flink 使用 状态后端(State Backend) 来决定状态如何被存储、检查点和恢复。
后端类型
描述
MemoryStateBackend
默认状态后端,将状态保存在 JVM 堆内存中
FsStateBackend
将状态快照写入文件系统(如 HDFS、本地磁盘)
RocksDBStateBackend
使用嵌入式数据库 RocksDB 存储状态,支持超大状态
EmbeddedRocksDBStateBackend
RocksDB 的默认实现,Flink 1.13+ 推荐
🧠 设置方法:
env. setStateBackend ( new FsStateBackend ( "file:///tmp/checkpoints" ) ) ;
env. setStateBackend ( new EmbeddedRocksDBStateBackend ( ) ) ;
🔐 四、状态生命周期与容错机制
功能
描述
Checkpoints
定期将状态持久化,用于故障恢复
Savepoints
手动触发的状态快照,用于升级、迁移等
State TTL(Time to Live)
设置状态存活时间,自动清理过期状态
🧠 示例:设置状态过期时间
StateTtlConfig ttlConfig = StateTtlConfig . newBuilder ( Time . hours ( 1 ) )
. setUpdateType ( StateTtlConfig. UpdateType. OnCreateAndWrite )
. build ( ) ;
ValueStateDescriptor < Integer > descriptor = new ValueStateDescriptor < > ( "myState" , Integer . class ) ;
descriptor. enableTimeToLive ( ttlConfig) ;
📊 五、状态使用建议总结
场景
推荐状态类型
存储方式
无需按 key 分组的状态
Operator State
Memory / Fs
按 key 处理的数据流
Keyed State
Fs / RocksDB
全局配置/规则共享
Broadcast State
Memory
超大状态(GB级)
Keyed State + RocksDB
RocksDB
状态需要定期清理
Keyed State + TTL
Fs / RocksDB