【OpenGauss源码学习 —— (VecSortAgg)】

发布于:2024-10-17 ⋅ 阅读:(8) ⋅ 点赞:(0)

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss5.1.0 的开源代码和《OpenGauss数据库源码解析》一书

概述

  在 openGauss (OG) 中,VecSortAgg 是一种基于矢量化的排序聚合操作,它用于在执行 SQL 查询时高效地对数据进行分组和聚合。与传统的逐行处理不同,VecSortAgg 通过批量处理数据来提高计算效率,特别适用于大规模数据集的聚合操作。VecSortAgg 支持诸如 GROUP BYROLLUP复杂的分组和多阶段聚合操作。其实现中包含了排序、去重、聚合函数应用等步骤,且通过内存管理机制(如批处理缓存和排序器)优化了内存使用,确保计算的高效性和稳定性。

SortAggRunner::SortAggRunner 函数

  SortAggRunner::SortAggRunnerSortAggRunner 类的构造函数用于初始化排序聚合Sort Aggregation)操作的相关成员变量和资源SortAggRunner 继承自 BaseAggRunner,用于处理向量化的排序聚合逻辑,特别是在处理复杂聚合查询(如涉及 GROUP BYOLAP 函数)时。

/*
 * @Description: Sort agg constructed function.
 * @描述: 构造SortAggRunner函数,用于处理聚合操作的排序。
 */
SortAggRunner::SortAggRunner(VecAggState* runtime)
    : BaseAggRunner(runtime, false),  // 调用基类BaseAggRunner的构造函数,传入runtime和false(表示不使用streaming aggregation)。
      m_FreeMem(false),  // 初始化m_FreeMem为false,表示是否释放内存。
      m_ApFun(false),  // 初始化m_ApFun为false,表示是否使用OLAP函数。
      m_noData(true),  // 初始化m_noData为true,表示当前没有数据。
      m_batchSortIn(NULL),  // 初始化m_batchSortIn为NULL,表示输入批处理为空。
      m_batchSortOut(NULL),  // 初始化m_batchSortOut为NULL,表示输出批处理为空。
      m_SortBatch(NULL),  // 初始化m_SortBatch为NULL,表示排序批处理对象为空。
      m_sortSource(NULL)  // 初始化m_sortSource为NULL,表示排序源为空。
{
    VecAgg* node = NULL;  // 声明一个VecAgg指针node并初始化为NULL。
    errno_t rc;

    node = (VecAgg*)(runtime->ss.ps.plan);  // 从runtime的计划状态中获取VecAgg节点。

    init_phase();  // 调用init_phase()函数初始化聚合的阶段信息。

    /* OLAP function */
    if (node->groupingSets) {  // 如果节点包含分组集(grouping sets),说明需要特殊处理。
        m_ApFun = true;  // 将m_ApFun标记为true,表示当前使用的是OLAP函数。
        init_indexForApFun();  // 初始化与OLAP函数相关的索引。

        if (m_runtime->numphases > 1) {  // 如果存在多个阶段(numphases > 1),需要保存每个阶段的返回结果。
            /* Store return results of phase sort. */
            m_SortBatch = New(CurrentMemoryContext)
                VectorBatch(CurrentMemoryContext, m_runtime->ss.ss_ScanTupleSlot->tts_tupleDescriptor);  // 分配内存并初始化m_SortBatch,用于保存阶段排序的结果批处理。
        }

        build_batch();  // 构建批处理,准备进行数据聚合操作。

        m_cellSize = offsetof(hashCell, m_val) + m_cols * sizeof(hashVal);  // 计算哈希单元的大小,用于存储聚合键值对。
    }

    m_groupState = GET_MATCH_KEY;  // 设置分组状态为GET_MATCH_KEY,表示正在寻找匹配的键值。

    /* Init sort state for distinct operate. */
    int64 workmem = SET_NODEMEM(node->plan.operatorMemKB[0], node->plan.dop);  // 计算用于排序操作的工作内存。
    int64 maxmem = (node->plan.operatorMaxMem > 0) ? SET_NODEMEM(node->plan.operatorMaxMem, node->plan.dop) : 0;  // 如果有最大内存限制,则设置最大内存,否则为0。
    initialize_sortstate(workmem, maxmem, node->plan.plan_node_id, SET_DOP(node->plan.dop));  // 初始化排序状态,传入工作内存、最大内存、节点ID和并行度(dop)。

    int numset = Max(m_runtime->maxsets, 1);  // 计算最大集合数,至少为1。

    m_sortGrps = (GroupintAtomContainer*)palloc0(numset * sizeof(GroupintAtomContainer));  // 分配并初始化用于存储分组信息的结构体数组。
    for (int i = 0; i < numset; i++) {  // 遍历每个集合进行初始化。
        rc = memset_s(
            m_sortGrps[i].sortGrp, 2 * BatchMaxSize * sizeof(hashCell*), 0, 2 * BatchMaxSize * sizeof(hashCell*));  // 将排序分组数组清零,确保没有旧数据残留。
        securec_check(rc, "\0", "\0");  // 检查memset操作是否成功。

        m_sortGrps[i].sortGrpIdx = 0;  // 初始化分组索引为0。
        m_sortGrps[i].sortCurrentBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext);  // 为当前排序分组分配新的缓冲区。
        m_sortGrps[i].sortCurrentBuf->Init();  // 初始化当前分组的缓冲区。
        m_sortGrps[i].sortBckBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext);  // 为备份缓冲区分配新的内存。
        m_sortGrps[i].sortBckBuf->Init();  // 初始化备份缓冲区。
    }

    m_runState = AGG_FETCH;  // 设置运行状态为AGG_FETCH,表示当前阶段为获取数据。
    m_prepareState = GET_SOURCE;  // 设置准备状态为GET_SOURCE,表示准备从数据源获取数据。
    BindingFp();  // 绑定所需的函数指针或回调函数。
}

示例

  假设我们有一个SQL查询,涉及到对大数据集的分组聚合,并且需要对聚合结果进行排序。这个查询可能涉及某种 OLAP (Online Analytical Processing) 函数,如GROUPING SETS,或对某些列执行DISTINCT操作。这时,SortAggRunner类的构造函数会负责初始化排序聚合的执行环境。

示例SQL查询:

SELECT department, COUNT(employee_id)
FROM employees
GROUP BY department
GROUPING SETS ((department), ())
ORDER BY department;

  在上述查询中,要求根据department进行聚合计算员工数量,并对结果进行排序。为了实现这个功能,数据库引擎需要在聚合阶段(GROUP BYGROUPING SETS)中进行排序。这时候,SortAggRunner类的构造函数将会被调用,确保为后续的排序聚合以及分组操作分配和初始化内存。

  1. 首先,构造函数会确定是否使用OLAP函数(通过GROUPING SETS)。
  2. 接着,会初始化用于存储中间结果的批处理(m_SortBatch),并分配内存。
  3. 它还会初始化不同的状态,如当前处理阶段的状态m_runState)以及排序状态initialize_sortstate())。
  4. 分配与分组和排序相关的内存结构m_sortGrps),并为每个集合(numset)准备独立的缓冲区,用来存储中间分组结果

SortAggRunner::init_phase 函数

  SortAggRunner::init_phase 函数的主要功能是初始化排序和阶段信息,为下一阶段的分组聚合操作做好准备。具体来说,如果当前阶段不是最后一个阶段,那么它将为下一阶段进行必要的排序初始化。这个函数确保在多阶段聚合操作中,数据可以按照正确的顺序和内存使用策略进行处理。它会设置排序所需的内存,并根据外部计划节点的元组描述符配置排序参数。

  1. 阶段切换: 如果当前阶段不是最后阶段,会为下一阶段的排序操作配置排序节点。
  2. 排序初始化: 根据指定的列、操作符等参数,使用堆排序算法创建排序器。
  3. 内存管理: 设置排序的工作内存和最大内存限制,确保后续操作在内存限制内执行。
/*
 * @Description: 初始化排序和阶段信息,用于下一阶段的分组操作
 */
