flink实现复杂kafka数据读取

发布于:2024-12-23 ⋅ 阅读:(13) ⋅ 点赞:(0)

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。

1.模拟数据

1.1 模拟数据

{
    "reportFormat": "2",
    "reportVersion": 1,
    "reports": [
        {
            "filename": "1733277155032RvReport",
            "c": {
                "objStationInfo": {
                    "sStationName": "LLP入口",
                    "ucStationDir": 1,
                    "sStationID": 500001
                },
                "objVehicle": {
                    "sUUID": "fdabd178-a169-11eb-9483-b95959072a9d",
                    "w64Timestamp": "1733881971628",
                    "objRfidInfo": {
                        "sReaderID": "10",
                        "objTagData": {
                            "sTID": "1234567891",
                            "sEPC": "1234567890"
                        }
                    },
                    "ucReportType": "8",
                    "ucVehicleType": "1"
                }
            }
        }
    ]
}

1.2 添加到kafka

使用kafka工具,kafkatool2,具体操作如下:
连接到kafka:
在这里插入图片描述
连接成功:
在这里插入图片描述
添加数据:
在这里插入图片描述
在这里插入图片描述
添加成功:
在这里插入图片描述

2.代码实现

2.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
 * EnvUtil
 * @description:
 */
public class EnvUtil {
    /**
     * 设置flink执行环境
     * @param parallelism 并行度
     */
    public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
        // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        if (parallelism >0 ){
            //设置并行度
            env.setParallelism(parallelism);
        } else {
            env.setParallelism(1);// 默认1
        }

        // 添加重启机制
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
        // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
        env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);
        //rocksdb状态后端,启用增量checkpoint
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        //设置checkpoint路径
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        // 同一时间只允许一个 checkpoint 进行(默认)
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //最小间隔,10*60*1000=60000
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        // 取消任务后,checkpoint仍然保存
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpoint容忍失败的次数
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        //checkpoint超时时间 默认10分钟
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
        //禁用operator chain(方便排查反压)
        env.disableOperatorChaining();
        return env;
    }

    public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        //设置时区 东八
        tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
        Configuration configuration = tenv.getConfig().getConfiguration();
        // 开启miniBatch
        configuration.setString("table.exec.mini-batch.enabled", "true");
        // 批量输出的间隔时间
        configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
        // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
        configuration.setString("table.exec.mini-batch.size", "20000");
        // 开启LocalGlobal
        configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
        //设置TTL API指定
        tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));

        return tenv;
    }

}

2.2 FlinkSourceUtil实现

FlinkSourceUtil用于连接kafka。

package com.zl.kafka.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;

/**
 * @desc:
 */
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {
	private String uniqueId;//flink生成的唯一键
	private long reportTime;// 过车时间
	private String dt;                           // 分区字段
	private String dh;                           // 小时

	private String reportFormat;
	private int reportVersion;
	private String filename;
	public String sStationName;    // 采集点名称
	public String ucStationDir;     // 采集点方向编号
	public String sStationID;      // 采集点编号
	private String sUUID;
	private long w64Timestamp;     //事件时间(毫秒级别)
	private String sReaderID;//射频设备(模块)代码
	private String sTIDR;
	private String sEPCR;
	private int ucReportType;//8->视频 2->射频 138,202->视频+射频
	private int ucVehicleType;

	public void parseTableColunm() {
		this.reportTime = this.w64Timestamp;
		this.uniqueId = this.sUUID;
	}
}

2.3 RvTable实现

RvTable解析数据最后存储的model。

package com.zl.kafka.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;

/**
 * @desc:
 */
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {
	private String uniqueId;//flink生成的唯一键
	private long reportTime;// 过车时间
	private String dt;                           // 分区字段
	private String dh;                           // 小时

	private String reportFormat;
	private int reportVersion;
	private String filename;
	public String sStationName;    // 采集点名称
	public String ucStationDir;     // 采集点方向编号
	public String sStationID;      // 采集点编号
	private String sUUID;
	private long w64Timestamp;     //事件时间(毫秒级别)
	private String sReaderID;//射频设备(模块)代码
	private String sTIDR;
	private String sEPCR;
	private int ucReportType;//8->视频 2->射频 138,202->视频+射频
	private int ucVehicleType;

	public void parseTableColunm() {
		this.reportTime = this.w64Timestamp;
		this.uniqueId = this.sUUID;
	}

}

