Starrocks 低基数全局字典优化

发布于:2025-06-29 ⋅ 阅读:(18) ⋅ 点赞:(0)

背景

本文基于 Starrock 3.3.5
对于这个优化,在很早就有提到,比如说StarRocks 技术内幕 | 基于全局字典的极速字符串查询StarRocks 查询优化器深度解析,对于实现的说明,可以参考以上的说明,本文以Rule的角度来看一下.
主要涉及 AddDecodeNodeForDictStringRuleLowCardinalityRewriteRule 这两个规则。
总体的思路是:
基于 physical tree 进行,采用自底向上的(bottom-up)方式:当某个字符串列被识别为低基数列时,会被改写为对应的 int 列。此过程中,所有使用该列的字符串函数、聚合函数等操作也将同步改写为支持编码后的整型列的版本。
若在某个节点发现后续操作无法继续基于整型列执行优化,系统将自动插入一个 decode 节点。在 decode 节点中,整型列会被解码还原为原始的 string 列
这里着重说明一点:可以用到字典优化的地方有Scan(需要进行比对),Filter,Agg,Join,Shuffle,Sort

分析

AddDecodeNodeForDictStringRuleLowCardinalityRewriteRule 这两个计划对应的PR为Rewrite plan for low cardinality string with global dict,[Enhancement] support low-cardinality v2,从这个提交的时间线来看,后者是前者的升级版本,即两者得兼容一起看。

AddDecodeNodeForDictStringRule

rewrite方法:

        if (!ConnectContext.get().getSessionVariable().isEnableLowCardinalityOptimize()
                || taskContext.getOptimizerContext().getSessionVariable().isUseLowCardinalityOptimizeV2()) {
            return root;
        }

        List<PhysicalOlapScanOperator> scanOperators = Utils.extractPhysicalOlapScanOperator(root);

        for (PhysicalOlapScanOperator scanOperator : scanOperators) {
            OlapTable table = (OlapTable) scanOperator.getTable();
            long version = table.getPartitions().stream().map(Partition::getVisibleVersionTime).max(Long::compareTo)
                    .orElse(0L);

            if ((table.getKeysType().equals(KeysType.PRIMARY_KEYS))) {
                continue;
            }
            if (table.hasForbiddenGlobalDict()) {
                continue;
            }
            if (table.inputHasTempPartition(scanOperator.getSelectedPartitionId())) {
                continue;
            }
            for (ColumnRefOperator column : scanOperator.getColRefToColumnMetaMap().keySet()) {
                // Condition 1:
                if (!column.getType().isVarchar()) {
                    continue;
                }

                ColumnStatistic columnStatistic =
                        GlobalStateMgr.getCurrentState().getStatisticStorage().getColumnStatistic(table, column.getName());
                // Condition 2: the varchar column is low cardinality string column
                if (!FeConstants.USE_MOCK_DICT_MANAGER && (columnStatistic.isUnknown() ||
                        columnStatistic.getDistinctValuesCount() > CacheDictManager.LOW_CARDINALITY_THRESHOLD)) {
                    LOG.debug("{} isn't low cardinality string column", column.getName());
                    continue;
                }

                // Condition 3: the varchar column has collected global dict
                Column columnObj = table.getColumn(column.getName());
                if (columnObj != null
                        && IDictManager.getInstance().hasGlobalDict(table.getId(), columnObj.getColumnId(), version)) {
                    Optional<ColumnDict> dict =
                            IDictManager.getInstance().getGlobalDict(table.getId(), columnObj.getColumnId());
                    // cache reaches capacity limit, randomly eliminate some keys
                    // then we will get an empty dictionary.
                    if (!dict.isPresent()) {
                        continue;
                    }
                    globalDictCache.put(new Pair<>(table.getId(), column.getName()), dict.get());
                    if (!tableIdToStringColumnIds.containsKey(table.getId())) {
                        Set<Integer> integers = Sets.newHashSet();
                        integers.add(column.getId());
                        tableIdToStringColumnIds.put(table.getId(), integers);
                    } else {
                        tableIdToStringColumnIds.get(table.getId()).add(column.getId());
                    }
                } else {
                    LOG.debug("{} doesn't have global dict", column.getName());
                }
            }
        }
        ...
        DecodeContext context = new DecodeContext(globalDictCache, tableIdToStringColumnIds,
                taskContext.getOptimizerContext().getColumnRefFactory());

        OptExpression rewriteExpr = root.getOp().accept(new DecodeVisitor(), root, context);
        if (context.hasEncoded) {
            return generateDecodeOExpr(context, Collections.singletonList(rewriteExpr));
        }
        return rewriteExpr;

