达梦数据库Flink CDC方案改进

发布于:2025-03-24 ⋅ 阅读:(28) ⋅ 点赞:(0)

系列文章目录

达梦数据库CDC方案改进



前言

       之前分享了 springboot集成flink实现DM数据库同步到ES ,当时是一个面试上机题,时间有限,资源有限,我自己之前也没有用过国产达梦数据库,实现的比较粗糙。今天有空,就优化下分享。


一、优化思路

当时的思路:

  1. 创建的一张记录表
  2. 利用数据库的触发器,监听表数据变更存储到记录表
  3. Flink的source实现一直查询记录表,利用时间增量查询
  4. Flink的sink负责实现数据转到ES

优化思路:

  1. 取消使用触发器 + 增量查询记录表方式
  2. 使用Debezium方式,实现多表的数据监听
  3. 数据直接就可以实现转ES里
  4. 如果要上Flink,可以将数据转到Kafka或redis,利用kafka的消息机制,redis的发布订阅

二、使用步骤

       基于上次代码的改动点。

1.引入库

<!-- Debezium Engine -->
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>3.0.0.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-jdbc</artifactId>
    <version>3.0.0.CR1</version>
</dependency>

2.配置文件增加

# 达梦数据库debezium配置
debezium:
  database:
    hostname: "192.168.9.202"
    port: "5236"
    user: "SYSDBA"
    password: "your_password"
    dbname: "DAMENG"
    serverId: "184054"
    tableIncludeList: "DAMENG.table1,DAMENG.table2,DAMENG.table3" # 监听多个表
    history:
      kafka:
        bootstrapServers: "localhost:9092"
        topic: "dm_cdc_history"
    offsetStorage: "dm-offset.dat"

3.核心代码

就是在上次Flink集成springboot的启动里增加DebeziumEngine的启动。


import cn.hutool.json.JSONUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.sink.ResultSinkDataEntitySink;
import com.zw.olapzw.source.ResultSourceDataEntitySource;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;

import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * springBoot的Flink启动类
 *
 * @author zwmac
 */
@SpringBootApplication
public class OlapZwApplication {


    public static void main(String[] args) {
        SpringApplication.run(OlapZwApplication.class, args);
        System.out.println("OlapZwApplication started");

    }


    @Bean
    public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
        return args -> {
            //获取flink的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置并行度
            env.setParallelism(1);

            //开启checkpoint,每隔5秒钟做一次checkpoint
            env.enableCheckpointing(5000L);

            //指定checkpoint的一致性语义
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

            //设置任务关闭的时候保留最后一次checkpoint数据
            checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());

            //重试策略设置
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));

            //源数据
            SourceFunction source = new ResultSourceDataEntitySource();

            //添加数据源到运行环境
            DataStreamSource<SourceDataEntity> streamSource = env.addSource(source, "ResultSourceDataEntitySource");

            //下游处理逻辑
            streamSource.addSink(new ResultSinkDataEntitySink());

            try {
                env.execute("达梦数据库变更数据同步");
            } catch (Exception e) {
                System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());
                throw new RuntimeException(e);
            }

            System.out.println("flink CDC started");

            //达梦数据库Debezium监听
            startDebeziumEngine();
        };
    }

    private void startDebeziumEngine() {
        Properties props = new Properties();
        props.setProperty("name", "dm-cdc-connector");
        props.setProperty("connector.class", "io.debezium.connector.jdbc.JdbcConnector");
        props.setProperty("database.hostname", "127.0.0.1");
        props.setProperty("database.port", "5236");
        props.setProperty("database.user", "SYSDBA");
        props.setProperty("database.password", "your_password");
        props.setProperty("database.dbname", "DAMENG");
        props.setProperty("database.server.id", "184054");
        props.setProperty("table.include.list", "SYSDBA.your_table");
        props.setProperty("database.history.kafka.bootstrap.servers", "localhost:9092");
        props.setProperty("database.history.kafka.topic", "dm_cdc_history");
        props.setProperty("offset.storage.file.filename", Paths.get("dm-offset.dat").toAbsolutePath().toString());


        DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {
                    String recordValue = record.value();
                    if (JSONUtil.isTypeJSON(recordValue)) {
                        processCDCData(recordValue);
                    }
                })
                .using(OffsetCommitPolicy.always())
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
    }

    private void processCDCData(String json) {
        System.out.println("收到变更数据:" + json);
        // 这里可以将数据存入 Kafka、Elasticsearch 或 MySQL
        //或者使用redis的发布订阅,将数据推送到flink的source,source里就不再需要jdbc的方式去获取数据了,直接从redis中获取数据
        //可以参考博主之前发的redis发布订阅的文章
    }

}

总结

  • 这个方案就比触发器的省数据库的资源
  • 增加的资源就是kafka或redis
  • 另外就是触发器方式有记录,当然不用触发器方式也可有记录
           最近也在分享高软比较,才分享到基于构件的软件工程,这一系列何尝不是构件组合,反正吧关键还是要有思路,有架构思维。希望能帮到大家,有兴趣的可以关注我的个微:码技加油站。