springboot集成flink实现DM数据库同步到ES

发布于:2025-03-16 ⋅ 阅读:(23) ⋅ 点赞:(0)

前言

       今天分享的其实是一个面试上机方案,就是监测DM数据库数据,同步到ES,使用flink实现。基本套路,其实也没啥好说的,非要说也就是,国家队还是很多不跟你玩啊,虽然flink有阿里在背后,但是确实也没有带DM玩。也许是DM不配合,万一真有互联网公司带着玩,政府部门估计不敢用了,哈哈。


一、DM数据库连接

       我这里是使用的DBeaver连接DM数据库的,自己建的驱动,以前有分享怎么建,这里就不细说。
       看效果检查数据也是使用DBeaver连接的ES,ES要证书什么的,这里也是使用建驱动连接,跟DM一样的。
自建驱动连接DM、ES
ES免证书连接驱动下载
连接DM数据库配置驱动
在这里插入图片描述

二、实现方案

  1. 监视数据变化,DM数据库自己完成,采用触发器监视
  2. flink监视第一步监视的数据表,flink没有直接binlog这种方式监视DM数据库
  3. flink实现周期监视变更记录表
  4. 处理增、删、改业务逻辑同步ES

三、上代码

1.创建触发器

REATE OR REPLACE TRIGGER DB库名.RC_PASS_RECORD_CHANGE_LOG_TRIGGER
AFTER INSERT OR UPDATE OR DELETE ON DB库名.RC_PASS_RECORD
FOR EACH ROW
BEGIN
    IF INSERTING THEN
        -处理插入
        INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('INSERT', :new.ID);
    ELSIF UPDATING THEN
        -处理更新
        INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('UPDATE', :new.ID);
    ELSIF DELETING THEN
        -处理删除
        INSERT INTO RC_PASS_RECORD_CHANGE_LOG (CHANGE_TYPE, CHANGE_ID) VALUES ('DELETE', :old.ID);
    END IF;
END;

2.flink代码

这里集成Springboot,所以先建最新的Springboot项目,引入flink的依赖、DM数据库、ES的连接依赖。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zw</groupId>
    <artifactId>olap-zw</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>olap-zw</name>
    <description>olap-zw</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
        <skipTests>true</skipTests>

        <flink.version>1.20.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <!--<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>-->

       <!-- <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.6</version>
        </dependency>-->

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <!--<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.18</version>
        </dependency>-->

        <dependency>
            <groupId>com.dameng</groupId>
            <artifactId>DmJdbcDriver18</artifactId>
            <version>8.1.3.62</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>3.0.1-1.17</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>



    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>

            <!-- Flink打包方式一 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.zw.olapzw.OlapZwApplication</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

代码包结构与文件
在这里插入图片描述
配置文件

spring.application.name=olap-zw

spring.datasource.url=jdbc:dm://192.168.1.22:5236?schema=DB库名&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false
spring.datasource.username=账号
spring.datasource.password=密码
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
#spring.datasource.druid.initial-size=5
#spring.datasource.druid.max-active=10
#spring.datasource.druid.min-idle=5
#spring.datasource.druid.max-wait=60000
#spring.datasource.druid.validation-query=SELECT 1
#spring.datasource.druid.test-while-idle=true
#spring.datasource.druid.time-between-eviction-runs-millis=30000
#spring.datasource.druid.min-evictable-idle-time-millis=60000


es.host=192.168.1.21
es.port=9200

logging.level.root=error
logging.level.com.zw.olapzw.sink=error
logging.level.com.zw.olapzw.source=error

entity -> SourceDataEntity

package com.zw.olapzw.entity;

import lombok.Data;

import java.util.Date;

/**
 * @author zwmac
 */
@Data
public class SourceDataEntity {

    private String id;

    private Long deviceId;

    private Long gateId;

    private String authObjCode;

    /**
     * 变更类型,来源触发器插入的标识
     */
    private String changeType;

    /**
     * 同步时间
     */
    private Date storageTime;
}

sink -> ResultSinkDataEntitySink

package com.zw.olapzw.sink;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.util.ConnUtil;
import com.zw.olapzw.util.ESClientUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author zwmac
 */
@Slf4j
public class ResultSinkDataEntitySink extends RichSinkFunction<SourceDataEntity> {

    @Resource
    private ConnUtil connUtil;

    @Override
    public void invoke(SourceDataEntity record, Context context) throws Exception {
        super.invoke(record, context);
        //收到的数据
        log.info("Start sink data entity");
        log.info("Sink data: {}", record);


        //设置存储时间
        record.setStorageTime(DateUtil.date());

        //组织sql,利用反射
        JSONObject dataJson = JSONUtil.parseObj(record);

        log.info("-- 同步数据dataJson: {}", dataJson);

        //TODO 连接ES,存储到ES
        // 连接 Elasticsearch
        ConnUtil connUtil = SpringUtil.getBean(ConnUtil.class);
        String es_host = connUtil.getEsHost();
        String es_port = connUtil.getEsPort();
        RestHighLevelClient restHighLevelClient = ESClientUtil.getClient(es_host, Integer.parseInt(es_port));

        //使用RestHighLevelClient创建索引

        //根据changeType判断
        String changeType = record.getChangeType();
        switch (changeType) {
            case "INSERT":
                //创建索引
                sinkInsert(record, dataJson, restHighLevelClient);
                break;
            case "UPDATE":
                sinkUpdate(record, dataJson, restHighLevelClient);
                break;
            case "DELETE":
                sinkDel(record, dataJson, restHighLevelClient);
                break;
        }

        //关闭连接
        restHighLevelClient.close();


        log.info("end sink data entity");

    }

