Flink cdc 使用总结

发布于:2025-07-16 ⋅ 阅读:(22) ⋅ 点赞:(0)

Flink 与 Flink CDC 版本兼容对照表

Flink 版本 支持的 Flink CDC 版本 关键说明
Flink 1.11.x Flink CDC 1.2.x 早期版本,需注意 Flink 1.11.0 的 Bug(如 Upsert 写入问题),建议使用 1.11.1 及以上。
Flink 1.12.x Flink CDC 2.0.x(MySQL 使用 flink-connector-mysql-cdc Flink 1.12.x 支持 CDC 2.0.x,MySQL 使用新版 Connector。
Flink 1.13.x Flink CDC 2.2.x, 2.3.x, 2.4.x 2.2.x 起支持 Flink 1.13.x,2.4.x 兼容性更广(支持到 Flink 1.15.x)。
Flink 1.14.x Flink CDC 2.2.x, 2.3.x, 2.4.x 同 Flink 1.13.x,需注意 2.4.x 对 1.14.x 的支持。
Flink 1.15.x Flink CDC 2.3.x, 2.4.x 2.4.x 是 Flink 1.15.x 的推荐版本,支持增量快照框架。
Flink 1.16.x Flink CDC 2.3.x, 2.4.x 2.4.x 支持 Flink 1.16.x,但需注意部分功能可能受限。
Flink 1.17.x Flink CDC 2.5.x 及以上(如 2.5.0) 官方未声明 2.4.x 支持 Flink 1.17.x,需升级 Flink CDC 至 2.5+ 或降级 Flink 至 1.15.x。
Flink 2.0.x 未明确说明(需参考最新 Flink CDC 文档) Flink 2.0 为新版本,建议关注 Flink CDC 官方文档的最新支持情况。
Flink CDC 3.x 仅支持 Flink 1.13.x 及以上(具体版本需看文档) Flink CDC 3.x 是新一代数据集成框架,需与 Flink 1.13+ 配合使用。

1.flink  cdc 的两种使用方式

source:
  type: mysql-cdc
  hostname: localhost
  port: 3306
  username: root
  password: "123456"
  database-list: app_db
  table-list: app_db.*
  scan.startup.mode: initial
  scan.incremental.snapshot.enabled: true
  scan.newly-added-table.enabled: true

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL to Doris
  parallelism: 2
  execution.runtime-mode: STREAMING

./bin/flink-cdc.sh run mysql-to-doris.yaml

2. flink cdc 另一种使用方式

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>
        

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>

package com.example.demo.cdc;


import com.example.demo.ConnectionConstants;
import com.example.demo.deserial.SafeStringKafkaDeserializationSchema;
import com.example.demo.domain.TableData;
import com.example.demo.dynamic.ExtractKafaRowAndTableName;
import com.example.demo.sink.DynamicJdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*;

public class FlinkKafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "192.168.64.141:9092");
        SafeStringKafkaDeserializationSchema schema = new SafeStringKafkaDeserializationSchema();
        //CustomKafkaDeserializationSchema schema = new CustomKafkaDeserializationSchema();
        FlinkKafkaConsumer<ConsumerRecord<String, String>> kafkaSource = new FlinkKafkaConsumer<>(
                "part.t_part", // 匹配所有 testdb 下的表
                schema,
                kafkaProps
        );
        kafkaSource.setStartFromEarliest();
        DataStreamSource<ConsumerRecord<String, String>> ds = env.addSource(kafkaSource);
        ExtractKafaRowAndTableName  extractRowAndTableName = new ExtractKafaRowAndTableName();
        SingleOutputStreamOperator<TableData> mapStream = ds.map(extractRowAndTableName);

        JdbcExecutionOptions options = JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(2000)
                .withMaxRetries(2)
                .build();
        DynamicJdbcSink dynamicJdbcSink = new DynamicJdbcSink(ConnectionConstants.PG_DRIVER_CLSSNAME,ConnectionConstants.PG_URL,ConnectionConstants.PG_USER_NAME,ConnectionConstants.PG_PASSWORD);
        mapStream.addSink(dynamicJdbcSink);
        env.enableCheckpointing(5000); // 每 5 秒做一次 checkpoint
        kafkaSource.setCommitOffsetsOnCheckpoints(true);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 最小间隔
        env.getCheckpointConfig().setCheckpointTimeout(6000); // 超时时间
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 并行数
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.execute("Multi-table CDC to PostgreSQL via DataStream");
    }
}