void SortAggRunner::init_phase()
{
    /*
     * 如果这不是最后一个阶段,我们需要为下一个阶段按适当顺序进行排序。
     * 检查当前阶段是否小于总阶段数的最后一个阶段。
     */
    if (m_runtime->current_phase < m_runtime->numphases - 1) {
        // 获取下一阶段的排序节点
        Sort* sort_node = m_runtime->phases[m_runtime->current_phase + 1].sortnode;
        // 获取外部节点的计划状态
        PlanState* outer_node = outerPlanState(m_runtime);
        // 获取外部节点的元组描述符,描述数据结构
        TupleDesc tup_desc = ExecGetResultType(outer_node);
        // 设置工作内存的大小,基于排序节点计划中的内存设置和并行度
        int64 work_mem = SET_NODEMEM(sort_node->plan.operatorMemKB[0], sort_node->plan.dop);
        // 设置最大内存的限制,基于计划中的最大内存值
        int64 max_mem = (sort_node->plan.operatorMaxMem > 0) 
                        ? SET_NODEMEM(sort_node->plan.operatorMaxMem, sort_node->plan.dop) 
                        : 0;

        // 初始化批次排序对象,采用堆排序的方式,配置相关的排序信息
        m_batchSortOut = batchsort_begin_heap(
            tup_desc,                     // 外部节点的元组描述符
            sort_node->numCols,            // 排序的列数
            sort_node->sortColIdx,         // 排序列的索引
            sort_node->sortOperators,      // 排序操作符
            sort_node->collations,         // 排序的语言区域设置
            sort_node->nullsFirst,         // 空值是否排在前面
            work_mem,                      // 分配的工作内存
            false,                         // 是否进行唯一性检查(此处为否)
            max_mem                        // 最大内存限制
        );
    }

    // 确认当前阶段小于或等于阶段数
    Assert(m_runtime->current_phase <= m_runtime->numphases);

    // 如果当前阶段小于总阶段数,指向当前的阶段信息
    if (m_runtime->current_phase < m_runtime->numphases) {
        // 将阶段指针指向当前阶段
        m_runtime->phase = &m_runtime->phases[m_runtime->current_phase];
    }
}

示例

  假设我们有一个多阶段的聚合查询,如下所示:

SELECT region, product, SUM(sales) 
FROM sales_data 
GROUP BY GROUPING SETS ((region), (product));

  在这个查询中,GROUPING SETS 表示多阶段的分组操作SortAggRunner::init_phase() 函数会在每个阶段切换时执行,用于为下一阶段(例如按 product 分组)准备排序操作:

  1. 当前阶段处理完毕:当按 region 分组的阶段完成后,该函数会初始化下一阶段的排序逻辑。
  2. 排序初始化:为下一阶段按 product 分组准备排序操作,分配所需的工作内存和最大内存,并确保批次数据按照正确的顺序传递到下一阶段。

SortAggRunner::init_indexForApFun 函数

  函数 init_indexForApFun() 的主要作用是OLAP 聚合操作(特别是 GROUPING SETS 等高级聚合功能)初始化索引和必要的列映射。具体步骤包括:

  1. 列集合的初始化: 首先从运行时上下文 m_runtime 中获取所有参与分组的列和其他需要的列,将它们放入一个 Bitmapset 集合中。
  2. 列位置映射: 为每一列在批处理中的位置分配索引,生成 m_cellBatchMap,它将批处理中的列位置映射到 cell 中。
  3. 分配分组键的内存: 通过计算最大分组列的长度,为保存当前分组键的索引和 cell 中的列位置分配内存,并调用 set_key() 设置这些键。
  4. 初始化聚合信息: 最终,调用 init_aggInfo()初始化聚合操作的信息。

  该函数在处理复杂的 OLAP 查询时,确保正确地将分组列和其他必要的列映射到批处理的正确位置,并为后续的聚合执行准备所需的索引和内存。

/*
 * @Description: 初始化Ap函数的索引,用于OLAP(Online Analytical Processing)聚合操作的索引设定。
 */
void SortAggRunner::init_indexForApFun()
{
    int i = 0;
    int index = 0;
    ListCell* lc = NULL;
    Bitmapset* all_need_cols = NULL;

    /* 获取所有参与分组的列 */
    foreach (lc, m_runtime->all_grouped_cols) {
        int var_number = lfirst_int(lc) - 1;  // 从链表中获取列号,列号减1以匹配内部存储格式
        all_need_cols = bms_add_member(all_need_cols, var_number);  // 将该列添加到需要的列的集合中
    }

    /* 获取其他未参与分组但需要的列 */
    foreach (lc, m_runtime->hash_needed) {
        int var_number = lfirst_int(lc) - 1;  // 同样减1调整列号
        all_need_cols = bms_add_member(all_need_cols, var_number);  // 添加未参与分组但需要的列
    }

    m_cellVarLen = bms_num_members(all_need_cols);  // 计算需要的列的总数
    m_cellBatchMap = (int*)palloc(sizeof(int) * m_cellVarLen);  // 分配内存,保存列在批处理中位置的映射

    /*
     * m_cellBatchMap 映射outbatch中的列位置到cell中。
     * 例如,m_cellBatchMap[0] = 3 表示:批处理的第3列存储在cell的第0列中
     */
    while ((index = bms_first_member(all_need_cols)) >= 0) {  // 获取集合中每个列的位置
        m_cellBatchMap[i] = index;  // 将位置映射到m_cellBatchMap数组
        i++;
    }

    int max_group_col_len = 0;
    for (i = 0; i < m_runtime->numphases; i++) {  // 遍历所有聚合阶段,找到最大分组列数
        int group_col = m_runtime->phases[i].aggnode->numCols;  // 获取当前阶段的分组列数
        if (group_col > max_group_col_len) {
            max_group_col_len = group_col;  // 更新最大分组列数
        }
    }

    /* 保存当前需要处理的分组列,它们的值将在执行过程中设置 */
    m_keyIdx = (int*)palloc(sizeof(int) * max_group_col_len);  // 分配内存,保存分组列的索引
    m_keyIdxInCell = (int*)palloc(sizeof(int) * max_group_col_len);  // 分配内存,保存分组列在cell中的位置

    set_key();  // 设置分组键
    init_aggInfo(m_runtime->numaggs, m_runtime->aggInfo);  // 初始化聚合信息
}

示例

  假设我们有一个 SQL 查询,涉及复杂的分组集合操作,比如 GROUPING SETS

SELECT department, team, COUNT(employee_id)
FROM employees
GROUP BY GROUPING SETS ((department, team), (department), ());

  这个查询要求数据库首先根据 departmentteam 进行分组计算,然后根据 department 单独进行分组,最后计算所有员工的总数。为了支持这种复杂的分组逻辑,数据库引擎需要对不同的分组列建立映射,并将这些列的位置与批处理中的列进行关联。
  init_indexForApFun() 函数在此场景下会负责为 departmentteam 这些分组列初始化索引和列位置映射。具体来说,它会:

  1. departmentteam 列添加到需要处理的列集合中。
  2. 生成列在批处理中的位置映射,确保每次处理 GROUPING SETS 时能够正确地定位列。
  3. 分组键聚合操作分配内存并初始化,以便后续的聚合过程能够顺利执行。

SortAggRunner::set_key 函数

  此函数的主要作用是设置分组键(key),为当前聚合阶段准备好需要的分组列及其在批次中的映射。在执行分组聚合时,需要知道如何对元组进行分组,因此该函数从 m_runtime 中提取分组信息,包括列索引比较函数,将它们映射到内部的结构中,供后续的聚合处理使用。
  具体功能分为以下几部分:

  1. 初始化分组列索引:从当前聚合阶段 (phase) 中提取需要分组的列索引 (grpColIdx),这些列是分组操作的基础。由于索引从1开始,代码减去1以适应内部的存储结构。
  2. 映射分组列:通过 m_cellBatchMap 将这些分组列的索引映射到输出批次(outbatch)中,确保在执行时可以找到对应的列。
  3. 设置比较函数:最后,设置分组键的比较函数(eqfunctions),用于后续对这些键的相等性判断。
void SortAggRunner::set_key()
{
    int i = 0;

    // 获取当前阶段的分组列数量(gset_lengths[0])作为m_key的值
    m_key = m_runtime->phase->gset_lengths[0]; 

    // 初始化 m_keyIdx:获取分组列的索引,将这些索引保存到 m_keyIdx 数组中,索引从1开始,因此需要减1
    for (i = 0; i < m_key; i++) {
        m_keyIdx[i] = m_runtime->phase->aggnode->grpColIdx[i] - 1;
    }

    // 查找这些分组列在输出批次(outbatch)中对应的位置
    for (i = 0; i < m_key; i++) {
        bool is_found = false;  // 用于标记当前列在 m_cellBatchMap 中是否找到匹配的列
        for (int k = 0; k < m_cellVarLen; k++) {
            // 如果分组列的索引在 m_cellBatchMap 中找到匹配,则记录该列的索引
            if (m_keyIdx[i] == m_cellBatchMap[k]) {
                is_found = true;
                m_keyIdxInCell[i] = k;  // 记录该分组列在 cell 中的索引位置
            }
        }
        // 确保分组列在 m_cellBatchMap 中找到对应的列
        Assert(is_found);
    }

    // 设置当前阶段的相等比较函数,m_eqfunctions 用于分组列的比较
    m_eqfunctions = m_runtime->phase->eqfunctions;
}

示例

  假设我们有一个名为 sales 的表,其结构如下:

