Optimizer
PointLookupOptimizer
作用:把符合条件的 OR 表达式转为 IN。
参数hive.optimize.point.lookup 设置是否开启 PointLookupOptimizer,默认为 true.
参数 hive.optimize.point.lookup.min 控制多少个 OR 表达式转为 IN,默认 31。
例如以下 sql, or 有3个 child,分别是 web_site_sk =1,web_site_sk =2, web_site_sk =3。这些 child 都必须是 = 。并且判断的字段是同一字段 web_site_sk。web_site_sk =1 or web_site_sk =2 or web_site_sk =3
可以转为 `web_site_sk in (1,2,3)``
set hive.optimize.point.lookup.min=2;
select web_county,count(1) cnt
from web_site
where web_site_sk = 1 or web_site_sk = 2 or web_site_sk =3
group by web_county;
判断 web_site_sk = 2 改成 2 = web_site_sk 也是可以的。
- 下边的用 or 关联的多个字段可以说可以转化的。
set hive.optimize.point.lookup.min=2;
explain select web_county,count(1) cnt
from web_site
where (web_site_sk = 1 and web_open_date_sk=1) or
(web_site_sk = 2 and web_open_date_sk=2) or
(web_site_sk = 3 and web_open_date_sk=3)
group by web_county;
生成的执行计划如下:
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_7]
Group By Operator [GBY_5] (rows=1 width=109)
Output:["_col0","_col1"],aggregations:["count(VALUE._col0)"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE]
SHUFFLE [RS_4]
PartitionCols:_col0
Group By Operator [GBY_3] (rows=1 width=109)
Output:["_col0","_col1"],aggregations:["count(1)"],keys:web_county
Select Operator [SEL_2] (rows=1 width=117)
Output:["web_county"]
Filter Operator [FIL_8] (rows=1 width=117)
predicate:(struct(web_open_date_sk,web_site_sk)) IN (const struct(1L,1L), const struct(2L,2L), const struct(3L,3L))
TableScan [TS_0] (rows=32 width=117)
tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_site_sk","web_open_date_sk","web_county"]
- struct 字段不能转化为 in
drop table if exists complex_table;
create table complex_table(
c_int int,
c_struct struct<name:string,age:int>
)
row format delimited
FIELDS TERMINATED BY '\t'
collection items terminated by ','
map keys terminated by ':'
lines terminated by '\n'
stored as textfile;
insert into complex_table values(
1,
named_struct("name","Alice", "age",18)
);
select distinct c_int from complex_table where
(c_struct.name="Alice" and c_struct.age=18) or
(c_struct.name="Alice" and c_struct.age=19) or
(c_struct.name="Alice" and c_struct.age=20);
PartitionColumnsSeparator
作用:把分区字段从 in 提取出来。如 STRUCT(T1.a, T1.b, T2.b, T2.c) IN (STRUCT(1, 2, 3, 4) , STRUCT(2, 3, 4, 5)),T1.a, T1.b, T2.c 是分区字段,T2.b 不是分区字段。处理之后删除额外的断言 STRUCT(T1.a, T1.b) IN (STRUCT(1, 2), STRUCT(2, 3))
AND
STRUCT(T2.c) IN (STRUCT(4), STRUCT(5))
这些额外的断言用来分区过滤。一旦分区过滤完成,分区条件会从执行计划中去除。
例如以下语句仅列出了3个分区的,因为 ws_sold_date_sk 是分区字段。
explain extended select distinct ws_item_sk from web_sales
where (ws_sold_date_sk=2452642 and ws_sold_time_sk=1)
or (ws_sold_date_sk=2452641 and ws_sold_time_sk=2)
or (ws_sold_date_sk=2452640 and ws_sold_time_sk=3);
PredicateTransitivePropagate(没找到触发 sql)
以下语句能走到逻辑,但是没有改执行计划
select sum(ws_net_paid) from web_sales join(select * from web_site where web_site_sk < 10) web_site where ws_sold_date_sk=2452640 and web_site_sk = 1;
ConstantPropagate
从 root 到 child 的顺序遍历 DAG,对于每个条件表达式,做以下处理:
- 折叠表达式,如果表达式是 UDF 并且所以的参数是常数。
- Filter 中 包含 true 的表达式去掉。
explain select count(1) from web_site where 1=1 and web_site_sk<10;
可以看到 1=1
总是为 true,可以去掉。
Filter Operator [FIL_8] (rows=10 width=8)
predicate:(web_site_sk < 10L)
- Filter 中 包含 false 的表达式可以短路计算。
explain select count(1) from web_site where 1=1 or web_site_sk<10;
可以看到 1=1
总是为 true,所以所有的表达式都不需要计算。
- null 条件等于 false
以下两个 sql 的结果都为 0
select count(1) from web_site where null;
select count(1) from web_site where false;
- 表达式的传递
如以下语句中web_site_sk=5
可以向上传递, 和web_site_sk < 10
组在一起。
explain select * from (select * from web_site where web_site_sk < 10) t where web_site_sk=5;
最终的 Filter 如下
Filter Operator [FIL_6]
predicate:((web_site_sk < 10L) and (web_site_sk = 5L))
SyntheticJoinPredicate
explain logical
select sum(ws_net_paid) net_paid
from web_sales
where ws_web_site_sk in(
select web_site_sk
from web_site
where web_site_sk < 10 )
and ws_sold_date_sk=2452642;
输出结果如下:
web_site 的 TablScan 后 Filter 增加了 (web_site_sk) IN (RS[7])
predicate: ((web_site_sk < 10L) and (web_site_sk) IN (RS[7])) (type: boolean)
web_sales 之后增加了 Filter 如下:
alias: web_sales
Filter Operator (FIL_20)
predicate: (ws_web_site_sk is not null and (ws_web_site_sk) IN (RS[9])) (type: boolean)
explain logical
select sum(ws_net_paid) net_paid
from web_sales join web_site on ws_web_site_sk=web_site_sk
where
ws_sold_date_sk=2452642;
生成的执行计划如下:
Explain
LOGICAL PLAN:
web_sales
TableScan (TS_0)
alias: web_sales
Filter Operator (FIL_16)
predicate: (ws_web_site_sk is not null and (ws_web_site_sk) IN (RS[5])) (type: boolean)
// ...
web_site
TableScan (TS_1)
alias: web_site
Filter Operator (FIL_17)
predicate: (web_site_sk is not null and (web_site_sk) IN (RS[3])) (type: boolean)
// ...
SortedDynPartitionOptimizer
动态分区排序优化,启用时,在插入记录之前,按分区字段,或者 bucket 字段运行排序,保证一个 reducer 仅有一个 writer,可以减少 reducer 的内存压力。
set hive.stats.autogather=false;
create table profile(c1 string) stored as textfile;
load data local inpath '/etc/profile' overwrite into table profile;
create table p_profile(c1 string) partitioned by (len int);
- 没有经过 sort 优化
set hive.optimize.sort.dynamic.partition=false;
explain logical insert overwrite table p_profile select c1,length(c1) from profile;
输出如下:
LOGICAL PLAN:
profile
TableScan (TS_0)
alias: profile
Select Operator (SEL_1)
expressions: c1 (type: string), length(c1) (type: int)
outputColumnNames: _col0, _col1
File Output Operator (FS_2)
compressed: false
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: test.p_profile
- 经过 sort 优化
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=true;
explain logical insert overwrite table p_profile select c1,length(c1) from profile;
多了排序
Explain
LOGICAL PLAN:
profile
TableScan (TS_0)
alias: profile
Select Operator (SEL_1)
expressions: c1 (type: string), length(c1) (type: int)
outputColumnNames: _col0, _col1
Reduce Output Operator (RS_3)
key expressions: _col1 (type: int)
sort order: +
Map-reduce partition columns: _col1 (type: int)
value expressions: _col0 (type: string)
Select Operator (SEL_4)
expressions: VALUE._col0 (type: string), KEY._col1 (type: int)
outputColumnNames: _col0, _col1
File Output Operator (FS_2)
compressed: false
Dp Sort State: PARTITION_SORTED
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: test.p_profile
SortedDynPartitionTimeGranularityOptimizer
专门为 FileSink 并且FileSink 为 org.apache.hadoop.hive.druid.io.DruidOutputFormat 可以优化。
PartitionPruner(简称 ppr) & PartitionConditionRemover(简称 PCR)
select sum(ws_net_paid) sum_ws_net_paid
from web_sales
where ws_sold_date_sk >= 2452640
and ws_sold_date_sk <=2452642
and ws_net_paid>1.0 ;
PartitionPruner 把 TableScan 后面的 Filter 中的 predicate 信息放入 opToPartList 中。
HashMap<TableScanOperator, PrunedPartitionList> opToPartList;
PartitionConditionRemover 从 opToPartList 取出 ts 对应的,进行 PartitionPruner.prune
得到 partsList。调用 opToPartList.put(ts, partsList);
如果 Filter 中,仅包含 分区字段的条件,则然后删除 TableScan 后的Filter。否则删除 Filter 中关于分区字段的判断。
ListBucketingPruner
drop table if exists list_bucket_test;
CREATE TABLE list_bucket_test (key int, value int) partitioned by (dt string)
SKEWED BY (key) ON (1,2) STORED AS DIRECTORIES;
insert overwrite table list_bucket_test
partition(dt=20250519)
values(1,1),(1,2),(2,3),(2,4),(3,5);
list_bucket_test 表目录下有3个目录,分别是 key=1,key=2 和 HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME。
drwxr-xr-x 4 houzhizhen staff 128 5 19 16:32 HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
drwxr-xr-x 4 houzhizhen staff 128 5 19 16:32 key=1
drwxr-xr-x 4 houzhizhen staff 128 5 19 16:32 key=2
set hive.optimize.listbucketing=true;
select sum(value) from list_bucket_test where dt=20250519 and key=1;
ListBucketingPruner 把 TableScanOperator->(Partition-> “GenericUDFOPEqual(Column[key], Const int 1)”) 信息放入 ParseContext 的以下字段。
Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner;
value 是 dt=20250519 -> “GenericUDFOPEqual(Column[key], Const int 1)”
然后在 GenMapRedUtils 里,根据 opToPartToSkewedPruner 信息生成 Partition 需要处理的 path.
GroupByOptimizer
Group by 优化,如果 group key 包括所有的 bucketing key 和 sort key,并且顺序相同,那么 group by 可以在map 中进行。
drop table if exists test ;
CREATE TABLE test (bkey int,skey int, value int) partitioned by(pkey int)
clustered by(bkey) sorted by (bkey,skey asc) into 8 buckets;
insert overwrite table test partition(pkey=5)
values(1,1,1),(1,2,3),(2,3,3),(2,3,4);
-- 数据量少,不转成 Fetcher
set hive.fetch.task.conversion=none;
explain select bkey,skey, sum(value) from test where pkey=5 group by bkey,skey ;
输出如下,可以看到,在 map 端完成了group by 操作,没有 reduce,这样可能降低并行度。
Explain
Stage-0
Fetch Operator
limit:-1
Stage-1
Map 1
File Output Operator [FS_7]
Group By Operator [GBY_3] (rows=4 width=16)
Output:["_col0","_col1","_col2"],aggregations:["sum(value)"],keys:bkey, skey
Select Operator [SEL_2] (rows=4 width=12)
Output:["bkey","skey","value"]
TableScan [TS_0] (rows=4 width=12)
tpcds_hdfs_orc_3@test,test,Tbl:COMPLETE,Col:COMPLETE,Output:["bkey","skey","value"]
ColumnPruner
后根遍历所有的 Operator,仅仅保留子操作用到的列。
explain logical
select sum(ws_net_paid) net_paid
from web_sales
where ws_sold_date_sk=2452642;
如没有优化的 TableScan,是所有的列,之后的 Select 也是。优化后,仅包含子操作需要的列。
CountDistinctRewriteProc
这个优化仅仅针对 tez 引擎生效。
count(distinct)只能有一个字段。并且仅能有一个 count distinct.
可以生效示例:
explain
select count(distinct ws_web_site_sk)
from web_sales;
生成的执行计划如下:
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3 vectorized
File Output Operator [FS_19]
Group By Operator [GBY_18] (rows=1 width=8)
Output:["_col0"],aggregations:["count(VALUE._col0)"]
<-Reducer 2 [CUSTOM_SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_17]
Group By Operator [GBY_16] (rows=1 width=8)
Output:["_col0"],aggregations:["count(_col0)"]
Group By Operator [GBY_15] (rows=360000188 width=934)
Output:["_col0"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE] vectorized
SHUFFLE [RS_14]
PartitionCols:_col0
Group By Operator [GBY_13] (rows=720000376 width=934)
Output:["_col0"],keys:ws_web_site_sk
Select Operator [SEL_12] (rows=720000376 width=934)
Output:["ws_web_site_sk"]
TableScan [TS_0] (rows=720000376 width=934)
tpcds_bos_parquet_1000@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_web_site_sk"]
Count Distinct 的执行过程:
对于 mr 引擎.
- 多个 Map task 执行 distinct ws_web_site_sk,输出去重后的 ws_web_site_sk。
- 然后启动一个 reducer,拉取所有 Map Task 的数据,进行最终的 count(distinct ws_web_site_sk) 处理。如果每个 distinct ws_web_site_sk 数量比较多,可能 OOM。
对于 tez 引擎
- 多个 Map task 执行 distinct ws_web_site_sk, 根据 ws_web_site_sk 为 partition key,把数据分给多个 reducer。
- 多个 reducer 做 count(distinct ws_web_site_sk) 处理。把数值输出。
- 一个 reducer 对第 2 步的数值进行相加。
不能生效示例1:
explain
select count(distinct ws_web_site_sk),
count(distinct ws_sold_time_sk)
from web_sales;
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3 vectorized
File Output Operator [FS_21]
Group By Operator [GBY_20] (rows=1 width=16)
Output:["_col0","_col1"],aggregations:["count(VALUE._col0)","count(VALUE._col1)"]
<-Reducer 2 [CUSTOM_SIMPLE_EDGE] vectorized
PARTITION_ONLY_SHUFFLE [RS_19]
Group By Operator [GBY_18] (rows=1 width=16)
Output:["_col0","_col1"],aggregations:["count(_col0)","count(_col1)"]
Select Operator [SEL_17] (rows=720000376 width=934)
Output:["_col0","_col1"]
Group By Operator [GBY_16] (rows=720000376 width=934)
Output:["_col0","_col1","_col2"],keys:KEY._col0, KEY._col1, KEY._col2
<-Map 1 [SIMPLE_EDGE] vectorized
SHUFFLE [RS_15]
PartitionCols:_col0, _col1, _col2
Group By Operator [GBY_14] (rows=1440000752 width=934)
Output:["_col0","_col1","_col2"],keys:_col0, _col1, 0L
Select Operator [SEL_13] (rows=720000376 width=934)
Output:["_col0","_col1"]
TableScan [TS_0] (rows=720000376 width=934)
tpcds_bos_parquet_1000@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_web_site_sk","ws_sold_time_sk"]
不能生效示例2:
explain
select count(distinct ws_web_site_sk, ws_sold_time_sk)
from web_sales;
生成的执行计划如下:
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_6]
Group By Operator [GBY_4] (rows=1 width=16)
Output:["_col0"],aggregations:["count(DISTINCT KEY._col0:0._col0, KEY._col0:0._col1)"]
<-Map 1 [SIMPLE_EDGE]
SHUFFLE [RS_3]
Group By Operator [GBY_2] (rows=720000376 width=934)
Output:["_col0","_col1","_col2"],aggregations:["count(DISTINCT ws_web_site_sk, ws_sold_time_sk)"],keys:ws_web_site_sk, ws_sold_time_sk
Select Operator [SEL_1] (rows=720000376 width=934)
Output:["ws_sold_time_sk","ws_web_site_sk"]
TableScan [TS_0] (rows=720000376 width=934)
tpcds_bos_parquet_1000@web_sales,web_sales,Tbl:COMPLETE,Col:NONE,Output:["ws_sold_time_sk","ws_web_site_sk"]
SkewJoinOptimizer
仅仅对 MR 有效。
drop table if exists skew_web_sales;
CREATE TABLE `skew_web_sales`(
`ws_sold_time_sk` bigint,
`ws_ship_date_sk` bigint,
`ws_item_sk` bigint,
`ws_bill_customer_sk` bigint,
`ws_bill_cdemo_sk` bigint,
`ws_bill_hdemo_sk` bigint,
`ws_bill_addr_sk` bigint,
`ws_ship_customer_sk` bigint,
`ws_ship_cdemo_sk` bigint,
`ws_ship_hdemo_sk` bigint,
`ws_ship_addr_sk` bigint,
`ws_web_page_sk` bigint,
`ws_web_site_sk` bigint,
`ws_ship_mode_sk` bigint,
`ws_warehouse_sk` bigint,
`ws_promo_sk` bigint,
`ws_order_number` bigint,
`ws_quantity` int,
`ws_wholesale_cost` decimal(7,2),
`ws_list_price` decimal(7,2),
`ws_sales_price` decimal(7,2),
`ws_ext_discount_amt` decimal(7,2),
`ws_ext_sales_price` decimal(7,2),
`ws_ext_wholesale_cost` decimal(7,2),
`ws_ext_list_price` decimal(7,2),
`ws_ext_tax` decimal(7,2),
`ws_coupon_amt` decimal(7,2),
`ws_ext_ship_cost` decimal(7,2),
`ws_net_paid` decimal(7,2),
`ws_net_paid_inc_tax` decimal(7,2),
`ws_net_paid_inc_ship` decimal(7,2),
`ws_net_paid_inc_ship_tax` decimal(7,2),
`ws_net_profit` decimal(7,2),
`ws_sold_date_sk` bigint)
skewed by (ws_web_site_sk) on (1);
insert overwrite table skew_web_sales select * from web_sales where ws_web_site_sk=1;
insert into table skew_web_sales select * from web_sales where ws_sold_date_sk=2452642 and ws_web_site_sk=3 limit 1;
set hive.optimize.skewjoin.compiletime=true;
set hive.execution.engine=mr;
explain
select ws_web_site_sk,web_class,sum(ws_net_paid)
from web_site join skew_web_sales
on skew_web_sales.ws_web_site_sk=web_site.web_site_sk
group by ws_web_site_sk,web_class;
生成的执行计划如下:
Explain
LOGICAL PLAN:
skew_web_sales
TableScan (TS_1)
alias: skew_web_sales
Statistics: Num rows: 126827 Data size: 15217112 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator (FIL_24)
predicate: (ws_web_site_sk is not null and (ws_web_site_sk = 1L)) (type: boolean)
Statistics: Num rows: 63414 Data size: 7608672 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator (RS_5)
key expressions: ws_web_site_sk (type: bigint)
sort order: +
Map-reduce partition columns: ws_web_site_sk (type: bigint)
Statistics: Num rows: 63414 Data size: 7608672 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: ws_net_paid (type: decimal(7,2))
Join Operator (JOIN_6)
condition map:
Inner Join 0 to 1
keys:
0 web_site_sk (type: bigint)
1 ws_web_site_sk (type: bigint)
outputColumnNames: _col7, _col42, _col58
Statistics: Num rows: 63414 Data size: 13379346 Basic stats: COMPLETE Column stats: COMPLETE
Union (UNION_27)
Statistics: Num rows: 94130 Data size: 19859414 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator (GBY_8)
aggregations: sum(_col58)
keys: _col42 (type: bigint), _col7 (type: varchar(50))
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator (RS_9)
key expressions: _col0 (type: bigint), _col1 (type: varchar(50))
sort order: ++
Map-reduce partition columns: _col0 (type: bigint), _col1 (type: varchar(50))
Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col2 (type: decimal(17,2))
Group By Operator (GBY_10)
aggregations: sum(VALUE._col0)
keys: KEY._col0 (type: bigint), KEY._col1 (type: varchar(50))
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator (FS_12)
compressed: false
Statistics: Num rows: 2 Data size: 422 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
subquery1:skew_web_sales
TableScan (TS_18)
alias: skew_web_sales
Statistics: Num rows: 126827 Data size: 15217112 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator (FIL_26)
predicate: (ws_web_site_sk is not null and (not (ws_web_site_sk = 1L))) (type: boolean)
Statistics: Num rows: 63413 Data size: 7608552 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator (RS_20)
key expressions: ws_web_site_sk (type: bigint)
sort order: +
Map-reduce partition columns: ws_web_site_sk (type: bigint)
Statistics: Num rows: 63413 Data size: 7608552 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: ws_net_paid (type: decimal(7,2))
Join Operator (JOIN_21)
condition map:
Inner Join 0 to 1
keys:
0 web_site_sk (type: bigint)
1 ws_web_site_sk (type: bigint)
outputColumnNames: _col7, _col42, _col58
Statistics: Num rows: 30716 Data size: 6480068 Basic stats: COMPLETE Column stats: COMPLETE
Union (UNION_27)
Statistics: Num rows: 94130 Data size: 19859414 Basic stats: COMPLETE Column stats: COMPLETE
subquery1:web_site
TableScan (TS_15)
alias: web_site
Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator (FIL_25)
predicate: (web_site_sk is not null and (not (ws_web_site_sk = 1L))) (type: boolean)
Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator (RS_17)
key expressions: web_site_sk (type: bigint)
sort order: +
Map-reduce partition columns: web_site_sk (type: bigint)
Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: web_class (type: varchar(50))
Join Operator (JOIN_21)
condition map:
Inner Join 0 to 1
keys:
0 web_site_sk (type: bigint)
1 ws_web_site_sk (type: bigint)
outputColumnNames: _col7, _col42, _col58
Statistics: Num rows: 30716 Data size: 6480068 Basic stats: COMPLETE Column stats: COMPLETE
web_site
TableScan (TS_0)
alias: web_site
Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator (FIL_23)
predicate: (web_site_sk is not null and (ws_web_site_sk = 1L)) (type: boolean)
Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator (RS_3)
key expressions: web_site_sk (type: bigint)
sort order: +
Map-reduce partition columns: web_site_sk (type: bigint)
Statistics: Num rows: 32 Data size: 3168 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: web_class (type: varchar(50))
Join Operator (JOIN_6)
condition map:
Inner Join 0 to 1
keys:
0 web_site_sk (type: bigint)
1 ws_web_site_sk (type: bigint)
outputColumnNames: _col7, _col42, _col58
Statistics: Num rows: 63414 Data size: 13379346 Basic stats: COMPLETE Column stats: COMPLETE
SamplePruner
示例:
drop table if exists t1 ;
create table t1(c1 string) stored as textfile;
load data local inpath '/etc/profile' overwrite into table t1;
drop table if exists t2;
create external table t2(c1 string) CLUSTERED BY (c1) into 8 buckets;
insert overwrite table t2 select * from t1;
set hive.execution.engine=mr;
set hive.fetch.task.conversion=none;
explain select * from t2 TABLESAMPLE(BUCKET 3 OUT OF 8 ON c1) s;
SamplePruner 生效条件:
- t2 是外部表。
- CLUSTERED BY 字段 和 select 中 TABLESAMPLE 的 ON 字段一样。如果是on rand 不行。
- 创建表中
into 8 buckets
和检索中OUT OF 8
的数字要一样。 - 检索中的
(BUCKET 3 OUT OF 8 ON c1)
的 3 代表是第3个文件。
MapJoinProcessor
用户指定使用 MapJoin 的 hint。这种方法已经不再使用,现在是基于统计信息自动把 join 转为 mapjoin.
set hive.ignore.mapjoin.hint=false;
explain select /*+mapjoin(web_site) */ sum(ws_net_paid) from web_sales join web_site on web_site_sk=ws_web_site_sk
where ws_sold_date_sk=2452640 ;
BucketMapJoinOptimizer
使用条件:
- mr 引擎
- sql 中 mapjoin hint 生效。
hive.optimize.bucketmapjoin
为 true,默认为 false.
BucketMapJoinOptimizer,SortedMergeBucketMapJoinOptimizer
mr 引擎生效,并且默认不启用。
BucketingSortingReduceSinkOptimizer
对于 insert overwrite table T2 select * from T1;
如果 T1 和 T2 都 bucket 和 sort key 相同,并且 bucket 的数量相同,那么就不用 reduer 任务。
UnionProcessor
如果 UNION 的两个子查询都是 map-only,把此信息存入 unionDesc/UnionOperator。如果其中的一个子查询是 map-reduce 的作业,在 UNION 之前加入 FS。
UNION 的两个子查询都是 map-only 的示例.
union 和 union all 之间的区别。
union 在 operator union 后,有 group by 和 reduce sink,因为需要去重。
union all 在 operator union 后,没有 group by 和 reduce sink,因为不需要去重。
explain logical select ws_web_site_sk from web_sales where ws_sold_date_sk=2452640
union
select ws_web_site_sk from web_sales where ws_sold_date_sk=2452641;
如果其中的一个子查询是 map-reduce 的作业示例:
explain logical select ws_web_site_sk from web_sales where ws_sold_date_sk=2452640
union
select distinct ws_web_site_sk from web_sales where ws_sold_date_sk=2452641;
在 MapReduceCompiler 使用
BucketVersionPopulator
根据 hash 算法的不同,数据分到 N 个bucket 时会不一致。
Hive 根据 bucketingVersion 确定使用哪个 hash 算法。
在每个 Reduce Sink 之后,可以使用不同的 Bucketing version。因为 full shuffle 可以重新按新的 hash 算法分配数据。
hive 使用表的 bucket version 写入数据。
如果从表读数据,会考虑表的 bucketing_version 字段。
ReduceSinkDeDuplication
如果相邻的两个 reduce sink 有共同的 partition/sort 字段,并且字段的顺序相同,可以合并为一个 reduce。
例如,以下sql 的两处 ‘parition by’ 字段不一样.
explain select
avg(sum(web_tax_percentage)) over
(partition by web_city)
avg_monthly_sales,
rank() over
(partition by web_county, web_city
order by web_county) rn
from web_site
group by web_county, web_city;
生成的语法树如下:
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
以下sql 生成的
explain select
avg(sum(web_tax_percentage)) over
(partition by web_county, web_city)
avg_monthly_sales,
rank() over
(partition by web_county, web_city
order by web_county) rn
from web_site
group by web_county, web_city;
生成的 vertex 如下, 比上一个 sql 少一个 reduce,因为 partition by 相同
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
NonBlockingOpDeDupProc
相邻的两个投影(SEL)操作合并为一个投影(SEL)操作,相邻的两个过滤(FIL)操作合并为一个过滤(FIL)操作。
IdentityProjectRemover
删除不必要的投影(SEL)操作。如果投影(SEL)仅仅是forward,没有计算,如 select x+1 这种带计算,并且没有给列重命名,则可以去除。
如
explain select web_city
from web_site
where web_city > '1'
group by web_city;
生成的 SQL 语法树为, 没有 select
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_7]
Group By Operator [GBY_5] (rows=1 width=91)
Output:["_col0"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE]
SHUFFLE [RS_4]
PartitionCols:_col0
Group By Operator [GBY_3] (rows=1 width=91)
Output:["_col0"],keys:web_city
Filter Operator [FIL_8] (rows=10 width=91)
predicate:(web_city > '1')
TableScan [TS_0] (rows=32 width=91)
tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_city"]
如没有 filter
explain select web_city
from web_site
group by web_city;
在 TS 后,有一个 select.
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_6]
Group By Operator [GBY_4] (rows=2 width=91)
Output:["_col0"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE]
SHUFFLE [RS_3]
PartitionCols:_col0
Group By Operator [GBY_2] (rows=2 width=91)
Output:["_col0"],keys:web_city
Select Operator [SEL_1] (rows=32 width=91)
Output:["web_city"]
TableScan [TS_0] (rows=32 width=91)
tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_city"]
如
explain select upper(web_city) d_web_city
from web_site
where web_city > '1'
group by web_city;
在 File Output Operator 之前,多了一个 select 操作
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_7]
Select Operator [SEL_6] (rows=1 width=144)
Output:["_col0"]
Group By Operator [GBY_5] (rows=1 width=91)
Output:["_col0"],keys:KEY._col0
<-Map 1 [SIMPLE_EDGE]
SHUFFLE [RS_4]
PartitionCols:_col0
Group By Operator [GBY_3] (rows=1 width=91)
Output:["_col0"],keys:web_city
Filter Operator [FIL_8] (rows=10 width=91)
predicate:(web_city > '1')
TableScan [TS_0] (rows=32 width=91)
tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_city"]
GlobalLimitOptimizer
set hive.limit.optimize.enable=true;
select * from web_sales limit 10;
如 web_sales 有很多分区,每个分区下很多文件,查询10条数据,没必要打开所有文件,可能第1个文件就有10条记录,可以减少收入。
CorrelationOptimizer
仅仅对 MR 引擎生效。参考论文:YSmart: Yet Another SQL-to-MapReduce Translator(Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, and Xiaodong Zhang)
LimitPushdownOptimizer
explain extended select sum(web_tax_percentage)
from web_site
group by web_city limit 1;
order by: 把 limit 推到最后的 reduce sink。
explain select ws_web_site_sk
from web_sales
order by ws_web_site_sk limit 1;
StatsOptimizer
有些 query 的结果,可以从 stats 信息中,直接获取。
先生成表的统计信息。
analyze table web_site compute statistics for columns;
执行 sql:
explain select min(web_site_sk) from web_site;
从统计信息获取结果的执行计划如下,仅有一个 Fetch
Explain
Stage-0
Fetch Operator
limit:1
- 不能从统计信息获取结果示例
explain select min(web_site_sk) from web_site where web_site_sk <> 2;
此 SQL 的执行计划如下:
Explain
Vertex dependency in root stage
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 2
File Output Operator [FS_7]
Group By Operator [GBY_5] (rows=1 width=8)
Output:["_col0"],aggregations:["min(VALUE._col0)"]
<-Map 1 [CUSTOM_SIMPLE_EDGE]
PARTITION_ONLY_SHUFFLE [RS_4]
Group By Operator [GBY_3] (rows=1 width=8)
Output:["_col0"],aggregations:["min(web_site_sk)"]
Filter Operator [FIL_8] (rows=32 width=8)
predicate:(web_site_sk <> 2L)
TableScan [TS_0] (rows=32 width=8)
tpcds_hdfs_orc_3@web_site,web_site,Tbl:COMPLETE,Col:COMPLETE,Output:["web_site_sk"]
AnnotateWithStatistics, AnnotateWithOpTraits
当 explain 的时候,并且是 mr 引擎时,在逻辑执行计划优化(Optimizer)时执行。
TEZ 在物理执行计划优化时执行。
AnnotateWithStatistics 给各 Operator 设置 stats(统计信息)。
AnnotateWithOpTraits 设置 opTraits,适用于 bucket 表.如两个 bucket 表做 join。
OpTraits 定义如下,
public OpTraits(List<List<String>> bucketColNames, int numBuckets,
List<List<String>> sortColNames, int numReduceSinks) {
this.bucketColNames = bucketColNames;
this.numBuckets = numBuckets;
this.sortColNames = sortColNames;
this.numReduceSinks = numReduceSinks;
}
explain
select web_site.web_city, sum(ws_ext_tax) ws_ext_tax_sum
from web_site join web_sales on web_site.web_site_sk = web_sales.ws_web_site_sk
where ws_sold_date_sk=2452642 and ws_web_site_sk =1
group by web_site.web_city;
SimpleFetchOptimizer
对于单表简单的操作(没有 group by, 没有 distinct,单表),不启动分布式任务,直接在 fetch task里读取表返回,可以加快执行速度。
hive.fetch.task.conversion
的值
none :禁用
minimal : 支持 SELECT *, 在分区字段过滤, LIMIT
more : SELECT, FILTER, LIMIT only (支持 TABLESAMPLE 和虚拟字段)
hive.fetch.task.conversion.threshold:控制适用这个优化的数据量,默认1G。
TablePropertyEnrichmentOptimizer
默认关闭,除了表的 properties 外,可以获取表的SerDe 的属性信息,然后都放到表的properties 中。
Serde 由参数 hive.optimize.update.table.properties.from.serde.list 控制,默认只有 AvroSerDe。
HiveOpConverterPostProc
以下3个条件都符合才执行,returnPathEnabled 默认 false,所以会不执行。
final boolean cboEnabled = HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED);
final boolean returnPathEnabled = HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP);
final boolean cboSucceeded = pctx.getContext().isCboSucceeded();