在现代实时数据分析场景中,Flink 与 Doris 的组合正在成为企业实时指标平台的首选架构。本文将从业务背景出发,实战演示如何通过 Flink 接入 Kafka 实时数据,并将处理结果写入 Doris,构建一个分钟级实时更新的指标体系。
一、背景介绍:为什么选择 Flink + Doris?
实时指标系统是 BI 报表、用户行为分析、运营监控的重要基础。常见需求包括:
Kafka 中持续产生的用户行为数据
对数据进行实时聚合计算,如 PV、UV、点击率等
持久化到数据库中用于 BI 查询展示
技术组件 | 作用 |
---|---|
Flink | 实时处理引擎,支持有状态流式计算、窗口聚合 |
Doris | 实时数仓,支持高并发、多维分析与快速写入 |
两者结合,既保证了 实时性,又具备良好的 查询性能,非常适合搭建企业级实时指标平台。
二、架构图:Flink + Kafka + Doris 实时链路
lua
复制编辑
Kafka(行为日志 Topic) ↓ Flink 实时计算 +----------------------+ | 窗口聚合 / 指标统计 | +----------------------+ ↓ Flink-Doris Sink ↓ Doris OLAP ↓ BI 可视化 / 报表系统
三、实战场景介绍
假设我们要做一个“实时用户行为分析系统”,统计每分钟的用户点击数(PV)、活跃用户数(UV)等指标。
Kafka Topic: user_action_log
数据样例(JSON):
json
复制编辑
{ "uid": "10001", "action": "click", "timestamp": 1712902050000 }
四、Flink 流处理逻辑
1. 数据解析与时间提取
java
复制编辑
DataStream<UserAction> stream = env .addSource(new FlinkKafkaConsumer<>(...)) .map(json -> JSON.parseObject(json, UserAction.class)) .assignTimestampsAndWatermarks( WatermarkStrategy .<UserAction>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, ts) -> event.getTimestamp()) );
2. 窗口聚合
java
复制编辑
SingleOutputStreamOperator<UserMetrics> result = stream .keyBy(UserAction::getAction) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .process(new ProcessWindowFunction<...>() { public void process(...) { // 聚合 PV、UV } });
五、Doris 表结构设计
sql
复制编辑
CREATE TABLE user_metrics_minute ( window_start DATETIME, action STRING, pv BIGINT, uv BIGINT ) AGGREGATE KEY(window_start, action) DISTRIBUTED BY HASH(action) BUCKETS 10 PROPERTIES ( "replication_allocation" = "tag.location.default: 3" );
使用 聚合模型,自动合并相同时间窗口和维度的指标
可以支持自动覆盖/累加
六、Flink 写入 Doris(使用 Flink-Doris-Connector)
1. 添加依赖(适用于 Maven)
xml
复制编辑
<dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector</artifactId> <version>1.3.0</version> </dependency>
2. 配置 Doris Sink
java
复制编辑
DorisSink.Builder<UserMetrics> builder = DorisSink.<UserMetrics>builder() .setFenodes("fe1:8030,fe2:8030") .setTableIdentifier("database.user_metrics_minute") .setUsername("root") .setPassword("") .setSerializer(new SimpleStringSerializer()) // 或 JsonSerializer .setSinkLabelPrefix("flink-stream"); result.sinkTo(builder.build());
3. 写入策略优化
Sink 支持 exactly-once 语义(配合 checkpoint)
可以设置 batch size、flush interval 优化吞吐
七、常见问题与优化建议
问题 | 优化建议 |
---|---|
UV 计算性能瓶颈 | 使用 HyperLogLog 或布隆过滤器压缩存储 |
Kafka 写入延迟波动 | 调整 Flink checkpoint 频率、并发度 |
Doris 写入压力大 | 提高 Doris BE 节点数、增加分桶数 |
实时结果不准确 | 确保水位线和窗口时间对齐,避免乱序数据丢失 |
八、可视化呈现(可选)
可以使用如下 BI 工具对接 Doris:
Apache Superset
Metabase
Tableau / Power BI(通过 JDBC)
实时更新的指标可以每分钟自动刷新,满足运营需求。
九、总结
通过 Flink + Doris 的实时链路,可以非常高效地构建一个分钟级实时更新的指标体系,适合用于实时报表、行为分析、广告监控等多个场景。
如果你已经有 Kafka、Flink 环境,不妨动手试一试这个组合,感受它的低延迟高吞吐!