1、CDC简介 Change Data Capture
FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。通过CDC获取源数据表的更新内容,将更新内容作为数据流下发到下游系统,可以做到mysql数据表数据的实时同步操作。
基于Flink CDC的MySQL表数据同步流程大致如下:
数据源(MySQL):首先,一个MySQL数据库作为数据源,其中包含了想要同步的表。
Flink CDC Connector:Flink CDC Connector是一个用于捕获MySQL表数据变更的组件。它连接到MySQL数据库,并持续监听数据变更(如插入、更新、删除操作)。
数据捕获:当MySQL表中的数据发生变化时,Flink CDC Connector会捕获这些变更事件,并将它们作为数据流进行处理。
Flink流处理:捕获到的数据流会进入Flink流处理引擎。在Flink中,你可以定义一系列的操作来处理这些数据,比如过滤、聚合、转换等。
目标存储:处理后的数据会被写入到目标存储系统。这可以是一个数据库、数据仓库、消息队列或其他任何数据存储系统。
监控与告警:同步过程中,你可以设置监控和告警机制,以便在出现问题时能够及时得到通知并进行处理。
错误处理与重试:在同步过程中,可能会遇到各种错误,如网络问题、目标存储故障等。你需要设计合适的错误处理机制,比如重试策略,以确保数据的可靠性和一致性。
2、CDC配置
(1)开启MySql的binlog
1,修改 mysql 的配置文件 my.cnf
追加内容:
log-bin=mysql-bin #binlog
binlog_format=ROW #选择row
server_id=1 #mysql实例id
2,重启 mysql:
service mysql restart
3,登录 mysql 客户端,查看 log_bin 开启状态
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON|
+---------------+-------+
————————————————
状态为ON表示该功能已开启
(2)创建Mysql数据库表
两个库表源表和目标表:
源表:
CREATE TABLE zentao.`zt_group` (
`id` MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
`project` MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
`vision` VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
`name` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
`role` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
`desc` CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
`acl` TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
`developer` ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
PRIMARY KEY (`id`) USING BTREE
);INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (1, 0, 'rnd', '管理员', 'admin', '系统管理员', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (2, 0, 'rnd', '研发', 'dev', '研发人员', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (3, 0, 'rnd', '测试', 'qa', '测试人员', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (4, 0, 'rnd', '项目经理', 'pm', '项目经理', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (5, 0, 'rnd', '产品经理', 'po', '产品经理', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (6, 0, 'rnd', '研发主管', 'td', '研发主管', NULL, '1');
目标表:
CREATE TABLE wfg.`zentao_zt_group` (
`id` MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
`project` MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
`vision` VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
`name` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
`role` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
`desc` CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
`acl` TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
`developer` ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
PRIMARY KEY (`id`) USING BTREE
);
3、SQL CDC同步数据代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TenTao2wfgUserSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(5000);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS source_test");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS target_test");
// 动态表,此为source表
tEnv.executeSql("CREATE TABLE source_test.zentao (\n" +
" id INT,\n" +
" project INT,\n" +
" vision STRING,\n" +
" name STRING,\n" +
" role STRING,\n" +
" desc STRING,\n" +
" acl STRING,\n" +
" developer INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '127.0.0.1',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'zentao',\n" +
" 'table-name' = 'zt_group',\n" +
" 'scan.incremental.snapshot.enabled' = 'false'\n" +
")");
// 动态表,此为sink表。sink表和source表的connector不一样
tEnv.executeSql("CREATE TABLE target_test.zentao (\n" +
" id INT,\n" +
" project INT,\n" +
" vision STRING,\n" +
" name STRING,\n" +
" role STRING,\n" +
" desc STRING,\n" +
" acl STRING,\n" +
" developer INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/wfg',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'table-name' = 'zentao_zt_group'\n" +
")");
tEnv.executeSql("INSERT INTO target_test.zentao (id, project, vision, name, role, desc,acl,developer) \n" +
"select f.id,\n" +
" f.project,\n" +
" f.vision,\n" +
" f.name,\n" +
" f.role,\n" +
" f.desc,\n" +
" f.acl,\n" +
" f.developer \n" +
" from source_test.zentao f ");
}
}