flink写doris时的优化

发布于:2025-04-18 ⋅ 阅读:(31) ⋅ 点赞:(0)

1.概念

doris并擅长高频小量数据的导入

因为doris每一次数据导入都会在be节点上生成数据文件;如果高频导入小量数据,就会在存储层产生大量的小文件(必然会影响到后续的查询效率,也会对系统产生更多的compaction操作压力)

而flink是实时不断地往doris中插入数据,所以很容易出现上述问题;

怎么办?有两个办法:

  1. 在flink中先做一些按时间开窗后的轻度聚合,降低写入的数据量(在先flink端处理,后续的数据量变少了)
  2. 可以适当调大checkpoint间隔(10分钟),降低插入频率(原因是flink在做完checkpoint才往下游写数据)

方案1:开窗轻度聚合

1.例子

例子:
-- 分钟级聚合
CREATE TABLE doris_sink (
    window_start TIMESTAMP(3),
    total_count BIGINT,
    sum_value DECIMAL(16,2)
) WITH (
    'connector' = 'doris',
    'table.identifier' = 'db.table',
    'sink.batch.size' = '5000', 
    'sink.batch.interval' = '60s'
);

INSERT INTO doris_sink
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    COUNT(*) AS total_count,
    SUM(value) AS sum_value
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);

优化效果​​(示例):

时间窗口 原始数据量 聚合后数据量 写入压缩比
1秒 10000条/s 10000条/s 1:1
10秒 10000条/s 1000条/10s 10:1
1分钟 10000条/s 167条/min 60:1

 在flink端部分聚合,再写入doris,数据量变小了,效率自然提高

2.合适的使用场景:

场景特征 适用性 技术实现要点 收益
高并发写入(>1万条/秒) 滚动窗口聚合 + 计数窗口降频 减少 90% 小文件,避免 -235 错误

1

亚秒级查询需求 预计算指标 + 结果表写入 查询延迟从秒级降至毫秒级

3

多源数据关联 窗口内多流 Join + 聚合 避免 Doris 复杂查询,节省 30% CPU

5

精确统计需求 需写入原始明细数据 -

 (1)高并发写入场景

当上游数据源(如 Kafka)的写入并发量极高(例如每秒 10 万条以上)时,直接写入 Doris 可能导致以下问题:

  1. ​小文件过多​​:频繁写入会产生大量小文件,触发 Doris 的版本合并(Compaction)压力,可能引发错误。
  2. ​资源消耗大​​:高频写入导致 Doris BE 节点的 CPU 和 I/O 资源被 Compaction 任务占用,影响查询性能。

​解决方案​​:
在 Flink 中通过 ​​滚动窗口(如 5 秒窗口)​​ 或 ​​计数窗口(如每 1000 条)​​ 对数据进行预聚合,将多条数据合并为一条统计结果后再写入 Doris。例如:

DataStream<Event> stream = ...;
stream.keyBy(Event::getKey)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口
    .aggregate(new AvgAggregator()) // 聚合逻辑(如计算均值)
    .addSink(new DorisSink());

对应到sql 直接开窗5s

此方式可将写入频率降低 10 倍以上,减少 Doris 的写入压力

(2)低延迟查询需求场景

当业务需要​​亚秒级查询响应​​(如实时大屏、风控决策)时,直接写入原始数据可能导致:

  1. ​查询性能下降​​:原始数据量大,Doris 需实时聚合计算,增加查询耗时;
  2. ​存储成本高​​:原始明细数据占用大量存储空间。

解决方案​​:
在 Flink 中按时间窗口(如 1 分钟)预计算关键指标(如 PV、UV、GMV),仅将聚合结果写入 Doris。例如:

  • ​原始数据​​:用户点击事件(每秒 10 万条) → ​​聚合后​​:每分钟 PV 统计值(每秒 1 条)。
    此方式可提升 Doris 查询效率,同时节省存储资源

(3)数据预处理与清洗场景

当原始数据存在以下特征时,适合在 Flink 端聚合:

  1. ​冗余数据多​​:如重复日志、无效埋点;
  2. ​关联计算需求​​:需跨数据源关联(如用户行为数据与订单数据)。

​解决方案​​:
通过 Flink 窗口函数实现:

  • ​去重​​:使用 WindowFunction 过滤重复数据;
  • ​关联计算​​:在窗口内完成多流 Join,输出关联结果。
    例如,在 10 秒窗口内关联用户点击与加购行为,输出转化率指标,避免 Doris 中复杂的多表关联查询

(4)资源受限场景

当 Doris 集群资源(CPU、内存、磁盘)有限时,可通过以下方式优化:

  1. ​降低写入量​​:聚合后数据量减少 50%~90%,降低 Doris 存储和 Compaction 压力;
  2. ​延长 Compaction 周期​​:通过减少小文件数量,允许 Doris 合并任务更高效调度。

