Flink 通过 Chunjun Oracle LogMiner 实时读取 Oracle 变更日志并写入 Doris 的方案

发布于:2025-03-22 ⋅ 阅读:(48) ⋅ 点赞:(0)

一、 技术背景

在大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris,以支持 数据分析、BI 报表、实时数据仓库 等应用。

本方案基于 Flink + Chunjun,通过 Oracle LogMiner 解析 Redo Log,实现 低延迟 写入Doris。

 

二、 关键技术

1、 Oracle LogMiner

LogMiner 是 Oracle 提供的 redo log 解析工具,用于跟踪 INSERTUPDATEDELETE 操作。

使用LogMiner需要现在Oracle中开启,具体开启操作见:Oracle配置LogMiner

 

2、 Chunjun 的 LogMiner 关键流程

Chunjun(原 FlinkX)是 Flink 生态的数据同步框架,支持多种数据源连接器(如 Oracle、MySQL、PostgreSQL、Doris)。
其中 Chunjun Oracle LogMiner Source 用于解析 Oracle Redo Log 并转换为 Flink 数据流

如下整个流程架构:

在这里插入图片描述

Flink任务启动后

  1. 通过Chunjun的oracle logMiner连接器, 建立 Oracle 连接,启动 LogMiner 解析 Redo Log。
  2. 实时监听 V$LOGMNR_CONTENTS,解析变更数据并转换为 Flink 事件流。具体地会将Oracle不同的操作日志解析为如下数据类型即重放数据操作,
  3. Flink 任务处理数据,完成转换、清洗等操作。
  4. Flink Sink 组件(Chunjun Doris Sink)将数据写入 Doris
操作类型 before(旧数据) after(新数据) Flink 处理逻辑
INSERT {新数据} 直接插入
UPDATE {旧数据} {新数据} 先删除旧数据,再插入新数据
DELETE {旧数据} 删除数据

最后如下示例flink sql:


CREATE TABLE source  
(  
    ID             int,  
    NAME          string  
) WITH (  
      'connector' = 'oraclelogminer-x'  
      ,'url' = 'jdbc:oracle:thin:@//xxx:1521/ORCL'  
      ,'username' = 'system'  
      ,'password' = 'xxx'  
      ,'cat' = 'insert,delete,update'  
      ,'table' = 'TEST.TEST_USER'  
      ,'timestamp-format.standard' = 'SQL'  
      );  
  
  
CREATE TABLE sink  
(  
     k4             int,  
     k3          string  
) WITH (  
'connector' = 'doris-x',  
'schema'='demo',  
      'password' = 'xxx',  
      'table-name' = 'mytable',  
      'url' = 'jdbc:mysql://xxx:9030',  
      'username' = 'root',  
      'sink.parallelism' = '1',  
      'lookup.error-limit' = '100',  
      'lookup.cache-type' = 'LRU',  
      'lookup.parallelism' = '1',  
      'lookup.cache.ttl' = '60000',  
      'lookup.cache.max-rows' = '10000',  
      'writeMode'='UPSERT'  
      );  
  
  
insert into sink  
select ID as k4, NAME as k3  
from source;  
  

 

3、修复 Chunjun Oracle LogMiner 问题

在实际使用中,Chunjun Oracle LogMiner 会遇到以下问题:

  1. 关于全量增量读数据的问题
//LogMinerConfig,没有全量同步的外部配置,默认是增量读取数据
private boolean enableFetchAll = true;

  1. 无法获取监听的表
//LogMinerListener 中的LogMinerConfig没有set table的地方,
//即无法获取被监听的表,改成直接获取
logMinerConfig.getListenerTables(); 

  1. PavingData和Split 不能同时开启,默认都开启,将PavingData关闭

 


网站公告

今日签到

点亮在社区的每一天
去签到