CREATE TABLE sales (
    region VARCHAR,
    product VARCHAR,
    revenue INT,
    year INT
) with (orientation = column);

  现在,我们希望通过以下 SQL 查询对 sales 表的数据按 regionyear 进行分组,统计每组的总收入 (revenue):

SELECT region, year, SUM(revenue) 
FROM sales 
GROUP BY region, year;

  在执行计划中,这个查询会生成一个聚合节点(Agg),其分组列为 regionyear,并且会使用聚合函数 SUM(revenue) 计算每个分组的总收入。
  在执行这个查询时,set_key() 函数负责对聚合的分组列(regionyear)进行处理,确保它们能够在后续的聚合操作中被正确使用。下面是该函数如何处理这个 SQL 查询中的分组列的具体步骤:

  1. 确定分组列数量:

m_key = m_runtime->phase->gset_lengths[0]
对于该查询,分组列regionyear,所以 gset_lengths[0] 返回 2,表示有两列需要进行分组,即 regionyear。此时,m_key 会被设置为 2

  1. 设置分组列索引:

m_keyIdx[i] = m_runtime->phase->aggnode->grpColIdx[i] - 1
假设 grpColIdx 包含列的索引(region 的索引为 1year 的索引为 4),通过减1操作后,m_keyIdx 将设置为 [0, 3],表示我们将对第1列和第4列进行分组

  1. 映射分组列到批次中的位置:

m_keyIdxInCell[i] = k
假设 m_cellBatchMap 保存了批次中列的映射,例如批次中的列是 [0, 1, 2, 3](代表 region, product, revenue, year),m_keyIdxInCell 将被设置为 [0, 3],表示 region 对应于批次中的第0列,year 对应于批次中的第3列。

  1. 设置相等比较函数:

m_eqfunctions = m_runtime->phase->eqfunctions
这一步会为每个分组列设置相应的相等比较函数,例如对 regionyear 使用字符串和整数的相等比较函数,确保在后续的分组过程中可以正确判断分组键是否相等。

BaseAggRunner::initialize_sortstate 函数

  该函数的主要作用是在执行包含 DISTINCT 操作的聚合查询时,为每个聚合操作初始化排序状态。如果一个聚合操作需要对某些列进行去重(即 DISTINCT 操作),该函数会为每个阶段分配相关的批处理和排序状态,以便后续在执行聚合时能够正确地对结果进行去重和排序。
  函数首先遍历所有的聚合操作,检查是否有需要 DISTINCT 操作的列。如果有,则为每个聚合操作分配 SortDistinct 结构,其中包含去重所需的批处理和排序状态。

void BaseAggRunner::initialize_sortstate(int work_mem, int max_mem, int plan_id, int dop)
{
    int agg_no;
    VecAggStatePerAgg per_agg = m_runtime->pervecagg;  // 获取聚合操作的状态信息,存储了每个聚合的相关属性
    VecAggStatePerAgg per_agg_state = NULL;  // 初始化单个聚合状态指针为 NULL

    // 循环遍历每个聚合操作,检查是否有 distinct 操作
    for (agg_no = 0; agg_no < m_aggNum; agg_no++) {
        per_agg_state = &per_agg[agg_no];
        if (per_agg_state->numDistinctCols > 0) {
            m_hasDistinct = true;  // 如果当前聚合操作包含 distinct 列,则标记为 true
            break;  // 有 distinct 列,直接跳出循环
        }
    }

    // 如果有 distinct 操作,开始初始化 distinct 相关的排序状态
    if (m_hasDistinct) {
        /*
         * 我们需要为最多的阶段数分配空间,而不是仅为第一个解析阶段分配空间,
         * 否则可能导致非法内存访问。
         */
        int numset = Max(m_runtime->maxsets, 1);  // 计算最多的阶段数,确保至少有一个阶段

        m_sortDistinct = (SortDistinct*)palloc0(numset * sizeof(SortDistinct));  // 为 SortDistinct 结构分配内存

        // 遍历每个阶段,初始化 distinct 相关的批处理和排序状态
        for (int i = 0; i < numset; i++) {
            // 为每个聚合操作分配批处理和排序状态数组
            m_sortDistinct[i].aggDistinctBatch = (VectorBatch**)palloc(m_aggNum * sizeof(VectorBatch*));
            m_sortDistinct[i].batchsortstate = (Batchsortstate**)palloc0(m_aggNum * sizeof(Batchsortstate*));

            // 分配存储 distinct 最后值的数组
            m_sortDistinct[i].lastVal = (ScalarValue*)palloc0(m_aggNum * sizeof(ScalarValue));
            m_sortDistinct[i].lastValLen = (int*)palloc0(m_aggNum * sizeof(int));

            // 遍历每个聚合操作,初始化 distinct 相关的批处理和排序
            for (agg_no = 0; agg_no < m_aggNum; agg_no++) {
                per_agg_state = &per_agg[agg_no];

                // 如果当前聚合操作包含排序列,初始化相应的批处理和排序状态
                if (per_agg_state->numSortCols > 0) {
                    // 初始化用于 distinct 聚合的批处理对象
                    m_sortDistinct[i].aggDistinctBatch[agg_no] =
                        New(CurrentMemoryContext) VectorBatch(CurrentMemoryContext, per_agg_state->evaldesc);

                    // 初始化 distinct 的排序状态,使用批处理排序函数 `batchsort_begin_heap`
                    m_sortDistinct[i].batchsortstate[agg_no] = batchsort_begin_heap(per_agg_state->evaldesc,
                        per_agg_state->numSortCols,  // 排序列数
                        per_agg_state->sortColIdx,  // 排序列的索引
                        per_agg_state->sortOperators,  // 排序操作符
                        per_agg_state->sortCollations,  // 排序的排序规则
                        per_agg_state->sortNullsFirst,  // 排序时 NULL 是否排在最前
                        work_mem,  // 分配的工作内存
                        false,  // 不启用索引支持
                        max_mem,  // 分配的最大内存
                        plan_id,  // 当前执行计划的节点 ID
                        dop);  // 并行度

                    // 如果列是压缩类型,则为 lastVal 分配空间
                    if (COL_IS_ENCODE(m_sortDistinct[i].aggDistinctBatch[agg_no]->m_arr->m_desc.typeId)) {
                        m_sortDistinct[i].lastVal[agg_no] = PointerGetDatum((char*)palloc(100));  // 分配100字节存储 lastVal
                        m_sortDistinct[i].lastValLen[agg_no] = 100;  // 设置 lastVal 的长度为100字节
                    }
                } else {
                    // 如果当前聚合操作不包含排序列,则将排序状态和批处理对象设为 NULL
                    m_sortDistinct[i].batchsortstate[agg_no] = NULL;
                    m_sortDistinct[i].aggDistinctBatch[agg_no] = NULL;
                }
            }
        }
    }
}

示例

  假设有如下表格 sales

CREATE TABLE sales (
    region VARCHAR,
    product VARCHAR,
    revenue INT,
    year INT
) with (orientation = column);

  执行一个查询,对 regionproduct 进行去重,同时聚合 revenue

SELECT region, product, SUM(DISTINCT revenue) 
FROM sales 
GROUP BY region, product;

  这个查询的作用是按 regionproductsales 表进行分组,并对每组中的 revenue 进行去重后求和。函数 initialize_sortstate 处理了该查询中 SUM(DISTINCT revenue) 部分的逻辑,特别是对 DISTINCT revenue 列的去重排序

函数逻辑与查询对应关系

  1. 判断是否存在 DISTINCT 操作:

在函数开始时,遍历所有聚合操作 (m_aggNum),并通过 numDistinctCols 判断是否存在需要去重的列。这与查询中的 SUM(DISTINCT revenue) 直接对应,系统检测到 revenue 列需要去重。。

  1. 分配排序和去重的状态内存:

如果发现 DISTINCT 操作,函数为每个阶段分配内存 (palloc0),为 distinct 操作生成必要的 SortDistinct 数据结构。查询中的 SUM(DISTINCT revenue) 需要对revenue列进行排序并去重,因此会为 revenue 列分配批处理 (aggDistinctBatch) 和排序状态 (batchsortstate)。

  1. 去重操作的排序:

排序状态是通过 batchsort_begin_heap 初始化的。该排序器会按照传入的 sortColIdxsortOperators 对列进行排序。针对 SUM(DISTINCT revenue),排序器会确保在同一组 (regionproduct) 内对 revenue 列进行排序,然后去掉重复值

SortAggRunner::BindingFp 函数

  这段代码的作用是SortAggRunner 初始化相应的函数指针,根据不同的聚合查询情况(是否有 DISTINCT、是否使用简单 key)动态绑定合适的构建排序和聚合的函数。通过这种动态函数指针绑定机制,确保 SortAggRunner 在执行过程中选择正确的排序和聚合函数

  • 如果查询中包含 DISTINCT 聚合列,系统会选择带有 DISTINCT 处理逻辑的聚合函数。
  • 如果没有 DISTINCT 列,系统会选择标准的聚合函数
  • 如果有复杂的 key,例如多列分组,则会使用处理复杂 key 的函数。
  • 最后,通过判断是否有最终聚合操作,系统会选择合适的扫描批处理函数用于处理聚合后的结果。
