day08_Doris和实时类标签
一、Doris概述(了解)
1、Doris简介
1. 简单来说
Apache Doris
是一个开源的实时分析型数据库,基于MPP
(大规模并行处理)架构,专为高效处理大规模数据的实时查询和分析而设计。它能够提供亚秒级的查询响应时间,广泛应用于报表分析、用户行为分析、日志检索等场景。2. 具体
Apache Doris
最初由百度开发,名为Palo
,后于 2018 年捐赠给 Apache 基金会并更名为Doris
。它的核心特点包括:
- 高性能:通过列式存储、多级索引和智能压缩技术,实现高效的数据存储和查询。
- 易用性:支持标准 SQL 语法,兼容 MySQL 协议,用户可以通过熟悉的工具和语言进行操作。
- 实时性:支持实时数据导入和查询,能够在数据写入后立即进行分析,满足实时业务需求。
- 可扩展性:采用分布式架构,支持水平扩展,能够处理 10PB 以上的超大规模数据集。
3. 实际生产场景
- 实时报表分析:在电商、金融等领域,用于生成实时更新的报表和仪表盘,支持业务决策。
- 用户行为分析:分析用户参与、留存、转化等行为,支持人群洞察和精准营销。
- 日志检索与分析:对分布式系统中的日志数据进行实时或批量分析,帮助定位问题和优化性能。
- 数据湖联邦查询:通过外表功能,支持对 Hive、Iceberg 等数据湖的联邦查询,避免数据拷贝。
4. 总之
Apache Doris
是一个功能强大且易于使用的实时分析型数据库,适用于需要处理大规模数据并快速获得查询结果的场景。它的高性能、易用性和可扩展性使其成为企业数据分析的理想选择。
Apache Doris是一个现代化的基于MPP(Massively Parallel Processing大规模并行处理)技术的分析型数据库产品 简单来说,MPP是将任务并行的分散到多个服务器和节点上,在每个节点上计算完成后,将各自部分的结果汇总在一起得到最终的结果(与Hadoop相似)。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。
Apache Doris 是2008年诞生于百度广告报表业务的 Palo 项目,2017 年正式对外开源,2018 年 7 月由百度捐赠给 Apache 基金会进行孵化。
2、Doris优点
特点:
简单易用:部署只需两个进程,不依赖其他系统;在线集群扩缩容,自动副本修复;
高性能:PB级别数据毫秒/秒级响应
统一数仓:可以同时支持实时数据服务、交互数据分析和离线数据处理场景
联邦查询:支持对 Hive、Iceberg、Hudi 等数据湖和 MySQL、Elasticsearch 等数据库的联邦查询分析
多种导入:支持从 HDFS/S3 等批量拉取导入和 MySQL Binlog/Kafka 等流式拉取导入;支持通过HTTP接口进行微批量推送写入和 JDBC 中使用 Insert 实时推送写入
生态丰富:Spark 利用 Spark Doris Connector 读取和写入 Doris;Flink Doris Connector 配合 Flink CDC 实现数据 Exactly Once 写入 Doris;利用 DBT Doris Adapter,可以很容易的在 Doris 中完成数据转化
常见OLAP引擎对比:
Doris和ClickHouse(简称CK)对比,Doris的优点:
数据压缩率 Clickhouse 好
ClickHouse 单表查询性能优势巨大
Join 查询两者各有优劣,数据量小情况下 Clickhouse 好,数据量大 Doris 好
Doris 对 SQL支持情况要好
二、Doris原理(熟悉)
1、名词解释
名词 | 说明 |
---|---|
FE | Frontend,即 Doris 的前端节点(类似酒店前台)。以 Java 语言为主,主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。 |
BE | Backend,即 Doris 的后端节点。以 C++ 语言为主,主要负责数据存储与管理、查询计划执行等工作 |
Tablet | Tablet是一张表实际的物理存储单元,一张表按照分区和分桶后在BE构成分布式存储层中以Tablet为单位进行存储,每个Tablet包括元信息及若干个连续的RowSet |
Rowset | Rowset是Tablet中一次数据变更的数据集合,数据变更包括了数据导入、删除、更新等。Rowset按版本信息进行记录。每次变更会生成一个版本 |
Version | 由Start、End两个属性构成,维护数据变更的记录信息。通常用来表示Rowset的版本范围,在一次新导入后生成一个Start,End相等的Rowset,在Compaction后生成一个带范围的Rowset版本 |
Segment | 表示Rowset中的数据分段。多个Segment构成一个Rowset |
Compaction | 连续版本的Rowset合并的过程成称为Compaction,合并过程中会对数据进行压缩操作 |
2、整体架构
Doris主要整合了Google Mesa(数据模型),Apache Impala(MPP Query Engine)和Apache ORCFile (存储格式,编码和压缩) 的技术。
Doris的系统架构如下,Doris主要分为FE和BE两个组件
Doris的架构很简洁,使用MySQL协议,用户可以使用任何MySQL ODBC/JDBC和MySQL客户端直接访问Doris,只设FE(Frontend)、BE(Backend)两种角色、两个进程,不依赖于外部组件,方便部署和运维。
Frontend(FE):主要接收客户端发送过来的请求(代码/SQL),并且负责元数据的管理、节点管理相关工作
- FE主要有两个角色,一个是follower,另一个是observer。多个follower组成选举组,会选出一个leader,leader是follower的一个特例,leader跟follower,主要是用来达到元数据的高可用,保证单节点宕机的情况下,元数据能够实时地在线恢复,而不影响整个服务。
- Observer 节点仅从 leader 节点进行元数据同步,不参与选举。可以横向扩展以提供元数据的读服务的扩展性。
Backend(BE):主要负责执行查询计划,存储数据
- 数据的可靠性由 BE 保证,BE 会对整个数据存储多副本或者是三副本。副本数可根据需求动态调整。
3、元数据结构
Doris 采用 “Paxos 协议以及 Memory+ Checkpoint + Journal” 的机制来确保元数据的高性能及高可靠。元数据的每次更新,都会遵照以下几步:
1- 首先写入到磁盘的日志文件中
2- 然后再写到内存中
3- 最后定期 checkpoint 到本地磁盘上
4、数据分发
数据主要都是存储在BE里面,BE节点上物理数据的可靠性通过多副本来实现,默认是3副本,副本数可配置且可随时动态调整,满足不同可用性级别的业务需求。FE调度BE上副本的分布与补齐。
如果说用户对可用性要求不高,而对资源的消耗比较敏感的话,可以在建表的时候选择建两副本或者一副本。比如在百度云上给用户建表的时候,有些用户对它的整个资源消耗比较敏感,因为他要付费,所以他可能会建两副本。但是一般不太建议用户建一副本,因为一副本的情况下可能一旦机器出问题了,数据直接就丢了,很难再恢复。一般是默认建三副本,这样基本可以保证一台机器单机节点宕机的情况下不会影响整个服务的正常运作。
三、Doris使用(操作)
Doris 采用 MySQL 协议进行通信,用户可通过 MySQL client 或者 MySQL JDBC连接到 Doris 集群。选择 MySQL client 版本时建议采用5.1 之后的版本,因为 5.1 之前不能支持长度超过 16 个字符的用户名。
0、Doris服务操作命令
- 启动Doris
/export/server/doris/fe/bin/start_fe.sh --daemon
/export/server/doris/be/bin/start_be.sh --daemon
/export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon
备注: Broker是用于访问外部数据源(如 hdfs)的进程。通常,在每台机器上部署一个 broker 实例即可。
- 停止Doris
/export/server/doris/fe/bin/stop_fe.sh
/export/server/doris/be/bin/stop_be.sh
/export/server/doris/apache_hdfs_broker/bin/stop_broker.sh
- 查看状态:
mysql -uroot -h up01 -P 9030 -p123456
注意: 如下的命令需要在MySQL命令行下执行
查看FE
SHOW PROC '/frontends'\G;
查看BE
SHOW PROC '/backends'\G;
查看broker
SHOW PROC '/brokers';
----------------------------------------------------------------------------
注意: 如果上面的命令使用失败,可以用下面的方式来检查进程是否启动成功
----------------------------------------------------------------------------
通过前端界面访问FE:
http://192.168.88.166:8130/System?path=//frontends
通过前端界面访问BE:
http://192.168.88.166:8060
- PyCharm连接Doris:直接使用MySQL的驱动包即可
1、创建新用户(不用操作)
首先进入MySQL的命令行下
mysql -uroot -h up01 -P 9030 -p
输入密码
123456
设置Doris的root用户的密码
SET PASSWORD FOR 'root' = PASSWORD('123456');
注意: 新创建的普通用户默认没有任何权限,需要使用管理员账号进行权限的赋予。
2、建库
创建
CREATE DATABASE if not exists test_db;
备注: 权限赋予中最后的test是用户名称
GRANT ALL ON test_db TO test;
含义: 是将test_db的所有操作权限赋予test用户
3、建表
-- 创建数据库
CREATE DATABASE if not exists test_db;
-- 创建数据表
CREATE TABLE if not exists test_db.test_table
(
event_day DATE,
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(event_day, siteid, citycode, username)
PARTITION BY RANGE(event_day)
(
PARTITION p201706 VALUES LESS THAN ('2017-07-01'), -- <=2017-07-01
PARTITION p201707 VALUES LESS THAN ('2017-08-01'), -- <=2017-08-01
PARTITION p201708 VALUES LESS THAN ('2017-09-01') -- <=2017-09-01
)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
sql("replication_num" = "1");
注意: Doris 的建表是一个同步命令,命令返回成功,即表示建表成功
数据类型:
更多数据类型见 https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-data-types/data-type-overview。
表结构变更:
使用alter table 命令, 可进行
增加列
删除列
修改列类型
改变列顺序
对上面的table1 添加一列
ALTER TABLE test_db.test_table ADD COLUMN uv BIGINT SUM DEFAULT '0' after pv;
查看执行进度
use test_db;
show alter table column;
如果想取消掉【正在执行】的alter, 则使用。无法取消成功的
CANCEL ALTER TABLE COLUMN FROM test_db.test_table;
4、数据模型(面试)
在 Doris 中,数据以表(Table)的形式进行逻辑上的描述。一张表包括行(Row)和列(Column)。Row即用户的一行数据。Column 用于描述一行数据中不同的字段。
Column可以分为两大类:Key(维度列)和Value(指标列)
Doris 的数据模型主要分为3类:Aggregate、Unique、Duplicate
4.1 Aggregate模型
AGGREGATEKEY模型可以提前聚合数据,适合报表和多维度业务。
目前有以下四种聚合方式:
- SUM:求和,多行的 Value 进行累加。
- REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。
- MAX:保留最大值。
- MIN:保留最小值。
为什么不支持avg的聚合方式?
48个人,6个组,每个组人数不统一
统计全部同学的考试平均分
方案一:sum(48个同学的考试成绩)/48
方案二(统计平均值的错误方案):分别计算6个小组的平均分,再sum(6个小组的平均分)/6
案例:
- 创建Doris表
CREATE TABLE IF NOT EXISTS test_db.example_site_visit
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 10
sql("replication_num" = "1");
- 插入数据
insert into test_db.example_site_visit values(10000,'2017-10-01','北京',20,0,'2017-10-01 06:00:00',20,10,10);
insert into test_db.example_site_visit values(10000,'2017-10-01','北京',20,0,'2017-10-01 07:00:00',15,2,2);
-- 数据表看到的数据insert into test_db.example_site_visit values(10000,'2017-10-01','北京',20,0,'2017-10-01 07:00:00',35,10,2);
insert into test_db.example_site_visit values(10001,'2017-10-01','北京',30,1,'2017-10-01 17:05:45',2,22,22);
insert into test_db.example_site_visit values(10002,'2017-10-02','上海',20,1,'2017-10-02 12:59:12',200,5,5);
insert into test_db.example_site_visit values(10003,'2017-10-02','广州',32,0,'2017-10-02 11:20:00',30,11,11);
insert into test_db.example_site_visit values(10004,'2017-10-01','深圳',35,0,'2017-10-01 10:00:15',100,3,3);
insert into test_db.example_site_visit values(10004,'2017-10-03','深圳',35,0,'2017-10-03 10:20:22',11,6,6);
- 验证结果
select * from test_db.example_site_visit;
4.2 Unique模型
Unique模型是一个特殊的Aggregate模型,使用的是replace聚合方式。
在某些多维分析场景下,用户更关注的是如何保证 Key(维度) 的唯一性,即如何获得 Primary Key 唯一性约束。
案例演示:这是一个典型的用户基础信息表。这类数据没有聚合需求,只需保证主键唯一性。(这里的主键为 user_id + username,在MySQL中称之为联合主键)
- 建表语句
CREATE TABLE IF NOT EXISTS test_db.user
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`username` VARCHAR(50) NOT NULL COMMENT "用户昵称",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`phone` LARGEINT COMMENT "用户电话",
`address` VARCHAR(500) COMMENT "用户地址",
`register_time` DATETIME COMMENT "用户注册时间"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 10
sql("replication_num" = "1");
- 插入数据
insert into test_db.user values(10000,'zhangsan','北京',20,0,13112345312,'北京西城区','2020-10-01 07:00:00');
insert into test_db.user values(10000,'zhangsan','北京',20,0,13112345312,'北京海淀区','2020-11-15 06:10:20');
注意: 后续新插入的新数据会替换掉旧数据
- 验证结果
select * from test_db.user order by user_id;
- 查看表结构
desc test_db.user;
4.3 Duplicate模型
在某些多维分析场景下,数据既没有主键,也没有聚合需求。因此,Doris引入 Duplicate 数据模型来满足这类需求。Duplicate Key 的模型支持一个用户导入之后把这个数据全部放在数据库里面,不再做提前的聚合,也不单独保证唯一性,只做一个排序。
案例:
- 创建Doris表
CREATE TABLE IF NOT EXISTS test_db.example_log
(
`timestamp` DATETIME NOT NULL COMMENT "日志时间",
`type` INT NOT NULL COMMENT "日志类型",
`error_code` INT COMMENT "错误码",
`error_msg` VARCHAR(1024) COMMENT "错误详细信息",
`op_id` BIGINT COMMENT "负责人id",
`op_time` DATETIME COMMENT "处理时间"
)
DUPLICATE KEY(`timestamp`, `type`)
DISTRIBUTED BY HASH(`timestamp`) BUCKETS 10
sql("replication_num" = "1");
注意:
1- 对数据按照DUPLICATE KEY中的字段进行局部排序,类似Hive中的sort by
2- DUPLICATE KEY中字段的顺序不同,排序效果不同,因此需要根据具体业务决定顺序
- 插入数据
insert into test_db.example_log values('2020-10-01 08:00:05',1,404,'not found page', 101, '2020-10-01 08:00:05');
insert into test_db.example_log values('2020-10-01 08:00:05',1,404,'not found page', 101, '2020-10-01 08:00:05');
insert into test_db.example_log values('2020-10-01 08:00:05',2,404,'not found page', 101, '2020-10-01 08:00:06');
insert into test_db.example_log values('2020-10-01 08:00:06',2,404,'not found page', 101, '2020-10-01 08:00:07');
insert into test_db.example_log values('2020-10-01 08:00:05',0,404,'not found page', 101, '2020-10-01 08:00:07');
- 验证结果
select * from test_db.example_log;
4.4 数据模型总结
注意:数据模型在建表后无法修改。
5、数据划分
基本概念:
一张表包括行(Row)和列(Column)
Row 即用户的一行数据
Column 用于描述一行数据中不同的字段。Column 可以分为两大类:Key 和 Value。从业务角度看,Key 和 Value 可以分别对应维度列和指标列
在 Doris 的存储引擎中,用户数据被水平划分为若干个数据分片(Tablet,也称作数据分桶)。每个 Tablet 包含若干数据行。
多个 Tablet 在逻辑上归属于不同的分区(Partition)。一个 Tablet 只属于一个 Partition。而一个 Partition 包含若干个 Tablet
Tablet 是数据移动、复制等操作的最小物理存储单元
5.1 分区
1- Partition 列可以指定一列或多列,分区列必须为 KEY 列。PARTITION BY中的字段必须属于AGGREGATE KEY
2- 当不使用 Partition 建表时,系统会自动生成一个和表名同名的,全值范围的 Partition。该Partition对用户不可见,并且不可删改。
3- 分区数量理论上没有上限(默认:动态分区的时候,以日为分区字段,不能超过500个日期分区)
5.1.1 Range分区案例
适合分区值连续的数据进行分区。
- 建表语句
CREATE TABLE IF NOT EXISTS test_db.example_range_tb1
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00"
COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p202001` VALUES LESS THAN ("2020-02-01"),
PARTITION `p202002` VALUES LESS THAN ("2020-03-01"),
PARTITION `p202003` VALUES LESS THAN ("2020-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
sql
(
"replication_num" = "1"
);
- 查询分区
show partitions from test_db.example_range_tb1;
- 增加分区
ALTER TABLE test_db.example_range_tb1 ADD PARTITION IF NOT EXISTS `p202005` VALUES LESS THAN ("2020-06-01");
- 删除分区
ALTER TABLE test_db.example_range_tb1 DROP PARTITION IF EXISTS p202003;
- 注意事项
Range分区一定注意,分区范围不允许出现重叠
5.1.2 List分区案例
适合分区值离散(不连续)的数据进行分区。
- 建表语句
CREATE TABLE IF NOT EXISTS test_db.example_list_tb1
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) NOT NULL COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00"
COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY LIST(`city`)
(
PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
PARTITION `p_jp` VALUES IN ("Tokyo")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
sql
(
"replication_num" = "1"
);
- 查询分区
show partitions from test_db.example_list_tb1;
- 增加分区
ALTER TABLE test_db.example_list_tb1 ADD PARTITION IF NOT EXISTS p_uk VALUES IN ("London");
- 删除分区
ALTER TABLE test_db.example_list_tb1 DROP PARTITION IF EXISTS p_jp;
5.2 分桶
**一个 Partition 的 Bucket 数量一旦指定,不可更改。**所以在确定 Bucket 数量时,需要预先考虑集群扩容的情况。比如当前只有 3 台 host,每台 host 有 1 块盘。如果 Bucket 的数量只设置为 3 或更小,那么后期即使再增加机器,也不能提高并发度。
Doris 支持两层的数据划分。第一层是 Partition,支持 Range 和 List 的划分方式。第二层是 Bucket(Tablet),仅支持 Hash 的划分方式。也可以仅使用一层分区,即使用单分区。使用一层分区时,只支持Bucket 划分。
-- 能否进行单纯的分区?
-- 不允许只进行分区。下面的建表语句会报错
CREATE TABLE IF NOT EXISTS test_db.example_range_tb_test
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00"
COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p202001` VALUES LESS THAN ("2020-02-01"),
PARTITION `p202002` VALUES LESS THAN ("2020-03-01"),
PARTITION `p202003` VALUES LESS THAN ("2020-04-01")
)
sql
(
"replication_num" = "1"
);
5.2.1 一层分桶
- 建表语句
CREATE TABLE test_db.table1
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
sql("replication_num" = "1");
- 导入数据
mkdir -p /export/data/doris/
cd /export/data/doris
vim table1_data
在/export/data/doris/下创建table1_data文件,写入如下内容:
10,101,jim,2
11,101,grace,2
12,102,tom,2
13,102,bush,3
14,103,helen,3
cd /export/data/doris
curl --location-trusted -u root:123456 -H "label:table1_20220714" -H "column_separator:," -T table1_data http://up01:8130/api/test_db/table1/_stream_load
- 检查数据
select * from test_db.table1;
5.2.2 两层分桶
- 建表语句
CREATE TABLE test_db.table2
(
event_day DATE,
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(event_day, siteid, citycode, username)
PARTITION BY RANGE(event_day)
(
PARTITION p202006 VALUES LESS THAN ('2020-07-01'),
PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
PARTITION p202008 VALUES LESS THAN ('2020-09-01')
)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
sql("replication_num" = "1");
- 导入数据
mkdir -p /export/data/doris/
在/export/data/doris/下创建table2_data文件,写入如下内容:
2020-07-03|11|1|jim|2
2020-07-05|12|1|grace|2
2020-07-12|13|2|tom|2
2020-07-15|14|3|bush|3
2020-07-12|15|3|helen|3
cd /export/data/doris
curl --location-trusted -u root:123456 -H "label:table2_20220715" -H "column_separator:|" -T table2_data http://up01:8130/api/test_db/table2/_stream_load
- 检查数据
select * from table2;
- 推荐使用两层分桶的场景
- 有时间维度或类似带有有序值的维度,可以以这类维度列作为分区列。分区粒度可以根据导入频次、分区数据量等进行评估。
- 历史数据删除需求:如有删除历史数据的需求(比如仅保留最近N 天的数据)。使用复合分区,可以通过删除历史分区来达到目的。也可以通过在指定分区内发送 DELETE 语句进行数据删除。
- 解决数据倾斜问题:每个分区可以单独指定分桶数量。如按天分区,当每天的数据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择区分度大的列。
6、删除数据
Doris 目前可以通过两种方式删除数据:DELETE FROM 语句和 ALTER TABLE DROP PARTITION 语句
条件删除:
语法
DELETE FROM table_name [PARTITION partition_name | PARTITIONS (p1, p2)]
WHERE
column_name1 op { value | value_list } [ AND column_name2 op { value | value_list } ...];
说明
1) op 的可选类型包括:=, >, <, >=, <=, !=, in, not in
2) 使用聚合类的表模型(AGGREGATE、UNIQUE)只能指定 key 列上的条件。
3) 当选定的 key 列不存在于某个 rollup 中时,无法进行 delete。
4) 条件之间只能是“与”的关系。若希望达成“或”的关系,需要将条件分写在两个 DELETE 语句中。
5) 如果为分区表,需要指定分区,如果不指定,doris 会从条件中推断出分区。两种情况下,doris 无法从条件中推断出分区: 条件中不包含分区列;分区列的 op 为 not in。当分区表未指定分区,或者无法从条件中推断分区的时候,需要设置会话变量 delete_without_partition 为 true,此时 delete 会应用到所有分区。
6) delete 是一个同步命令,命令返回即表示执行成功
举例: 删除 test_db.table2 partition p202007 中 siteid 列值为 11 的数据行
DELETE FROM test_db.table2 PARTITION p202007 WHERE siteid = 11;
删除分区:
该命令可以直接删除指定的分区。因为 Partition 是逻辑上最小的数据管理单元,所以使用 DROP PARTITION 命令可以很轻量的完成数据删除工作。并且该命令不受 load 以及任何其他操作的限制,同时不会影响查询效率。是比较推荐的一种数据删除方式。
该命令是同步命令,执行成功即生效。而后台数据真正删除的时间可能会延迟10分钟左右。
语法
ALTER TABLE example_db.my_table DROP PARTITION p1;
7、动态分区
目前实现了动态添加分区及动态删除分区的功能。动态分区只支持 Range 分区。
通过动态分区功能,用户可以在建表时设定动态分区的规则。FE 会启动一个后台线程,根据用户指定的规则创建或删除分区。用户也可以在运行时对现有规则进行变更。
动态分区的规则可以在建表时指定,或者在运行时进行修改。当前仅支持对单分区列(分区字段只能有一个)的分区表设定动态分区规则。
- 建表时指定
CREATE TABLE tbl1
(...)
sql
(
"dynamic_partition.prop1" = "value1",
"dynamic_partition.prop2" = "value2",
...
)
常用参数:
0- dynamic_partition.enable
1- dynamic_partition.start
2- dynamic_partition.end
3- dynamic_partition.prefix
4- dynamic_partition.buckets
5- dynamic_partition.replication_num
- 运行时修改
ALTER TABLE tbl1 SET
(
"dynamic_partition.prop1" = "value1",
"dynamic_partition.prop2" = "value2",
...
)
8、多源数据目录(掌握)
多源数据目录(Multi-Catalog)是 Doris 1.2.0 版本中推出的功能,旨在能够更方便对接外部数据目录,以增强Doris的数据湖分析和联邦数据查询能力。
新版Doris中元数据有 Catalog -> Database -> Table 三层。其中,Catalog 可以直接对应到外部数据目录。目前支持的外部数据目录包括:
Hive
Iceberg
Hudi
Elasticsearch
JDBC
Catalog分为如下两类:
Internal Catalog:Doris 原有的 Database 和 Table 都将归属于 Internal Catalog。Internal Catalog 是内置的默认 Catalog,用户不可修改或删除。
External Catalog:可以通过 CREATE CATALOG 命令创建一个 External Catalog。创建后,可以通过 SHOW CATALOGS 命令查看已创建的 Catalog。
Catalog操作演示:
- 查看Catalog
SHOW CATALOGS;
Internal Catalog 是内置的默认 Catalog,Doris 原有的 Database 和 Table 都将归属于这个 Catalog。
- 创建 Catalog:这里以 Hive 为例
CREATE CATALOG hive sql (
'type'='hms',
'hive.metastore.uris' = 'thrift://192.168.88.166:9083',
'hadoop.username' = 'root',
'hive.version' = '3.1.2'
);
对应的官网链接: https://doris.apache.org/zh-CN/docs/2.0/lakehouse/datalake-analytics/hive?_highlight=catalog
type 和 hive.metastore.uris 两个必须是参数,其他参数根据情况而定。在默认情况下,Doris 会以 Hive 2.3 版本的兼容接口访问 Hive Metastore。这里使用的hive 3.1.2,所以在创建 Catalog 时指定了 hive 的版本
- 查看自定义catalog下表的数据可能遇到的错误
解决办法: 表的限定要加上catalog的名称。也就是将上面的SQL语句改为如下
select * from hive.ads.ads_mem_new_old_user_i;
- 切换 Catolog
SWITCH hive;
- 删除 Catalog
DROP CATALOG hive;
External Catalog 中的 Database 和 Table 都是只读的。但是可以删除 Catalog(Internal Catalog无法删除)。可以通过 DROP CATALOG 命令删除一个 External Catalog。
该操作仅会删除 Doris 中该 Catalog 的映射信息,并不会修改或变更任何外部数据目录的内容。
- Resource
Resource 是一组配置的集合。在 1.2.1 版本之后,用户可以通过 CREATE RESOURCE 命令创建一个 Resource。之后可以在创建 Catalog 时使用这个 Resource。
一个 Resource 可以被多个 Catalog 使用,以复用其中的配置。
CREATE RESOURCE hms_resource sql (
'type'='hms',
'hive.metastore.uris' = 'thrift://192.168.88.166:9083',
'hadoop.username' = 'root',
'hive.version' = '3.1.2'
);
CREATE CATALOG hive WITH RESOURCE hms_resource;
9、SQL函数
查看函数名
show builtin functions in test_db;
Doris 符合sql标准,大多数用法与mysql一致,具体函数用法可以参照
函数汇总:https://doris.apache.org/zh-CN/docs/2.0/sql-manual/sql-functions/array-functions/array
四、实时数据分析概览(复习)
1、需求介绍
实时数据分析主要分为两部分,一部分是分析日志,包括Nginx(恩几可使)日志和用户行为日志,一部分是分析业务库数据。
- 通过实时分析Nginx日志,可以分析系统网站的用户量、网站性能情况、异常用户情况等
- 通过实时收集和分析用户的点击、浏览、搜索、购买等行为数据,可以了解用户偏好、优化推荐算法、提升用户体验、增加销售转化率
- 通过实时分析用户相关的业务库数据,可以了解用户近期的购物行为和购物偏好等信息,进而针对性地制定营销策略
2、实时计算架构图
DB: 专门指的是实时业务数据,使用SeaTunnel实时采集到Kafka,后续使用结构化流进行指标计算和标签计算
日志: 分为两类日志数据,Nginx日志和用户行为日志,使用Flume实时采集到Kafka,后续使用结构化流进行ETL和指标计算
Kafka: 存储采集进来的原始业务数据和日志数据,里面对数据进行分层存储,也就是不同层以不同的Topic主题进行数据存储
数据处理: 使用Spark的结构化流进行数据实时消费、ETL、指标计算和打标签的开发
结果数据: 用户画像的标签结果数据存储在ElasticSearch,指标结果数据存储在Doris。后续还可以使用Doris集成ElasticSearch
五、日志数据实时采集(理解)
1、日志模拟
模拟日志数据的实时产生效果。按照如下的步骤进行配置:
- 把log_generate项目的内容复制放到当前的用户画像标签项目里面:注意log_generate的文件夹一定要放在当前项目的根目录下
- 代码作用说明
其中,通过修改resource/config.ini文件,可以修改kafka和mysql的配置信息。修改utils/ConfigLoader.py文件,可以修改config.ini的位置信息。
NginxLogSimulationData.py: 执行这个文件就会在 linux上 项目根目录/datacollection/source_data 这个目录下生成模拟的Nginx日志文件
EventSimulationJsonData.py: 执行这个文件就会在 linux上 项目根目录/datacollection/source_data 这个目录下生成模拟的用户行为日志的json文件
SimulationNginxData.py: 执行这个文件,会将已经生成的 Nginx日志发送到Kafka的 xtzg_nginx_log topic。
SimulationEventData.py: 执行这个文件,会将已经生成的 用户行为日志发送到Kafka的 xtzg_user_event topic。
注意: SimulationNginxData.py和SimulationEventData.py是为了方便开发,直接将日志信息导入到了Kafka,使用时需要提前开启Kafka。真正项目中,需要使用Flume进行日志采集,发送到Kafka中。
2、Flume简介
1. 简单来说
Flume
是一个开源的分布式日志收集系统,专门用于高效地收集、聚合和传输大规模日志数据,就像是一个“数据搬运工”,能够将分散的日志数据集中到指定的存储或分析系统中。2. 具体
Flume
是 Apache 基金会的一个顶级项目,主要用于处理流式数据(如日志、事件数据)。它的核心架构包括三个主要组件:
- Source:数据来源,负责从外部系统(如日志文件、Kafka、HTTP 请求)收集数据。
- Channel:数据通道,用于临时存储数据,确保数据在传输过程中不会丢失。
- Sink:数据目的地,负责将数据传输到目标系统(如 HDFS、HBase、Kafka)。
Flume
的特点包括:
- 高可靠性:通过事务机制和持久化存储,确保数据不丢失。
- 高扩展性:支持自定义 Source、Channel 和 Sink,适应不同的数据源和目标系统。
- 分布式架构:支持多节点部署,能够处理海量数据。
3. 实际生产场景
- 日志收集:在分布式系统中,用于将多台服务器的日志数据集中存储到 HDFS 或 Elasticsearch 中,便于后续分析。
- 实时数据传输:将日志数据实时传输到 Kafka,供流处理系统(如 Flink、Spark Streaming)进行实时分析。
- 数据备份:将重要日志数据备份到多个存储系统,确保数据安全。
4. 总之
Flume
是一个功能强大且可靠的日志收集工具,适用于需要处理大规模日志数据的场景。它的高可靠性和扩展性使其成为企业日志管理和数据分析的重要工具。
面试题: 为什么实时数据采集部分需要使用Flume而不使用DataX、sqoop、Kettle?
答: 我们实时数据处理部分的需要是数据源要能够实时、快速的从多个地方采集汇聚到Kafka中,后续通过实时计算框架,例如:结构化流进行数据的实时ETL和指标统计分析。因此需要快速的采集数据,同时要能够分布式采集。所以选择使用Flume。而DataX只能进行单机操作,Sqoop虽然能够进行分布式但是底层运行是MapReduce程序速度很慢,Kettle虽然能够进行分布式但是采集速度没有Flume快,海量数据处理不够优秀,同时部署也相对麻烦。
2.1 Flume的基本介绍(了解)
Flume是Apache旗下一款顶级开源免费, 专门用于进行数据采集的工具, 最早来源于cloudera公司, 后期将其贡献给Apache, 成为Apache顶级开源项目 Flume 采用Java 编写
官方网站: https://flume.apache.org/
Flume为了能够满足大部分采集场景, 提供各种针对不同的采集数据源和目的地的组件 , 以满足各种采集的场景
Flume在早期的版本中, 称为 Flume OG(1.X版本以前), 在新版本中称为Flume NG(1.x以后)
运行一个Flume的实例, 就是启动了一个Agent实例对象, 一般Agent由三个部分组成的
1- Source: 数据源。用来采集数据源中的数据,支持多种不同的数据源
2- Channel: 管道。用来对Source采集到的数据进行缓存。将数据由Source端传输到Sink端。最常用的是内存Channel
3- Sink: 下沉池(目的地)。将采集到的数据存储到其他介质中,例如:Kafka、MySQL等
数据从Source 到 Sink, 都是通过 event对象进行数据传输的, 一条数据就是一个event对象, 在event对象中, 除了可以放置本身数据以外, 还支持放置一些其他的描述信息数据(元数据), 默认只有数据本身
Flume一般是安装在数据采集的节点上。也就是说,数据在什么地方产生的,Flume就部署在什么地方。使用Flume 主要是配置Flume的采集配置文件,一般不需要写代码即可完成数据采集
Flume的使用帮助文档: https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
常用的相关组件:
source:
channel组件
sink组件
Channel选择总结:
1- 内存Memory Channel: 常用。如果数据量不是太多,并且对性能要求高,可以选择
2- Kafka Channel: 常用。如果最终需要将数据存储到Kafka,可以直接用这个Channel,那么可以免去sink
3- 文件File Channel: 比较常用。如果数据量不是太多,并且对性能要求不高,数据的可靠性要求高,可以选择
2.2 Flume的入门案例(重点)
需求: 监听 up01节点上 55555 端口号, 当这个端口号中有数据, 请通过Flume立即采集过来, 并打印控制台
- 1- 编写Flume的采集文件:工作中一般习惯将Flume配置文件就放在flume安装目录的conf目录下
在/export/server/flume/conf目录下创建一个flume_demo.conf文件
cd /export/server/flume/conf
vim flume_demo.conf
文件内容如下:
# 1- 给三大组件取名字
# 给source组件取名字
a1.sources = r1
# 给channel组件取名字
a1.channels = c1
# 给sink组件取名字
a1.sinks = k1
# 2- 分别配置三大组件
# Source组件配置
# 指定source组件的类型
a1.sources.r1.type = netcat
# 采集哪台服务器的数据
a1.sources.r1.bind = 192.168.88.166
# 采集哪台服务器具体端口的数据
a1.sources.r1.port = 55555
# Channel组件配置
# 指定Channel组件类型
a1.channels.c1.type = memory
# 设置内存Channel同一时刻最大允许存放的数据条数
a1.channels.c1.capacity = 100
# 设置内存Channel同一时刻一个事务处理的数据条数
a1.channels.c1.transactionCapacity = 100
# Sink组件配置
# 指定sink组件的类型
a1.sinks.k1.type = logger
# 3- 串联三大组件
# 将source与channel串联起来
a1.sources.r1.channels = c1
# 将channel和sink串联起来
a1.sinks.k1.channel = c1
- 2- 启动Flume 进行数据采集操作
cd /export/server/flume/bin
./flume-ng agent -n a1 -c ../conf -f /export/server/flume/conf/flume_demo.conf -Dflume.root.logger=INFO,console
参数解释:
1: flume-ng运行的脚本名称
2: agent -n设置agent的名称
3: a1配置文件中设置的agent名称
4: -c ../conf固定写法,指定Flume工具自己的配置文件路径(可以是相对路径也可以是绝对路径)
5: -f /export/server/flume/conf/flume_demo.conf设置我们自己编写的Flume运行脚本所在的文件路径,一般习惯性放在Flume安装的conf目录下
6: -Dflume.root.logger=INFO,console固定写法,设置Flume工具运行时候的日志基本以及输出的地方
- 3- 向 55555 端口号发送数据
telnet 192.168.88.166 55555
允许结果截图:
可能遇到的错误一:
原因: 没有telnet软件
解决办法: yum install -y telnet
可能遇到的错误二:
原因: telnet是需要连接到对应端口号上,然后发送数据
解决办法: 需要先启动Flume
可能遇到的错误三:
原因: 给Flume发送数据的时候不能使用nc命令。因为nc自己就会去占用那个端口
解决办法: 使用telnet 192.168.88.166 55555发送数据