state
————保存历史数据
有状态计算和无状态计算
- 无状态计算:
- 不需要考虑历史数据, 相同的输入,得到相同的输出!
- 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
- 有状态计算:
- 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
- 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。
具体区别:
从状态管理的方式上来说
Managed State是由Flink管理的,Flink帮忙存储、恢复和优化
Raw State是开发者自己管理的,需要自己序列化
从状态的数据结构上来说
Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。
Raw State只支持字节,任何上层数据结构需要序列化为字节数组。
从具体使用场景来说
绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State
Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。
对Managed State继续细分,它又有两种类型:Keyed State和Operator State。 以下的对Managed State的概述
Flink状态
- 托管状态
- KeyedState ( 在keyBy之后可以使用状态 )
- ValueState (存储一个值)
- ListState (存储多个值)
- MapState (存储key-value)
- OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]
- 原生状态 (不用)
Keyed State(键控状态)
——用于分组后处理的数据,每个 key 都有自己的独立状态
- KeyedState ( 在keyBy之后可以使用状态 )
- ValueState (存储一个值)
- ListState (存储多个值)
- MapState (存储key-value)
- AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
- ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
1、Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中
2、当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key
案例1:求最大值
1、使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)
2、我们自己使用KeyState中的ValueState来模拟实现maxBy
代码实现:
package com.bigdata.day05;
/**
* 1、键控状态现在只用于map flatmap 因为这两个是不能记录数据状态的,需要键控状态的帮助
* 2、使用open进行初始化
* 3、ValueState 只能存储一个值,用于存放最大值或者最小值
* 4、keyBy+map 可以当作 maxBy
**/
public class _04_stateDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//2. source-加载数据
DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L),
Tuple2.of("北京", 7L)
);
//3. transformation-数据处理转换
tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
ValueState<Long> state = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {
if (state.value() == null){
state.update(value.f1);
}else {
if (state.value() < value.f1){
state.update(value.f1);
}
}
return Tuple2.of(value.f0,state.value());
}
}).print();
//.maxBy(1).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
解决方式二:
package com.bigdata.day05;
/**
* 1、使用maxBy实现最大值
* 2、和上面的区别在于由于map对于每一条数据都会有一个输出值,所以结果会有多个,只需看最后的即可
*
*
**/
public class _04_stateDemo_maxby {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//2. source-加载数据
DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
Tuple2.of("北京", 1L),
Tuple2.of("上海", 2L),
Tuple2.of("北京", 6L),
Tuple2.of("上海", 8L),
Tuple2.of("北京", 3L),
Tuple2.of("上海", 4L),
Tuple2.of("北京", 7L)
);
//3. transformation-数据处理转换
tupleDS.keyBy(0).maxBy(1).print();
//.maxBy(1).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
案例2
——如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]
package com.bigdata.day05;
/**
* 1、使用flatMap实现键控状态
* 2、此时就没有一个合适的方法可以快速得到数据了
* 3、reduce 只能相加,process 不能获取数量 app agg均不能使用
*
**/
public class _05_stateDemo_2 {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] s = line.split(" ");
return Tuple2.of(s[0],Integer.valueOf(s[1]));
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
}).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {
ValueState<Integer> valueState = null;
ListState<Integer> listState = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);
valueState = getRuntimeContext().getState(stateDescriptor);
ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void flatMap(Tuple2<String, Integer> tuple2, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {
if (tuple2.f1>38){
valueState.update(valueState.value()==null?1:(valueState.value()+1));
listState.add(tuple2.f1);
}
ArrayList<Integer> wendus = new ArrayList<>();
if (valueState.value()!=null && valueState.value()>=3){
for (Integer wendu : listState.get()) {
wendus.add(wendu);
}
out.collect(Tuple2.of(tuple2.f0,wendus));
}
}
}).print();
env.execute();
}
}
使用map实现
package com.bigdata.day05;
/**
* 1、map的最后返回值是不能为null的必须有值
* 2、而flatMap可以
* 3、map会返回很多空
*
**/
public class _05_stateDemo_2_reduce {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] s = line.split(" ");
return Tuple2.of(s[0],Integer.valueOf(s[1]));
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {
ValueState<Integer> valueState = null;
ListState<Integer> listState = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);
valueState = getRuntimeContext().getState(stateDescriptor);
ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public Tuple2<String, ArrayList<Integer>> map(Tuple2<String, Integer> tuple2) throws Exception {
if (tuple2.f1>38){
valueState.update(valueState.value()==null?1:(valueState.value()+1));
listState.add(tuple2.f1);
}
ArrayList<Integer> wendus = new ArrayList<>();
if (valueState.value()!=null && valueState.value()>=3){
for (Integer wendu : listState.get()) {
wendus.add(wendu);
}
return Tuple2.of(tuple2.f0,wendus);
}
return null;
}
}).print();
env.execute();
}
}
Operator State(操作符状态)
- OperatorState ( 没有keyBy的情况下也可以使用 ) ————一般不使用