void SortAggRunner::BindingFp()
{
    // 如果 key 是简单的(即没有复杂的 key 运算),选择不同的构建函数
    if (m_keySimple) {
        // 如果有 DISTINCT 操作,选择带有 DISTINCT 的构建函数
        if (m_hasDistinct) {
            m_buildSortFun = &SortAggRunner::buildSortAgg<true, true>; // 构建有简单 key 且有 DISTINCT 的排序聚合函数
        } else {
            m_buildSortFun = &SortAggRunner::buildSortAgg<true, false>; // 构建有简单 key 但没有 DISTINCT 的排序聚合函数
        }
    } else {
        // 如果 key 不是简单的,选择没有简单 key 的不同构建函数
        if (m_hasDistinct) {
            m_buildSortFun = &SortAggRunner::buildSortAgg<false, true>; // 构建无简单 key 但有 DISTINCT 的排序聚合函数
        } else {
            m_buildSortFun = &SortAggRunner::buildSortAgg<false, false>; // 构建无简单 key 且没有 DISTINCT 的排序聚合函数
        }
    }

    // 初始化构建扫描批处理的函数
    // 如果有最终聚合操作,则选择最终聚合的扫描批处理构建函数
    if (m_finalAggNum > 0)
        m_buildScanBatch = &BaseAggRunner::BuildScanBatchFinal; // 选择处理最终聚合结果的批处理函数
    else
        m_buildScanBatch = &BaseAggRunner::BuildScanBatchSimple; // 否则选择简单批处理函数
}

SortAggRunner::buildSortAgg 函数

  SortAggRunner::buildSortAgg 模板函数用于处理向量批次中的分组和聚合操作。该函数根据 m_groupState 的状态来执行相应的操作,主要分为以下两种状态:

  1. GET_MATCH_KEY:负责获取当前分组号,如果所有分组都已处理完毕则返回退出。对于包含 Ap 函数(高级聚合函数)的查询,可能会需要为一个批次处理多个分组
  2. SORT_MATCH:负责对当前分组进行批次匹配和聚合操作,调用 BatchMatchAndAgg<simple, hashDistinct> 函数来进行具体的操作。处理完一个分组后,递增分组索引并返回到 GET_MATCH_KEY 状态继续处理。

  该函数可以处理多种场景,如:

  1. simple 参数用于指示是否为简单分组键
  2. hashDistinct 参数用于指示是否包含 DISTINCT 聚合操作。
template <bool simple, bool hashDistinct>
void SortAggRunner::buildSortAgg(VectorBatch* batch)
{
    int current_grp = 0;  // 初始化当前分组索引

    while (true) {  // 无限循环,直到手动返回
        switch (m_groupState) {  // 根据当前的分组状态执行不同的操作
            /*
             * 获取分组列的数量。
             * 对于包含 Ap 函数的查询,可能需要为此批次处理多个分组。
             */
            case GET_MATCH_KEY:
                // 如果无法设置当前的分组号(例如分组已经处理完毕),则退出
                if (!set_group_num(current_grp)) {
                    return;  // 无法设置当前分组时,直接返回退出循环
                }
                m_groupState = SORT_MATCH;  // 设置成功后,转移到 SORT_MATCH 状态进行排序匹配
                break;
            case SORT_MATCH:
                // 执行批次的匹配和聚合操作
                BatchMatchAndAgg<simple, hashDistinct>(current_grp, batch);  // 根据当前分组,处理聚合和匹配
                current_grp++;  // 处理下一个分组
                m_groupState = GET_MATCH_KEY;  // 完成后,返回 GET_MATCH_KEY 状态准备处理下一分组
                break;
            default:
                // 如果进入了不支持的状态,则抛出错误
                ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), errmsg("Unsupported state in vec sort agg")));
                break;
        }
    }
}

SortAggRunner::set_group_num 函数

  该函数的作用是根据当前的分组阶段分组次数,确定是否继续进行分组处理,并设置当前分组所使用的列数。函数会根据 m_ApFun 标志判断是否涉及复杂的多分组集情况。如果 m_ApFunfalse,表示简单分组,函数只处理一个分组集;如果 m_ApFuntrue,则根据当前阶段 phase 的设置逐个处理多个分组集
详细功能描述

  • 对于简单的 GROUP BY,只需要处理一次分组操作,函数会检查 current_grp 是否超过0,如果超过0则不再继续分组,返回 false停止分组操作
  • 对于复杂的 GROUP BY多分组集操作,函数会根据 current_grp 值来逐个处理不同的分组集。每个分组集的列数由 m_runtime->phase->gset_lengths[current_grp] 确定,并将这个列数赋给 m_key,以便后续操作使用。
/*
 * @Description: 设置分组列数量,当在同一阶段切换分组时调用该函数
 * @in current_grp - 当前阶段的分组次数
 */
bool SortAggRunner::set_group_num(int current_grp)
{
    // 简单的分组聚合,不涉及复杂的分组集(ApFun为false表示不涉及OLAP窗口函数或复杂分组)
    if (m_ApFun == false) {
        // 如果当前是第一个分组以外的分组,则不再需要处理,返回false表示结束分组处理
        if (current_grp > 0) {
            return false;
        }
    } else {
        // 对于复杂分组(ApFun为true),如果当前分组数达到了阶段的总分组数,则不再处理
        if (current_grp == m_runtime->phase->numsets) {
            return false;
        } else {
            // 设置当前分组列的长度,必须小于或等于当前阶段的分组集长度
            // 这里的gset_lengths保存了每个分组集的列数,current_grp指定了当前处理的是第几个分组集
            m_key = m_runtime->phase->gset_lengths[current_grp];

            return true; // 成功设置当前分组列的数量
        }
    }

    // 如果是简单分组,返回true表示还可以继续处理当前分组
    return true;
}

SortAggRunner::BatchMatchAndAgg 函数

  该函数用于在批处理聚合操作时,逐行处理数据,进行分组键的匹配以及聚合操作。根据是否启用了去重hashDistinct)和是否有简单分组simple),决定如何初始化和分配哈希单元,并对分组进行聚合计算。主要的操作包括:

  • 处理没有分组键的情况:如果没有分组键(m_key == 0),则直接对整个批次的数据进行聚合计算不做分组区分
  • 处理有分组键的情况:逐行检查当前行是否匹配已有分组键,如果匹配则将该行加入已有分组,如果不匹配切换到新的分组,并为该分组分配哈希单元。
  • 去重操作:如果启用了去重,则在分组键不匹配时,执行去重逻辑并对数据进行排序聚合
/*
 * @Description: 匹配并执行聚合计算
 * @in current_grp - 当前阶段的分组次数
 * @in batch - 当前处理的数据批次
 */
