【微服务】springboot3 集成 Flink CDC 1.17 实现mysql数据同步

发布于:2024-10-13 ⋅ 阅读:(13) ⋅ 点赞:(0)

目录

一、前言

二、常用的数据同步解决方案

2.1 为什么需要数据同步

2.2 常用的数据同步方案

2.2.1 Debezium

2.2.2 DataX

2.2.3 Canal

2.2.4 Sqoop

2.2.5 Kettle

2.2.6 Flink CDC

三、Flink CDC介绍

3.1 Flink CDC 概述

3.1.1 Flink CDC 工作原理

3.2  Flink CDC数据同步优势

3.3 Flink CDC 适用范围

四、Java集成Flink CDC同步mysql数据

4.1 组件版本选择

4.2 数据准备

4.3 导入组件依赖

4.4 Java代码实现过程

4.4.1 自定义反序列化器

4.4.2 自定义Sink输出

4.4.3 启动任务类

4.4.4 效果测试

五、与springboot整合过程

5.1 自定义监听类

5.2 效果测试

六、写在文末


一、前言

在微服务开发模式下,系统的数据源往往不是一个单一的来源,在实际项目中,往往是多种异构数据源的组合,比如核心业务数据在mysql,日志分析数据在hbase,clickhouse,es等,但是不同的数据源需要配合完成某一类业务的时候,就涉及到数据的整合或数据同步问题,尤其是数据同步的场景可以说是非常常见的,那么有哪些解决方案呢?

二、常用的数据同步解决方案

2.1 为什么需要数据同步

当系统发展到一定阶段,尤其是系统的规模越来越大,业务体量也不断扩大的时候,一个系统可能会用到多种数据存储中间件,而不再是单纯的mysql,pgsql等,甚至一个系统中一个或多个微服务无法再满足业务的需求,而需要单独做数据存储的服务,在类似的场景下,很难避免新的服务需要从原有的微服务中抽取数据,或定期做数据的同步处理,诸如此类的场景还有很多。

2.2 常用的数据同步方案

下图列举了几种主流的用于解决数据同步场景的方案

关于图中几种技术,做如下简单的介绍,便于做技术选型作为对比

2.2.1 Debezium

Debezium是国外⽤户常⽤的CDC组件,单机对于分布式来说,在数据读取能力的拓展上,没有分布式的更具有优势,在大数据众多的分布式框架中(Hive、Hudi等)Flink CDC 的架构能够很好地接入这些框架。

2.2.2 DataX

DataX无法支持增量同步。如果一张Mysql表每天增量的数据是不同天的数据,并且没有办法确定它的产生时间,那么如何将数据同步到数仓是一个值得考虑的问题。DataX支持全表同步,也支持sql查询的方式导入导出,全量同步一定是不可取的,sql查询的方式没有可以确定增量数据的字段的话也不是一个好的增量数据同步方案。

2.2.3 Canal

Canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。Canal主要支持了MySQL的Binlog解析,将增量数据写入中间件中(例如kafka,Rocket MQ等),但是无法同步历史数据,因为无法获取到binlog的变更。

2.2.4 Sqoop

Sqoop主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递。Sqoop将导入或导出命令翻译成mapreduce程序来实现,这样的弊端就是Sqoop只能做批量导入,遵循事务的一致性,Mapreduce任务成功则同步成功,失败则全部同步失败。

2.2.5 Kettle

Kettle是一款开源的数据集成工具,主要用于数据抽取、转换和加载(ETL)。它提供了图形化的界面,使得用户可以通过拖拽组件的方式来设计和执行数据集成任务。

  • Kettle 被广泛应用于数据仓库构建、数据迁移、数据清洗等多种场景。
  • 虽然 Kettle 在处理中小型数据集时表现良好,但在处理大规模数据集时可能会遇到性能瓶颈,尤其是在没有进行优化的情况下。
  • Kettle 在运行时可能会消耗较多的内存和 CPU 资源,特别是当处理复杂的转换任务时。