首先在cbo_enable_low_cardinality_optimizelow_cardinality_optimize_v2为true的情况下(默认都是true),才会继续优化
其次是获取所有的内表Scan物理节点,对于每一个scan节点做如下判断:

  • 如果是主键表,则跳到下一个scan节点,否则继续往下
  • 如果表级别设置禁止全局字典的话,则跳到下一个scan节点,否则继续往下
  • 如果表有临时分区,则跳到下一个scan节点
    如果以上条件都没满足:则会对所有的字段进行迭代做如下操作:
  • 如果是VARCHAR类型,则会进行下一步,否则进行下一个字段判断
    • 调用getColumnStatistic,获取对应字段的统计信息,如果该字段的基数大于LOW_CARDINALITY_THRESHOLD(默认 255),则会进行下一个字段的判
      断, 否则继续
    • 如果对应字段版本有全局字典,则放入内部的字段缓存globalDictCache以及tableIdToStringColumnIds
  • 构建好 DecodeContext ,最终走visitor模式(具体是DecodeVisitor)进行迭代处理,返回需要解码的表达式
    具体的可以看 visitPhysicalOlapScan 方法,
  • generateDecodeOExpr
    对于需要解码的操作,直接在最后加上PhysicalDecodeOperator操作,比如说我们之前说的visitPhysicalOlapScan算子,这种SQL类似select * from table,这种我们是需要得到具体的字段的,而不是字典编码后的int类型的值,所以需要decode

注意

  • 这里是否判断字段是否有全局字典,是通过CacheDictManager.dictLoader方法,而这里的方法,会通过异步查询的方式,调用queryDictSync方法,这个方法会调用类似如下SQL:select dict_merge(col)from table [_META_]
    关于dict_merge参考dict_merge
    测试下对应的SQL:
MySQL [test]>   select dict_merge(col)  from test[_META_]\G;
*************************** 1. row ***************************
dict_merge(col): {"2":{"lst":["str",66,"QVJFQV9DT05ESVRJT04","QVVUSE9SSVpBVElPTl9QT1BVUA","QkFGRkxFX1NDRU5F","QkFGRkxFX1RZUEU","QklORF9DQVJEX01PREU","Q0FMQ19ZRUFSX0RBWVM","Q09PUEVSQVRJT05fTU9ERQ","Q1JFRElUX0dSQU5USU5HX1RC","Q1JFRElUX0dSQU5USU5HX1RD","Q1JFRElUX0dSQVRJTkdfVENfVkFMSURJVFlfUEVSSU9E","RElTVFJJQlVURV9UWVBF","RUFSTFlfUkVQQVlfTU9ERQ","RUFSTFlfU0VUVExFX0xJTUlU","RUFSTFlfU0VUVExFX01PREU","RVJST1JfQ09ERV9NQVBQSU5H","RkVFX0JBU0U","RkVFX0NPREU","RkVFX1JBVEVfT1BUSU9O","RkVFX1RZUEU","RkVJWVVF","RlVORF9NQU5BR0VfTU9ERQ","RlVORF9PUkdfUkVMX1RZUEU","RlVORF9UWVBF","R05UX0ZFRTE","R1JBQ0VfT0lOVF9NT0RF","R1JPVVBfUkFURV9UWVBF","R1JPVVBfVFlQRQ","R1VBUkFOVEVFX01PREU","R1VBUkFOVEVFX1RZUEU","TE9BTl9GRUVfUkFURV9TTE9U","TE9BTl9UWVBF","T1JHX05BTUVfU1RBR0VfU0hPVw","UEFZX0NIQU5ORUw","UExBTl9HRU5FUkFURV9NT0RF","UFJJQ0VfVFlQRQ","UFJPRklUX1RZUEU","UkVQQVlNRU5UX0lOX0FQUA","UklTS19QUklDRV9UWVBF","UlBZX0RBWV9UWVBF","UlBZX1RZUEU","UlVMRV9UWVBF","U0VSVklDRV9UWVBF","U0VSVklDRV9UWVBFX0ZPUl9BUFA","U1RBVEVfVFlQRQ","U1VCX1BST0RVQ1RfVFlQRQ","U1VQUE9SVF9NVUxUSVBMRV9MT0FOUw","VklQX0hPSVNUX0xJTkVT","VklQX01FTUJFUl9DQVJE","V0VFS19EQVk","YWRkaXRpb25fb3JnX2NvZGU","Y2FyZF9jYXNo","ZGVkdWN0X29yZw","ZmVlX3N1YmplY3Q","aW5jb21lX2RlZHVjdF9vcmc","aW5jb21lX2Rpc3RyaWJ1dGlvbl9zdGF0ZQ","aW5jb21lX2ZlZV9zdWJqZWN0","aW5jb21lX3R5cGU","cHJvY2Vzc190YXNrX3Jlc3VsdA","cHJvY2Vzc190YXNrX3N0YXR1cw","cHJvZml0X29yZw","cHVzaF9zeXN0ZW0","dGlja2V0X3NjZW5l","dGlja2V0X3R5cGU","dmlwX2NhcmQ","d2l0aGhvbGRfdHlwZQ","eGlhb2ZlaWRhaV9jYXNo"]},"3":{"lst":["i32",66,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66]}}
1 row in set (0.03 sec)
  • 查看Starrocks.g4文件
    relationPrimary
     ...
     AS? alias=identifier)? bracketHint?                                             #tableAtom
    
    这里会走到 AstBuilder.visitTableAtom,最终会tableRelation.addTableHint,最终会形成PhysicalMetaScanOperator.
    其实这就是Meta Scan操作,也就是元数据层面的数据扫描.
    具体的实现可以参考lake_meta_reader.cpp中LakeMetaReader,里面可以看到是会利用SegmentMetaCollecter从Segment 层面进行统计。
    还可以参考StarRocks 技术内幕 | 基于全局字典的极速字符串查询全局字典的构建