2.4 核心逻辑实现

package com.zl.kafka;

import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

public class KafkaExample {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");

        /// 读取kafka数据
        SingleOutputStreamOperator<String> rvSourceStream = env
                .addSource(FlinkSourceUtil.getKafkaSource(
                        "rvGroup",
                        "rv-test",
                        "10.86.97.21:9092",
                        "earliest"))// earliest/latest
                .setParallelism(1).uid("getRV").name("getRV");

        // 解析转换数据格式
        SingleOutputStreamOperator<String> rvParseStream = null;
        try {
            rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) {
                    if (StringUtils.isEmpty(value)) {
                        return;
                    }
                    parseRVData(value, out);
                }
            }).setParallelism(1).uid("rvParse").name("rvParse");
        } catch (Exception e) {
            e.printStackTrace();
        }

        rvParseStream.print();

        env.execute("rvParseJob");

    }// main

    public static void parseRVData(String jsonStr, Collector<String> out) {
        try {
            if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {
                return;
            }
            JSONObject in = JSONObject.parseObject(jsonStr);
            // =====报告头信息 =====
            String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));
            int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));
            JSONArray reports = in.getJSONArray("reports");

            if (reports != null) {
                for (int i = 0; i < reports.size(); i++) {
                    RvTable rvTable = new RvTable();
                    JSONObject record = reports.getJSONObject(i);
                    if (record != null) {
                        String filename = stringDefaultIfEmpty(record.getString("filename"));
                        JSONObject c = record.getJSONObject("c");
                        if (c != null) {
                            // ===== 采集点信息 =====
                            JSONObject objStationInfo = c.getJSONObject("objStationInfo");
                            if(objStationInfo != null) {
                                rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));
                                rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));
                                rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));
                            }
                            JSONObject objVehicle = c.getJSONObject("objVehicle");
                            if (objVehicle != null) {
                                // ===== 车辆报告信息 =====
                                rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));
                                rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));
                                rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));
                                rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));
                                // ===== 车辆报告信息/射频车辆信息 =====
                                JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");
                                if (objRfidInfo != null) {
                                    rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));
                                    JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");
                                    if (objTagData != null) {
                                        rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));
                                        rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));
                                    }
                                }
                                // ===== 自加特殊处理字段 =====
                                long timestamp = rvTable.getW64Timestamp();
                                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
                                Date date = new Date(timestamp);
                                String[] s = simpleDateFormat.format(date).split(" ");
                                rvTable.setDt(s[0]);
                                rvTable.setDh(s[1]);
                                out.collect(JSONObject.toJSONString(rvTable));
                            }// if (objVehicle != null)
                        }// if (c != null)
                    }// if (record != null)
                }// for 循环
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 此处把解析后的数据存储到数据库……
        }
    }// parseRVData

    public static boolean isJSON(String str) {
        boolean result;
        try {
            JSON.parse(str);
            result = true;
        } catch (Exception e) {
            result = false;
        }
        return result;
    }

    public static int intDefaultIfEmpty(Integer num) {
        if (num == null) {
            num = 0;
            return num;
        }
        return num;
    }

    public static String stringDefaultIfEmpty(String str) {
        return StringUtils.defaultIfEmpty(str, "ENULL");
    }

    public static Long longDefaultIfEmpty(Long num) {
        if (num == null) {
            num = 0l;
            return num;
        }
        return num;
    }

    public static Double doubleDefaultIfEmpty(Double num) {
        if (num == null) {
            num = 0.0;
            return num;
        }
        return num;
    }
}

2.5 pom.xml

注意修改此处:
在这里插入图片描述

3.运行效果

3.1 运行日志

在这里插入图片描述

3.2 web UI

访问:http://IP:1000/
在这里插入图片描述
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5. 完整代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql


网站公告

今日签到

点亮在社区的每一天
去签到