paimon---同步mysql数据到paimon表中

发布于:2025-03-14 ⋅ 阅读:(13) ⋅ 点赞:(0)

1.1、mysql源表

CREATE TABLE `mysql_orders` (
  `order_id` varchar(100) NOT NULL,
  `user_id` varchar(100) DEFAULT NULL,
  `amount` decimal(10,2) DEFAULT NULL,
  `update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  PRIMARY KEY (`order_id`)
)

mysql 开启bin_log, 设置ROW

1.2、flink cdc同步mysql数据

参考: https://blog.csdn.net/wuxintdrh/article/details/146165736

CREATE TABLE mysql_cdc_source (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10,2),
    update_time TIMESTAMP(3),
    dt STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'chb1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'paimon_test',
    'table-name' = 'mysql_orders',
    'server-time-zone' = 'Asia/Shanghai'  -- 时区配置(避免时间偏差)
);
select * from mysql_cdc_source;


1.3、同步到paimon

创建paimon表

CREATE TABLE orders (
    order_id STRING PRIMARY KEY NOT ENFORCED,
    user_id STRING,
    amount DECIMAL(10,2),
    update_time TIMESTAMP(3),
	dt STRING
) WITH (
    'merge-engine' = 'deduplicate',       -- 默认去重引擎,保留最新记录
    'changelog-producer' = 'input',       -- 直接存储 CDC 的原始变更日志
    'bucket' = '4',                       -- 分桶优化写入性能
    'snapshot.time-retained' = '7d'       -- 保留 7 天快照
);

同步数据

INSERT INTO paimon_catalog.`default`.orders
SELECT 
    order_id, 
    user_id, 
    amount, 
    update_time, 
    DATE_FORMAT(update_time, 'yyyy-MM-dd') AS dt  -- 动态分区
FROM default_catalog.default_database.mysql_cdc_source;

查询paimon表:

select * from paimon_catalog.`default`.orders;

报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

End of exception on server side

排查发现jobManager资源充足,taskManager slot还有可用,taskManager memory资源偏小,调大资源后运行正常。

二、通过paimon-flink-action同步数据

参考:https://paimon.apache.org/docs/1.0/cdc-ingestion/mysql-cdc/
报错:ClassNotFoundException: org.apache.kafka.connect.errors.ConnectException,引入connect-api-3.2.1.jar

又报错: java.lang.NoSuchMethodError: io.debezium.config.Field.withType(Lorg/apache/kafka/common/config/ConfigDef$Type;)Lio/debezium/config/Field;