template <bool simple, bool hashDistinct>
void SortAggRunner::BatchMatchAndAgg(int current_grp, VectorBatch* batch)
{
    hashCell* cell = NULL;                     // 用于存储当前处理的哈希单元(存放分组结果)
    hashCell** sort_grp;                       // 当前分组的哈希单元数组(用于分组排序)
    int current_sortGrpIdx;                    // 当前分组排序索引
    int diff_group_row_num = 0;                // 用于记录分组切换时的行号
    int nrows = batch->m_rows;                 // 当前批次的总行数

    current_sortGrpIdx = m_sortGrps[current_grp].sortGrpIdx;  // 获取当前分组的排序索引
    m_sortCurrentBuf = m_sortGrps[current_grp].sortCurrentBuf; // 获取当前使用的缓冲区
    sort_grp = m_sortGrps[current_grp].sortGrp;               // 获取当前分组的哈希单元数组

    // 如果没有分组键(即m_key为0),处理整个批次数据,不需要分组
    if (m_key == 0) {
        for (int i = 0; i < nrows; i++) {
            cell = sort_grp[current_sortGrpIdx];  // 获取当前哈希单元

            // 如果当前哈希单元为空,则分配一个新的哈希单元
            if (cell == NULL) {
                cell = (hashCell*)m_sortCurrentBuf->Allocate(m_cellSize); // 分配内存
                sort_grp[current_sortGrpIdx] = cell;                      // 存储新哈希单元
                initCellValue<simple, true>(batch, cell, i);              // 初始化哈希单元的值
            }

            m_Loc[i] = cell;  // 将当前行的哈希单元存储到结果位置
        }
    } else {  // 如果有分组键
        for (int i = 0; i < nrows; i++) {
            cell = sort_grp[current_sortGrpIdx];  // 获取当前哈希单元

            bool matched_keys = false;  // 用于判断是否匹配当前分组键
            if (cell != NULL) {
                // 如果启用了JIT编译,则调用已生成的代码进行分组键匹配
                if (m_runtime->jitted_SortAggMatchKey) {
                    typedef bool (*match_key_func)(SortAggRunner* tbl, VectorBatch* batch, int batchIdx, hashCell* cell);
                    matched_keys = ((match_key_func)(m_runtime->jitted_SortAggMatchKey))(this, batch, i, cell);
                } else {
                    // 否则使用默认的分组键匹配逻辑
                    matched_keys = match_key<simple>(batch, i, cell);
                }
            }

            // 如果当前哈希单元为空或分组键匹配失败
            if (cell == NULL || matched_keys == false) {
                if (cell != NULL) {
                    current_sortGrpIdx++;  // 切换到下一个分组

                    // 如果分组索引达到批次最大大小,切换缓冲区
                    if (current_sortGrpIdx == BatchMaxSize) {
                        VarBuf* tmp = NULL;
                        tmp = m_sortGrps[current_grp].sortCurrentBuf;
                        m_sortGrps[current_grp].sortCurrentBuf = m_sortGrps[current_grp].sortBckBuf;
                        m_sortGrps[current_grp].sortBckBuf = tmp;
                        m_sortCurrentBuf = m_sortGrps[current_grp].sortCurrentBuf;
                    }

                    // 如果启用了去重
                    if (hashDistinct) {
                        m_sortDistinct[current_grp].m_distinctLoc = cell;  // 保存当前去重位置
                        AppendBatchForSortAgg(batch, diff_group_row_num, i, current_grp);  // 追加批次数据
                        BatchSortAggregation(current_grp, u_sess->attr.attr_memory.work_mem, false, m_runtime->ss.ps.plan->plan_node_id, SET_DOP(m_runtime->ss.ps.plan->dop));  // 执行批次排序聚合
                        diff_group_row_num = i;  // 更新分组切换行号
                    }
                }

                // 为新分组分配哈希单元并初始化
                cell = (hashCell*)m_sortCurrentBuf->Allocate(m_cellSize);
                sort_grp[current_sortGrpIdx] = cell;
                initCellValue<simple, true>(batch, cell, i);
            }

            Assert(cell != NULL);  // 确保哈希单元已分配
            m_Loc[i] = cell;  // 将当前行的哈希单元存储到结果位置
        }
    }

    // 更新当前分组的排序索引
    m_sortGrps[current_grp].sortGrpIdx = current_sortGrpIdx;

    // 如果启用了去重,执行追加和排序聚合
    if (hashDistinct) {
        m_sortDistinct[current_grp].m_distinctLoc = cell;
        AppendBatchForSortAgg(batch, diff_group_row_num, nrows, current_grp);
        BatchNoSortAgg(batch);
    } else {
        BatchAggregation(batch);  // 否则执行常规的批次聚合
    }
}

示例

  假设有如下 SQL 查询:

SELECT region, product, SUM(sales)
FROM sales_data
GROUP BY region, product;

  在此查询中,regionproduct分组键SUM(sales)聚合操作BatchMatchAndAgg 函数将处理来自 sales_data 表的批次数据,按 regionproduct 列对数据进行分组:

  1. 没有分组键的情况:如果查询不使用 GROUP BY(例如:SELECT SUM(sales) FROM sales_data)m_key == 0,函数将直接对整个批次数据进行求和
  2. 有分组键的情况:函数会遍历批次中的每一行,检查该行是否属于当前分组regionproduct 相同),如果是则将其加入该分组的哈希单元中,并执行聚合计算(累加 sales 值)。如果分组键不匹配,则开始处理新分组,并将该行数据作为新分组的起始数据。
  3. 去重:如果查询中使用了 DISTINCT,如 SELECT region, product, SUM(DISTINCT sales)函数将会在匹配分组键时执行去重操作

SortAggRunner::Run 函数

  函数 SortAggRunner::Run() 是整个分组和聚合操作的入口函数负责控制执行流程。它的主要功能是从数据源中获取数据,执行分组和聚合操作,并返回处理结果。函数通过一个状态机实现,按顺序进行获取数据源执行排序切换阶段等操作,直到所有数据处理完毕。

  1. 获取数据源:首先尝试从数据源(可能是左子树或外部数据)中获取批次数据。如果没有更多数据,则标记为完成并返回 NULL
  2. 执行排序和聚合:如果成功获取到数据源执行排序和聚合操作,并返回处理后的批次数据
  3. 阶段切换:如果存在多个阶段(如多阶段分组聚合),则在处理完当前阶段后切换到下一阶段,重新获取数据并继续处理
/*
 * @Description: 入口函数。从这里开始执行聚合操作。
 * @return - 返回分组和聚合的结果。
 */
VectorBatch* SortAggRunner::Run()
{
    // 无限循环,直到返回结果或结束操作
    for (;;) {
        switch (m_prepareState) {
            /* 获取数据源,可能是左子树或者排序状态 */
            case GET_SOURCE: {
                // 获取排序数据源(可能来自上游节点或外部源)
                m_sortSource = GetSortSource();
                // 如果数据源为空,表示没有更多数据,标记结束
                if (m_sortSource == NULL) {
                    m_finish = true;
                    return NULL; // 返回NULL表示没有更多数据
                }
                // 如果获取到数据源,进入排序阶段
                m_prepareState = RUN_SORT;
                break;
            }
            /* 执行分组和聚合操作 */
            case RUN_SORT: {
                // 执行排序操作,获取批次数据
                VectorBatch* batch = RunSort();
                // 如果批次数据为空
                if (BatchIsNull(batch)) {
                    // 如果所有数据处理完成,返回NULL
                    if (m_finish) {
                        return NULL;
                    }
                    // 如果还没有结束,切换到下一阶段的解析
                    m_prepareState = SWITCH_PARSE;
                } else {
                    // 如果批次数据不为空,返回当前批次数据
                    return batch;
                }
                break;
            }
            /* 如果存在多个阶段,切换到下一个阶段 */
            case SWITCH_PARSE: {
                // 切换到下一个阶段
                switch_phase();
                // 重新获取数据源,准备进入下一阶段
                m_prepareState = GET_SOURCE;
                break;
            }
            default:
                // 默认分支,处理不支持的状态
                break;
        }
    }
}

示例

  假设有如下 SQL 查询:

SELECT category, COUNT(*), SUM(sales) 
FROM sales_data 
GROUP BY category;

  在这个查询中,SortAggRunner::Run() 函数会按如下方式工作:

  1. 获取数据源:从 sales_data 中获取批次数据。
  2. 执行分组和聚合:对数据按照 category 列进行分组,并计算 COUNT(*)SUM(sales)
  3. 返回结果:将每个批次的聚合结果返回给调用者,直到所有数据处理完毕。

  如果查询涉及多阶段的分组,如 GROUPING SETS,该函数还会处理不同阶段的数据切换和排序操作。

SortAggRunner::GetSortSource 函数

  SortAggRunner::GetSortSource() 函数用于获取分组和聚合操作的数据源。数据源有两种可能:

  1. 来自已排序的批次数据:如果聚合操作已进行排序,那么函数会从内部排序结果中获取数据
  2. 来自外部子计划(左子树):如果没有内部排序结果,则函数从上游节点(通常是左子树)获取数据。

  该函数首先检查当前阶段是否已完成,如果已完成则返回 NULL,表示没有数据源可以获取。否则,它根据当前是否有内部排序结果来决定获取数据的方式,并返回相应的 hashSource 对象。

/*
 * @Description: 获取数据源。数据可能来自左子树,或者是Ap函数中聚合自身排序的结果。
 */
hashSource* SortAggRunner::GetSortSource()
{
    hashSource* ps = NULL;

    // 确保当前阶段索引不超过总阶段数
    Assert(m_runtime->current_phase <= m_runtime->numphases);

    // 如果当前阶段等于总阶段数,说明已经没有数据源可获取,返回NULL
    if (m_runtime->current_phase == m_runtime->numphases) {
        return NULL;
    }
    // 如果有排序输入数据(即m_batchSortIn不为空)
    else if (m_batchSortIn) {
        // 从已排序的批次数据创建一个新的排序源
        ps = New(CurrentMemoryContext) hashSortSource(m_batchSortIn, m_SortBatch);
    }
    // 否则,从外部计划状态(通常是左子树)获取数据源
    else {
        ps = New(CurrentMemoryContext) hashOpSource(outerPlanState(m_runtime));
    }

    // 返回数据源对象
    return ps;
}

SortAggRunner::GetSortSource 函数

  SortAggRunner::RunSort分组聚合操作的核心执行函数。它从数据源获取批次数据,执行分组和聚合计算,并根据不同的状态返回结果。函数的执行逻辑大致分为以下几个步骤:

