Flink CDC Pipeline mysql to doris

发布于:2025-04-05 ⋅ 阅读:(26) ⋅ 点赞:(0)

版本兼容

flink 与 flink-cdc版本兼容
在这里插入图片描述

flink 与doris版本兼容
在这里插入图片描述

运行同步程序

最终在 flink-1.20.1flink-cdc-3.1.1 跑通测试

配置yaml文件

[root@chb1 flink-cdc-3.1.1]# cat mysql2doris.yaml 
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
  type: mysql
  hostname: chb1
  port: 3306
  username: root
  password: 123456
  tables: test.\.*
  server-id: 5400-5404

sink:
  type: doris
  fenodes: chb1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2
[root@chb1 flink-cdc-3.1.1]# 

提交任务

[root@chb1 flink-cdc-3.1.1]# ./bin/flink-cdc.sh mysql2doris.yaml 
Pipeline has been submitted to cluster.
Job ID: 4a71588006d5b5cf25f10101d613cb8b
Job Description: Sync MySQL Database to Doris

下面问题都是版本不兼容问题, 按照各种方法修改都没有左右,最后通过将 flink-1.19.1 改为 flink-1.20.1 问题解决

1、报错.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;
Exception in thread "main" java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;
        at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.<clinit>(HistorizedRelationalDatabaseConnectorConfig.java:48)
        at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.<init>(MySqlSourceConfig.java:120)
        at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:356)
        at org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:296)
        at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.createDataSource(MySqlDataSourceFactory.java:170)
        at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:91)
        at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.translate(DataSourceTranslator.java:48)
        at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:103)
        at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)
        at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)

flink lib 下导入

debezium-connector-mysql-2.5.0.Final.jar
debezium-core-2.5.0.Final.jar
2、报错 Exception in thread "main" java.lang.NoClassDefFoundError: io/debezium/spi/topic/TopicNamingStrategy

flink lib 引入 debezium-api-2.5.0.Final.jar

3、Exception in thread "main" java.lang.NoSuchFieldError: DATABASE_HISTORY
Exception in thread "main" java.lang.NoSuchFieldError: DATABASE_HISTORY
        at io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.lambda$new$0(MySqlConnection.java:590)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:176)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1603)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at io.debezium.config.Configuration$6.keys(Configuration.java:1659)
        at io.debezium.config.Configuration.asProperties(Configuration.java:1824)
        at io.debezium.config.Configuration.asProperties(Configuration.java:1812)
        at io.debezium.config.Configuration.copy(Configuration.java:755)
        at io.debezium.config.Configuration.edit(Configuration.java:988)
        at io.debezium.connector.mysql.MySqlConnection$MySqlConnectionConfiguration.<init>(MySqlConnection.java:596)
        at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection(DebeziumUtils.java:91)
        at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection(DebeziumUtils.java:84)
        at org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils.listTables(MySqlSchemaUtils.java:60)
        at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.getTableList(MySqlDataSourceFactory.java:276)
        at org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory.createDataSource(MySqlDataSourceFactory.java:170)
        at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.createDataSource(DataSourceTranslator.java:91)
        at org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator.translate(DataSourceTranslator.java:48)
        at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:103)
        at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)
        at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)

参考: mysql-cdc sql-client