背景
本文基于 Starrock 3.3.5
对于这个优化,在很早就有提到,比如说StarRocks 技术内幕 | 基于全局字典的极速字符串查询和StarRocks 查询优化器深度解析,对于实现的说明,可以参考以上的说明,本文以Rule的角度来看一下.
主要涉及 AddDecodeNodeForDictStringRule和LowCardinalityRewriteRule 这两个规则。
总体的思路是:
基于 physical tree 进行,采用自底向上的(bottom-up)方式:当某个字符串列被识别为低基数列时,会被改写为对应的 int 列。此过程中,所有使用该列的字符串函数、聚合函数等操作也将同步改写为支持编码后的整型列的版本。
若在某个节点发现后续操作无法继续基于整型列执行优化,系统将自动插入一个 decode 节点。在 decode 节点中,整型列会被解码还原为原始的 string 列
这里着重说明一点:可以用到字典优化的地方有Scan(需要进行比对),Filter,Agg,Join,Shuffle,Sort
分析
AddDecodeNodeForDictStringRule和LowCardinalityRewriteRule 这两个计划对应的PR为Rewrite plan for low cardinality string with global dict,[Enhancement] support low-cardinality v2,从这个提交的时间线来看,后者是前者的升级版本,即两者得兼容一起看。
AddDecodeNodeForDictStringRule
rewrite方法:
if (!ConnectContext.get().getSessionVariable().isEnableLowCardinalityOptimize()
|| taskContext.getOptimizerContext().getSessionVariable().isUseLowCardinalityOptimizeV2()) {
return root;
}
List<PhysicalOlapScanOperator> scanOperators = Utils.extractPhysicalOlapScanOperator(root);
for (PhysicalOlapScanOperator scanOperator : scanOperators) {
OlapTable table = (OlapTable) scanOperator.getTable();
long version = table.getPartitions().stream().map(Partition::getVisibleVersionTime).max(Long::compareTo)
.orElse(0L);
if ((table.getKeysType().equals(KeysType.PRIMARY_KEYS))) {
continue;
}
if (table.hasForbiddenGlobalDict()) {
continue;
}
if (table.inputHasTempPartition(scanOperator.getSelectedPartitionId())) {
continue;
}
for (ColumnRefOperator column : scanOperator.getColRefToColumnMetaMap().keySet()) {
// Condition 1:
if (!column.getType().isVarchar()) {
continue;
}
ColumnStatistic columnStatistic =
GlobalStateMgr.getCurrentState().getStatisticStorage().getColumnStatistic(table, column.getName());
// Condition 2: the varchar column is low cardinality string column
if (!FeConstants.USE_MOCK_DICT_MANAGER && (columnStatistic.isUnknown() ||
columnStatistic.getDistinctValuesCount() > CacheDictManager.LOW_CARDINALITY_THRESHOLD)) {
LOG.debug("{} isn't low cardinality string column", column.getName());
continue;
}
// Condition 3: the varchar column has collected global dict
Column columnObj = table.getColumn(column.getName());
if (columnObj != null
&& IDictManager.getInstance().hasGlobalDict(table.getId(), columnObj.getColumnId(), version)) {
Optional<ColumnDict> dict =
IDictManager.getInstance().getGlobalDict(table.getId(), columnObj.getColumnId());
// cache reaches capacity limit, randomly eliminate some keys
// then we will get an empty dictionary.
if (!dict.isPresent()) {
continue;
}
globalDictCache.put(new Pair<>(table.getId(), column.getName()), dict.get());
if (!tableIdToStringColumnIds.containsKey(table.getId())) {
Set<Integer> integers = Sets.newHashSet();
integers.add(column.getId());
tableIdToStringColumnIds.put(table.getId(), integers);
} else {
tableIdToStringColumnIds.get(table.getId()).add(column.getId());
}
} else {
LOG.debug("{} doesn't have global dict", column.getName());
}
}
}
...
DecodeContext context = new DecodeContext(globalDictCache, tableIdToStringColumnIds,
taskContext.getOptimizerContext().getColumnRefFactory());
OptExpression rewriteExpr = root.getOp().accept(new DecodeVisitor(), root, context);
if (context.hasEncoded) {
return generateDecodeOExpr(context, Collections.singletonList(rewriteExpr));
}
return rewriteExpr;
首先在cbo_enable_low_cardinality_optimize
和low_cardinality_optimize_v2
为true的情况下(默认都是true),才会继续优化
其次是获取所有的内表Scan物理节点,对于每一个scan节点做如下判断:
- 如果是主键表,则跳到下一个scan节点,否则继续往下
- 如果表级别设置禁止全局字典的话,则跳到下一个scan节点,否则继续往下
- 如果表有临时分区,则跳到下一个scan节点
如果以上条件都没满足:则会对所有的字段进行迭代做如下操作: - 如果是
VARCHAR
类型,则会进行下一步,否则进行下一个字段判断- 调用
getColumnStatistic
,获取对应字段的统计信息,如果该字段的基数大于LOW_CARDINALITY_THRESHOLD(默认 255),则会进行下一个字段的判
断, 否则继续 - 如果对应字段版本有全局字典,则放入内部的字段缓存
globalDictCache
以及tableIdToStringColumnIds
中
- 调用
- 构建好 DecodeContext ,最终走visitor模式(具体是DecodeVisitor)进行迭代处理,返回需要解码的表达式
具体的可以看visitPhysicalOlapScan
方法, - generateDecodeOExpr
对于需要解码的操作,直接在最后加上PhysicalDecodeOperator
操作,比如说我们之前说的visitPhysicalOlapScan
算子,这种SQL类似select * from table
,这种我们是需要得到具体的字段的,而不是字典编码后的int类型的值,所以需要decode
注意:
- 这里是否判断字段是否有全局字典,是通过
CacheDictManager.dictLoader
方法,而这里的方法,会通过异步查询的方式,调用queryDictSync
方法,这个方法会调用类似如下SQL:select dict_merge(col)from table [_META_]
关于dict_merge参考dict_merge
测试下对应的SQL:
MySQL [test]> select dict_merge(col) from test[_META_]\G;
*************************** 1. row ***************************
dict_merge(col): {"2":{"lst":["str",66,"QVJFQV9DT05ESVRJT04","QVVUSE9SSVpBVElPTl9QT1BVUA","QkFGRkxFX1NDRU5F","QkFGRkxFX1RZUEU","QklORF9DQVJEX01PREU","Q0FMQ19ZRUFSX0RBWVM","Q09PUEVSQVRJT05fTU9ERQ","Q1JFRElUX0dSQU5USU5HX1RC","Q1JFRElUX0dSQU5USU5HX1RD","Q1JFRElUX0dSQVRJTkdfVENfVkFMSURJVFlfUEVSSU9E","RElTVFJJQlVURV9UWVBF","RUFSTFlfUkVQQVlfTU9ERQ","RUFSTFlfU0VUVExFX0xJTUlU","RUFSTFlfU0VUVExFX01PREU","RVJST1JfQ09ERV9NQVBQSU5H","RkVFX0JBU0U","RkVFX0NPREU","RkVFX1JBVEVfT1BUSU9O","RkVFX1RZUEU","RkVJWVVF","RlVORF9NQU5BR0VfTU9ERQ","RlVORF9PUkdfUkVMX1RZUEU","RlVORF9UWVBF","R05UX0ZFRTE","R1JBQ0VfT0lOVF9NT0RF","R1JPVVBfUkFURV9UWVBF","R1JPVVBfVFlQRQ","R1VBUkFOVEVFX01PREU","R1VBUkFOVEVFX1RZUEU","TE9BTl9GRUVfUkFURV9TTE9U","TE9BTl9UWVBF","T1JHX05BTUVfU1RBR0VfU0hPVw","UEFZX0NIQU5ORUw","UExBTl9HRU5FUkFURV9NT0RF","UFJJQ0VfVFlQRQ","UFJPRklUX1RZUEU","UkVQQVlNRU5UX0lOX0FQUA","UklTS19QUklDRV9UWVBF","UlBZX0RBWV9UWVBF","UlBZX1RZUEU","UlVMRV9UWVBF","U0VSVklDRV9UWVBF","U0VSVklDRV9UWVBFX0ZPUl9BUFA","U1RBVEVfVFlQRQ","U1VCX1BST0RVQ1RfVFlQRQ","U1VQUE9SVF9NVUxUSVBMRV9MT0FOUw","VklQX0hPSVNUX0xJTkVT","VklQX01FTUJFUl9DQVJE","V0VFS19EQVk","YWRkaXRpb25fb3JnX2NvZGU","Y2FyZF9jYXNo","ZGVkdWN0X29yZw","ZmVlX3N1YmplY3Q","aW5jb21lX2RlZHVjdF9vcmc","aW5jb21lX2Rpc3RyaWJ1dGlvbl9zdGF0ZQ","aW5jb21lX2ZlZV9zdWJqZWN0","aW5jb21lX3R5cGU","cHJvY2Vzc190YXNrX3Jlc3VsdA","cHJvY2Vzc190YXNrX3N0YXR1cw","cHJvZml0X29yZw","cHVzaF9zeXN0ZW0","dGlja2V0X3NjZW5l","dGlja2V0X3R5cGU","dmlwX2NhcmQ","d2l0aGhvbGRfdHlwZQ","eGlhb2ZlaWRhaV9jYXNo"]},"3":{"lst":["i32",66,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66]}}
1 row in set (0.03 sec)
- 查看Starrocks.g4文件
这里会走到 AstBuilder.visitTableAtom,最终会relationPrimary ... AS? alias=identifier)? bracketHint? #tableAtom
tableRelation.addTableHint
,最终会形成PhysicalMetaScanOperator
.
其实这就是Meta Scan操作,也就是元数据层面的数据扫描.
具体的实现可以参考lake_meta_reader.cpp中LakeMetaReader
,里面可以看到是会利用SegmentMetaCollecter从Segment 层面进行统计。
还可以参考StarRocks 技术内幕 | 基于全局字典的极速字符串查询中全局字典的构建
LowCardinalityRewriteRule
这个规则是前一个规则的补充或者说是 升级。支持更多的表达式
@Override
public OptExpression rewrite(OptExpression root, TaskContext taskContext) {
SessionVariable session = taskContext.getOptimizerContext().getSessionVariable();
if (!session.isEnableLowCardinalityOptimize() || !session.isUseLowCardinalityOptimizeV2()) {
return root;
}
ColumnRefFactory factory = taskContext.getOptimizerContext().getColumnRefFactory();
DecodeContext context = new DecodeContext(factory);
{
DecodeCollector collector = new DecodeCollector(session);
collector.collect(root, context);
if (!collector.isValidMatchChildren()) {
return root;
}
}
DecodeRewriter rewriter = new DecodeRewriter(factory, context);
return rewriter.rewrite(root);
}
只有
cbo_enable_low_cardinality_optimize
和low_cardinality_optimize_v2
为true的情况下(默认都是true),才会继续优化其次是构造
DecodeContext
表达式,以及DecodeCollector
来收集字典编码的表达式,public void collect(OptExpression root, DecodeContext context) { collectImpl(root, null); initContext(context); }
总体思路是检查并收集所有的String列以及跟这个string相关的表达式,并把这些表达式替换成对应的字典表达式.
collectImpl 方法中:DecodeInfo info = optExpression.getOp().accept(this, optExpression, context); ... collectPredicate(optExpression.getOp(), info); collectProjection(optExpression.getOp(), info);
- 具体看
DecodeCollector.visitPhysicalOlapScan
实现:
具体的判断和上面的AddDecodeNodeForDictStringRule.rewrite
方法一样,只不过多了一些条件- 增加了对Array 类型的支持,并增加option
array_low_cardinality_optimize
(默认是true) - 把符合条件的字段信息放入到DecodeInfo中
- 增加了对Array 类型的支持,并增加option
- collectPredicate
调用dictExpressionCollector.collect
方法来收集能够使用字典优化的谓词 - collectProjection
调用dictExpressionCollector.collect
方法来收集能够使用字典优化的投影
- 具体看
DecodeRewriter.rewrite
- context.initRewriteExpressions()
替代所有的String 表达式为字典表达式,并修改对应的数据类型 - insertDecodeNode
对不能使用编码的输出列,进行加入decode以便进行字典解码
- context.initRewriteExpressions()
注意:在3.4版本以后是支持主键表的