虽然 Kettle 的基本操作相对简单,但对于高级功能和复杂任务的设计,用户仍需投入一定的时间和精力来学习和掌握。

2.2.6 Flink CDC

Flink CDC 基本都弥补了以上框架的不足,将数据库的全量和增量数据一体化地同步到消息队列和数据仓库中;也可以用于实时数据集成,将数据库数据实时入湖入仓;无需像其他的CDC工具一样需要在服务器上进行部署,减少了维护成本,链路更少;完美套接Flink程序,CDC获取到的数据流直接对接Flink进行数据加工处理,一套代码即可完成对数据的抽取转换和写出,既可以使用flink的DataStream API完成编码,也可以使用较为上层的FlinkSQL API进行操作。

三、Flink CDC介绍

3.1 Flink CDC 概述

Flink CDC(Change Data Capture)是指使用 Apache Flink 流处理框架来捕获数据库中的变更记录(即数据变更日志)。这种方式允许开发者实时监控并处理数据库表中的更改事件,比如插入、更新或删除操作。通过这种方式,可以实现数据库之间或者数据库与外部系统的实时数据同步。

3.1.1 Flink CDC 工作原理

Flink CDC 主要依赖于数据库提供的日志或者事务记录,如 MySQL 的 Binlog、PostgreSQL 的 WAL(Write-Ahead Logs)等。Flink CDC 使用专门的连接器(Connector)读取这些日志文件,并将其转换成 Flink 可以处理的数据流。之后,这些数据流可以被进一步处理,如过滤、聚合、转换等,最终输出到目标系统,如另一个数据库、消息队列或其他存储系统。

3.2  Flink CDC数据同步优势

Flink CDC(Change Data Capture)是一种用于捕获数据库变更事件的技术,它能够实现实时的数据同步和流式处理。使用 Apache Flink CDC 进行数据同步具有以下几个主要优势:

  1. 实时性:Flink CDC 能够捕捉到数据库中的任何更改(如插入、更新、删除等),并立即将这些更改作为事件流发送出去,实现了真正的实时数据同步。

  2. 高性能:Flink 是一个高度可扩展的流处理框架,其内置的优化机制使得即使在处理大规模数据流时也能保持高性能。Flink CDC 利用了这些优化技术来保证数据同步的高效执行。

  3. 易用性:Flink 提供了丰富的 API 和预构建的连接器(Connectors),使得集成数据库变得简单。使用 Flink CDC 可以通过配置文件或简单的编程接口轻松设置数据捕获任务。

  4. 灵活性:除了基本的 CDC 功能之外,Flink 还支持各种复杂的流处理操作,如窗口计算、状态管理等。这意味着不仅可以捕获变更数据,还可以在数据同步过程中加入各种数据处理逻辑。

  5. 兼容性广泛:Flink CDC 支持多种数据库系统,包括 MySQL、PostgreSQL、Oracle、SQL Server 等,提供了广泛的兼容性和选择空间。

  6. 容错能力:Flink 内置了强大的状态管理和检查点机制,即使在发生故障的情况下也能保证数据的一致性和准确性。这对于需要高可靠性的数据同步场景非常重要。

  7. 无侵入性:Flink CDC 通常是无侵入性的,不需要对源数据库进行任何修改或添加额外的触发器等组件,减少了对现有系统的干扰。

  8. 支持多种部署方式:无论是部署在本地集群还是云端,Flink 都能很好地支持,这为不同规模的企业提供了灵活的选择。

  9. 社区支持:Apache Flink 拥有一个活跃的开源社区,这不仅意味着有丰富的文档和教程可供参考,也意味着遇到问题时可以快速获得帮助和支持。

Flink CDC 在数据同步方面提供了一个强大且灵活的解决方案,适合那些需要高性能、实时数据处理的应用场景。

3.3 Flink CDC 适用范围

