flink学习(11)——state

发布于:2024-11-29 ⋅ 阅读:(15) ⋅ 点赞:(0)

state

————保存历史数据

有状态计算和无状态计算

- 无状态计算:
  - 不需要考虑历史数据, 相同的输入,得到相同的输出!
  - 如:map, 将每个单词记为1, 进来一个hello, 得到(hello,1),再进来一个hello,得到的还是(hello,1)
- 有状态计算:
  - 需要考虑历史数据, 相同的输入,可能会得到不同的输出!
  - 如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。


具体区别:

从状态管理的方式上来说
Managed State是由Flink管理的,Flink帮忙存储、恢复和优化
Raw State是开发者自己管理的,需要自己序列化

从状态的数据结构上来说
Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。
Raw State只支持字节,任何上层数据结构需要序列化为字节数组。

从具体使用场景来说
绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State
Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。


对Managed State继续细分,它又有两种类型:Keyed State和Operator State。 以下的对Managed State的概述

Flink状态 
 - 托管状态
   - KeyedState ( 在keyBy之后可以使用状态 )
      - ValueState  (存储一个值)
      - ListState   (存储多个值)
      - MapState    (存储key-value) 
   - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]
 - 原生状态 (不用)

Keyed State(键控状态)

——用于分组后处理的数据,每个 key 都有自己的独立状态

- KeyedState ( 在keyBy之后可以使用状态 )
    - ValueState  (存储一个值)
    - ListState   (存储多个值)
    - MapState    (存储key-value) 
    - AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
    - ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
1、Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中
2、当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key

案例1:求最大值

1、使用KeyedState中的ValueState获取数据中的最大值(获取每个key的最大值)(实际中直接使用maxBy即可)
2、我们自己使用KeyState中的ValueState来模拟实现maxBy
代码实现:
package com.bigdata.day05;


/**
 * 1、键控状态现在只用于map flatmap 因为这两个是不能记录数据状态的,需要键控状态的帮助
 * 2、使用open进行初始化
 * 3、ValueState   只能存储一个值,用于存放最大值或者最小值
 * 4、keyBy+map 可以当作 maxBy
 **/
public class _04_stateDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L),
                Tuple2.of("北京", 7L)
        );
        //3. transformation-数据处理转换
        tupleDS.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
        
            ValueState<Long> state = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("valueState",Long.class);
                state = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2<String, Long> map(Tuple2<String, Long> value) throws Exception {
                if (state.value() == null){
                    state.update(value.f1);
                }else {
                    if (state.value() < value.f1){
                        state.update(value.f1);
                    }
                }
                return Tuple2.of(value.f0,state.value());
            }
        }).print();
                //.maxBy(1).print();

        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

解决方式二:

package com.bigdata.day05;


/**
 * 1、使用maxBy实现最大值
 * 2、和上面的区别在于由于map对于每一条数据都会有一个输出值,所以结果会有多个,只需看最后的即可
 * 
 * 
 **/
public class _04_stateDemo_maxby {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStream<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L),
                Tuple2.of("北京", 7L)
        );
        //3. transformation-数据处理转换
        tupleDS.keyBy(0).maxBy(1).print();
                //.maxBy(1).print();

        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

案例2

——如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

package com.bigdata.day05;

/**
 * 1、使用flatMap实现键控状态
 * 2、此时就没有一个合适的方法可以快速得到数据了
 * 3、reduce 只能相加,process 不能获取数量 app agg均不能使用 
 * 
 **/
public class _05_stateDemo_2 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
        dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] s = line.split(" ");
                return Tuple2.of(s[0],Integer.valueOf(s[1]));
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
        }).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {

            ValueState<Integer> valueState = null;
            ListState<Integer> listState = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);
                valueState = getRuntimeContext().getState(stateDescriptor);


                ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
                listState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public void flatMap(Tuple2<String, Integer> tuple2, Collector<Tuple2<String, ArrayList<Integer>>> out) throws Exception {
                if (tuple2.f1>38){
                    valueState.update(valueState.value()==null?1:(valueState.value()+1));
                    listState.add(tuple2.f1);
                }
                ArrayList<Integer> wendus = new ArrayList<>();
                if (valueState.value()!=null && valueState.value()>=3){
                    for (Integer wendu : listState.get()) {
                        wendus.add(wendu);
                    }
                    out.collect(Tuple2.of(tuple2.f0,wendus));
                }

            }
        }).print();
        env.execute();
    }
}

使用map实现

package com.bigdata.day05;


/**
 * 1、map的最后返回值是不能为null的必须有值
 * 2、而flatMap可以
 * 3、map会返回很多空
 * 
 **/
public class _05_stateDemo_2_reduce {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
        dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] s = line.split(" ");
                return Tuple2.of(s[0],Integer.valueOf(s[1]));
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, ArrayList<Integer>>>() {

            ValueState<Integer> valueState = null;
            ListState<Integer> listState = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<Integer>("numState",Integer.class);
                valueState = getRuntimeContext().getState(stateDescriptor);


                ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("listState", Integer.class);
                listState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public Tuple2<String, ArrayList<Integer>> map(Tuple2<String, Integer> tuple2) throws Exception {
                if (tuple2.f1>38){
                    valueState.update(valueState.value()==null?1:(valueState.value()+1));
                    listState.add(tuple2.f1);
                }
                ArrayList<Integer> wendus = new ArrayList<>();
                if (valueState.value()!=null && valueState.value()>=3){
                    for (Integer wendu : listState.get()) {

                        wendus.add(wendu);
                    }

                    return Tuple2.of(tuple2.f0,wendus);
                }
                return null;
            }
        }).print();
        env.execute();
    }
}

Operator State(操作符状态) 

- OperatorState ( 没有keyBy的情况下也可以使用 ) ————一般不使用