Flink CDC 3.0-yaml配置数据同步
docker compose环境搭建
初始化
mkdir -p /data/flink/flink-cdc/flink-cdc-release-3.3.0
mkdir -p /data/flink/flink-cdc/flink-cdc-3.3.0-bin
# 存放我们自己写的cdc脚本
mkdir -p /data/flink/flink-cdc/flink-cdc-3.3.0-bin/cdc-script
# 运行所需要的三方包
mkdir -p /data/flink/flink-cdc/flink-cdc-3.3.0-bin/expand-lib
flink-cdc-bin
移动到/data/flink/flink-cdc/flink-cdc-3.3.0-bin下, 如下图所示
将下面的MySQL驱动jar包放在 Flink lib 目录下,或通过 --jar 参数将其传入 Flink CDC CLI,因为 CDC连接器不再包含这些驱动包
启动容器后, 将依赖包拷贝到容器内
# 拷贝到主节点 docker cp /data/flink/flink-cdc/flink-cdc-3.3.0-bin/expand-lib/mysql-connector-java-8.0.27.jar flink-taskmanager-1:/opt/flink/lib # 拷贝到工作节点 docker cp /data/flink/flink-cdc/flink-cdc-3.3.0-bin/expand-lib/mysql-connector-java-8.0.27.jar flink-jobmanager-1:/opt/flink/lib
flink-cdc中的pipeline连接器
下载apache/flink-cdc并解压
进入到flink-cdc-connect/flink-cdc-pipeline-connectors文件夹中, 拷贝所需要的pipeline管道(这里拷贝mysql和kafka, 后续读取mysql的数据到kafka中), 拷贝到/data/flink/flink-cdc/flink-cdc-3.3.0-bin/lib下, 如下图所示
Kafka环境
services:
zookeeper:
image: zookeeper:3.7.1
ports:
- 2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:2.8.1
ports:
- 9092:9092
- 9093:9093
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_LISTENERS=PLAINTEXT://:9092
# 外界访问地址
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.132.10:9092
# zk连接地址
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
Flink环境
version: "3.8"
services:
# 主节点
jobmanager:
image: flink:1.19.1-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
# 将flink-cdc的bin命令映射到容器内
- /data/flink/flink-cdc/flink-cdc-3.3.0-bin:/opt/flink-cdc-3.3.0-bin
# 工作节点
taskmanager:
image: flink:1.19.1-scala_2.12
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
访问http://192.168.132.10:8081, 查看Flink Web页面
脚本准备
MySQL脚本
# 建表语句
CREATE TABLE `student` (
`id` int(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4
# 插入数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `age`) VALUES (1, '小牛马', 123);
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `age`) VALUES (2, '中牛马', 11111);
编写cdc脚本
vim /data/flink/flink-cdc/flink-cdc-3.3.0-bin/cdc-script/mysql-to-kafka.yaml
# 数据源
source:
type: mysql
hostname: 192.168.132.10
port: 3306
username: root
password: 12345678
# 通过正则匹配同步 whiterbocade下的student表
tables: whitebrocade.student
server-id: 5400-5404
server-time-zone: Asia/Shanghai
# 输出算子
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: 192.168.132.10:9092
topic: yaml-mysql-kafka
# 管道
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 1
进入主节点容器
docker exec -it flink-jobmanager-1 /bin/bash
启动cdc脚本
bash /opt/flink-cdc-3.3.0-bin/bin/flink-cdc.sh /opt/flink-cdc-3.3.0-bin/cdc-script/mysql-to-kafka.yaml
执行后出现以下命令, 只是任务提交成功, 不代表任务真的执行了
进入Flink Web UI查看任务
测试
新增数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `age`) VALUES (3, '大牛马', 33333);
主题确实创建了
通过kafka自带的客户端查看Topic情况,得到debezium-json格式的内容
docker exec -it kafka-kafka-1 kafka-console-consumer.sh --bootstrap-server 192.168.132.10:9092 --topic yaml-mysql-kafka --from-beginning
参考
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成