系列文章目录
达梦数据库CDC方案改进
前言
之前分享了 springboot集成flink实现DM数据库同步到ES ,当时是一个面试上机题,时间有限,资源有限,我自己之前也没有用过国产达梦数据库,实现的比较粗糙。今天有空,就优化下分享。
一、优化思路
当时的思路:
- 创建的一张记录表
- 利用数据库的触发器,监听表数据变更存储到记录表
- Flink的source实现一直查询记录表,利用时间增量查询
- Flink的sink负责实现数据转到ES
优化思路:
- 取消使用触发器 + 增量查询记录表方式
- 使用Debezium方式,实现多表的数据监听
- 数据直接就可以实现转ES里
- 如果要上Flink,可以将数据转到Kafka或redis,利用kafka的消息机制,redis的发布订阅
二、使用步骤
基于上次代码的改动点。
1.引入库
<!-- Debezium Engine -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>3.0.0.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-jdbc</artifactId>
<version>3.0.0.CR1</version>
</dependency>
2.配置文件增加
# 达梦数据库debezium配置
debezium:
database:
hostname: "192.168.9.202"
port: "5236"
user: "SYSDBA"
password: "your_password"
dbname: "DAMENG"
serverId: "184054"
tableIncludeList: "DAMENG.table1,DAMENG.table2,DAMENG.table3" # 监听多个表
history:
kafka:
bootstrapServers: "localhost:9092"
topic: "dm_cdc_history"
offsetStorage: "dm-offset.dat"
3.核心代码
就是在上次Flink集成springboot的启动里增加DebeziumEngine的启动。
import cn.hutool.json.JSONUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.sink.ResultSinkDataEntitySink;
import com.zw.olapzw.source.ResultSourceDataEntitySource;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* springBoot的Flink启动类
*
* @author zwmac
*/
@SpringBootApplication
public class OlapZwApplication {
public static void main(String[] args) {
SpringApplication.run(OlapZwApplication.class, args);
System.out.println("OlapZwApplication started");
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//开启checkpoint,每隔5秒钟做一次checkpoint
env.enableCheckpointing(5000L);
//指定checkpoint的一致性语义
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置任务关闭的时候保留最后一次checkpoint数据
checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());
//重试策略设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
//源数据
SourceFunction source = new ResultSourceDataEntitySource();
//添加数据源到运行环境
DataStreamSource<SourceDataEntity> streamSource = env.addSource(source, "ResultSourceDataEntitySource");
//下游处理逻辑
streamSource.addSink(new ResultSinkDataEntitySink());
try {
env.execute("达梦数据库变更数据同步");
} catch (Exception e) {
System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());
throw new RuntimeException(e);
}
System.out.println("flink CDC started");
//达梦数据库Debezium监听
startDebeziumEngine();
};
}
private void startDebeziumEngine() {
Properties props = new Properties();
props.setProperty("name", "dm-cdc-connector");
props.setProperty("connector.class", "io.debezium.connector.jdbc.JdbcConnector");
props.setProperty("database.hostname", "127.0.0.1");
props.setProperty("database.port", "5236");
props.setProperty("database.user", "SYSDBA");
props.setProperty("database.password", "your_password");
props.setProperty("database.dbname", "DAMENG");
props.setProperty("database.server.id", "184054");
props.setProperty("table.include.list", "SYSDBA.your_table");
props.setProperty("database.history.kafka.bootstrap.servers", "localhost:9092");
props.setProperty("database.history.kafka.topic", "dm_cdc_history");
props.setProperty("offset.storage.file.filename", Paths.get("dm-offset.dat").toAbsolutePath().toString());
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
String recordValue = record.value();
if (JSONUtil.isTypeJSON(recordValue)) {
processCDCData(recordValue);
}
})
.using(OffsetCommitPolicy.always())
.build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
private void processCDCData(String json) {
System.out.println("收到变更数据:" + json);
// 这里可以将数据存入 Kafka、Elasticsearch 或 MySQL
//或者使用redis的发布订阅,将数据推送到flink的source,source里就不再需要jdbc的方式去获取数据了,直接从redis中获取数据
//可以参考博主之前发的redis发布订阅的文章
}
}
总结
- 这个方案就比触发器的省数据库的资源
- 增加的资源就是kafka或redis
- 另外就是触发器方式有记录,当然不用触发器方式也可有记录
最近也在分享高软比较,才分享到基于构件的软件工程,这一系列何尝不是构件组合,反正吧关键还是要有思路,有架构思维。希望能帮到大家,有兴趣的可以关注我的个微:码技加油站。