Flink Db2 CDC 环境配置与验证

发布于:2025-07-05 ⋅ 阅读:(17) ⋅ 点赞:(0)
一、DB2 数据库核心配置
1. 启用数据库日志记录与CDC支持
-- 以DB2管理员身份连接数据库
CONNECT TO mydb USER db2inst1 USING password;

-- 启用数据库归档日志模式(CDC依赖)
UPDATE DATABASE CONFIGURATION USING LOGARCHMETH1 DISK:/db2log/archive;
QUIESCE DATABASE IMMEDIATE FORCE CONNECTIONS;
BACKUP DATABASE mydb;
UNQUIESCE DATABASE;

-- 验证日志模式
GET DATABASE CONFIGURATION FOR mydb | grep LOGARCHMETH1;
-- 输出应为:LOGARCHMETH1 (Log archive method 1) = DISK:/db2log/archive

-- 创建捕获模式和控制表
CREATE SCHEMA cdc;
SET SCHEMA cdc;

-- 创建控制表(用于跟踪捕获进程)
CREATE TABLE cdc.control (
    id INTEGER PRIMARY KEY,
    last_commit_time TIMESTAMP
);
INSERT INTO cdc.control VALUES (1, CURRENT_TIMESTAMP);
2. 为捕获表启用变更数据捕获
-- 为目标表启用CDC(示例:products表)
SET SCHEMA myschema;

-- 创建捕获缓冲区
CREATE TRIGGER products_cdc_trg 
AFTER INSERT OR UPDATE OR DELETE ON products
REFERENCING NEW AS n OLD AS o
FOR EACH ROW MODE DB2SQL
BEGIN ATOMIC
    IF INSERTING THEN
        INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
        VALUES ('I', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);
    ELSEIF UPDATING THEN
        INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
        VALUES ('U', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);
        INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
        VALUES ('U', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);
    ELSEIF DELETING THEN
        INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
        VALUES ('D', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);
    END IF;
END;

-- 创建捕获缓冲区表(根据实际表结构调整)
CREATE TABLE cdc.products_cdc_buffer (
    operation CHAR(1),
    op_ts TIMESTAMP,
    id INT,
    name VARCHAR(100),
    description VARCHAR(255),
    weight DECIMAL(10,3)
);
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-db2-cdc</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>

<!-- DB2 JDBC驱动依赖 -->
<dependency>
    <groupId>com.ibm.db2</groupId>
    <artifactId>jcc</artifactId>
    <version>11.5.0.0</version>
</dependency>
2. SQL Client部署
  1. 下载JAR包:
  2. 将JAR包放入$FLINK_HOME/lib/目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含元数据列)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';

-- 创建DB2 CDC表
CREATE TABLE db2_products (
    id INT,
    name STRING,
    description STRING,
    weight DECIMAL(10, 3),
    -- 元数据列:捕获变更信息
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
    op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'db2-cdc',
    'hostname' = '192.168.1.100',
    'port' = '50000',
    'username' = 'db2inst1',
    'password' = 'password',
    'database-name' = 'mydb',
    'schema-name' = 'myschema',
    'table-name' = 'products',
    'server-time-zone' = 'Asia/Shanghai',
    'scan.startup.mode' = 'initial'
);
2. 核心参数详解
参数名 必选 默认值 类型 说明
connector String 固定为db2-cdc
hostname String DB2服务器IP或域名
username String 连接数据库的用户名
password String 连接数据库的密码
database-name String 数据库名称(如mydb
schema-name String 模式名称(如myschema
table-name String 表名(如products
port 50000 Integer 数据库端口号
scan.startup.mode initial String 启动模式:initial(首次启动时执行快照)、latest-offset(仅读取最新变更)
server-time-zone 系统时区 String 数据库服务器时区(如Asia/Shanghai),影响TIMESTAMP转换
四、环境验证与测试
1. 准备测试数据(DB2)
-- 创建测试表(若不存在)
CONNECT TO mydb USER db2inst1 USING password;
SET SCHEMA myschema;

CREATE TABLE products (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    description VARCHAR(255),
    weight DECIMAL(10,3)
);

-- 插入测试数据
INSERT INTO products VALUES (1, '产品A', '测试产品A', 1.5);
INSERT INTO products VALUES (2, '产品B', '测试产品B', 2.3);
COMMIT;
2. Flink SQL 验证
-- 查询DB2 CDC表(首次触发快照读取)
SELECT * FROM db2_products;

-- 在DB2中更新数据
UPDATE myschema.products SET weight = 1.8 WHERE id = 1;
COMMIT;

-- 观察Flink输出:应显示更新后的记录,op_ts为变更时间
3. DataStream API 验证
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.db2.Db2Source;

public class Db2SourceExample {
    public static void main(String[] args) throws Exception {
        // 配置DB2 Source
        SourceFunction<String> sourceFunction = Db2Source.<String>builder()
                .hostname("192.168.1.100")
                .port(50000)
                .database("mydb")
                .tableList("myschema.products")
                .username("db2inst1")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        env.addSource(sourceFunction)
                .print()
                .setParallelism(1);

        env.execute("DB2 CDC Test");
    }
}
五、常见问题与解决方案
  1. 日志模式未启用

    ERROR: DB2 CDC requires archive logging to be enabled
    
    • 解决方案:执行UPDATE DATABASE CONFIGURATION启用归档日志,并重启数据库。
  2. 触发器权限不足

    ERROR: User does not have permission to create triggers
    
    • 解决方案:授予用户CREATE TRIGGER权限:
      GRANT CREATETAB, BINDADD, IMPLICIT_SCHEMA, CREATE_NOT_FENCED_ROUTINE TO db2inst1;
      
  3. 数据类型不支持(BOOLEAN)

    ERROR: BOOLEAN type is not supported in SQL Replication on DB2
    
    • 解决方案:将BOOLEAN列替换为SMALLINT(0/1)或CHAR(1)(‘Y’/‘N’)。
  4. 时间戳转换异常

    • 解决方案:显式设置server-time-zone参数:
      'server-time-zone' = 'Asia/Shanghai'
      
六、生产环境优化建议
  1. 性能调优

    • 调整debezium.poll.interval.ms(如500)控制轮询间隔,debezium.snapshot.fetch.size(如2048)优化快照读取。
  2. 高可用配置

    • 使用DB2 HADR(高可用性灾难恢复)集群,Flink作业连接主节点,确保日志复制正常。
  3. 监控与维护

    • 定期清理CDC缓冲区表:
      DELETE FROM cdc.products_cdc_buffer WHERE op_ts < CURRENT_TIMESTAMP - 1 DAY;
      

通过以上步骤,可完成Flink DB2 CDC的全流程配置与验证。生产环境中需特别注意DB2日志模式配置、触发器权限管理及BOOLEAN类型的兼容性问题,以确保数据一致性和系统稳定性。