StarRocks数据导入

发布于:2025-03-27 ⋅ 阅读:(30) ⋅ 点赞:(0)

StarRocks数据导入

Broker Load

使用 broker load 导入 hdfs 文件,需要在 Be 和 Broker 的 conf 目录中添加 HDFS的 hdfs-site.xml、core-site.xml、hive-site.xml ,配置HDFS集群 hosts 。

-- hive建表  creation_month为分区字段
CREATE TABLE `ads_crm_sales_visit_statistics_details2`(
  `id` string COMMENT '拜访id', 
  `sales_id` bigint COMMENT '销售id', 
  ... 
  `exception_elimination` bigint COMMENT '客户统计异常拜访1表示是要排除的0表示不排除', 
  `row_id` bigint COMMENT '虚拟主键')
COMMENT '销售异常拜访-拜访临时表'
PARTITIONED BY ( 
  `creation_month` string COMMENT '创建月份')

-- 查看hive表分区
show partitions ads_crm_sales_visit_statistics_details2;
-- SR建表  creation_month为排序键和分区字段
CREATE TABLE `ads_crm_sales_visit_statistics_details2` (
  `creation_month` date ,
  `creation_date` string ,
  `id` string , 
  `sales_id` bigint , 
  ..... 
  `exception_elimination` bigint , 
  `row_id` bigint  
) ENGINE=OLAP 
DUPLICATE KEY(`creation_month`, `creation_date`)
PARTITION BY RANGE(`creation_month`)
(
START ("2022-05-01") END ("2023-02-01") EVERY (INTERVAL 1 MONTH)
)
DISTRIBUTED BY HASH(`row_id`) BUCKETS 6 
;

ETL 集群导数

使用 broker load 导入 hdfs 文件,需要在 Be 和 Broker 的 conf 目录中添加 HDFS的 hdfs-site.xml、core-site.xml、hive-site.xml ,配置HDFS集群 hosts 。

[ 开通 SR 到 HDFS NameNode 的 8020及 DateNode dfs.datanode.ipc.address、dfs.datanode.http.address、dfs.datanode.address 三个端口网络权限 ]

-- broker load
load label zxl.ads_crm_sales_visit_statistics_details2023_04_13
(
  data infile('hdfs://bigbigworld/user/hive/warehouse/ads_crm.db/ads_crm_sales_visit_statistics_details2/creation_month=202*/*')
  into table ads_crm_sales_visit_statistics_details2
  FORMAT AS 'ORC'
    (  
	  -- hive 表字段信息(不要包含分区字段,分区字段要在下面 COLUMNS FROM PATH 中指定)  
  `creation_date`  ,
  `id`  , 
  `sales_id`  , 
  ..... 
  `exception_elimination`  , 
  `row_id` 
    )
-- 将衍生列指定并转换分区字段,将原来的 yyy-mm 转换成 yyy-mm-dd
COLUMNS FROM PATH AS (creation_month)
SET(creation_month = (concat(creation_month ,'-01'))) 
)
with broker 'hdfs_broker';

-- hdfs_broker 是 broker_name
非 ETL 集群导数

以上 Load 任务是连接 conf 配置中 hdfs-site.xml 的 NameNode 信息,若使用与配置中不同的 HDFS 可使用以下 Load 语句创建 ( NameNade HA )

