Flink解决广播配置流滞后数据流的问题

发布于:2022-12-31 ⋅ 阅读:(1210) ⋅ 点赞:(0)

Background

  • 在使用flink处理数据的时候,需要从mysql获取基础的配置信息去解析收到的数据;
  • 第一种方案使用缓存,程序启动时从mysql读取配置信息加载到缓存,后面定时更新缓存,解析数据时从缓存中获取配置信息,但这种方法存在很多问题,首先会增加数据库的负载,同时缓存更新及时性不佳;
  • 第二种方案使用flink-cdc加广播流,通过mysql日志文件获取数据,减少对数据库的负载,提高数据的一致性;把数据流和配置流合并,然后对数据进行解析,这种方案也会有个问题,无法保证配置信息加载完毕后再处理数据,比如说数据源已经有数据了,程序启动时,配置信息还没加载完,数据已经上来了,那这个时候解析就会有问题;
  • 这里给出第二种方案问题的解决方法,在flink官方中文邮箱列表中找到了一个合理的解决思路,在处理数据流的方法中使用ListState对数据进行“缓冲”,等到配置信息加载完毕后再对“缓冲”的数据进行处理,下面是我自己的实现。

处理思路(后补的哈)

大致说下处理思路哈:
1.程序启动,数据流元素和配置流元素开始加载,不分先后;
2.当接收到数据流元素时,首先判断配置流元素是否加载完毕(判断逻辑后面介绍),若未加载完毕,会把数据缓存起来,因为流处理是数据驱动的,数据不来,缓存的数据就不会被处理,所以为了解决这种情况,同时创建了一个定时器,默认30秒后自动把缓存的数据处理掉;缓存了一定的数据,如果当下一个数据流元素来的时候发现配置加载完毕了,然后首先把缓存的数据处理掉,同时清除缓存、定时处理标志等,接着处理收到的数据,后面就正常流转了;
3.然后说下配置加载完毕的判断逻辑,我这里是根据两个条件判断的,一是配置流元素加载的个数,即配置流元素每次加载都会累加,二是配置流元素更新的加载时间,即配置流元素每次加载都会把当前加载的时间更新下;如果加载的个数大于0,同时更新的加载时间和当前时间相差大于2秒,我即判断为加载完毕。

源码

  • 懒得赘述了,注释的已经很清楚了,不懂的评论里问,不要私信,这样大家也可以看到相同的问题,不用和每个说一遍相同的问题,记得先一键三连哈
    在这里插入图片描述
  • DataParseFunc
package com.yl.flink.processor;

import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.DateUtil;
import com.yl.constant.Const;
import com.yl.entity.MultiDataEntity;
import com.yl.entity.cdc.MysqlCdcEntity;
import com.yl.util.FlinkUtil;
import com.yl.util.PayloadParseUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * @author wlf
 * @apiNote 数据解析
 * @since 2022/9/2
 */
@Slf4j
public class DataParseFunc extends KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity> {

    // 配置信息未加载完毕时先缓存数据流元素
    private transient ListState<String> payloads;

    // 未设置定时输出缓存流元素的定时器,默认未设置
    private transient ValueState<Boolean> setTimer;