AGG_FETCH: 从数据源获取批次数据并执行分组聚合。如果没有数据,它会进入处理结束批次或返回空批次的状态。
AGG_RETURN: 返回当前批次的聚合结果。如果有数据则返回,否则继续获取下一批数据。
AGG_RETURN_LAST: 返回最后一批数据,当所有批次处理完毕时进入此状态。
AGG_RETURN_NULL: 当数据源为空且分组列为空时,返回一个空批次。

  函数通过循环和状态切换来处理批次数据,直到所有数据处理完毕。

/*
 * @Description: 从当前数据源中获取数据,执行分组和聚合操作,并返回计算结果。
 * @return - 分组和聚合的结果。
 */
VectorBatch* SortAggRunner::RunSort()
{
    VectorBatch* outer_batch = NULL;  // 用于存储从外部数据源获取的批次数据
    VectorBatch* result_batch = NULL;  // 用于存储聚合后的结果
    int numset = Max(m_runtime->phase->numsets, 1);  // 当前阶段中的组集合数,默认为1
    Plan* plan = m_runtime->ss.ps.plan;  // 获取计划节点
    int64 workmem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop);  // 分配运算内存,基于计划中的运算器内存和并行度
    int64 maxmem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0;  // 如果设置了最大内存,则获取最大内存,否则为0

    if (m_finish)
        return NULL;  // 如果聚合操作已完成,直接返回NULL

    ResetExprContext(m_runtime->ss.ps.ps_ExprContext);  // 重置表达式上下文,准备执行新一轮计算

    for (;;) {
        switch (m_runState) {
            /* 获取数据,执行分组和聚合操作 */
            case AGG_FETCH:
                outer_batch = m_sortSource->getBatch();  // 从数据源获取当前批次数据
                if (unlikely(BatchIsNull(outer_batch))) {  // 如果没有数据,进入不同的状态处理
                    if (m_noData) {  // 如果没有数据标志为真,直接返回空结果
                        m_runState = AGG_RETURN_NULL;
                        m_projected_set = 0;
                        break;
                    }
                    if (m_hasDistinct) {  // 如果存在DISTINCT操作,执行最后一批数据的聚合操作
                        for (int i = 0; i < numset; i++) {
                            BatchSortAggregation(i, workmem, maxmem, plan->plan_node_id, SET_DOP(plan->dop));
                        }
                    }
                    /* 返回最后一批数据 */
                    m_projected_set = -1;
                    m_runState = AGG_RETURN_LAST;
                    break;
                }

                m_noData = false;  // 重置无数据标志
                if (m_batchSortOut) {  // 如果有批次排序输出,将批次数据放入排序缓冲区
                    m_batchSortOut->sort_putbatch(m_batchSortOut, outer_batch, 0, outer_batch->m_rows);
                }

                InvokeFp(m_buildSortFun)(outer_batch);  // 调用构建排序聚合函数处理批次数据
                m_projected_set = -1;
                m_FreeMem = false;
                m_runState = AGG_RETURN;  // 准备返回数据
                break;
            /* 当返回行数超过1000时返回数据 */
            case AGG_RETURN:
                result_batch = ReturnData();  // 获取返回的批次数据
                if (result_batch != NULL) {
                    return result_batch;  // 如果有结果批次,返回该批次
                } else {
                    Assert(m_projected_set == numset);
                    m_runState = AGG_FETCH;  // 否则继续获取数据
                }
                break;
            /* 返回最后一批数据,当 outer_batch 为 NULL 时 */
            case AGG_RETURN_LAST:
                result_batch = ReturnLastData();  // 返回最后一批数据
                if (result_batch != NULL) {
                    return result_batch;
                } else {
                    Assert(m_projected_set == numset);
                    m_runState = AGG_FETCH;  // 否则继续获取数据
                    return NULL;
                }
                break;
            /* 返回在分组列为空且左子树无数据时的结果 */
            case AGG_RETURN_NULL:
                result_batch = ReturnNullData();  // 返回空数据的批次
                if (result_batch != NULL) {
                    return result_batch;
                } else {
                    m_finish = true;  // 完成聚合操作
                    return NULL;
                }
            default:
                break;
        }
    }
}

SortAggRunner::ReturnData 函数

  SortAggRunner::ReturnData 函数 ReturnData() 的主要功能是:

  • 遍历每个聚合组返回满足行数要求(如大于1000行)的聚合结果
  • 在处理每个聚合组时,检查该组是否有足够的数据。如果有,则生成一个结果批次并返回。
  • 如果没有足够的数据,函数会继续检查下一个聚合组
  • 如果所有聚合组都没有符合条件的数据,则返回 NULL,表示没有可以返回的数据。

  该函数的逻辑主要是为了应对大数据集的批量处理,当聚合组中的数据量达到指定大小(如1000行)时,生成一个批次返回,避免一次性处理过多数据导致内存溢出

/*
 * @Description: 当行数大于1000时返回数据。
 * @return - 如果行数大于1000,返回分组和聚合的结果,否则返回null。
 */
VectorBatch* SortAggRunner::ReturnData()
{
    VectorBatch* res_batch = NULL;  // 用于存储将要返回的结果批次。
    int numset = Max(m_runtime->phase->numsets, 1);  // 获取当前阶段的聚合组数,确保至少为1。

    while (m_projected_set < numset) {  // 循环处理每个聚合组。
        if (m_projected_set != -1 && m_FreeMem) {  // 如果当前组已经处理过,并且需要释放内存。
            Assert(m_projected_set >= 0);  // 确保当前聚合组的索引有效。

            /* 释放之前返回的数据所占用的内存 */
            FreeSortGrpMem<false>(BatchMaxSize);  // 释放之前批次的内存,防止内存泄漏。
        }

        m_projected_set++;  // 递增当前要处理的聚合组。
        m_runtime->projected_set = m_projected_set;  // 更新运行时信息中的聚合组编号。

        if (m_projected_set < numset) {  // 检查是否还有未处理的聚合组。
            int total_num = m_sortGrps[m_projected_set].sortGrpIdx;  // 获取当前组的排序组索引。
            m_FreeMem = false;  // 先设置不需要释放内存。

            if (total_num >= BatchMaxSize) {  // 如果当前组的元素数大于等于BatchMaxSize(1000行)。
                hashCell** sort = m_sortGrps[m_projected_set].sortGrp;  // 获取当前聚合组中的排序单元。

                for (int j = 0; j < BatchMaxSize; j++)  // 遍历当前聚合组的所有单元。
                    InvokeFp(m_buildScanBatch)(sort[j]);  // 调用函数处理每个排序单元,生成批次数据。

                res_batch = ProducerBatch();  // 生成最终的批次数据。
                if (unlikely(BatchIsNull(res_batch))) {  // 检查是否生成了空的批次数据。
                    m_FreeMem = true;  // 如果批次为空,标记需要释放内存。
                    continue;  // 跳过当前组,处理下一个聚合组。
                } else {
                    m_FreeMem = true;  // 如果生成了有效的批次数据,也标记释放内存。
                    return res_batch;  // 返回生成的批次数据。
                }
            }
        }
    }
    return NULL;  // 如果没有符合条件的批次数据,返回NULL。
}

示例说明

  假设有一张包含销售数据的表 sales,你需要按月份对销售数据进行聚合,并返回每个月的销售总额。如果某个月的数据行数超过1000行,那么数据库系统会在达到1000行时生成一个批次并返回结果。此时,ReturnData() 函数会检查每个聚合组(例如,某个月的数据),如果数据行数达到1000行,就生成结果并返回。否则,继续检查下一个月份的数据。

SortAggRunner::FreeSortGrpMem 函数

  函数 FreeSortGrpMem 主要用于在聚合操作中释放内存。它通过两种方式来释放内存:

  1. 如果当前批次没有到达最终状态,函数会释放当前批次中部分排序组的内存并移动剩余的排序组
  2. 如果是最后一次调用,则清除整个排序组

  此操作主要是为了在处理大批量数据时,及时释放不再需要的数据,减少内存消耗,防止系统崩溃。

/*
 * @Description: 释放返回批次占用的内存。
 */
