Flink MongoDB CDC 环境配置与验证

发布于:2025-07-05 ⋅ 阅读:(12) ⋅ 点赞:(0)
一、MongoDB 数据库核心配置
1. 环境准备与集群要求

MongoDB CDC 依赖 Change Streams 特性,需满足以下条件:

  • 版本要求:MongoDB ≥ 3.6
  • 集群模式:副本集(Replica Set)或分片集群(Sharded Cluster)
  • 存储引擎:WiredTiger(默认自3.2版本起)
  • 副本集协议:pv1(MongoDB 4.0+默认)

验证集群配置

# 连接MongoDB shell
mongo --host localhost:27017

# 检查版本
db.version()

# 检查存储引擎
db.getMongo().getDBs().forEach(db => print(db + ": " + db.adminCommand('getParameter', {storageEngine: 1}).storageEngine.name))

# 检查副本集状态(若为副本集)
rs.status()
2. 启用 Change Streams 与权限配置
-- 创建自定义角色(授予Change Stream和读取权限)
use admin;
db.createRole(
    {
        role: "flinkrole",
        privileges: [
            {
                resource: { db: "", collection: "" },
                actions: [
                    "splitVector",
                    "listDatabases",
                    "listCollections",
                    "collStats",
                    "find",
                    "changeStream"
                ]
            }
        ],
        roles: [
            { role: 'read', db: 'config' }  -- 分片集群需读取config.chunks
        ]
    }
);

-- 创建用户并授权
db.createUser(
  {
      user: 'flinkuser',
      pwd: 'flinkpw',
      roles: [
         { role: 'flinkrole', db: 'admin' }
      ]
  }
);

-- 启用数据库级PreAndPostImages(MongoDB 6.0+支持完整变更日志)
db.runCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: {
        expireAfterSeconds: 'off'  -- 禁用过期,或设置自定义时间
      }
    }
  }
});

-- 为集合启用PreAndPostImages
db.runCommand({
  collMod: "inventory.products", 
  changeStreamPreAndPostImages: {
    enabled: true 
  } 
});
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>
2. SQL Client部署
  1. 下载JAR包:flink-sql-connector-mongodb-cdc-3.0.1.jar
  2. 将JAR包放入$FLINK_HOME/lib/目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含嵌套结构与元数据)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';

-- 创建MongoDB CDC表(支持嵌套文档与数组)
CREATE TABLE mongodb_products (
    _id STRING,                            -- 必须声明,作为主键
    name STRING,
    weight DECIMAL(10, 3),
    tags ARRAY<STRING>,                    -- 数组类型
    price ROW<amount DECIMAL(10, 2), currency STRING>,  -- 嵌入式文档
    suppliers ARRAY<ROW<name STRING, address STRING>>,  -- 嵌入式文档数组
    -- 元数据列
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
    op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'localhost:27017,localhost:27018,localhost:27019',  -- 副本集地址
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'inventory',
    'collection' = 'products',
    'scan.incremental.snapshot.enabled' = 'true',
    'scan.full-changelog' = 'true'  -- MongoDB 6.0+启用完整变更日志
);
2. 核心参数详解
参数名 必选 默认值 类型 说明
connector String 固定为mongodb-cdc
hosts String MongoDB服务器地址(副本集用逗号分隔,如localhost:27017,localhost:27018
database String 数据库名(支持正则匹配多个库,如^inventory.*
collection String 集合名(支持正则匹配多个集合,如^products$
scan.startup.mode initial String 启动模式:initial(快照+变更流)、latest-offset(仅最新变更)、timestamp(指定时间戳)
scan.incremental.snapshot.enabled false Boolean 启用增量快照(MongoDB 4.0+,并行读取)
scan.full-changelog false Boolean 启用完整变更日志(MongoDB 6.0+,需配置PreAndPostImages)
heartbeat.interval.ms 0 Integer 心跳间隔(防止resumeToken过期,慢变更场景必设)
四、环境验证与测试
1. 准备测试数据(MongoDB)
// 连接MongoDB并插入测试数据
mongo --host localhost:27017/inventory

db.products.insertMany([
    {
        _id: "P001",
        name: "笔记本电脑",
        weight: 2.5,
        tags: ["电子", "电脑"],
        price: { amount: 5999.00, currency: "CNY" },
        suppliers: [
            { name: "联想", address: "北京" },
            { name: "戴尔", address: "上海" }
        ]
    },
    {
        _id: "P002",
        name: "智能手机",
        weight: 0.2,
        tags: ["电子", "手机"],
        price: { amount: 3999.00, currency: "CNY" },
        suppliers: [
            { name: "华为", address: "深圳" }
        ]
    }
]);
2. Flink SQL 验证
-- 查询MongoDB CDC表(首次触发快照读取)
SELECT * FROM mongodb_products;

-- 在MongoDB中更新数据
db.products.updateOne(
    { _id: "P001" },
    { $set: { "price.amount": 6499.00 } }
);

-- 观察Flink输出:应显示更新后的完整文档,op_ts为变更时间
3. DataStream API 验证(增量模式)
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MongoDBIncrementalSourceExample {
    public static void main(String[] args) throws Exception {
        // 配置MongoDB Source(增量快照模式)
        MongoDBSource<String> source = MongoDBSource.<String>builder()
                .hosts("localhost:27017")
                .databaseList("inventory")
                .collectionList("inventory.products")
                .username("flinkuser")
                .password("flinkpw")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .scanFullChangelog(true)  // 启用完整变更日志(MongoDB 6.0+)
                .startupOptions(StartupOptions.initial())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.fromSource(
                source,
                null,
                "MongoDB CDC Source")
            .setParallelism(4)
            .print();
        
        env.execute("MongoDB CDC Test");
    }
}
五、常见问题与解决方案
  1. Change Streams权限不足

    ERROR: not authorized to execute command { changeStream: "products", ... }
    
    • 解决方案:确认用户已授予changeStream权限,重新执行角色创建语句。
  2. 集群模式不支持

    ERROR: Change Streams require a replica set or sharded cluster
    
    • 解决方案:将单机模式升级为副本集,或连接现有副本集/分片集群。
  3. 增量快照失败(无分片键)

    ERROR: Incremental snapshot requires sharded cluster or _id index
    
    • 解决方案:确保集群为分片模式,或_id字段存在索引(MongoDB默认创建)。
  4. resumeToken过期

    • 解决方案:设置合理的heartbeat.interval.ms(如30000),或手动指定scan.startup.timestamp-millis
六、生产环境优化建议
  1. Change Streams优化

    • 配置poll.await.time.ms(如500)减少空轮询,poll.max.batch.size(如2048)提高批量处理效率。
  2. 高可用配置

    • 使用副本集连接字符串(如mongodb://host1:27017,host2:27018/inventory?replicaSet=rs0),Flink会自动故障转移。
  3. 大表快照调优

    • 调整scan.incremental.snapshot.chunk.size.mb(如128)平衡内存占用与并行度,scan.incremental.snapshot.chunk.samples(如50)优化分片采样。

通过以上步骤,可完成Flink MongoDB CDC的全流程配置与验证。生产环境中需特别注意集群模式兼容性、Change Streams权限配置及完整变更日志的启用条件,以确保数据一致性和系统稳定性。


网站公告

今日签到

点亮在社区的每一天
去签到