Flink Vitess CDC 环境配置与验证

发布于:2025-07-08 ⋅ 阅读:(16) ⋅ 点赞:(0)
一、Vitess 集群核心配置(重点:开启 CDC 支持)
1. 启用 VStream 服务(Vitess CDC 的核心)

Vitess 通过 VStream 服务提供增量数据捕获能力,需确保 VTGate 正确配置:

# 启动 VTGate 时显式启用 VStream(生产环境建议配置)
vtgate \
  --logtostderr \
  --port 15991 \                 # MySQL 协议端口
  --grpc_port 15999 \           # gRPC 端口(Flink CDC 连接此端口)
  --service_map 'grpc-vtgateservice' \  # 启用 VStream 服务
  --vstream_heartbeat_interval 5s \     # 心跳间隔(可选)
  --cells zone1 \               # 指定 cell 区域
  --tablet_types_to_wait MASTER,REPLICA \
  --mysql_server_port 3306 \    # MySQL 兼容端口
  --mysql_auth_server_impl none # 禁用认证(测试环境)
2. 验证 VStream 可用性
# 使用 vtctlclient 验证 VStream 是否正常工作
vtctlclient -server localhost:15999 VStream -pos "" -tables "mydb.orders"

预期输出

VStream started at position: <current VGTID>
...
二、Flink 环境集成配置
1. 添加 Maven 依赖(同上)
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-vitess-cdc</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>
2. SQL Client 部署(同上)
  1. 下载 JAR 包:
    flink-sql-connector-vitess-cdc-3.0.1.jar
  2. 将 JAR 包放入 $FLINK_HOME/lib/ 后重启 Flink 集群。
三、Flink SQL 表定义与参数详解(重点:CDC 参数)
1. 完整建表示例(含关键 CDC 参数)
-- 配置 checkpoint(每 3 秒)
SET 'execution.checkpointing.interval' = '3s';

-- 创建 Vitess CDC 表(仅增量同步,无快照)
CREATE TABLE vitess_orders (
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    order_status BOOLEAN,
    -- 元数据列(Vitess 特有)
    keyspace_name STRING METADATA FROM 'keyspace' VIRTUAL,
    shard_name STRING METADATA FROM 'shard' VIRTUAL,
    vgtid STRING METADATA FROM 'vgtid' VIRTUAL,
    op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'vitess-cdc',
    'hostname' = 'localhost',            -- VTGate 主机
    'port' = '15999',                    -- VTGate gRPC 端口(非 MySQL 端口)
    'keyspace' = 'mydb',                 -- 目标 Keyspace
    'table-name' = 'orders',             -- 表名
    'tablet.type' = 'REPLICA',           -- 从 REPLICA 节点读取(减少主库压力)
    'stopOnReshard' = 'false',           -- 分片变更时不停服务
    'tombstonesOnDelete' = 'true',       -- 删除事件后发送墓碑标记
    'debezium.connector.vitess.include.schema.changes' = 'false', -- 忽略 schema 变更
    'debezium.connector.vitess.snapshot.mode' = 'never'  -- 禁用快照(仅增量)
);
2. 核心 CDC 参数详解(新增/重点参数)
参数名 必选 默认值 类型 说明
port 15999 Integer VTGate gRPC 端口(非 MySQL 端口 3306),VStream 服务使用此端口
tablet.type REPLICA String 从哪个类型的节点读取变更:MASTER(主库)、REPLICA(从库)、RDONLY(只读从库)
debezium.connector.vitess.snapshot.mode never String 快照模式:never(禁用快照,仅增量)、initial(初始快照+增量)
debezium.connector.vitess.heartbeat.interval.ms 5000 Long VStream 心跳间隔(毫秒),用于检测连接状态
debezium.connector.vitess.include.schema.changes false Boolean 是否捕获 schema 变更(如 ALTER TABLE)
四、环境验证与测试
1. 准备测试数据(Vitess)
-- 连接 VTGate(3306 端口)
mysql -h localhost -P 3306 -u root

-- 插入测试数据
USE mydb;
INSERT INTO orders VALUES 
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
COMMIT;
2. Flink SQL 验证(重点:监控 VGTID)
-- 查询 Vitess CDC 表(验证增量同步)
SELECT 
    order_id, 
    price,
    vgtid,        -- Vitess 特有的全局事务 ID
    op_ts         -- 变更时间戳
FROM vitess_orders;

-- 在 Vitess 中执行变更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
COMMIT;

-- 观察 Flink 输出(vgtid 应变化,op_ts 为变更时间)
五、常见问题与解决方案(重点:CDC 故障排查)
  1. VStream 连接失败

    ERROR: Failed to connect to VStream service at localhost:15999
    
    • 解决方案:
      1. 确认 VTGate 已启动且 --service_map 包含 grpc-vtgateservice
      2. 检查防火墙是否开放 15999 端口
      3. 验证 VStream 服务:
        vtctlclient -server localhost:15999 VStream -pos ""
        
  2. 无法捕获变更

    • 解决方案:
      1. 确认表结构符合预期(无隐藏列)
      2. 检查 table-name 参数是否正确(格式:keyspace.table
      3. 手动触发变更并观察 VTGate 日志:
        docker logs vtgate | grep "VStream"
        
  3. VGTID 不更新

    • 解决方案:
      • 增加日志级别:
        SET 'execution.log-level' = 'DEBUG';
        
      • 检查 Vitess 集群状态:
        vtctlclient -server localhost:15999 ListAllTablets zone1
        
六、生产环境优化建议(重点:CDC 性能)
  1. 高可用配置

    • 配置多 VTGate 节点(负载均衡):
      'hostname' = 'vtgate1:15999,vtgate2:15999,vtgate3:15999'
      
  2. 性能调优

    • 增大 VStream 缓冲区:
      'debezium.connector.vitess.buffer.size' = '8192'  -- 变更事件缓冲区大小
      
    • 调整 Flink 并行度:
      'parallelism' = '4'  -- 根据分片数量调整
      
  3. 监控指标

    • 监控 VStream 延迟:
      # 查询 Vitess 内部指标
      curl -s http://vtgate:15001/metrics | grep vstream
      

通过以上步骤,可完成 Flink Vitess CDC 的全流程配置与验证。关键点在于确保 VTGate 正确启用 VStream 服务,并通过 Flink 正确连接到 gRPC 端口(默认 15999)。生产环境中需特别关注 VStream 服务的高可用性、变更捕获的实时性及分片变更时的处理策略。