Python Pandas实现GROUP BY WITH CUBE和WITH ROLLUP的分类汇总功能

发布于:2025-02-22 ⋅ 阅读:(37) ⋅ 点赞:(0)
import pandas as pd
from itertools import combinations, chain

def groupby_rollup(df, groups, agg_dict):
    """
    实现类似SQL GROUP BY ROLLUP操作,支持异常处理的高性能函数
    
    参数:
    df: pd.DataFrame - 输入数据框
    groups: list - 分组列名列表(按层次结构)
    agg_dict: dict - 聚合配置字典,格式为 {聚合列: 聚合函数}
    
    返回:
    pd.DataFrame - 包含所有层级汇总结果的数据框
    """
    try:
        # 参数校验
        if not isinstance(groups, list) or len(groups) == 0:
            raise ValueError("groups必须为非空列表")
        if missing := [col for col in groups if col not in df.columns]:
            raise KeyError(f"列不存在: {missing}")
        
        # 生成所有ROLLUP层级
        subsets = []
        current_groups = groups.copy()
        while current_groups:
            subsets.append(current_groups.copy())
            current_groups.pop()
        subsets.append([])  # 添加总计层级
        
        results = []
        for subset in subsets:
            if subset:
                grouped = df.groupby(subset).agg(agg_dict).reset_index()
            else:
                grouped = df.agg(agg_dict).to_frame().T
                grouped[groups] = 'all'
            
            # 填充未被分组的列为all
            for col in groups:
                if col not in subset:
                    grouped[col] = 'all'
            
            results.append(grouped)
        
        return pd.concat(results, ignore_index=True).drop_duplicates()
    
    except Exception as e:
        print(f"错误发生: {str(e)}")
        return pd.DataFrame()

def groupby_cube(df, groups, agg_dict):
    """
    实现类似SQL GROUP BY CUBE操作,支持异常处理的高性能函数
    
    参数:
    df: pd.DataFrame - 输入数据框
    groups: list - 分组列名列表
    agg_dict: dict - 聚合配置字典,格式为 {聚合列: 聚合函数}
    
    返回:
    pd.DataFrame - 包含所有组合汇总结果的数据框
    """
    try:
        # 参数校验
        if not isinstance(groups, list) or len(groups) == 0:
            raise ValueError("groups必须为非空列表")
        if missing := [col for col in groups if col not in df.columns]:
            raise KeyError(f"列不存在: {missing}")
        
        # 生成所有非空子集组合
        subsets = []
        for i in range(1, len(groups)+1):
            for combo in combinations(groups, i):
                subsets.append(list(combo))
        subsets.append([])  # 添加总计层级
        
        results = []
        for subset in subsets:
            if subset:
                grouped = df.groupby(subset).agg(agg_dict).reset_index()
            else:
                grouped = df.agg(agg_dict).to_frame().T
                grouped[groups] = 'all'
            
            # 填充未被分组的列为all
            for col in groups:
                if col not in subset:
                    grouped[col] = 'all'
            
            results.append(grouped)
        
        return pd.concat(results, ignore_index=True).drop_duplicates()
    
    except Exception as e:
        print(f"错误发生: {str(e)}")
        return pd.DataFrame()

功能说明

  1. 分组机制

    • ROLLUP按层次结构生成汇总(如GROUP BY a,b,ca,ba→总计)
    • CUBE生成所有可能的组合(包括单列、多列组合和总计)
  2. 性能优化

    • 使用列表存储中间结果,最后一次性合并
    • 避免循环内重复操作,统一处理列填充逻辑
    • 使用原生Pandas聚合方法保持高性能
  3. 异常处理

    • 检查分组列是否存在
    • 处理空分组列表等非法输入
    • 捕获所有异常并返回空DataFrame

使用示例

# 示例数据
data = {
    'A': ['a1', 'a1', 'a2', 'a2'],
    'B': ['b1', 'b2', 'b1', 'b2'],
    'C': ['c1', 'c2', 'c3', 'c4'],
    'value': [10, 20, 30, 40]
}
df = pd.DataFrame(data)

# ROLLUP操作
print(groupby_rollup(df, ['A', 'B'], {'value': 'sum'}))

# CUBE操作
print(groupby_cube(df, ['A', 'B'], {'value': 'sum'}))

输出说明

结果数据框包含:

  • 原始分组结果
  • 所有中间层级的汇总(all标记被汇总的列)
  • 总计行(所有分组列标记为all

每个分组层级自动填充未被分组的列为all,实现与SQL一致的汇总展示逻辑。