【flink】 flink 读取debezium-json数据获取数据操作类型op/rowkind方法

发布于:2025-06-22 ⋅ 阅读:(14) ⋅ 点赞:(0)

flink 读取debezium-json数据获取数据操作类型op/rowkind方法。
op类型有c(create),u(update),d(delete)
参考官网案例:此处的"op": "u",就是操作类型。

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}

思路:添加自定义metareader
具体修改方法:去github flink 下载flink源码,选择tags,后按照自己需要的版本下载。
至idea中。修改org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat

enum ReadableMetadata枚举类中添加枚举对象:
注意:operation.type可以按需更改,opudf也能按需更改,但不能是op,否则原有的冲突。
添加如下代码:

        OPERATION_TYPE(
                "operation.type",
                DataTypes.STRING().nullable(),
                true,
                DataTypes.FIELD("opudf", DataTypes.STRING()),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(GenericRowData row, int pos) {
                        return row.getString(2);
                    }
                })

编译项目:

# install
mvn clean package -DskipTests "-Dmaven.test.skip=true" "-Pfast" -pl flink-formats/flink-json
# maven install 到本地
mvn install:install-file "-DgroupId=org.apache.flink" "-DartifactId=flink-json-udf" "-Dversion=1.13.6" -Dpackaging=jar "-Dfile=D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar"

# gradle 不能使用 implement(files("libs/flink-json-1.13.6.jar")) 的方式。需要直接加载到仓库。或者user/.m2目录下的临时仓库。
# 如下路径不能直接复制,可供参考,自己找下。
copy D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar D:\env\Gradle_Repository\caches\modules-2\files-2.1\org.apache.flink\flink-json\1.13.6\cb4daaf018e2
10faa54e76488cbb11e179502728


使用方法:
添加列定义:
opt_type STRING METADATA FROM 'value.operation.type' VIRTUAL,

create table test.some_debezium
(
    event_time      TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
    origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
    origin_table    STRING METADATA FROM 'value.source.table' VIRTUAL,
    opt_type        STRING METADATA FROM 'value.operation.type' VIRTUAL,
    id            BIGINT,
    name          STRING,

) WITH(
    'connector'='kafka',
    'topic'='your_topic',
    'properties.bootstrap.servers'='host1:6667,host2:6667,host3:6667',
    'properties.group.id'='your_group_id',
    'properties.security.protocol'='SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name'='kafka',
    'format'='debezium-json',
    'debezium-json.timestamp-format.standard'='ISO-8601',
    'debezium-json.schema-include'='true',
    'debezium-json.ignore-parse-errors'='true'
);

网站公告

今日签到

点亮在社区的每一天
去签到