Flink CDC(Change Data Capture)连接器是 Apache Flink 社区为 Flink 提供的一种用于捕获数据库变更事件的工具。它允许用户从关系型数据库中实时捕获表的数据变更,并将这些变更事件转化为流式数据,以便进行实时处理,比如你需要将mysql的数据同步到另一个mysql数据库,就需要使用mysql连接器,如果需要同步mongodb的数据,则需要使用mongodb的连接器。截止到Flink CDC 2.2 为止,支持的连接器:

支Flink CDC 持的Flink版本,在实际使用的时候需要根据版本的对照进行选择:

四、Java集成Flink CDC同步mysql数据

4.1 组件版本选择

网上很多关于Flink CDC的版本都是1.13左右的,这个在当前JDK比较新的版本下已经出现了较多的不兼容,本文以JDK17为基础版本进行说明

编号 组件名称 版本
1

JDK

17

2

springboot

3.2.2

3

mysql

8.0.23

4

flink cdc

1.17.0

4.2 数据准备

找一个可以用的mysql数据库,在下面创建两张表,一张tb_role,另一张tb_role_copy,仅表名不一样

CREATE TABLE `tb_role` (
  `id` varchar(32) NOT NULL COMMENT '主键',
  `role_code` varchar(32) NOT NULL COMMENT '版本号',
  `role_name` varchar(32) DEFAULT NULL COMMENT '角色名称',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='tb角色表';

4.3 导入组件依赖

pom中添加如下依赖

<!-- Flink CDC 1.17.0版本,与springboot3.0进行整合使用的版本-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.17.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>1.17.1</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.4.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.0.0-1.16</version>
</dependency>

4.4 Java代码实现过程

案例需求场景:通过Flink CDC ,监听tb_role表数据变化,写入tb_role_copy

4.4.1 自定义反序列化器

反序列化器的目的是为了解析flink cdc监听到mysql表数据变化的日志,以json的形式进行解析,方便对日志中的关键参数进行处理

package com.congge.flink.blog;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerSchema implements DebeziumDeserializationSchema<String> {

    /**
     * 封装的数据格式
     * {
     * "database":"",
     * "tableName":"",
     * "before":{"id":"","tm_name":""....},
     * "after":{"id":"","tm_name":""....},
     * "type":"c u d"
     * //"ts":156456135615
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建JSON对象用于存储最终数据
        JSONObject result = new JSONObject();
        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }
        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        //7.输出数据
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

4.4.2 自定义Sink输出

Sink即为Flink CDC的输出连接器,即监听到源表数据变化并经过处理后最终写到哪里,以mysql为例,我们在监听到tb_role表数据变化后,同步到tb_role_copy中去

package org.dromara.sync.flink.v2;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.sync.config.SyncDsTargetDbConfig;

import java.io.Serial;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * 自定义一个mysql的Sink输出器,将监听到的变化数据写到指定的数据库下面的表中
 * @author zcy
 */
@Slf4j
public class MyJdbcSink extends RichSinkFunction<String> {

    @Serial
    public static final long serialVersionUID = 3153039337754737517L;

    // 提前声明连接和预编译语句
    private Connection connection = null;
    private PreparedStatement insertStmt = null;
    private PreparedStatement updateStmt = null;
    private PreparedStatement preparedStatement = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        if(connection ==null){
            Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://IP:3306/db", "root", "123");
            connection.setAutoCommit(false);//关闭自动提交
        }
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(value);
        log.info("监听到数据变化,准备执行sql的变更,参数 jsonObject :【{}】",jsonObject);
        String type = jsonObject.getString("type");
        String tableName = "tb_role_copy";
        String database = jsonObject.getString("database");
        if(type.equals("insert")){
            JSONObject after = (JSONObject)jsonObject.get("after");
            Integer id = after.getInteger("id");
            String roleCode = after.getString("role_code");
            String roleName = after.getString("role_name");
            String sql = String.format("insert into %s.%s values (?,?,?)", database, tableName);
            insertStmt = connection.prepareStatement(sql);
            insertStmt.setInt(1, Integer.valueOf(id));
            insertStmt.setString(2, roleCode);
            insertStmt.setString(3, roleName);
            insertStmt.execute();
            connection.commit();
        } else if(type.equals("update")){
            JSONObject after = jsonObject.getJSONObject("after");
            Integer id = after.getInteger("id");
            String roleCode = after.getString("role_code");
            String roleName = after.getString("role_name");
            String sql = String.format("update %s.%s set role_code = ?, role_name = ? where id = ?", database, tableName);
            updateStmt = connection.prepareStatement(sql);
            updateStmt.setString(1, roleCode);
            updateStmt.setString(2, roleName);
            updateStmt.setInt(3, id);
            updateStmt.execute();
            connection.commit();
        } else if(type.equals("delete")){
            JSONObject after = jsonObject.getJSONObject("before");
            Integer id = after.getInteger("id");
            String sql = String.format("delete from %s.%s where id = ?", database, tableName);
            preparedStatement = connection.prepareStatement(sql);
            preparedStatement.setInt(1, id);
            preparedStatement.execute();
            connection.commit();
        }
    }

    @Override
    public void close() throws Exception {
        if(insertStmt != null){
            insertStmt.close();
        }
        if(updateStmt != null){
            updateStmt.close();
        }
        if(preparedStatement != null){
            preparedStatement.close();
        }
        if(connection != null){
            connection.close();
        }
    }
}

4.4.3 启动任务类

本例先以main程序运行,在实际进行线上部署使用时,可以打成jar包或整合springboot进行启动即可

package org.dromara.sync.flink.mock;


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.dromara.sync.flink.MyJdbcSchema;
import org.dromara.sync.flink.v2.MyJdbcSink;

public class FlinkCdcMainTest {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
            .hostname("IP")
            .port(3306)
            .username("root")
            .password("123456")
            .databaseList("db") // 监听的数据库列表
            .tableList("db.tb_role") // 监听的表列表
            .deserializer(new MyJdbcSchema())
            .startupOptions(StartupOptions.latest())
            .serverTimeZone("UTC")
            .build();

        DataStream<String> stream = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
        // 输出到控制台
//        stream.print();
        stream.addSink(new MyJdbcSink());
        env.execute("Flink CDC Job");
    }
}

