一、DB2 数据库核心配置
1. 启用数据库日志记录与CDC支持
-- 以DB2管理员身份连接数据库
CONNECT TO mydb USER db2inst1 USING password;
-- 启用数据库归档日志模式(CDC依赖)
UPDATE DATABASE CONFIGURATION USING LOGARCHMETH1 DISK:/db2log/archive;
QUIESCE DATABASE IMMEDIATE FORCE CONNECTIONS;
BACKUP DATABASE mydb;
UNQUIESCE DATABASE;
-- 验证日志模式
GET DATABASE CONFIGURATION FOR mydb | grep LOGARCHMETH1;
-- 输出应为:LOGARCHMETH1 (Log archive method 1) = DISK:/db2log/archive
-- 创建捕获模式和控制表
CREATE SCHEMA cdc;
SET SCHEMA cdc;
-- 创建控制表(用于跟踪捕获进程)
CREATE TABLE cdc.control (
id INTEGER PRIMARY KEY,
last_commit_time TIMESTAMP
);
INSERT INTO cdc.control VALUES (1, CURRENT_TIMESTAMP);
2. 为捕获表启用变更数据捕获
-- 为目标表启用CDC(示例:products表)
SET SCHEMA myschema;
-- 创建捕获缓冲区
CREATE TRIGGER products_cdc_trg
AFTER INSERT OR UPDATE OR DELETE ON products
REFERENCING NEW AS n OLD AS o
FOR EACH ROW MODE DB2SQL
BEGIN ATOMIC
IF INSERTING THEN
INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
VALUES ('I', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);
ELSEIF UPDATING THEN
INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
VALUES ('U', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);
INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
VALUES ('U', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);
ELSEIF DELETING THEN
INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)
VALUES ('D', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);
END IF;
END;
-- 创建捕获缓冲区表(根据实际表结构调整)
CREATE TABLE cdc.products_cdc_buffer (
operation CHAR(1),
op_ts TIMESTAMP,
id INT,
name VARCHAR(100),
description VARCHAR(255),
weight DECIMAL(10,3)
);
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-db2-cdc</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
<!-- DB2 JDBC驱动依赖 -->
<dependency>
<groupId>com.ibm.db2</groupId>
<artifactId>jcc</artifactId>
<version>11.5.0.0</version>
</dependency>
2. SQL Client部署
- 下载JAR包:
- 将JAR包放入
$FLINK_HOME/lib/
目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含元数据列)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';
-- 创建DB2 CDC表
CREATE TABLE db2_products (
id INT,
name STRING,
description STRING,
weight DECIMAL(10, 3),
-- 元数据列:捕获变更信息
db_name STRING METADATA FROM 'database_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'db2-cdc',
'hostname' = '192.168.1.100',
'port' = '50000',
'username' = 'db2inst1',
'password' = 'password',
'database-name' = 'mydb',
'schema-name' = 'myschema',
'table-name' = 'products',
'server-time-zone' = 'Asia/Shanghai',
'scan.startup.mode' = 'initial'
);
2. 核心参数详解
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector |
是 | 无 | String | 固定为db2-cdc |
hostname |
是 | 无 | String | DB2服务器IP或域名 |
username |
是 | 无 | String | 连接数据库的用户名 |
password |
是 | 无 | String | 连接数据库的密码 |
database-name |
是 | 无 | String | 数据库名称(如mydb ) |
schema-name |
是 | 无 | String | 模式名称(如myschema ) |
table-name |
是 | 无 | String | 表名(如products ) |
port |
否 | 50000 | Integer | 数据库端口号 |
scan.startup.mode |
否 | initial |
String | 启动模式:initial (首次启动时执行快照)、latest-offset (仅读取最新变更) |
server-time-zone |
否 | 系统时区 | String | 数据库服务器时区(如Asia/Shanghai ),影响TIMESTAMP转换 |
四、环境验证与测试
1. 准备测试数据(DB2)
-- 创建测试表(若不存在)
CONNECT TO mydb USER db2inst1 USING password;
SET SCHEMA myschema;
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(100),
description VARCHAR(255),
weight DECIMAL(10,3)
);
-- 插入测试数据
INSERT INTO products VALUES (1, '产品A', '测试产品A', 1.5);
INSERT INTO products VALUES (2, '产品B', '测试产品B', 2.3);
COMMIT;
2. Flink SQL 验证
-- 查询DB2 CDC表(首次触发快照读取)
SELECT * FROM db2_products;
-- 在DB2中更新数据
UPDATE myschema.products SET weight = 1.8 WHERE id = 1;
COMMIT;
-- 观察Flink输出:应显示更新后的记录,op_ts为变更时间
3. DataStream API 验证
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.db2.Db2Source;
public class Db2SourceExample {
public static void main(String[] args) throws Exception {
// 配置DB2 Source
SourceFunction<String> sourceFunction = Db2Source.<String>builder()
.hostname("192.168.1.100")
.port(50000)
.database("mydb")
.tableList("myschema.products")
.username("db2inst1")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.addSource(sourceFunction)
.print()
.setParallelism(1);
env.execute("DB2 CDC Test");
}
}
五、常见问题与解决方案
日志模式未启用
ERROR: DB2 CDC requires archive logging to be enabled
- 解决方案:执行
UPDATE DATABASE CONFIGURATION
启用归档日志,并重启数据库。
- 解决方案:执行
触发器权限不足
ERROR: User does not have permission to create triggers
- 解决方案:授予用户
CREATE TRIGGER
权限:GRANT CREATETAB, BINDADD, IMPLICIT_SCHEMA, CREATE_NOT_FENCED_ROUTINE TO db2inst1;
- 解决方案:授予用户
数据类型不支持(BOOLEAN)
ERROR: BOOLEAN type is not supported in SQL Replication on DB2
- 解决方案:将BOOLEAN列替换为SMALLINT(0/1)或CHAR(1)(‘Y’/‘N’)。
时间戳转换异常
- 解决方案:显式设置
server-time-zone
参数:'server-time-zone' = 'Asia/Shanghai'
- 解决方案:显式设置
六、生产环境优化建议
性能调优
- 调整
debezium.poll.interval.ms
(如500
)控制轮询间隔,debezium.snapshot.fetch.size
(如2048
)优化快照读取。
- 调整
高可用配置
- 使用DB2 HADR(高可用性灾难恢复)集群,Flink作业连接主节点,确保日志复制正常。
监控与维护
- 定期清理CDC缓冲区表:
DELETE FROM cdc.products_cdc_buffer WHERE op_ts < CURRENT_TIMESTAMP - 1 DAY;
- 定期清理CDC缓冲区表:
通过以上步骤,可完成Flink DB2 CDC的全流程配置与验证。生产环境中需特别注意DB2日志模式配置、触发器权限管理及BOOLEAN类型的兼容性问题,以确保数据一致性和系统稳定性。