template <bool lastCall>
void SortAggRunner::FreeSortGrpMem(int num)
{
    int i, j, idx;  // 定义循环计数器和索引变量。
    Oid type_id;  // 用于存储当前列的数据类型ID。
    hashVal temp_val;  // 用于存储临时哈希值。
    FmgrInfo* final_flinfo = NULL;  // 用于存储最终聚合函数信息。
    FmgrInfo* agg_flinfo = NULL;  // 用于存储聚合函数信息。
    void* value_loc = NULL;  // 用于存储待释放的值指针。
    errno_t rc;  // 存储错误返回码。

    /* 找到聚合列并释放 m_sortGrp 中的值 */
    for (i = 0; i < m_aggNum; i++) {  // 遍历所有聚合列。
        /*
         * 如果列是向量化的 AVG 类型,并且数据类型是 int1/int2/int4/float4/float8,
         * 则无需释放内存,因为这些内存在 ecxt_per_tuple_memory 中,它们会被自动重置。
         */
        final_flinfo = m_runtime->aggInfo[i].vec_final_function.flinfo;  // 获取列的最终聚合函数信息。
        agg_flinfo = m_runtime->aggInfo[i].vec_agg_function.flinfo;  // 获取列的聚合函数信息。
        type_id = agg_flinfo->fn_rettype;  // 获取聚合函数返回值的数据类型。

        // 如果类型是 INT8ARRAYOID 或 FLOAT8ARRAYOID,则跳过该列,继续下一个列的处理。
        if (final_flinfo != NULL && ((type_id == INT8ARRAYOID) || (type_id == FLOAT8ARRAYOID))) {
            continue;
        }

        // 如果列是编码类型,则需要释放内存。
        if (COL_IS_ENCODE(type_id)) {
            idx = m_aggIdx[i];  // 获取聚合列在索引数组中的位置。
            for (j = 0; j < num; j++) {  // 遍历批次中的每一行。
                temp_val = ((hashCell*)(m_sortGrps[m_projected_set].sortGrp[j]))->m_val[idx];  // 获取当前行的值。
                if (NOT_NULL(temp_val.flag)) {  // 如果值非空,则释放其占用的内存。
                    value_loc = DatumGetPointer(temp_val.val);  // 获取值的位置指针。
                    if (value_loc != NULL)
                        pfree_ext(value_loc);  // 释放值指向的内存。
                }
            }
        }
    }

    // 如果不是最后一次调用,则重排剩余的排序组数据。
    if (!lastCall) {
        int remain_grp = m_sortGrps[m_projected_set].sortGrpIdx - BatchMaxSize + 1;  // 计算剩余的分组数量。
        hashCell** sort = m_sortGrps[m_projected_set].sortGrp;  // 获取当前分组的排序单元。

        // 将剩余的分组元素移动到前面。
        for (i = 0; i < remain_grp; i++)
            sort[i] = sort[BatchMaxSize + i];

        const int free_len = 2 * BatchMaxSize - remain_grp;  // 计算需要清零的元素个数。

        // 将多余的排序组位置清零。
        rc = memset_s(&sort[remain_grp], free_len * sizeof(hashCell*), 0, free_len * sizeof(hashCell*));
        securec_check(rc, "\0", "\0");  // 检查内存设置操作的结果。

        /* 重置排序组的索引 */
        m_sortGrps[m_projected_set].sortGrpIdx = remain_grp - 1;

        // 重置排序组的备份缓冲区。
        m_sortGrps[m_projected_set].sortBckBuf->Reset();

        // 重置扫描批次。
        m_scanBatch->Reset();
    } else {
        // 如果是最后一次调用,清空整个排序组。
        rc = memset_s(m_sortGrps[m_projected_set].sortGrp,
            2 * BatchMaxSize * sizeof(hashCell*),
            0,
            2 * BatchMaxSize * sizeof(hashCell*));

        securec_check(rc, "\0", "\0");  // 检查内存设置操作的结果。

        // 重置排序组的索引。
        m_sortGrps[m_projected_set].sortGrpIdx = 0;

        /* 释放排序组的备份缓冲区 */
        m_sortGrps[m_projected_set].sortBckBuf->Reset();

        /* 重置扫描批次 */
        m_scanBatch->Reset();
    }
}

示例

  假设你在一个数据库中执行如下 SQL 查询:

SELECT city, SUM(sales)
FROM sales_data
GROUP BY city
ORDER BY city;

  这个查询的目的是sales_data 表中的销售数据按城市进行分组,并计算每个城市的销售总额。然后按城市名称进行排序
  当我们在执行这样的查询时,数据库会按城市对数据进行分组并逐行累加城市的销售额。同时,为了确保查询的顺利执行,数据库在处理大数据集时需要进行内存管理,以防止内存溢出。这时就需要 FreeSortGrpMem() 函数来释放已处理完成的数据占用的内存
  步骤示例:

  1. 数据库从 sales_data 表中获取每个城市的销售记录,并按城市进行分组。
  2. 在处理每个批次数据时,城市的销售额被逐步累加。
  3. 当某个批次的数据处理完后(比如某一批次中包含1000个城市的销售数据), FreeSortGrpMem() 函数会被调用,释放已经处理完的批次数据的内存。

SortAggRunner::ReturnLastData 函数

  函数 ReturnLastData() 的作用是在处理 SQL 查询中的最后一批数据时,返回最后一批的聚合和分组结果。它通过逐批处理分组数据,并在每个批次处理完后释放内存,确保内存使用不会过高。在没有数据的批次中,会跳过不处理,直到找到并返回非空的批次结果。

/*
 * @Description: 返回最后一个批次的数据。
 * @return - 返回最后一个批次的数据。
 */
VectorBatch* SortAggRunner::ReturnLastData()
{
    VectorBatch* res_batch = NULL; // 初始化返回的批次数据为NULL
    int numset = Max(m_runtime->phase->numsets, 1); // 获取聚合操作中要处理的批次数,默认为1

    // 循环处理所有的批次数据
    while (m_projected_set < numset) {
        // 如果 m_projected_set 不为-1,表示已经处理过一部分批次数据
        if (m_projected_set != -1) {
            Assert(m_projected_set >= 0); // 确保 m_projected_set 有效

            // 释放已经返回数据时使用的内存
            FreeSortGrpMem<true>(m_sortGrps[m_projected_set].sortGrpIdx);
        }

        m_projected_set++; // 处理下一个批次
        m_runtime->projected_set = m_projected_set; // 更新当前运行时状态中的 projected_set

        // 如果当前批次编号还在范围内,继续处理
        if (m_projected_set < numset) {
            int total_num = m_sortGrps[m_projected_set].sortGrpIdx; // 获取当前批次的分组索引

            // 如果当前分组为空,跳过继续处理
            if (total_num == 0 && m_sortGrps[m_projected_set].sortGrp[0] == NULL) {
                continue;
            } else {
                hashCell** sort = m_sortGrps[m_projected_set].sortGrp; // 获取当前批次的分组数据
                // 遍历当前批次的所有分组
                for (int i = 0; i <= total_num; i++) {
                    InvokeFp(m_buildScanBatch)(sort[i]); // 调用函数构建扫描批次
                }

                res_batch = ProducerBatch(); // 生成批次结果
                if (BatchIsNull(res_batch)) { // 如果批次为空,继续处理
                    continue;
                } else {
                    return res_batch; // 返回生成的批次数据
                }
            }
        }
    }
    return NULL; // 如果所有批次处理完毕,返回NULL
}

示例

  假设有一个复杂的 SQL 查询,带有聚合排序操作:

SELECT region, SUM(sales)
FROM sales_data
GROUP BY region
ORDER BY region;

  在这个 SQL 中,GROUP BYORDER BY 操作可能会涉及多次批次处理。在查询执行过程中,数据库会分批次地进行聚合(计算每个区域的销售总和)和排序,可能处理上千行数据。当所有批次的主要数据处理完后,ReturnLastData() 会负责处理和返回最后一批的数据结果

SortAggRunner::ReturnNullData 函数

  ReturnNullData 函数用于在一种特定情况下返回一行数据:==当“group by”列为空且从左子树(lefttree)获取不到数据时。==它通过检查 gset_lengths 来判断当前分组是否包含数据,如果分组长度为0,则需要返回一个批次数据,该批次只包含一行空值,确保即使没有数据也能有一个默认的返回值。

/*
 * @Description: 当“group by”列为空且左子树返回的数据为空时,我们需要返回一行数据。
 * 该函数主要用于在左子树没有数据的情况下处理聚合操作,确保在这种情况下也能返回一行结果。
 * @return - 返回处理结果。如果没有数据,返回NULL。
 */
VectorBatch* SortAggRunner::ReturnNullData()
{
    VectorBatch* result_batch = NULL; // 初始化返回批次为空
    int numset = Max(m_runtime->phase->numsets, 1); // 获取当前阶段的批次数,默认为1

    // 当 gset_lengths 数组存在时,遍历当前批次,直到所有分组数据处理完毕
    while (m_runtime->phase->gset_lengths && m_projected_set < numset) {
        // 如果当前分组长度为0,则该分组不含数据
        if (0 == m_runtime->phase->gset_lengths[m_projected_set]) {
            // 如果 m_scanBatch 没有数据,则需要构建一个包含空值的批次
            if (m_scanBatch->m_rows == 0) {
                BuildNullScanBatch(); // 构建一个包含空值的批次数据
            }

            // 更新当前批次编号
            m_runtime->projected_set = m_projected_set;
            result_batch = ProducerBatch(); // 生成当前批次的结果
            m_projected_set++; // 处理下一个批次

            // 如果生成的批次非空,则返回该批次
            if (!BatchIsNull(result_batch)) {
                return result_batch;
            } else {
                // 如果结果为空,则返回 NULL
                return NULL;
            }
        }
        // 处理下一个批次
        m_projected_set++;
    }
    return NULL; // 如果所有批次处理完毕,返回NULL
}