LowCardinalityRewriteRule

这个规则是前一个规则的补充或者说是 升级。支持更多的表达式

 @Override
    public OptExpression rewrite(OptExpression root, TaskContext taskContext) {
        SessionVariable session = taskContext.getOptimizerContext().getSessionVariable();
        if (!session.isEnableLowCardinalityOptimize() || !session.isUseLowCardinalityOptimizeV2()) {
            return root;
        }

        ColumnRefFactory factory = taskContext.getOptimizerContext().getColumnRefFactory();
        DecodeContext context = new DecodeContext(factory);
        {
            DecodeCollector collector = new DecodeCollector(session);
            collector.collect(root, context);
            if (!collector.isValidMatchChildren()) {
                return root;
            }
        }
        DecodeRewriter rewriter = new DecodeRewriter(factory, context);
        return rewriter.rewrite(root);
    }
  • 只有cbo_enable_low_cardinality_optimizelow_cardinality_optimize_v2为true的情况下(默认都是true),才会继续优化

  • 其次是构造DecodeContext表达式,以及DecodeCollector来收集字典编码的表达式,

      public void collect(OptExpression root, DecodeContext context) {
          collectImpl(root, null);
          initContext(context);
      }
    

    总体思路是检查并收集所有的String列以及跟这个string相关的表达式,并把这些表达式替换成对应的字典表达式.
    collectImpl 方法中:

     DecodeInfo info = optExpression.getOp().accept(this, optExpression, context);
     ... 
     collectPredicate(optExpression.getOp(), info);
     collectProjection(optExpression.getOp(), info);
    
    • 具体看DecodeCollector.visitPhysicalOlapScan实现:
      具体的判断和上面的AddDecodeNodeForDictStringRule.rewrite方法一样,只不过多了一些条件
      • 增加了对Array 类型的支持,并增加option array_low_cardinality_optimize(默认是true)
      • 把符合条件的字段信息放入到DecodeInfo中
    • collectPredicate
      调用dictExpressionCollector.collect方法来收集能够使用字典优化的谓词
    • collectProjection
      调用dictExpressionCollector.collect方法来收集能够使用字典优化的投影
  • DecodeRewriter.rewrite

    • context.initRewriteExpressions()
      替代所有的String 表达式为字典表达式,并修改对应的数据类型
    • insertDecodeNode
      对不能使用编码的输出列,进行加入decode以便进行字典解码

注意:在3.4版本以后是支持主键表的


网站公告

今日签到

点亮在社区的每一天
去签到