    /**
     * 初始化资源配置
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        payloads = getRuntimeContext().getListState(new ListStateDescriptor<>(Const.S_PAYLOAD_STA, TypeInformation.of(new TypeHint<>() {
        })));
        setTimer = getRuntimeContext().getState(new ValueStateDescriptor<>(Const.S_TIMER_SET, TypeInformation.of(new TypeHint<>() {
        })));
    }

    /**
     * 处理数据流元素
     *
     * @param payload 数据流元素
     * @param ctx     上下文
     * @param out     流输出器
     */
    @Override
    public void processElement(String payload, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.ReadOnlyContext ctx, Collector<MultiDataEntity> out) throws Exception {
        ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor());
        // 数据流元素来时首先判断一下配置信息是否加载完毕
        if (configReady(state)) {
            // 如果加载完毕了,判断一下是否有缓存的数据需要处理一下
            if (null != setTimer.value() && setTimer.value()) {
                for (String element : payloads.get()) {
                    output(element, state, out);
                }
                // 处理后把缓存清空
                payloads.clear();
                // 同时修改定时处理缓存的标志,说明缓存已经没数据了,不需要定时处理缓存数据了
                setTimer.update(false);
            }
            // 处理流元素
            output(payload, state, out);
        } else {
            // 如果配置信息还没有加载完毕,先把流元素缓存起来
            payloads.add(payload);
            // 只需要一个定时器就行了,到时执行一次就可以把缓存的所有数据处理掉
            if (null == setTimer.value() || !setTimer.value()) {
                // 注册定时器,如果后面没来数据,默认30秒后把缓存的数据发送到下游
                long fireTime = ctx.timerService().currentProcessingTime() + 30_000;
                ctx.timerService().registerProcessingTimeTimer(fireTime);
                setTimer.update(true);
            }
        }
    }

    /**
     * 处理配置流元素
     *
     * @param config 配置流元素
     * @param ctx    上下文
     * @param out    流输出器
     */
    @Override
    public void processBroadcastElement(MysqlCdcEntity config, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.Context ctx, Collector<MultiDataEntity> out) throws Exception {
        // 广播状态
        BroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor());
        // 每来一个配置流元素都会更新配置缓存的失效时间
        updateCache(state);
        // 根据操作类型更新广播数据状态
        FlinkUtil.updateState(state, config);
    }

    /**
     * 定时输出缓存的数据
     * 配置流未加载完毕时缓存此时过来的数据流元素,缓存的数据流元素只有等下一个流元素来的时候才会触发后续的处理操作,这里的定时器可以定时输出缓存的数据
     *
     * @param timestamp 定时器触发时间
     * @param ctx       上下文
     * @param out       输出器
     */
    @Override
    public void onTimer(long timestamp, KeyedBroadcastProcessFunction<String, String, MysqlCdcEntity, MultiDataEntity>.OnTimerContext ctx, Collector<MultiDataEntity> out) throws Exception {
        ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state = ctx.getBroadcastState(FlinkUtil.getConfigDescriptor());
        for (String element : payloads.get()) {
            output(element, state, out);
        }
        // 输出后把缓存清空防止重复输出
        payloads.clear();
        // 执行完任务删除定时器
        ctx.timerService().deleteProcessingTimeTimer(timestamp);
        // 同时更新定时器的状态
        setTimer.update(false);
    }

    /**
     * 把流元素输出到下游
     */
    private void output(String payload, ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state, Collector<MultiDataEntity> out) throws Exception {
        // 解析数据,并把解析后的数据发送到下游
        PayloadParseUtil
                .parse(payload, state)
                .forEach(out::collect);
    }

    /**
     * 每来一个配置流元素进行个数统计,同时更新加载时间
     */
    private void updateCache(BroadcastState<String, List<MysqlCdcEntity>> state) throws Exception {
        MysqlCdcEntity config;
        if (state.contains(Const.S_CONFIG_STA)) {
            config = state.get(Const.S_CONFIG_STA).get(0);
            // 统计的是流元素的个数
            config.setConfigCount(config.getConfigCount() + 1);
            // 更新加载时间
            config.setConfigTs(DateUtil.date().getTime());
        } else {
            config = MysqlCdcEntity.builder()
                    .configCount(1)
                    .configTs(DateUtil.date().getTime())
                    .build();
        }
        // 动态更新广播流状态
        state.put(Const.S_CONFIG_STA, ListUtil.toList(config));
    }

    /**
     * 配置流元素是否配置完毕
     * 正常情况下配置流元素更新的时间和当前时间相差大于2秒代表配置完成
     * 配置加载完毕判断的两个必要条件:
     * 1.配置流元素加载的个数大于0;
     * 2.配置流元素加载时更新的加载时间和当前时间相差大于2秒;
     */
    private Boolean configReady(ReadOnlyBroadcastState<String, List<MysqlCdcEntity>> state) throws Exception {
        long nowTs = DateUtil.date().getTime();
        if (state.contains(Const.S_CONFIG_STA)) {
            MysqlCdcEntity config = state.get(Const.S_CONFIG_STA).get(0);
            long configTs = config.getConfigTs();
            return config.getConfigCount() > 0 && Math.abs((nowTs - configTs) / 1000) > 2;
        }
        return false;
    }

}

本文含有隐藏内容,请 开通VIP 后查看