一、MongoDB 数据库核心配置
1. 环境准备与集群要求
MongoDB CDC 依赖 Change Streams 特性,需满足以下条件:
- 版本要求:MongoDB ≥ 3.6
- 集群模式:副本集(Replica Set)或分片集群(Sharded Cluster)
- 存储引擎:WiredTiger(默认自3.2版本起)
- 副本集协议:pv1(MongoDB 4.0+默认)
验证集群配置:
# 连接MongoDB shell
mongo --host localhost:27017
# 检查版本
db.version()
# 检查存储引擎
db.getMongo().getDBs().forEach(db => print(db + ": " + db.adminCommand('getParameter', {storageEngine: 1}).storageEngine.name))
# 检查副本集状态(若为副本集)
rs.status()
2. 启用 Change Streams 与权限配置
-- 创建自定义角色(授予Change Stream和读取权限)
use admin;
db.createRole(
{
role: "flinkrole",
privileges: [
{
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream"
]
}
],
roles: [
{ role: 'read', db: 'config' } -- 分片集群需读取config.chunks
]
}
);
-- 创建用户并授权
db.createUser(
{
user: 'flinkuser',
pwd: 'flinkpw',
roles: [
{ role: 'flinkrole', db: 'admin' }
]
}
);
-- 启用数据库级PreAndPostImages(MongoDB 6.0+支持完整变更日志)
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' -- 禁用过期,或设置自定义时间
}
}
}
});
-- 为集合启用PreAndPostImages
db.runCommand({
collMod: "inventory.products",
changeStreamPreAndPostImages: {
enabled: true
}
});
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
</dependency>
2. SQL Client部署
- 下载JAR包:flink-sql-connector-mongodb-cdc-3.0.1.jar
- 将JAR包放入
$FLINK_HOME/lib/
目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含嵌套结构与元数据)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';
-- 创建MongoDB CDC表(支持嵌套文档与数组)
CREATE TABLE mongodb_products (
_id STRING, -- 必须声明,作为主键
name STRING,
weight DECIMAL(10, 3),
tags ARRAY<STRING>, -- 数组类型
price ROW<amount DECIMAL(10, 2), currency STRING>, -- 嵌入式文档
suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档数组
-- 元数据列
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019', -- 副本集地址
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true' -- MongoDB 6.0+启用完整变更日志
);
2. 核心参数详解
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector |
是 | 无 | String | 固定为mongodb-cdc |
hosts |
是 | 无 | String | MongoDB服务器地址(副本集用逗号分隔,如localhost:27017,localhost:27018 ) |
database |
否 | 无 | String | 数据库名(支持正则匹配多个库,如^inventory.* ) |
collection |
否 | 无 | String | 集合名(支持正则匹配多个集合,如^products$ ) |
scan.startup.mode |
否 | initial |
String | 启动模式:initial (快照+变更流)、latest-offset (仅最新变更)、timestamp (指定时间戳) |
scan.incremental.snapshot.enabled |
否 | false | Boolean | 启用增量快照(MongoDB 4.0+,并行读取) |
scan.full-changelog |
否 | false | Boolean | 启用完整变更日志(MongoDB 6.0+,需配置PreAndPostImages) |
heartbeat.interval.ms |
否 | 0 | Integer | 心跳间隔(防止resumeToken过期,慢变更场景必设) |
四、环境验证与测试
1. 准备测试数据(MongoDB)
// 连接MongoDB并插入测试数据
mongo --host localhost:27017/inventory
db.products.insertMany([
{
_id: "P001",
name: "笔记本电脑",
weight: 2.5,
tags: ["电子", "电脑"],
price: { amount: 5999.00, currency: "CNY" },
suppliers: [
{ name: "联想", address: "北京" },
{ name: "戴尔", address: "上海" }
]
},
{
_id: "P002",
name: "智能手机",
weight: 0.2,
tags: ["电子", "手机"],
price: { amount: 3999.00, currency: "CNY" },
suppliers: [
{ name: "华为", address: "深圳" }
]
}
]);
2. Flink SQL 验证
-- 查询MongoDB CDC表(首次触发快照读取)
SELECT * FROM mongodb_products;
-- 在MongoDB中更新数据
db.products.updateOne(
{ _id: "P001" },
{ $set: { "price.amount": 6499.00 } }
);
-- 观察Flink输出:应显示更新后的完整文档,op_ts为变更时间
3. DataStream API 验证(增量模式)
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MongoDBIncrementalSourceExample {
public static void main(String[] args) throws Exception {
// 配置MongoDB Source(增量快照模式)
MongoDBSource<String> source = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.databaseList("inventory")
.collectionList("inventory.products")
.username("flinkuser")
.password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema())
.scanFullChangelog(true) // 启用完整变更日志(MongoDB 6.0+)
.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.fromSource(
source,
null,
"MongoDB CDC Source")
.setParallelism(4)
.print();
env.execute("MongoDB CDC Test");
}
}
五、常见问题与解决方案
Change Streams权限不足
ERROR: not authorized to execute command { changeStream: "products", ... }
- 解决方案:确认用户已授予
changeStream
权限,重新执行角色创建语句。
- 解决方案:确认用户已授予
集群模式不支持
ERROR: Change Streams require a replica set or sharded cluster
- 解决方案:将单机模式升级为副本集,或连接现有副本集/分片集群。
增量快照失败(无分片键)
ERROR: Incremental snapshot requires sharded cluster or _id index
- 解决方案:确保集群为分片模式,或
_id
字段存在索引(MongoDB默认创建)。
- 解决方案:确保集群为分片模式,或
resumeToken过期
- 解决方案:设置合理的
heartbeat.interval.ms
(如30000
),或手动指定scan.startup.timestamp-millis
。
- 解决方案:设置合理的
六、生产环境优化建议
Change Streams优化
- 配置
poll.await.time.ms
(如500
)减少空轮询,poll.max.batch.size
(如2048
)提高批量处理效率。
- 配置
高可用配置
- 使用副本集连接字符串(如
mongodb://host1:27017,host2:27018/inventory?replicaSet=rs0
),Flink会自动故障转移。
- 使用副本集连接字符串(如
大表快照调优
- 调整
scan.incremental.snapshot.chunk.size.mb
(如128
)平衡内存占用与并行度,scan.incremental.snapshot.chunk.samples
(如50
)优化分片采样。
- 调整
通过以上步骤,可完成Flink MongoDB CDC的全流程配置与验证。生产环境中需特别注意集群模式兼容性、Change Streams权限配置及完整变更日志的启用条件,以确保数据一致性和系统稳定性。