LOAD LABEL db1.label1
(
    DATA INFILE("hdfs://nameservice1/starRocks/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1, tmp_c2)
    SET
    (
        id=tmp_c2,
        name=tmp_c1
    )
)
WITH BROKER 'brokername'
(
    "username" = "hdfs_username",---无密码可省略
    "password" = "hdfs_password",---无密码可省略
    "dfs.nameservices" = "nameservice1",
    "dfs.ha.namenodes.nameservice1" = "namenode01,namenode02",
    "dfs.namenode.rpc-address.nameservice1.namenode01" = "nn01:8020",
    "dfs.namenode.rpc-address.nameservice1.namenode02" = "nn02:8020",
    "dfs.client.failover.proxy.provider" =    "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
-- 该配置实际引用的是 hdfs-site.xml 中 dfs.nameservices、dfs.ha.namenodes.nameservice1、dfs.namenode.rpc-address.nameservice1.namenode01、dfs.namenode.rpc-address.nameservice1.namenode02、dfs.client.failover.proxy.provider
Broker Load 任务查看
use database;
show load where label = 'label ID';

show load from database where label = 'label ID';

-- 查看 Broker Load 任务状态
SHOW LOAD  FROM zxl WHERE LABEL = "ads_crm_sales_visit_statistics_details2023_04_13";   

-- 取消 Broker Load 任务
CANCEL LOAD FROM zxl WHERE LABEL = "ads_crm_sales_visit_statistics_details2023_01_18";
BrokerLoad⼤数据量导⼊优化参数推荐配置
StarRocks SessionVariables(FE):
1. load_transmission_compression_type = none //默认值是none,当并发提⾼,cpu仍然打不满时,可
以设置为LZ4_FRAME。
v2.4及之前版本:
1. load_parallel_instance_num=8 // 可以根据BE的cpu核数设置,⼀般设置区间为[cpu核数/2 - 64]
v2.5+版本 & session variable enable_adaptive_sink_dop=true:

1. pipeline_dop=0 // ⾃动设置BE核数的⼀半作为并⾏度,可以⼿动设置⾮0值,和查询的设置⽅式
⼀致。
2. 注: 新部署的2.5集群⾃动设置enable_adaptive_sink_dop=true。从之前版本升级上来的集群会⾃动
设置enable_adaptive_sink_dop=false,此时依然使⽤load_parallel_instance_num来配置并⾏度。

StarRocks SessionVariables(BE):
1. flush_thread_num_per_store=8 // 每个盘的flush线程数,当⽤户盘⽐较少时可以设置较⼤,盘较多
时设置较⼩,⼀般情况下 flush_thread_num_per_store * store_num < be_cpu_core_num / 2
2. olap_table_sink_send_interval_ms=1  
3. load_process_max_memory_limit_percent=50
4. send_channel_buffer_limit = 67108864 // 默认值64MB,当⽤户导⼊的数据列较多、单⾏数据较⼤
时,可以适当调⼤
5. number_tablet_writer_threads = 16 // 默认值16,[16-48],⼀般设置为cpu核数的1/3左右。
BrokerLoad 排查思路

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1. Show load查看任务状态,状态为CANCELLED的时候进⼀步跟进排查。

2. 如果URL不为空,则通过curl $URL查看具体报错信息。

3. 如果URL为空,通过fe⽇志查看load id和be ip。

4. (检查hdfs⽂件路径是否指定正确,可以指定到具体⽂件也可以指定某⽬录下的所有⽂件。

5. (hdfs导⼊请检查⼀下是否有k8s认证,并进⾏配置 :grep $JobnId fe.log

6. 去到对应的be中查看具体异常:grep $load_id be.INFO

7. 定位到具体错误,并调整broker load配置,尝试重新导⼊任务。

Insert Into

Insert into table values 不建议在线上使用,频繁使用 INSERT 语句导入小批量数据会产⽣过多的数据版本,从⽽影响查询性能。需要流式导⼊或者小批量多次导⼊数据,建议使用 Kafka 作为数据源并通过 Routine Load 方式导入。

Insert into select 可以将源表中的数据导入都标表中。源表可以是⼀张或多张内部表或者外部表。⽬标表必须是 StarRocks 的内表。

Insert Into大数据量导入优化参数
# 每个盘的flush线程数,当⽤户盘⽐较少时可以设置较⼤,盘较多时设置较⼩,⼀般情况下 flush_thread_num_per_store * store_num(磁盘数量) < be_cpu_core_num / 2
flush_thread_num_per_store=8

olap_table_sink_send_interval_ms=1  # v2.5+ 未找到该参数

load_process_max_memory_limit_percent=50 

# 默认值64MB,当⽤户导⼊的数据列较多、单⾏数据较⼤时,可以适当调⼤ 
send_channel_buffer_limit = 67108864  # v2.5+ 未找到该参数

# 默认值16,[16-48],⼀般设置为cpu核数的1/3左右
number_tablet_writer_threads = 16  # v2.5+ 未找到该参数

Stream Load

curl --location-trusted -u 'root:kye!@#2020' -H "label:label1" \
    -H "Expect:100-continue" \
    -H "timeout:100" \
    -H "column_separator:," \
    -T /home/admin/zxl.csv -XPUT \   http://10.82.194.150:8030/api/ads_ops/disk_io_sector/_stream_load

# 导入分区
curl --location-trusted -uroot -H "label:123" -H"partitions:p1,p2" -TtestData http://host:port/api/testDb/testTbl/_stream_load

-H "column_separator: <column_separator>" 列分隔符
-H "row_delimiter:<row_delimiter>" 行分隔符
StreamLoad⼤数据量导⼊优化参数推荐配置
# StreamLoad⼤数据量导⼊优化参数推荐配置

# StarRocksFE配置
stream_load_default_timeout_second = 10800

# StarRocksBE配置
streaming_load_max_mb=102400
streaming_load_max_batch_size_mb=102400
flush_thread_num_per_store=8 // 每个盘的flush线程数,当⽤户盘⽐较少时可以设置较⼤,盘较多时设置较⼩,⼀般情况下 flush_thread_num_per_store * store_num < be_cpu_core_num / 2

olap_table_sink_send_interval_ms=1 # v2.5+ 未找到该参数

load_process_max_memory_limit_percent=50

tablet_max_versions = 20000
enable_new_load_on_memory_limit_exceeded=true
Stream Load 排查思路

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1. 查看本次导⼊的load_id和调度到的BE节点IP 
grep -w $TxnId fe.log|grep "load id" 

2. 输出样例为:2023-1-30 20:48:50,169 INFO (thrift-server-pool-4|138)  
[FrontendServiceImpl.streamLoadPut():809] receive stream load put request. db:ssb, tbl: demo_test_1, txn id:  
1580717, load id: 7a4d4384-1ad7-b798-f176-4ae9d7ea6b9d, backend: 172.26.92.155 

3. 我们可以看到对应的BE节点IP,去到该节点上查看具体原因:grep $load_id be.INFO|less 

4. 输出样例为:I0518 11:58:16.771597 4228 stream_load.cpp:202] new income streaming load  
request.id=f1481, job_id=-1, txn_id=-1, label=metrics_detail_16, db=starrocks, tbl=metrics_detail 
I0518 11:58:16.7 4176 load_channel_mgr.cpp:186] Removing finished load channel load id=f181 
I0518 11:58:16.7 4176 load_channel.cpp:40] load channel mem peak usage=1915984, info=limit:  
16113540169; label: f181; all tracker size: 3; limit trackers size: 3; parent is null: false; , load_id=f181 

5. 如果查不到具体原因,则可以继续查看线程上下⽂进⾏进⼀步跟踪定位,⽐如上⽂的 4176线程 : 
grep -w 4176 be.INFO|less 进⼀步分析即可。

RoutineLoad

RoutineLoad 优化⽅式
# 任务调度周期
1. max_batch_interval
通过缩短任务调度周期加速数据消费。但是,更⼩的任务调度周期可能会带来更多的CPU资源消耗。
需要注意的是,任务调度周期最⼩值为5s。

# 任务级别调优
1. 降低导⼊QPS,集群总体的导⼊QPS尽量<10
计算⽅式: 集群routine_load_task_num / routine_load_task_consume_second
2. 增加单个导⼊事务的数据量,单个Routine Load Task导⼊的数据量 > 1G
需要同时调整 max_routine_load_batch_size,routine_load_task_timeout_second来实现
3. 单个BE上并发导⼊任务routine_load_thread_pool_size尽量<be_core_num / 2

# 任务并⾏度
1. max_routine_load_task_concurrent_num
2. desired_concurrent_number
(在partition数量和BE数量较多时,可以通过设置较⼤的该参数来加速任务执⾏。但是,更⼤的并⾏度
可能会带来更多的CPU资源消耗)

# 任务批量⼤⼩
1. routine_load_task_consume_second
通过增⼤单次读取持续时间加速数据消费。
2. max_routine_load_batch_size
通过增⼤单次读取的数据量加速数据消费。
Routine Load 排查思路

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

当任务状态为PAUSED或者CANCELLED的时候需要介⼊排查 

任务状态为PAUSED时: 
1. 可以先查看ReasonOfStateChanged定位下原因,例如“Offset out of range” 。 

2. 若ReasonOfStateChanged为空,查看ErrorLogUrls可查看具体的报错信息:curl ${ErrorLogUrls} 

3. 如果以上⽅法不能获取具体异常,可以执⾏以下命令查看(由于routine load是按周期调度的stream  
load任务,所以可以通过调度的任务查看任务的状态: 
show routine load task where JobName="routine_load_wikipedia"; 
查看Message字段可以看到具体异常,如果以上⽅法都不能排查到问题,可以拿到job id在be.INFO⽇志中找到txn id,然后通过txn id在be.INFO中查看上下⽂有具体的任务信息。 

任务状态为CANCELLED时:
1. 则可能为导⼊任务执⾏遇到异常(如表被删除)。 
您可以参考ReasonOfStateChanged、ErrorLogUrls报错进⾏排查和修复。但是修复后,您⽆法恢复 CANCELLED 状态的导⼊作业。

问题排查-导入失败相关

部署环境检查脚本

过txn id在be.INFO中查看上下⽂有具体的任务信息。

任务状态为CANCELLED时:

  1. 则可能为导⼊任务执⾏遇到异常(如表被删除)。
    您可以参考ReasonOfStateChanged、ErrorLogUrls报错进⾏排查和修复。但是修复后,您⽆法恢复 CANCELLED 状态的导⼊作业。



[问题排查-导入失败相关](https://forum.mirrorship.cn/t/topic/4923)

[部署环境检查脚本](https://forum.mirrorship.cn/uploads/short-url/cA3X82Tmzs6GCHqY5lbWoZfRlZS.sh)