Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API),它基于数据库日志的 CDC(变更数据捕获)技术实现了统一的增量和全量数据读取。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:
- ✅ 端到端的数据集成框架
- ✅ 为数据集成的用户提供了易于构建作业的 API
- ✅ 支持在 Source 和 Sink 中处理多个表
- ✅ 整库同步
- ✅具备表结构变更自动同步的能力(Schema Evolution)
一、如何使用 Flink CDC
Flink CDC 提供了基于 YAML
格式的用户 API,更适合于数据集成场景。以下是一个 YAML
文件的示例,它定义了一个数据管道(Pipeline),该Pipeline从 MySQL 捕获实时变更,并将它们同步到 Apache Doris:
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
通过使用 flink-cdc.sh
提交 YAML 文件,一个 Flink 作业将会被编译并部署到指定的 Flink 集群。
二、理解核心概念
1、Data Pipeline
由于Flink CDC中的事件以管道方式从上游流向下游,因此整个ETL任务被称为数据管道。
我们可以使用下面的yaml文件来定义一个简洁的数据管道,描述将MySQL app_db数据库下的所有表同步到Doris:
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
我们可以使用下面的yaml文件定义一个复杂的数据管道,描述将MySQL app_db数据库下的所有表同步到Doris,并给出特定的目标数据库名称ods_db和特定的目标表名称前缀ods_:
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
Pipeline 配置:
支持数据管道级别的以下配置选项:
parameter | meaning | optional/required |
---|---|---|
name | 管道的名称,将作为作业名称提交到Flink集群。 | optional |
parallelism | 管道的全局并行性。默认为1。 | optional |
local-time-zone | 本地时区定义当前会话时区id。 | optional |
2、Data Source
数据源用于访问元数据,并从外部系统读取更改的数据。数据源可以同时从多个表中读取数据。
要描述数据源,需要以下内容:
parameter | meaning | optional/required |
---|---|---|
type | 数据源的类型,如mysql。 | required |
name | 数据源的名称,由用户定义(提供默认值)。 | optional |
configurations of Data Source | 用于构建数据源的配置,例如连接配置和源表属性。 | optional |
source:
type: mysql
name: mysql-source #optional,description information
host: localhost
port: 3306
username: admin
password: pass
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_\.*