    /**
     * 删除数据
     * @param record
     * @param dataJson
     * @param restHighLevelClient
     * @throws IOException
     */
    private void sinkDel(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {
        //使用restHighLevelClient删除数据
        DeleteRequest deleteRequest = new DeleteRequest("test_index", record.getId());

        DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
        log.error("--- ES使用DeleteRequest删除返回deleteResponse: {}", deleteResponse);
    }

    /**
     * 更新数据
     * @param record
     * @param dataJson
     * @param restHighLevelClient
     * @throws IOException
     */
    private void sinkUpdate(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {
        //使用restHighLevelClient更新数据
        UpdateRequest updateRequest = new UpdateRequest("test_index", record.getId());
        //String jsonStr = JSONUtil.toJsonStr(dataJson);
        //updateRequest.doc("data", jsonStr);

        Map<String, Object> updateFields = new HashMap<>();
        updateFields.put("deviceId", record.getDeviceId());
        updateFields.put("id", record.getId());
        updateFields.put("authObjCode", record.getAuthObjCode());
        updateFields.put("gateId", record.getGateId());

        if (MapUtil.isNotEmpty(updateFields)){
            updateRequest.doc(updateRequest);

            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.error("---- ES使用UpdateRequest更新返回: {}", updateResponse);

        }else {
            log.error("------- es更新时 updateMap is empty");
        }
    }

    /**
     * 插入数据
     * @param record
     * @param dataJson
     * @param restHighLevelClient
     * @throws IOException
     */
    private void sinkInsert(SourceDataEntity record, JSONObject dataJson, RestHighLevelClient restHighLevelClient) throws IOException {
        //使用restHighLevelClient存储数据
        IndexRequest indexRequest = new IndexRequest("test_index");
        indexRequest.id(record.getId());

        Map<String, Object> insertFields = new HashMap<>();
        insertFields.put("deviceId", record.getDeviceId());
        insertFields.put("id", record.getId());
        insertFields.put("authObjCode", record.getAuthObjCode());
        insertFields.put("gateId", record.getGateId());

        indexRequest.source(insertFields);
        IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        log.error("------ ES使用IndexRequest插入返回indexResponse: {}", indexResponse);
    }
}

source -> ResultSourceDataEntitySource

package com.zw.olapzw.source;

import cn.hutool.extra.spring.SpringUtil;
import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.util.ConnUtil;
import dm.jdbc.util.StringUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.springframework.stereotype.Component;

import java.sql.*;

/**
 * @author zwmac
 */
@Component
@Slf4j
public class ResultSourceDataEntitySource extends RichSourceFunction<SourceDataEntity> {
    @Resource
    private ConnUtil connUtil;


    String changeLogSql = "SELECT * FROM LAMP_ZF.RC_PASS_RECORD_CHANGE_LOG RPRCL";

    String recordSql = "SELECT * FROM LAMP_ZF.RC_PASS_RECORD RPR";

    Long startId = 0L;

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<SourceDataEntity> sourceContext) throws Exception {
        //TODO 从数据库中读取数据,这块要是跟springboot结合起来,就可以用一些orm框架,等等项目上的一些东西了
        log.info("Start source data entity");
        try {
            ConnUtil connUtil = SpringUtil.getBean(ConnUtil.class);
            String jdbcUrl = connUtil.getJdbcUrl();
            String jdbcUsername = connUtil.getJdbcUsername();
            String jdbcPassword = connUtil.getJdbcPassword();
            Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword);

            Statement statement = connection.createStatement();

            // 执行查询语句
            while (isRunning) {
                StringBuffer sbf = new StringBuffer();
                sbf.append(changeLogSql);
                sbf.append(" WHERE RPRCL.ID > ");
                sbf.append(startId);
                sbf.append(" ORDER BY RPRCL.ID ASC");
                String querySql = sbf.toString();
                //log.info(querySql);

                log.error("---查询sql:" + querySql);

                ResultSet resultSet = statement.executeQuery(querySql);

                // 处理查询结果
                while (resultSet.next()) {
                    // 读取每一行数据
                    Long id = resultSet.getLong("ID");
                    startId = id;

                    //CHANGE_TYPE
                    String changeType = resultSet.getString("CHANGE_TYPE");
                    //CHANGE_ID
                    String changeId = resultSet.getString("CHANGE_ID");
                    //CREATE_TIME
                    Date changeDate = resultSet.getDate("CREATE_TIME");

                    SourceDataEntity sourceDataEntity = new SourceDataEntity();
                    sourceDataEntity.setChangeType(changeType);

                    //如果是删除就不查了
                    if("DELETE".equals(changeType)) {
                        sourceDataEntity.setId(changeId);
                    }else {

                        //根据类型查询对应的数据
                        StringBuffer recordSbf = new StringBuffer();
                        recordSbf.append(recordSql);
                        recordSbf.append(" WHERE RPR.ID = '" + changeId + "'");
                        String queryRecordSql = recordSbf.toString();
                        log.error("-- 查询记录sql:" + queryRecordSql);

                        Statement recordSm = connection.createStatement();
                        ResultSet recordRs = recordSm.executeQuery(queryRecordSql);


                        while (recordRs.next()) {

                            //解析数据
                            sourceDataEntity.setId(recordRs.getString("ID"));
                            sourceDataEntity.setDeviceId(recordRs.getLong("DEVICE_ID"));
                            sourceDataEntity.setGateId(recordRs.getLong("GATE_ID"));
                            sourceDataEntity.setAuthObjCode(recordRs.getString("AUTH_OBJ_CODE"));

                        }
                    }
                    if (sourceDataEntity.getId() != null){
                        //解决物理删除后的报错
                        sourceDataEntity.setStorageTime(changeDate);
                        sourceContext.collect(sourceDataEntity);
                    }

                }
                //调试用的,生产可以根据情况加活不加
                Thread.sleep(10000L);
            }


        } catch (SQLException e) {
            log.error(e.getMessage());
        }

    }