使用flink cdc 写代码的时候jar 包方式,你需要考虑不同数据库的序列化和反序列化问题, yaml 方式就是没提供的功能你无法用不够灵活。

一、判断Flink CDC同步完成的常见方法

Flink CDC的同步分为全量同步和增量同步阶段,完成标志如下:

  1. 监控 currentEmitEventTimeLag 指标

    • 这是核心判断依据。该指标表示数据从数据库产生到离开Source节点的时间延迟。
    • 全量同步完成标志:当 currentEmitEventTimeLag 从 ≤0 变为 >0 时,表示已从全量阶段进入增量(Binlog)读取阶段。
    • 原理:全量阶段该指标为0(无延迟),进入增量阶段后延迟值变为正数。
    • 实现:通过Flink的Metrics系统(如Prometheus、Grafana)实时监控该指标。
  2. 检查日志输出

    • 在日志中搜索关键词 BinlogSplitReader is created 或 全量同步结束,这通常标志全量阶段完成。
    • 全量同步完成后,日志会显示Binlog读取开始。
  3. 观察作业状态和指标

    • 作业状态:通过Flink Web UI或API检查Job状态,若为 FINISHED(仅限批处理任务),表示同步完成。
    • 其他指标
      • sourceIdleTime:源空闲时间增加,可能表示无新数据。
      • currentFetchEventTimeLag:类似 currentEmitEventTimeLag,监控数据读取延迟。
  4. 验证目标数据

    • 对比源数据库和目标存储(如数据湖)的数据一致性:
      • 全量同步后,目标数据应包含源数据库的所有记录。
      • 使用数据校验工具(如比对哈希值)确保一致性。

二、为什么不用数据条数判断?

  • 动态性:增量同步中数据持续流入,条数无法作为静态终点。
  • 准确性问题
    • 数据删除、更新可能导致条数波动。
    • 分布式系统中,分片同步可能不同步完成。
  • 替代方案:上述指标和状态监控更实时可靠。

三、实践建议

  • 实时监控:优先使用 currentEmitEventTimeLag,结合Prometheus等工具告警。
  • 自动化验证:在ETL管道中加入数据校验步骤,确保同步质量。
  • 日志审计:定期审查日志,辅助异常排查。

如果您有具体同步场景(如MySQL到数据湖),可进一步优化方案。

Flink中的滑动窗口和滚动窗口

1. 滑动窗口(Sliding Window):

  • 定义:滑动窗口有一个固定的大小,并且可以有重叠。这意味着数据项可能会被包含在一个或多个窗口中。
  • 用途:适用于需要分析一段时间内的趋势或模式的情况,例如计算过去5分钟内每1分钟的数据平均值。
  • 特点
    • 窗口大小和滑动步长可以独立配置。
    • 可能导致较高的计算成本,因为它涉及到更多的窗口操作。

2. 滚动窗口(Tumbling Window):

  • 定义:滚动窗口是滑动窗口的一种特殊情况,其中窗口之间没有重叠(即滑动步长等于窗口大小)。每个数据项只会属于一个特定的窗口。
  • 用途:适合于定期汇总数据的场景,比如每天统计一次用户活动量。
  • 特点
    • 简单易懂,实现起来相对直接。
    • 数据不会跨窗口重复处理,减少了计算负担。

限流熔断机制中的滑动窗口

3. 限流熔断机制中的滑动窗口:

  • 定义:在分布式系统或微服务架构中,为了防止某个服务过载而采取的一种保护措施。这里的滑动窗口通常用于监控请求速率,以便决定是否应该限制请求或触发熔断。
  • 用途:主要用于控制流量、保护下游服务免受突发流量的影响。
  • 特点
    • 主要关注点在于时间间隔内的请求数量或错误率。
    • 实现方式可能包括固定大小的时间桶(buckets),随着时间推移,新的请求会进入最新的时间桶,而旧的时间桶会被丢弃。
    • 目的是快速响应流量变化,提供即时反馈以调整系统的负载能力。

区别总结

  • 应用场景不同:Flink的窗口函数主要用于流处理任务中的数据分析;而限流熔断机制中的滑动窗口则用于保障系统稳定性和可用性。
  • 技术细节差异:Flink中的窗口涉及复杂的数据聚合逻辑,可能跨越多个节点进行分布式计算;相比之下,限流熔断机制中的滑动窗口更注重实时性和效率,通常在单个服务实例内部执行。
  • 目标不同:前者旨在提取有价值的信息,如统计信息、模式识别等;后者的目标是通过限制请求频率来维持系统的健康状态。

网站公告

今日签到

点亮在社区的每一天
去签到