Flink CDC 3.0-yaml配置数据同步

发布于:2025-04-13 ⋅ 阅读:(14) ⋅ 点赞:(0)

Flink CDC 3.0-yaml配置数据同步

docker compose环境搭建

初始化

mkdir -p /data/flink/flink-cdc/flink-cdc-release-3.3.0
mkdir -p /data/flink/flink-cdc/flink-cdc-3.3.0-bin
# 存放我们自己写的cdc脚本
mkdir -p /data/flink/flink-cdc/flink-cdc-3.3.0-bin/cdc-script
# 运行所需要的三方包
mkdir -p /data/flink/flink-cdc/flink-cdc-3.3.0-bin/expand-lib
flink-cdc-bin
  1. 下载flink-cdc-3.3.0-bin.tar.gz并解压

  2. 移动到/data/flink/flink-cdc/flink-cdc-3.3.0-bin下, 如下图所示

    image-20250410231511936

  3. 将下面的MySQL驱动jar包放在 Flink lib 目录下,或通过 --jar 参数将其传入 Flink CDC CLI,因为 CDC连接器不再包含这些驱动包

    • 启动容器后, 将依赖包拷贝到容器内

    • # 拷贝到主节点
      docker cp /data/flink/flink-cdc/flink-cdc-3.3.0-bin/expand-lib/mysql-connector-java-8.0.27.jar flink-taskmanager-1:/opt/flink/lib
      
      # 拷贝到工作节点
      docker cp /data/flink/flink-cdc/flink-cdc-3.3.0-bin/expand-lib/mysql-connector-java-8.0.27.jar flink-jobmanager-1:/opt/flink/lib
      
flink-cdc中的pipeline连接器
  1. 下载apache/flink-cdc并解压

  2. 进入到flink-cdc-connect/flink-cdc-pipeline-connectors文件夹中, 拷贝所需要的pipeline管道(这里拷贝mysql和kafka, 后续读取mysql的数据到kafka中), 拷贝到/data/flink/flink-cdc/flink-cdc-3.3.0-bin/lib下, 如下图所示

    image-20250410231245207

Kafka环境

services:
   zookeeper:
    image: zookeeper:3.7.1
    ports:
      - 2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
   kafka:
    image: bitnami/kafka:2.8.1
    ports:
      - 9092:9092
      - 9093:9093
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      # 外界访问地址
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.132.10:9092
      # zk连接地址
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181

Flink环境

version: "3.8"
services:
  # 主节点
  jobmanager:
    image: flink:1.19.1-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager    
    volumes:
      # 将flink-cdc的bin命令映射到容器内
      - /data/flink/flink-cdc/flink-cdc-3.3.0-bin:/opt/flink-cdc-3.3.0-bin
  # 工作节点
  taskmanager:
    image: flink:1.19.1-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

访问http://192.168.132.10:8081, 查看Flink Web页面

image-20250410232044039

脚本准备

MySQL脚本

# 建表语句
CREATE TABLE `student` (
  `id` int(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4


# 插入数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `age`) VALUES (1, '小牛马', 123);
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `age`) VALUES (2, '中牛马', 11111);

编写cdc脚本

vim /data/flink/flink-cdc/flink-cdc-3.3.0-bin/cdc-script/mysql-to-kafka.yaml
# 数据源
source:
  type: mysql
  hostname: 192.168.132.10
  port: 3306
  username: root
  password: 12345678
  # 通过正则匹配同步 whiterbocade下的student表
  tables: whitebrocade.student
  server-id: 5400-5404
  server-time-zone: Asia/Shanghai

# 输出算子
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: 192.168.132.10:9092
  topic: yaml-mysql-kafka

# 管道
pipeline:
  name: MySQL to Kafka Pipeline
  parallelism: 1

进入主节点容器

docker exec -it flink-jobmanager-1 /bin/bash 

启动cdc脚本

bash /opt/flink-cdc-3.3.0-bin/bin/flink-cdc.sh /opt/flink-cdc-3.3.0-bin/cdc-script/mysql-to-kafka.yaml

执行后出现以下命令, 只是任务提交成功, 不代表任务真的执行了

image-20250411004350288

进入Flink Web UI查看任务

image-20250411004505744

测试

新增数据

INSERT INTO `whitebrocade`.`student`(`id`, `name`, `age`) VALUES (3, '大牛马', 33333);

主题确实创建了

image-20250411004621089

通过kafka自带的客户端查看Topic情况,得到debezium-json格式的内容

docker exec -it kafka-kafka-1 kafka-console-consumer.sh --bootstrap-server 192.168.132.10:9092 --topic yaml-mysql-kafka --from-beginning

image-20250411005552008

参考

基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成

MySQL 同步到 StarRocks

Docker | Apache Flink

apache/flink-cdc: Flink CDC 是一个流式数据集成工具

flink-cdc-3.3.0-bin.tar.gz