一、SQL Server 数据库核心配置
1. 启用 CDC 功能(Change Data Capture)
SQL Server CDC 依赖数据库级别的 CDC 功能及表级别的捕获配置,需按以下步骤启用:
启用数据库 CDC
-- 以管理员身份连接数据库
USE master;
GO
-- 检查数据库是否已启用CDC
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'MyDB' AND is_cdc_enabled = 1)
BEGIN
EXEC sys.sp_cdc_enable_db;
PRINT 'CDC已启用';
END
ELSE
PRINT 'CDC已启用';
GO
启用表级 CDC(以dbo.Orders表为例)
USE MyDB;
GO
-- 确保SQL Agent服务已启动(CDC依赖Agent作业)
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo', -- 表所属模式
@source_name = N'Orders', -- 表名
@role_name = N'cdc_reader', -- 授权角色(可设为NULL使用默认权限)
@filegroup_name = N'MyDB_CT', -- 存储变更表的文件组(需提前创建)
@supports_net_changes = 0; -- 是否支持净变更(0为不支持)
GO
-- 验证CDC配置
EXEC sys.sp_cdc_help_change_data_capture;
GO
创建文件组(若不存在)
USE MyDB;
GO
IF NOT EXISTS (SELECT 1 FROM sys.filegroups WHERE name = N'MyDB_CT')
BEGIN
ALTER DATABASE MyDB ADD FILEGROUP MyDB_CT;
ALTER DATABASE MyDB ADD FILE (NAME = N'MyDB_CT', FILENAME = N'C:\Data\MyDB_CT.ndf') TO FILEGROUP MyDB_CT;
END
GO
2. 创建专用用户并授权
-- 创建用户
CREATE LOGIN flinkuser WITH PASSWORD = 'Flink@123';
CREATE USER flinkuser FOR LOGIN flinkuser;
-- 授予数据库访问权限
ALTER ROLE db_owner ADD MEMBER flinkuser; -- 生产环境建议细化权限
GRANT SELECT ON ALL TABLES IN SCHEMA dbo TO flinkuser;
-- 授予CDC相关权限
GRANT VIEW SERVER STATE TO flinkuser;
GRANT SELECT ON sys.change_tables TO flinkuser;
GO
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
2. SQL Client部署
- 下载JAR包:flink-sql-connector-sqlserver-cdc-3.0.1.jar
- 将JAR包放入
$FLINK_HOME/lib/
目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含元数据列)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';
-- 创建SQL Server CDC表
CREATE TABLE sqlserver_orders (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
-- 元数据列:捕获变更信息
db_name STRING METADATA FROM 'database_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '192.168.1.100',
'port' = '1433',
'username' = 'flinkuser',
'password' = 'Flink@123',
'database-name' = 'MyDB',
'table-name' = 'dbo.orders',
'server-time-zone' = 'Asia/Shanghai',
'scan.incremental.snapshot.enabled' = 'true'
);
2. 核心参数详解
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector |
是 | 无 | String | 固定为sqlserver-cdc |
hostname |
是 | 无 | String | SQL Server服务器IP或域名 |
username |
是 | 无 | String | 连接数据库的用户名(需具备CDC读取权限) |
password |
是 | 无 | String | 连接数据库的密码 |
database-name |
是 | 无 | String | 数据库名称(如MyDB ) |
table-name |
是 | 无 | String | 表名(格式:schema.table ,如dbo.orders ) |
port |
否 | 1433 | Integer | 数据库端口号 |
server-time-zone |
否 | UTC | String | 数据库时区(如Asia/Shanghai ),影响TIMESTAMP转换 |
scan.incremental.snapshot.enabled |
否 | true | Boolean | 启用增量快照(并行读取,需主键),默认开启 |
debezium.snapshot.mode |
否 | initial |
String | 快照模式:initial (结构+数据)、initial-only (仅快照)、latest-offset (仅结构) |
四、环境验证与测试
1. 准备测试数据
-- 创建测试表(已启用CDC)
USE MyDB;
GO
CREATE TABLE dbo.orders (
id INT PRIMARY KEY,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
update_time DATETIME
);
-- 插入测试数据
INSERT INTO dbo.orders VALUES
(1, '2023-01-01', 101, 5, 1001, GETDATE()),
(2, '2023-01-02', 102, 3, 1002, GETDATE());
GO
2. Flink SQL 验证
-- 查询CDC表(首次触发快照读取)
SELECT * FROM sqlserver_orders;
-- 在SQL Server中更新数据
UPDATE dbo.orders SET quantity = 10 WHERE id = 1;
GO
-- 观察Flink输出:应显示变更记录,op_ts为变更时间
3. DataStream API 验证(增量模式)
import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SqlServerCdcExample {
public static void main(String[] args) throws Exception {
// 配置SQL Server Source(增量快照模式)
SqlServerSourceBuilder.SqlServerIncrementalSource<String> sourceBuilder =
SqlServerSourceBuilder.sqlserverIncrementalSource()
.hostname("192.168.1.100")
.port(1433)
.databaseList("MyDB")
.tableList("dbo.orders")
.username("flinkuser")
.password("Flink@123")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.splitSize(1000) // 快照分片大小
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.fromSource(
sourceBuilder,
WatermarkStrategy.noWatermarks(),
"SQL Server CDC Source")
.setParallelism(4) // 设置4并行度
.print();
env.execute("SQL Server CDC Test");
}
}
五、常见问题与解决方案
SQL Agent未运行
ERROR: CDC作业无法启动,SQL Agent服务未运行
- 解决方案:启动SQL Server Agent服务(可通过SQL Server配置管理器或命令行启动)。
权限不足
ERROR: 用户无权访问CDC表
- 解决方案:确认用户属于
db_owner
角色,或手动授予SELECT
权限至sys.change_tables
。
- 解决方案:确认用户属于
增量快照失败(无主键表)
ERROR: 表缺少主键,无法进行增量快照
- 解决方案:为表添加主键,或手动指定分片键:
'scan.incremental.snapshot.chunk.key-column' = 'id'
- 解决方案:为表添加主键,或手动指定分片键:
时区转换异常
- 解决方案:显式设置
server-time-zone
参数:'server-time-zone' = 'Asia/Shanghai'
- 解决方案:显式设置
六、生产环境优化建议
CDC清理策略
- 配置CDC清理作业(定期删除旧变更数据):
USE MyDB; GO EXEC sys.sp_cdc_cleanup_change_data; -- 清理旧变更记录
- 配置CDC清理作业(定期删除旧变更数据):
作业高可用
- 使用SQL Server Always On Availability Groups时,Flink作业需连接主副本,并确保CDC配置在主库。
性能调优
- 调整
scan.incremental.snapshot.chunk.size
(如设为10000)以平衡并行度和内存占用; - 对于大表,启用
debezium.snapshot.fetch.size
(如设为2048)优化快照读取性能。
- 调整
通过以上步骤,可完成Flink SQL Server CDC的全流程配置与验证。生产环境中需特别注意SQL Agent的运行状态、CDC数据清理策略及增量快照的并行参数调优,以确保数据一致性和系统稳定性。