文章目录
StarRocks数据导入
Broker Load
使用 broker load 导入 hdfs 文件,需要在 Be 和 Broker 的 conf 目录中添加 HDFS的 hdfs-site.xml、core-site.xml、hive-site.xml ,配置HDFS集群 hosts 。
-- hive建表 creation_month为分区字段
CREATE TABLE `ads_crm_sales_visit_statistics_details2`(
`id` string COMMENT '拜访id',
`sales_id` bigint COMMENT '销售id',
...
`exception_elimination` bigint COMMENT '客户统计异常拜访1表示是要排除的0表示不排除',
`row_id` bigint COMMENT '虚拟主键')
COMMENT '销售异常拜访-拜访临时表'
PARTITIONED BY (
`creation_month` string COMMENT '创建月份')
-- 查看hive表分区
show partitions ads_crm_sales_visit_statistics_details2;
-- SR建表 creation_month为排序键和分区字段
CREATE TABLE `ads_crm_sales_visit_statistics_details2` (
`creation_month` date ,
`creation_date` string ,
`id` string ,
`sales_id` bigint ,
.....
`exception_elimination` bigint ,
`row_id` bigint
) ENGINE=OLAP
DUPLICATE KEY(`creation_month`, `creation_date`)
PARTITION BY RANGE(`creation_month`)
(
START ("2022-05-01") END ("2023-02-01") EVERY (INTERVAL 1 MONTH)
)
DISTRIBUTED BY HASH(`row_id`) BUCKETS 6
;
ETL 集群导数
使用 broker load 导入 hdfs 文件,需要在 Be 和 Broker 的 conf 目录中添加 HDFS的 hdfs-site.xml、core-site.xml、hive-site.xml ,配置HDFS集群 hosts 。
[ 开通 SR 到 HDFS NameNode 的 8020及 DateNode dfs.datanode.ipc.address、dfs.datanode.http.address、dfs.datanode.address 三个端口网络权限 ]
-- broker load
load label zxl.ads_crm_sales_visit_statistics_details2023_04_13
(
data infile('hdfs://bigbigworld/user/hive/warehouse/ads_crm.db/ads_crm_sales_visit_statistics_details2/creation_month=202*/*')
into table ads_crm_sales_visit_statistics_details2
FORMAT AS 'ORC'
(
-- hive 表字段信息(不要包含分区字段,分区字段要在下面 COLUMNS FROM PATH 中指定)
`creation_date` ,
`id` ,
`sales_id` ,
.....
`exception_elimination` ,
`row_id`
)
-- 将衍生列指定并转换分区字段,将原来的 yyy-mm 转换成 yyy-mm-dd
COLUMNS FROM PATH AS (creation_month)
SET(creation_month = (concat(creation_month ,'-01')))
)
with broker 'hdfs_broker';
-- hdfs_broker 是 broker_name
非 ETL 集群导数
以上 Load 任务是连接 conf 配置中 hdfs-site.xml 的 NameNode 信息,若使用与配置中不同的 HDFS 可使用以下 Load 语句创建 ( NameNade HA )
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://nameservice1/starRocks/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1, tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
)
WITH BROKER 'brokername'
(
"username" = "hdfs_username",---无密码可省略
"password" = "hdfs_password",---无密码可省略
"dfs.nameservices" = "nameservice1",
"dfs.ha.namenodes.nameservice1" = "namenode01,namenode02",
"dfs.namenode.rpc-address.nameservice1.namenode01" = "nn01:8020",
"dfs.namenode.rpc-address.nameservice1.namenode02" = "nn02:8020",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
-- 该配置实际引用的是 hdfs-site.xml 中 dfs.nameservices、dfs.ha.namenodes.nameservice1、dfs.namenode.rpc-address.nameservice1.namenode01、dfs.namenode.rpc-address.nameservice1.namenode02、dfs.client.failover.proxy.provider
Broker Load 任务查看
use database;
show load where label = 'label ID';
show load from database where label = 'label ID';
-- 查看 Broker Load 任务状态
SHOW LOAD FROM zxl WHERE LABEL = "ads_crm_sales_visit_statistics_details2023_04_13";
-- 取消 Broker Load 任务
CANCEL LOAD FROM zxl WHERE LABEL = "ads_crm_sales_visit_statistics_details2023_01_18";
BrokerLoad⼤数据量导⼊优化参数推荐配置
StarRocks SessionVariables(FE):
1. load_transmission_compression_type = none //默认值是none,当并发提⾼,cpu仍然打不满时,可
以设置为LZ4_FRAME。
v2.4及之前版本:
1. load_parallel_instance_num=8 // 可以根据BE的cpu核数设置,⼀般设置区间为[cpu核数/2 - 64]
v2.5+版本 & session variable enable_adaptive_sink_dop=true:
1. pipeline_dop=0 // ⾃动设置BE核数的⼀半作为并⾏度,可以⼿动设置⾮0值,和查询的设置⽅式
⼀致。
2. 注: 新部署的2.5集群⾃动设置enable_adaptive_sink_dop=true。从之前版本升级上来的集群会⾃动
设置enable_adaptive_sink_dop=false,此时依然使⽤load_parallel_instance_num来配置并⾏度。
StarRocks SessionVariables(BE):
1. flush_thread_num_per_store=8 // 每个盘的flush线程数,当⽤户盘⽐较少时可以设置较⼤,盘较多
时设置较⼩,⼀般情况下 flush_thread_num_per_store * store_num < be_cpu_core_num / 2
2. olap_table_sink_send_interval_ms=1
3. load_process_max_memory_limit_percent=50
4. send_channel_buffer_limit = 67108864 // 默认值64MB,当⽤户导⼊的数据列较多、单⾏数据较⼤
时,可以适当调⼤
5. number_tablet_writer_threads = 16 // 默认值16,[16-48],⼀般设置为cpu核数的1/3左右。
BrokerLoad 排查思路
1. Show load查看任务状态,状态为CANCELLED的时候进⼀步跟进排查。
2. 如果URL不为空,则通过curl $URL查看具体报错信息。
3. 如果URL为空,通过fe⽇志查看load id和be ip。
4. (检查hdfs⽂件路径是否指定正确,可以指定到具体⽂件也可以指定某⽬录下的所有⽂件。
5. (hdfs导⼊请检查⼀下是否有k8s认证,并进⾏配置 :grep $JobnId fe.log
6. 去到对应的be中查看具体异常:grep $load_id be.INFO
7. 定位到具体错误,并调整broker load配置,尝试重新导⼊任务。
Insert Into
Insert into table values 不建议在线上使用,频繁使用 INSERT 语句导入小批量数据会产⽣过多的数据版本,从⽽影响查询性能。需要流式导⼊或者小批量多次导⼊数据,建议使用 Kafka 作为数据源并通过 Routine Load 方式导入。
Insert into select 可以将源表中的数据导入都标表中。源表可以是⼀张或多张内部表或者外部表。⽬标表必须是 StarRocks 的内表。
Insert Into大数据量导入优化参数
# 每个盘的flush线程数,当⽤户盘⽐较少时可以设置较⼤,盘较多时设置较⼩,⼀般情况下 flush_thread_num_per_store * store_num(磁盘数量) < be_cpu_core_num / 2
flush_thread_num_per_store=8
olap_table_sink_send_interval_ms=1 # v2.5+ 未找到该参数
load_process_max_memory_limit_percent=50
# 默认值64MB,当⽤户导⼊的数据列较多、单⾏数据较⼤时,可以适当调⼤
send_channel_buffer_limit = 67108864 # v2.5+ 未找到该参数
# 默认值16,[16-48],⼀般设置为cpu核数的1/3左右
number_tablet_writer_threads = 16 # v2.5+ 未找到该参数
Stream Load
curl --location-trusted -u 'root:kye!@#2020' -H "label:label1" \
-H "Expect:100-continue" \
-H "timeout:100" \
-H "column_separator:," \
-T /home/admin/zxl.csv -XPUT \ http://10.82.194.150:8030/api/ads_ops/disk_io_sector/_stream_load
# 导入分区
curl --location-trusted -uroot -H "label:123" -H"partitions:p1,p2" -TtestData http://host:port/api/testDb/testTbl/_stream_load
-H "column_separator: <column_separator>" 列分隔符
-H "row_delimiter:<row_delimiter>" 行分隔符
StreamLoad⼤数据量导⼊优化参数推荐配置
# StreamLoad⼤数据量导⼊优化参数推荐配置
# StarRocksFE配置
stream_load_default_timeout_second = 10800
# StarRocksBE配置
streaming_load_max_mb=102400
streaming_load_max_batch_size_mb=102400
flush_thread_num_per_store=8 // 每个盘的flush线程数,当⽤户盘⽐较少时可以设置较⼤,盘较多时设置较⼩,⼀般情况下 flush_thread_num_per_store * store_num < be_cpu_core_num / 2
olap_table_sink_send_interval_ms=1 # v2.5+ 未找到该参数
load_process_max_memory_limit_percent=50
tablet_max_versions = 20000
enable_new_load_on_memory_limit_exceeded=true
Stream Load 排查思路
1. 查看本次导⼊的load_id和调度到的BE节点IP
grep -w $TxnId fe.log|grep "load id"
2. 输出样例为:2023-1-30 20:48:50,169 INFO (thrift-server-pool-4|138)
[FrontendServiceImpl.streamLoadPut():809] receive stream load put request. db:ssb, tbl: demo_test_1, txn id:
1580717, load id: 7a4d4384-1ad7-b798-f176-4ae9d7ea6b9d, backend: 172.26.92.155
3. 我们可以看到对应的BE节点IP,去到该节点上查看具体原因:grep $load_id be.INFO|less
4. 输出样例为:I0518 11:58:16.771597 4228 stream_load.cpp:202] new income streaming load
request.id=f1481, job_id=-1, txn_id=-1, label=metrics_detail_16, db=starrocks, tbl=metrics_detail
I0518 11:58:16.7 4176 load_channel_mgr.cpp:186] Removing finished load channel load id=f181
I0518 11:58:16.7 4176 load_channel.cpp:40] load channel mem peak usage=1915984, info=limit:
16113540169; label: f181; all tracker size: 3; limit trackers size: 3; parent is null: false; , load_id=f181
5. 如果查不到具体原因,则可以继续查看线程上下⽂进⾏进⼀步跟踪定位,⽐如上⽂的 4176线程 :
grep -w 4176 be.INFO|less 进⼀步分析即可。
RoutineLoad
RoutineLoad 优化⽅式
# 任务调度周期
1. max_batch_interval
通过缩短任务调度周期加速数据消费。但是,更⼩的任务调度周期可能会带来更多的CPU资源消耗。
需要注意的是,任务调度周期最⼩值为5s。
# 任务级别调优
1. 降低导⼊QPS,集群总体的导⼊QPS尽量<10
计算⽅式: 集群routine_load_task_num / routine_load_task_consume_second
2. 增加单个导⼊事务的数据量,单个Routine Load Task导⼊的数据量 > 1G
需要同时调整 max_routine_load_batch_size,routine_load_task_timeout_second来实现
3. 单个BE上并发导⼊任务routine_load_thread_pool_size尽量<be_core_num / 2
# 任务并⾏度
1. max_routine_load_task_concurrent_num
2. desired_concurrent_number
(在partition数量和BE数量较多时,可以通过设置较⼤的该参数来加速任务执⾏。但是,更⼤的并⾏度
可能会带来更多的CPU资源消耗)
# 任务批量⼤⼩
1. routine_load_task_consume_second
通过增⼤单次读取持续时间加速数据消费。
2. max_routine_load_batch_size
通过增⼤单次读取的数据量加速数据消费。
Routine Load 排查思路
当任务状态为PAUSED或者CANCELLED的时候需要介⼊排查
任务状态为PAUSED时:
1. 可以先查看ReasonOfStateChanged定位下原因,例如“Offset out of range” 。
2. 若ReasonOfStateChanged为空,查看ErrorLogUrls可查看具体的报错信息:curl ${ErrorLogUrls}
3. 如果以上⽅法不能获取具体异常,可以执⾏以下命令查看(由于routine load是按周期调度的stream
load任务,所以可以通过调度的任务查看任务的状态:
show routine load task where JobName="routine_load_wikipedia";
查看Message字段可以看到具体异常,如果以上⽅法都不能排查到问题,可以拿到job id在be.INFO⽇志中找到txn id,然后通过txn id在be.INFO中查看上下⽂有具体的任务信息。
任务状态为CANCELLED时:
1. 则可能为导⼊任务执⾏遇到异常(如表被删除)。
您可以参考ReasonOfStateChanged、ErrorLogUrls报错进⾏排查和修复。但是修复后,您⽆法恢复 CANCELLED 状态的导⼊作业。
过txn id在be.INFO中查看上下⽂有具体的任务信息。
任务状态为CANCELLED时:
- 则可能为导⼊任务执⾏遇到异常(如表被删除)。
您可以参考ReasonOfStateChanged、ErrorLogUrls报错进⾏排查和修复。但是修复后,您⽆法恢复 CANCELLED 状态的导⼊作业。
[问题排查-导入失败相关](https://forum.mirrorship.cn/t/topic/4923)
[部署环境检查脚本](https://forum.mirrorship.cn/uploads/short-url/cA3X82Tmzs6GCHqY5lbWoZfRlZS.sh)