1.概念
doris并不擅长高频、小量数据的导入;
因为doris每一次数据导入都会在be节点上生成数据文件;如果高频导入小量数据,就会在存储层产生大量的小文件(必然会影响到后续的查询效率,也会对系统产生更多的compaction操作压力)
而flink是实时不断地往doris中插入数据,所以很容易出现上述问题;
怎么办?有两个办法:
- 在flink中先做一些按时间开窗后的轻度聚合,降低写入的数据量(在先flink端处理,后续的数据量变少了)
- 可以适当调大checkpoint间隔(10分钟),降低插入频率(原因是flink在做完checkpoint才往下游写数据)
方案1:开窗轻度聚合
1.例子
例子:
-- 分钟级聚合
CREATE TABLE doris_sink (
window_start TIMESTAMP(3),
total_count BIGINT,
sum_value DECIMAL(16,2)
) WITH (
'connector' = 'doris',
'table.identifier' = 'db.table',
'sink.batch.size' = '5000',
'sink.batch.interval' = '60s'
);
INSERT INTO doris_sink
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS total_count,
SUM(value) AS sum_value
FROM source_table
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);
优化效果(示例):
时间窗口 | 原始数据量 | 聚合后数据量 | 写入压缩比 |
---|---|---|---|
1秒 | 10000条/s | 10000条/s | 1:1 |
10秒 | 10000条/s | 1000条/10s | 10:1 |
1分钟 | 10000条/s | 167条/min | 60:1 |
在flink端部分聚合,再写入doris,数据量变小了,效率自然提高
2.合适的使用场景:
场景特征 | 适用性 | 技术实现要点 | 收益 |
---|---|---|---|
高并发写入(>1万条/秒) | ✅ | 滚动窗口聚合 + 计数窗口降频 | 减少 90% 小文件,避免 -235 错误 1 |
亚秒级查询需求 | ✅ | 预计算指标 + 结果表写入 | 查询延迟从秒级降至毫秒级 3 |
多源数据关联 | ✅ | 窗口内多流 Join + 聚合 | 避免 Doris 复杂查询,节省 30% CPU 5 |
精确统计需求 | ❌ | 需写入原始明细数据 | - |
(1)高并发写入场景
当上游数据源(如 Kafka)的写入并发量极高(例如每秒 10 万条以上)时,直接写入 Doris 可能导致以下问题:
- 小文件过多:频繁写入会产生大量小文件,触发 Doris 的版本合并(Compaction)压力,可能引发错误。
- 资源消耗大:高频写入导致 Doris BE 节点的 CPU 和 I/O 资源被 Compaction 任务占用,影响查询性能。
解决方案:
在 Flink 中通过 滚动窗口(如 5 秒窗口) 或 计数窗口(如每 1000 条) 对数据进行预聚合,将多条数据合并为一条统计结果后再写入 Doris。例如:
DataStream<Event> stream = ...;
stream.keyBy(Event::getKey)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒滚动窗口
.aggregate(new AvgAggregator()) // 聚合逻辑(如计算均值)
.addSink(new DorisSink());
对应到sql 直接开窗5s
此方式可将写入频率降低 10 倍以上,减少 Doris 的写入压力
(2)低延迟查询需求场景
当业务需要亚秒级查询响应(如实时大屏、风控决策)时,直接写入原始数据可能导致:
- 查询性能下降:原始数据量大,Doris 需实时聚合计算,增加查询耗时;
- 存储成本高:原始明细数据占用大量存储空间。
解决方案:
在 Flink 中按时间窗口(如 1 分钟)预计算关键指标(如 PV、UV、GMV),仅将聚合结果写入 Doris。例如:
- 原始数据:用户点击事件(每秒 10 万条) → 聚合后:每分钟 PV 统计值(每秒 1 条)。
此方式可提升 Doris 查询效率,同时节省存储资源
(3)数据预处理与清洗场景
当原始数据存在以下特征时,适合在 Flink 端聚合:
- 冗余数据多:如重复日志、无效埋点;
- 关联计算需求:需跨数据源关联(如用户行为数据与订单数据)。
解决方案:
通过 Flink 窗口函数实现:
- 去重:使用
WindowFunction
过滤重复数据; - 关联计算:在窗口内完成多流 Join,输出关联结果。
例如,在 10 秒窗口内关联用户点击与加购行为,输出转化率指标,避免 Doris 中复杂的多表关联查询
(4)资源受限场景
当 Doris 集群资源(CPU、内存、磁盘)有限时,可通过以下方式优化:
- 降低写入量:聚合后数据量减少 50%~90%,降低 Doris 存储和 Compaction 压力;
- 延长 Compaction 周期:通过减少小文件数量,允许 Doris 合并任务更高效调度。
参数调优建议:
- Flink Checkpoint 间隔:从 5 秒调整为 30 秒~1 分钟,减少事务提交频率;
- Doris Compaction 参数:调低
cumulative_size_based_promotion_min_size_mbytes
(默认 64MB → 8MB),加速小文件合并;
方案2:调大 Checkpoint 间隔
生产环境测试数据:
Checkpoint间隔 | 吞吐量(events/s) | 写入延迟(ms) | CPU利用率 |
---|---|---|---|
1分钟 | 12,000 | 50-100 | 75% |
5分钟 | 28,000 | 30-80 | 65% |
10分钟 | 35,000 | 20-60 | 58% |
考一个对checkpoint的理解:flink是在做完checkpoint才往下游写数据?,比如说checkpoint的时间是1分钟,岂不是延迟就是一分钟?
结论:数据处理和状态快照是解耦的。调整 Checkpoint 间隔只会影响故障恢复时可能丢失的数据量(Recovery Time Objective),不会增加数据处理的固有延迟;
具体例子(以第 N 分钟为例):
0:00.000
- 用户A点击商品X → Kafka 生产事件
- Flink 立即消费并处理,PV计数器+1 → 实时写入 Doris
0:00.500
- CheckpointCoordinator 触发新一轮 Checkpoint
- Source 算子注入 Barrier 到数据流(特殊标记,不影响正常数据处理)
0:00.501-0:02.000
- Barrier 随数据流向下游传播
- PV统计算子 边处理新事件 边接收 Barrier:
- Doris 持续收到
PV=100 → 101 → 102...
的写入请求
0:03.000
- 所有算子完成状态快照(耗时约2秒)
- 快照存储到 HDFS(异步执行,不阻塞主线程)
0:06.000
- Checkpoint 确认完成,JM 记录元数据;
正常情况:
- 用户点击后 500ms 内即可在 Doris 查询到最新 PV(实际延迟仅网络+计算耗时)
- Checkpoint 过程持续 6秒,但期间 Doris 收到 60次 数据写入(每秒10次);
故障要恢复情况:
假设在 0:50 发生故障:
- 从最近 Checkpoint(0:00 开始,0:06 完成)恢复
- 状态回滚到 PV=100(Checkpoint 时的快照值)
- 但 Doris 实际已写入 PV=150
- Flink 通过事务机制保证最终 PV=150 + 恢复后新数据 的精确一次语义
这时候聪明的你又发现:
Doris 实际已写入 PV=150,相当于以及写入到下游的doris,是怎么让数据回滚的???
原因:Flink 在故障恢复时保证 Doris 已写入的 PV=150 数据不会导致重复计算,核心是通过两阶段提交(2PC)机制与事务性写入实现的,所以可以回滚数据
Checkpoint 与事务的阶段性控制:
Flink 的 Checkpoint 过程与 Sink 的事务提交严格绑定,整个过程分为 预提交(pre-commit) 和 正式提交(commit) 两个阶段
预提交阶段(Checkpoint 进行中)
- Flink Sink 将计算结果(如 PV=100→150 的增量)写入 Doris 的临时存储位置(如临时表或事务日志),但未对外可见。
- 此时 Doris 的 PV=150 仅处于预提交状态,未实际生效。
正式提交阶段(Checkpoint 确认完成)
- 当 JobManager 收到所有算子的 Checkpoint 完成确认后,才会通知 Sink 提交事务。
- Doris 将临时数据原子性替换为正式数据(如重命名临时文件或更新可见标志)
故障恢复时的回滚逻辑
假设故障发生在 0:50(Checkpoint 未完成):
未完成的 Checkpoint 事务回滚
- Flink 从最近成功的 Checkpoint(PV=100)恢复状态。
- 同时,Doris 中处于预提交状态的 PV=150 会被自动清理(如删除临时表或撤销事务日志)。
数据重放与幂等性保障
- Flink 会从 Source 端(如 Kafka)重放 Checkpoint 后的数据(0:06→0:50 的数据)。
- Doris Sink 在写入时通过事务 ID 或唯一键实现幂等性,确保相同数据多次写入不会重复累加;
疑问:针对数据回滚的场景,doris能查询到 PV=150的数据吗
Doris 默认的隔离级别保证查询只能看到已提交的数据,所以查看不到PV=150的数据
其他调优手段:
1、开启 MiniBatch 聚合
table.exec.mini-batch.enabled = true
table.exec.mini-batch.size = 5000
2、配置 Doris 批量写入
sink.batch.size = 5000
sink.max-retries = 5 --最大可重试5次
3、异步 Compaction 调优
ALTER TABLE doris_table SET ("compaction_policy" = "time_series");