​参数调优建议​​:

  • Flink Checkpoint 间隔:从 5 秒调整为 30 秒~1 分钟,减少事务提交频率;
  • Doris Compaction 参数:调低 cumulative_size_based_promotion_min_size_mbytes(默认 64MB → 8MB),加速小文件合并;

方案2:调大 Checkpoint 间隔

生产环境测试数据​​:

Checkpoint间隔 吞吐量(events/s) 写入延迟(ms) CPU利用率
1分钟 12,000 50-100 75%
5分钟 28,000 30-80 65%
10分钟 35,000 20-60 58%

考一个对checkpoint的理解:flink是在做完checkpoint才往下游写数据?,比如说checkpoint的时间是1分钟,岂不是延迟就是一分钟?

结论:​​数据处理和状态快照是解耦的​​。调整 Checkpoint 间隔只会影响故障恢复时可能丢失的数据量(Recovery Time Objective),​​不会增加数据处理的固有延迟​;

具体例子(以第 N 分钟为例):

  1. ​0:00.000​

    • 用户A点击商品X → Kafka 生产事件
    • Flink 立即消费并处理,PV计数器+1 → 实时写入 Doris

  2. ​0:00.500​

    • CheckpointCoordinator 触发新一轮 Checkpoint
    • Source 算子注入 Barrier 到数据流(特殊标记,不影响正常数据处理)

  3. ​0:00.501-0:02.000​

    • Barrier 随数据流向下游传播
    • PV统计算子 ​​边处理新事件​​ 边接收 Barrier:
    • Doris 持续收到 PV=100 → 101 → 102... 的写入请求
  4. ​0:03.000​

    • 所有算子完成状态快照(耗时约2秒)
    • 快照存储到 HDFS(异步执行,不阻塞主线程)
  5. ​0:06.000​

    • Checkpoint 确认完成,JM 记录元数据;

正常情况:

  • 用户点击后 ​​500ms​​ 内即可在 Doris 查询到最新 PV(实际延迟仅网络+计算耗时)
  • Checkpoint 过程持续 ​​6秒​​,但期间 Doris 收到 ​​60次​​ 数据写入(每秒10次);

故障要恢复情况:

假设在 ​​0:50​​ 发生故障:

  • 从最近 Checkpoint(0:00 开始,0:06 完成)恢复
  • 状态回滚到 PV=100(Checkpoint 时的快照值)
  • ​但 Doris 实际已写入 PV=150​
  • Flink 通过事务机制保证最终 PV=150 + 恢复后新数据 的精确一次语义

这时候聪明的你又发现:

Doris 实际已写入 PV=150​​,相当于以及写入到下游的doris,是怎么让数据回滚的???

原因:Flink 在故障恢复时保证 Doris 已写入的 PV=150 数据不会导致重复计算,核心是通过​​两阶段提交(2PC)机制​​与​​事务性写入​​实现的,所以可以回滚数据

Checkpoint 与事务的阶段性控制​:

Flink 的 Checkpoint 过程与 Sink 的事务提交严格绑定,整个过程分为 ​​预提交(pre-commit)​​ 和 ​​正式提交(commit)​​ 两个阶段

  1. ​预提交阶段​​(Checkpoint 进行中)

    • Flink Sink 将计算结果(如 PV=100→150 的增量)写入 Doris 的​​临时存储位置​​(如临时表或事务日志),但​​未对外可见​​。
    • 此时 Doris 的 PV=150 ​​仅处于预提交状态​​,未实际生效。
  2. ​正式提交阶段​​(Checkpoint 确认完成)

    • 当 JobManager 收到所有算子的 Checkpoint 完成确认后,才会通知 Sink ​​提交事务​​。
    • Doris 将临时数据​​原子性替换为正式数据​​(如重命名临时文件或更新可见标志)

故障恢复时的回滚逻辑​

假设故障发生在 ​​0:50​​(Checkpoint 未完成):

  1. ​未完成的 Checkpoint 事务回滚​

    • Flink 从最近成功的 Checkpoint(PV=100)恢复状态。
    • 同时,Doris 中处于预提交状态的 PV=150 ​​会被自动清理​​(如删除临时表或撤销事务日志)。
  2. ​数据重放与幂等性保障​

    • Flink 会从 Source 端(如 Kafka)​​重放 Checkpoint 后的数据​​(0:06→0:50 的数据)。
    • Doris Sink 在写入时通过​​事务 ID 或唯一键​​实现幂等性,确保相同数据多次写入不会重复累加;

疑问:针对数据回滚的场景,doris能查询到 PV=150的数据吗

Doris 默认的隔离级别保证查询只能看到已提交的数据,所以查看不到PV=150的数据

其他调优手段:

1、开启 MiniBatch 聚合

table.exec.mini-batch.enabled = true
table.exec.mini-batch.size = 5000

2、配置 Doris 批量写入

sink.batch.size = 5000
sink.max-retries = 5 --最大可重试5次

3、异步 Compaction 调优

ALTER TABLE doris_table SET ("compaction_policy" = "time_series");

 


网站公告

今日签到

点亮在社区的每一天
去签到