4.4.4 效果测试

运行上面的代码,通过控制台可以看到任务已经运行起来了,监听并等待数据源数据变更


测试之前,确保两张表数据是一致的

此时为tb_role表增加一条数据,很快控制台可以监听并输出相关的日志

而后,tb_role_copy表同步新增了一条数据

五、与springboot整合过程

在实际项目中,通常会结合springboot项目整合使用,参考下面的使用步骤

5.1 自定义监听类

可以直接基于启动类改造,也可以新增一个类,实现ApplicationRunner接口,重写里面的run方法

  • 不难发现,run方法里面的代码逻辑即是从上述main方法运行任务里面拷贝过来的;

package org.dromara.sync.flink.v2;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.dromara.sync.config.SyncDsSourceDbConfig;
import org.dromara.sync.flink.MyJdbcSchema;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 启动加载,将该类作为一个监听任务的进程启动并执行
 * @author Evans
 */
@Component
public class MysqlDsListener implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
            .hostname("IP")
            .port(13306)
            .username("root")
            .password("123456")
            .databaseList("db")
            .tableList("db.tb_role")
            .deserializer(new MyJdbcSchema())
            .startupOptions(StartupOptions.latest())
            .serverTimeZone("UTC")
            .build();

        DataStream<String> stream = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
        // 输出到控制台
//        stream.print();
        stream.addSink(new MyJdbcSink());
        env.execute("Flink CDC Job");
    }
}

5.2 效果测试

启动工程之后,相当于是通过flink cdc启动了一个用于监听数据变更的后台进程

然后我们再在数据库tb_role表增加一条数据,控制台可以看到输出了相关的日志

此时再检查数据表,可以发现tb_role_copy表新增了一条一样的数据

六、写在文末

本文通过较大的篇幅详细介绍了Flink CDC相关的技术,最后通过一个实际案例演示了使用Flink CDC同步mysql表数据的示例,希望对看到的同学有用,本篇到此结束感谢观看。