示例

  假设我们有一个如下的 SQL 查询:

SELECT SUM(salary), department 
FROM employees 
GROUP BY department;

  在某些情况下,如果某个 department 列为空且没有数据从表中返回,那么根据 ReturnNullData 的逻辑,它将返回一行默认的空值作为聚合结果

SortAggRunner::BuildNullScanBatch 函数

  BuildNullScanBatch 函数用于生成一个包含一行空值数据的批次,并将所有列的值设置为 NULL。此外,它还会对批次中的聚合列进行初始化,确保聚合函数可以处理没有有效数据的情况。通常用于聚合查询中,当“group by”列为空且左子树没有返回任何数据时,保证查询能够返回一行结果。

/*
 * @Description: 设置一行数据,该行的数据值全为 NULL,同时初始化聚合函数的值。
 * 该函数用于在特定情况下生成一个仅包含空值的批次数据,用于应对没有有效数据的情况。
 * @return - 无返回值,直接操作批次数据。
 */
void SortAggRunner::BuildNullScanBatch()
{
    int i;
    int nrows = m_scanBatch->m_rows;  // 获取当前批次的行数
    int scan_batch_cols = m_cellVarLen + m_aggNum;  // 批次列数,等于聚合列数和非聚合列数之和
    int col_idx = 0;  // 当前列的索引
    ScalarVector* vector = NULL;  // 用于操作批次中的数据列

    // 遍历每一列,设置当前批次数据的值为 NULL
    for (i = 0; i < scan_batch_cols; i++) {
        vector = &m_scanBatch->m_arr[i];  // 获取列数据向量
        SET_NULL(vector->m_flag[nrows]);  // 将该行设置为 NULL 值
        vector->m_rows++;  // 增加列的行数
    }

    // 遍历每一个聚合列,初始化聚合值
    for (i = 0; i < m_aggNum; i++) {
        if (m_aggCount[i]) {  // 如果该聚合函数存在
            vector = &m_scanBatch->m_arr[col_idx + m_cellVarLen];  // 获取当前聚合列的向量
            vector->m_vals[nrows] = 0;  // 初始化聚合值为 0
            SET_NOTNULL(vector->m_flag[nrows]);  // 设置该行的聚合值为非 NULL
        }
        col_idx++;  // 移动到下一个列
    }

    m_scanBatch->m_rows++;  // 增加批次的总行数
}

SortAggRunner::switch_phase 函数

  switch_phase 函数的主要作用是在多阶段聚合操作中进行阶段切换。每个阶段可能涉及不同的分组列排序操作聚合函数计算。这个函数负责结束当前阶段的排序和聚合操作,并为下一阶段的操作做好准备。它确保在不同聚合阶段之间平滑过渡,同时释放不再需要的资源。

/*
 * @Description: 切换到下一个阶段。
 * 这个函数负责在多个聚合阶段之间进行切换,每个阶段对应不同的分组列和聚合操作。
 */
void SortAggRunner::switch_phase()
{
    /* 如果当前有正在进行的批次排序输入,结束当前排序并释放资源 */
    if (m_batchSortIn) {
        batchsort_end(m_batchSortIn);  // 结束当前的批次排序,释放相关资源
        m_batchSortIn = NULL;  // 将指针置为空,表示当前阶段完成
    }

    /* 如果有批次排序输出,启动新的排序阶段 */
    if (m_batchSortOut) {
        m_batchSortIn = m_batchSortOut;  // 将输出赋值给输入,准备下一阶段的排序操作
        batchsort_performsort(m_batchSortIn);  // 执行排序操作
        m_batchSortOut = NULL;  // 将输出置为空,表示当前排序已完成
    }

    m_runtime->current_phase++;  // 增加当前的聚合阶段,切换到下一阶段

    init_phase();  // 初始化新阶段的环境

    /* 如果还有更多阶段,设置下一个阶段的分组列 */
    if (m_runtime->current_phase < m_runtime->numphases) {
        set_key();  // 设置新的分组列,用于下一阶段的聚合计算
    }
}

SortAggRunner::ResetNecessary 函数

  该函数的主要作用是重置 SortAggRunner 中的相关状态和内存,以便准备新的聚合操作。它通过清理先前的排序器和排序组,将状态标记为未完成,并重新初始化运行阶段。在执行聚合操作之前,需要通过此函数重置所有状态和内存,以确保在新一轮聚合操作中数据的一致性和正确性。
  具体功能细节:

  1. 状态重置函数将 m_finish 和其他相关的状态变量重置为初始状态
  2. 内存释放释放并结束批处理排序器的资源,避免内存泄漏。
  3. 重新初始化current_phase(当前运行阶段)设为初始值,并通过 init_phaseset_key 函数重新配置运行环境
  4. 清空排序组通过 memset_s 函数清空排序组的内存重置索引和缓冲区,确保每个聚合阶段开始时有干净的环境。
  5. 返回状态函数最终返回 true,表示重置成功,可以继续进行新的聚合操作。
bool SortAggRunner::ResetNecessary(VecAggState* node)
{
    // 将聚合执行状态标记为未完成
    m_finish = false;
    // 设置准备状态为获取数据源
    m_prepareState = GET_SOURCE;
    // 将运行状态设置为聚合获取数据
    m_runState = AGG_FETCH;
    // 标记为无数据状态
    m_noData = true;
    // 初始化内存释放标志为false
    m_FreeMem = false;

    // 如果输入批处理排序器存在,则释放并结束排序
    if (m_batchSortIn) {
        batchsort_end(m_batchSortIn);
        m_batchSortIn = NULL;
    }

    // 如果输出批处理排序器存在,则释放并结束排序
    if (m_batchSortOut) {
        batchsort_end(m_batchSortOut);
        m_batchSortOut = NULL;
    }

    // 重置当前的运行阶段为初始阶段
    m_runtime->current_phase = 0;
    // 初始化当前阶段
    init_phase();
    
    // 如果聚合函数存在,设置键值用于排序
    if (m_ApFun) {
        set_key();
    }

    // 获取最大聚合集的数量,至少为1
    int numset = Max(m_runtime->maxsets, 1);
    
    // 遍历每个聚合集,重置相关的排序组
    for (int i = 0; i < numset; i++) {
        // 将当前排序组的内存清空为0
        errno_t rc = memset_s(
            m_sortGrps[i].sortGrp, 2 * BatchMaxSize * sizeof(hashCell*), 0, 2 * BatchMaxSize * sizeof(hashCell*));

        // 检查内存操作是否成功
        securec_check(rc, "\0", "\0");

        // 重置排序组索引为0
        m_sortGrps[i].sortGrpIdx = 0;
        // 重置排序组的后备缓冲区
        m_sortGrps[i].sortBckBuf->Reset();
        // 重置扫描批处理
        m_scanBatch->Reset();
    }

    // 返回true,表示重置成功
    return true;
}

SortAggRunner::endSortAgg 函数

  该函数的主要作用是结束排序聚合操作并释放所有占用的资源。它首先结束任何与去重操作相关的批处理排序器,然后释放所有分组集的缓冲区和扫描批处理,以确保内存资源被释放,避免内存泄漏。

void SortAggRunner::endSortAgg()
{
    int setno;
    // 计算分组集的数量,至少为1
    int num_grouping_sets = Max(m_runtime->maxsets, 1);

    // 如果存在用于去重的排序器
    if (m_sortDistinct) {
        // 遍历每个分组集
        for (setno = 0; setno < num_grouping_sets; setno++) {
            // 遍历每个聚合操作
            for (int i = 0; i < m_aggNum; i++) {
                // 如果当前分组集对应的批处理排序器存在
                if (m_sortDistinct[setno].batchsortstate[i]) {
                    // 结束当前的批处理排序操作,释放资源
                    batchsort_end(m_sortDistinct[setno].batchsortstate[i]);
                    // 将当前分组集的排序状态置为空
                    m_sortDistinct[setno].batchsortstate[i] = NULL;
                }
            }
        }
    }

    // 释放缓存内存
    int numset = Max(m_runtime->maxsets, 1);
    // 遍历每个分组集
    for (int i = 0; i < numset; i++) {
        // 释放每个分组集的后备缓冲区
        m_sortGrps[i].sortBckBuf->DeInit();
        // 重置扫描批处理
        m_scanBatch->Reset();
    }
}