    @Override
    public void cancel() {
        isRunning = false;
    }
}

util -> ConnUtil

package com.zw.olapzw.util;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@Data
public class ConnUtil {
    @Value("${spring.datasource.url}")
    private String jdbcUrl;

    @Value("${spring.datasource.username}")
    private String jdbcUsername;

    @Value("${spring.datasource.password}")
    private String jdbcPassword;

    @Value("${es.host}")
    private String esHost;

    @Value("${es.port}")
    private String esPort;


}

util -> ESClientUtil

package com.zw.olapzw.util;

import lombok.Data;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author zwmac
 */
public class ESClientUtil {


    private static RestHighLevelClient restHighLevelClient;

    public static RestHighLevelClient getClient(String host, int port) {
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
            .setHttpClientConfigCallback(new HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    return httpClientBuilder;
                }
            });
        
        restHighLevelClient = new RestHighLevelClient(builder);
        return restHighLevelClient;
    }

    public static void closeClient() {
        try {
            if (restHighLevelClient != null) {
                restHighLevelClient.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

OlapZwApplication

package com.zw.olapzw;

import com.zw.olapzw.entity.SourceDataEntity;
import com.zw.olapzw.sink.ResultSinkDataEntitySink;
import com.zw.olapzw.source.ResultSourceDataEntitySource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;

/**
 * @author zwmac
 */
@SpringBootApplication
public class OlapZwApplication {

    /*public static void main(String[] args) {

        //SpringApplication.run(OlapZwApplication.class, args);

        //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);

        //开启checkpoint,每隔5秒钟做一次checkpoint
        env.enableCheckpointing(5000L);

        //指定checkpoint的一致性语义
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //设置任务关闭的时候保留最后一次checkpoint数据
        checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());

        //重试策略设置
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));

        //源数据
        DataStreamSource<SourceDataEntity> streamSource = env.addSource(new ResultSourceDataEntitySource(), "ResultSourceDataEntitySource");

        //下层处理
        streamSource.addSink(new ResultSinkDataEntitySink());


        System.out.println("Hello, OlapZwApplication!");
        try {
            env.execute("达梦数据库变更数据同步");
        } catch (Exception e) {
            System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());
            throw new RuntimeException(e);
        }
    }
*/
    public static void main(String[] args) {

        SpringApplication.run(OlapZwApplication.class, args);
        System.out.println("OlapZwApplication started");
    }


    @Bean
    public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
        return args -> {
            //获取flink的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置并行度
            env.setParallelism(1);

            //开启checkpoint,每隔5秒钟做一次checkpoint
            env.enableCheckpointing(5000L);

            //指定checkpoint的一致性语义
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

            //设置任务关闭的时候保留最后一次checkpoint数据
            checkpointConfig.enableUnalignedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.deleteOnCancellation());

            //重试策略设置
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));

            //源数据
            SourceFunction source = new ResultSourceDataEntitySource();

            //添加数据源到运行环境
            DataStreamSource<SourceDataEntity> streamSource = env.addSource(source, "ResultSourceDataEntitySource");

            //下游处理逻辑
            streamSource.addSink(new ResultSinkDataEntitySink());

            try {
                env.execute("达梦数据库变更数据同步");
            } catch (Exception e) {
                System.out.println("达梦数据库变更数据同步,原因:" + e.getMessage());
                throw new RuntimeException(e);
            }

            System.out.println("flink CDC started");
        };
    }

}


总结

  • 其实感觉没啥好说的,整个代码后面分享到csdn的gitCode