pandas多维聚合实战:滚动计算、自定义函数与生产级稳定性
1. 项目概述为什么多维聚合不是“加个groupby”就能搞定的事我在银行数据平台组干了八年从最早用SQL写几十行嵌套子查询做客户分层到后来带团队搭实时风险计算引擎踩过的坑比写的代码还多。今天聊的这个主题——“多维聚合中的数据操作”听起来像教科书里的一个章节标题但实际在生产环境里它直接决定着风控模型能不能按时上线、月度经营分析报告能不能准时发出、甚至监管报送数据有没有逻辑性错误。我见过太多人把df.groupby().agg()当成万能胶水结果在测试环境跑得飞起一上生产就崩内存爆掉、结果错位、时间窗口对不上、多级索引展不开……最后发现问题根本不在数据量而在于没真正吃透pandas聚合背后的执行逻辑和业务语义。核心关键词是多维聚合、滚动计算、自定义聚合函数、unstack重塑结构、生产级稳定性。这几个词串起来就是金融、电商、SaaS类企业每天真实面对的数据分析场景不是算一个平均值而是要同时看“华东区高净值客户在教育品类的30天滚动均值 vs 全量均值”还要叠加“该客户近三个月交易金额标准差是否突破阈值”最后把结果喂进BI看板或风控规则引擎。这种需求靠sum()、mean()两个内置函数打天下根本不可能。它需要你理解pandas如何调度内存、如何处理缺失值边界、如何让自定义函数不拖垮整个pipeline、以及最关键的一点——什么时候该用agg什么时候该切回apply什么时候必须手动循环。这不是炫技是保命。比如我们去年上线的反欺诈实时评分模块就因为没处理好滚动窗口的初始NaN填充策略导致前两天所有新客的评分都是空值差点触发监管问询。所以这篇内容我不讲概念只讲我在真实项目里验证过、压测过、上线后稳定运行超过18个月的实操路径。适合三类人刚转行做数据分析想避开初级坑的新手、正在搭建数仓或报表系统的工程师、还有被老板催着“明天就要看到区域-产品-渠道三维交叉分析”的业务分析师。你不需要是pandas源码贡献者但得知道agg和apply底层调用栈的区别得明白unstack()为什么有时返回DataFrame有时报错得清楚rolling(window7).mean()在非等距时间序列里会出什么鬼。2. 多维聚合的核心设计逻辑为什么不能只靠groupby链式调用2.1 业务问题倒推技术选型从“要什么”到“怎么要”先说个血泪教训。去年Q3风控部提了个需求“请输出过去90天内每个地市分行下信用卡分期业务中‘教育’和‘医疗’两类商户的逾期率逾期笔数/总笔数、平均分期期数、首期还款金额中位数并按逾期率降序排列。”表面看就是个三字段groupby但实际落地时我们卡在三个地方第一逾期率计算必须排除当月未满90天的新开户数据这要求先按开户日期做过滤再聚合第二“首期还款金额中位数”在pandas里没有内置函数且median()对空值敏感而分期数据里大量首期金额为0系统默认值第三最终要导出Excel给监管检查表格必须是“地市分行”为行、“商户类型”为列、“指标”为值的矩阵而不是默认的MultiIndex Series。这三个问题任何一个解决不好交付物就不可用。这就逼着我们必须放弃“先groupby再逐个agg”的线性思维转向问题驱动的分层建模。我把整个流程拆成四层数据准备层不是简单读CSV而是构建带业务语义的过滤器。比如针对开户时间我们封装了一个filter_by_cohort_window(df, cohort_colopen_date, window_days90)函数内部用pd.Timestamp.now() - pd.Timedelta(days90)动态计算截止日避免硬编码日期。聚合逻辑层区分“可向量化操作”和“需逐组迭代操作”。像逾期率这种分子分母都要计数的用agg({overdue_count: sum, total_count: sum})再计算比率比用apply(lambda x: x[overdue_count].sum()/x[total_count].sum())快3倍以上因为前者走的是Cython优化路径。结构重塑层unstack()不是万能的它要求索引层级严格匹配。我们遇到过因groupby([city, merchant_type])后某城市缺失某商户类型导致unstack()报ValueError: Index contains duplicate entries。解决方案是强制补全result.reindex(pd.MultiIndex.from_product([cities, merchant_types], names[city, merchant_type]), fill_value0)。交付适配层BI工具或Excel不认MultiIndex。我们固定用result.reset_index().rename(columns{level_0: city, level_1: merchant_type})再用pd.ExcelWriter指定sheet名和格式。提示永远先问自己——这个结果要喂给谁如果是Python下游模块保留MultiIndex更高效如果是人工看报表reset_index()rename()是刚需。别为了“代码简洁”牺牲交付确定性。2.2 多列不同聚合函数的底层机制为什么输出是Hierarchical Columns看原文第一个例子df.groupby(merchant_category).agg({transaction_amount: [mean,median], processing_fee: [min,max]})输出列名是transaction_amount和processing_fee两层外层下面再挂mean、median等。很多人觉得这是pandas的“特色”其实这是明确的工程设计它强制你意识到——不同列的聚合逻辑是独立的不能混为一谈。比如transaction_amount.mean()和processing_fee.min()的计算上下文完全不同强行压平成amount_mean、fee_min会丢失语义关联。我在生产环境里坚持一个原则Hierarchical Columns是优势不是负担。比如我们做客户价值分群时要同时计算“近30天交易频次均值”、“单笔金额中位数”、“最大单笔金额”、“首笔交易距今天数”这些指标天然属于不同业务维度行为频次、金额分布、生命周期用agg({freq: mean, amount: [median, max], first_txn_days: min})生成的MultiIndex列一眼就能看出哪些是金额类指标amount下挂的哪些是时间类指标first_txn_days。后续做特征工程时用result[amount][median]取值比用result[amount_median]更安全因为后者可能被其他同事误删或重命名。但麻烦也在这里下游系统常不支持MultiIndex。我们的解法是封装一个flatten_columns(df)函数def flatten_columns(df): 将MultiIndex列名展平为col_func格式自动去重 if isinstance(df.columns, pd.MultiIndex): # 避免amount_mean和amount_median冲突用下划线连接 df.columns [_.join(col).strip() for col in df.columns.values] # 去除末尾可能的空格和重复下划线 df.columns [re.sub(r_, _, col).strip(_) for col in df.columns] return df这个函数在ETL流水线里是标配所有agg结果必过一遍。它比df.columns.map(_.join)更鲁棒能处理(amount, mean)和(amount, )这种异常情况。2.3 性能陷阱预警agg字典 vs apply的临界点在哪里原文提到“用字典agg比分开groupby再merge更高效”这没错但有个隐藏前提数据量在内存可承受范围内且聚合函数是向量化的。我做过压测当分组数超过50万且聚合函数含lambda x: x.max()-x.min()这类操作时agg反而比apply慢40%。原因在于agg会对每个分组重复解析lambda表达式而apply是一次编译多次调用。真实案例我们处理某省农信社的POS流水数据日增800万条要做“每台POS机的当日交易金额极差max-min”。最初用df.groupby(pos_id)[amount].agg(lambda x: x.max()-x.min())单日任务耗时22分钟。改成# 预先计算每组的max和min temp df.groupby(pos_id)[amount].agg([max, min]) result temp[max] - temp[min]耗时降到6分钟。因为agg([max, min])走的是pandas高度优化的C路径而lambda触发的是Python解释器。所以我的经验法则分组数 10万优先用agg字典代码清晰分组数 10万~100万拆成多个agg([func1, func2])再组合分组数 100万改用apply 预聚合或直接上Dask涉及复杂条件逻辑如“近7天交易中剔除周末且金额1000的订单”必须用applyagg无法表达这种跨行依赖。注意永远用df.memory_usage(deepTrue).sum()监控内存。我见过有人在agg里传入lambda x: x.to_list()结果把百万级分组的列表全加载进内存直接OOM。3. 自定义聚合函数的实战要点从lambda到可审计的业务逻辑3.1 Lambda的适用边界为什么它只该出现在原型阶段原文用lambda x: x.max() - x.min()演示范围计算这很直观但我在生产代码里严禁出现任何lambda用于聚合。原因有三第一不可调试。当结果异常时你没法在lambda里加print()或断点只能靠猜第二不可复用。同一个“交易极差”逻辑在客户分群、风控评分、运营报表里都要用lambda意味着三处重复代码第三不可审计。合规检查时风控部要确认“极差计算是否剔除了异常值”lambda里藏了x x[x 0]你根本看不到。所以我的标准动作是所有业务逻辑必须封装为具名函数并附docstring说明业务依据。比如上面的极差计算def transaction_range(series, exclude_outliersTrue): 计算交易金额范围最大值-最小值 业务依据根据《XX银行反欺诈操作指引》第3.2条 极差计算需剔除单笔金额5000元的异常交易占总量0.1% 避免大额营销活动干扰正常波动阈值。 Parameters ---------- series : pd.Series 交易金额序列 exclude_outliers : bool, default True 是否剔除5000元的异常值 Returns ------- float 范围值若series为空返回np.nan if len(series) 0: return np.nan if exclude_outliers: series series[series 5000] if len(series) 0: return np.nan return series.max() - series.min()这个函数在git history里有完整修改记录每次调整阈值都有commit message说明原因如“2024-03-15 因新增跨境支付场景将异常值阈值从3000上调至5000”。这才是生产级代码该有的样子。3.2 加权平均的实现细节为什么np.average比手动循环更可靠原文的weighted_average函数用np.linspace(0.5,1.5,len(series))生成权重这在教学场景没问题但实际业务中权重往往来自外部配置。比如我们给客户经理的绩效计算权重由“客户资产等级”决定A类客户权重1.2B类1.0C类0.8。这时就不能在函数里硬编码而要通过**kwargs注入def weighted_avg_by_asset_class(series, asset_weightsNone): 按客户资产等级加权的平均交易额 Parameters ---------- series : pd.Series 交易金额序列索引需与asset_weights对齐 asset_weights : dict, optional 资产等级到权重的映射如{A: 1.2, B: 1.0, C: 0.8} 若为None则使用默认权重{A: 1.2, B: 1.0, C: 0.8} if asset_weights is None: asset_weights {A: 1.2, B: 1.0, C: 0.8} # 确保series索引有asset_class列通常来自groupby的原始df if not hasattr(series, name) or series.name ! amount: raise ValueError(Series must have name amount and index with asset_class) # 从原始df获取asset_class映射这里简化实际从groupby对象传入 # weights series.index.get_level_values(asset_class).map(asset_weights) # 为演示假设weights已存在 weights np.array([asset_weights.get(A, 1.0)] * len(series)) # 简化版 return np.average(series, weightsweights)关键点在于np.average比sum(x_i * w_i)/sum(w_i)更可靠因为它内部做了数值稳定性处理避免大数相乘溢出。我们曾在线上遇到过sum(weights)因浮点误差变成0.999999999导致除零警告而np.average自动处理了归一化。3.3 复杂条件聚合如何用apply实现“分组内条件统计”原文Analysis 7的risk_metrics函数用apply返回pd.Series这是正确做法。但要注意apply在分组聚合中是“最后手段”因为它是Python层循环性能差。我们只在两种情况下用跨行逻辑如“计算该客户近3笔交易中金额平均值的笔数”多指标强耦合如“同时返回高价值交易占比、常规交易均值、最大单笔金额”这些指标计算共享同一遍数据扫描比分开agg三次快得多。实操技巧用apply时务必用result_typeexpand参数否则返回的是object类型Series后续处理麻烦# 正确返回DataFrame列名自动继承 risk_analysis df_transactions.groupby(customer_id)[amount].apply( risk_metrics, result_typeexpand # 关键否则返回Series of Series ) # 错误返回object列需额外处理 # risk_analysis df_transactions.groupby(customer_id)[amount].apply(risk_metrics)另外risk_metrics函数里series[series high_value_threshold].mean()这行有隐患如果所有交易都300series[...]返回空Seriesmean()报RuntimeWarning: Mean of empty slice。生产代码必须兜底regular_part series[series high_value_threshold] regular_avg regular_part.mean() if len(regular_part) 0 else 0.04. 时间窗口计算的深度实践滚动与扩展窗口的业务语义4.1 滚动窗口的三大陷阱时间对齐、缺失值、窗口大小原文用rolling(window3).mean()演示但真实业务中window参数绝不是拍脑袋定的。比如“30天滚动均值”在金融场景有严格定义必须是自然日历的30天不是交易日且包含节假日。我们曾因用window30按行数而非window30D按时间导致春节假期期间滚动均值失真——那7天没交易window30只取前30行全是节前数据而window30D会自动向前追溯到节前30天中间空缺用NaN填充。所以我的黄金法则时间序列聚合永远用字符串窗口如7D、30D不用整数窗口。代码必须显式声明频率# 正确按日历天数滚动 df_ts[rolling_30d_avg] df_ts.groupby(category)[daily_revenue].rolling(30D).mean() # 错误按行数滚动忽略时间间隔 # df_ts[rolling_30d_avg] df_ts.groupby(category)[daily_revenue].rolling(30).mean()第二个陷阱是缺失值处理策略。原文说“前两行NaN是预期行为”但在风控场景NaN意味着“无数据”而“无数据”和“0交易”意义完全不同。我们的标准是对趋势分析如判断是否突破阈值用min_periods1确保首日就有值对绝对值计算如计算日均交易额用min_periods7保证至少7天数据才输出有效值对报警触发用fillna(methodffill)但必须加注释“前向填充仅用于可视化报警逻辑以原始NaN为准”。第三个陷阱是非等距时间序列。POS流水数据常有缺失设备离线rolling(30D)会把缺失日算作“0”扭曲均值。解决方案是先用asfreq(D, fill_value0)补齐再滚动# 补齐每日数据缺失日填0业务允许 df_filled df_ts.asfreq(D, fill_value0) df_filled[rolling_30d_avg] df_filled.groupby(category)[daily_revenue].rolling(30D).mean()4.2 扩展窗口的业务场景为什么cumsum比SQL更稳原文用expanding().sum()做累计和这在财务系统里是刚需。但要注意expanding默认从第一行开始而业务常要求“按时间排序后从首日开始”。我们吃过亏——某次数据导入顺序错乱expanding.sum()按原始行序累加导致YTD报表全错。所以必须显式排序# 关键先按时间排序再expanding df_sorted df_ts.sort_values(date).set_index(date) df_sorted[ytd_revenue] df_sorted.groupby(category)[daily_revenue].expanding().sum()更关键的是expanding支持任意聚合函数不只是sum()。比如质量管理部门要“累计标准差”用expanding().std()比SQL里写递归CTE简单十倍# 计算累计标准差样本标准差 df_sorted[cum_std] df_sorted.groupby(category)[daily_revenue].expanding().std(ddof1)ddof1是关键它表示“样本标准差”符合统计学惯例。如果漏掉std()默认ddof0总体标准差结果偏差可达15%。4.3 混合窗口策略滚动扩展的组合拳最复杂的场景是“滚动窗口内的扩展统计”。比如风控部要求“对每个客户计算其近90天内每笔交易相对于该客户历史均值的偏离度Z-score”。这需要两层嵌套外层按客户分组内层对每个客户的交易序列用rolling(90D)计算滚动均值和标准差再算Z-score。代码实现def calculate_rolling_zscore(group): 计算分组内滚动Z-score # 按日期排序确保时间连续 group group.sort_values(date).set_index(date) # 计算90天滚动均值和标准差 rolling_mean group[amount].rolling(90D, min_periods5).mean() rolling_std group[amount].rolling(90D, min_periods5).std(ddof1) # Z-score (x - mean) / std避免除零 zscore (group[amount] - rolling_mean) / rolling_std.replace(0, np.nan) return zscore.reset_index(dropTrue) # 应用 df_transactions[zscore_90d] df_transactions.groupby(customer_id).apply(calculate_rolling_zscore).values这里min_periods5是底线确保至少5天数据才计算避免初期波动过大。replace(0, np.nan)防止标准差为0时除零。5. 多级分组与unstack的工程化落地从MultiIndex到业务看板5.1 unstack的失败场景与修复方案原文df_sales.groupby([region,product])[revenue].mean().unstack()输出完美矩阵但现实远比这残酷。常见失败有三类场景一缺失组合导致unstack报错如某地区无“Gadget”销售groupby结果里就没有(North, Gadget)这一行unstack()时因索引不全报错。修复方案是强制补全# 获取所有可能的组合 all_regions df_sales[region].unique() all_products df_sales[product].unique() full_index pd.MultiIndex.from_product([all_regions, all_products], names[region, product]) # reindex补全缺失值填0 result df_sales.groupby([region,product])[revenue].mean().reindex(full_index, fill_value0).unstack()场景二列名冲突当unstack()后列名与其他列重名如已有Gadget列pandas会自动加后缀_0导致后续代码失效。解决方案是预处理列名result result.unstack().rename(columnslambda x: frev_{x} if isinstance(x, str) else x)场景三多值unstack原文只agg一个revenue但实际要同时看revenue和profit_marginunstack()会返回三层列。此时必须用unstack(level1)指定展开哪一层# groupby两级agg两个指标 result df_sales.groupby([region,product]).agg({revenue: sum, profit_margin: mean}) # 展开product层region保持为行索引 result_unstacked result.unstack(levelproduct) # 或 result.unstack(level1)5.2 从unstack到BI看板自动化列名映射表业务方要的从来不是Gadget而是“小工具销售额”。我们维护一个column_mapping.yaml文件Gadget: 小工具 Widget: 标准件 North: 华北区 South: 华南区 revenue_sum: 销售额 profit_margin_mean: 毛利率然后在ETL脚本里自动应用import yaml def apply_column_mapping(df, mapping_filecolumn_mapping.yaml): with open(mapping_file) as f: mapping yaml.safe_load(f) # 列名映射支持MultiIndex if isinstance(df.columns, pd.MultiIndex): new_columns [] for col in df.columns: if isinstance(col, tuple): # 对元组中每个元素映射 mapped tuple(mapping.get(str(c), c) for c in col) new_columns.append(mapped) else: new_columns.append(mapping.get(str(col), col)) df.columns pd.MultiIndex.from_tuples(new_columns, namesdf.columns.names) else: df.columns [mapping.get(str(col), col) for col in df.columns] return df # 使用 result_cn apply_column_mapping(result_unstacked)这样开发写Gadget业务看小工具双方零沟通成本。5.3 性能优化unstack前的必要瘦身unstack()是内存大户。当分组数达百万级unstack()可能吃光16G内存。我们的优化三步法提前过滤unstack()前用query()或loc筛掉低频组合。如“只看TOP 100地区和TOP 20产品”降精度对金额类字段astype(float32)比float64省内存50%分块unstack对超大结果按主索引分块处理def chunked_unstack(series, chunk_size10000): 分块unstack避免内存爆炸 index_chunks [series.index[i:ichunk_size] for i in range(0, len(series), chunk_size)] chunks [] for idx_chunk in index_chunks: chunk_series series.loc[idx_chunk] chunk_unstacked chunk_series.unstack(fill_value0) chunks.append(chunk_unstacked) return pd.concat(chunks, axis0) # 使用 result_chunked chunked_unstack(result_series)6. 端到端实战零售银行信用卡分析流水线详解6.1 数据生成的真实性校验原文用np.random.uniform(20,500,60)生成交易额这太理想化。真实信用卡数据有强分布特征金额服从对数正态分布小额高频大额低频时间有明显周期性周五、月末交易高峰客户间差异巨大长尾分布20%客户贡献80%交易额。我们用scipy.stats.lognorm模拟from scipy.stats import lognorm # 生成符合真实分布的交易额 # s1.2是形状参数scale100是尺度参数对应均值约200 amounts lognorm.rvs(s1.2, scale100, size60).round(2) # 强制加入极端值0.5%交易额2000大额消费 extreme_mask np.random.random(60) 0.005 amounts[extreme_mask] np.random.uniform(2000, 5000, extreme_mask.sum()).round(2)这样生成的数据describe()出来的分位数才接近生产环境。6.2 七层分析的执行顺序与依赖关系原文Analysis 1到7是并列展示但真实流水线是强依赖的。比如Analysis 3滚动均值必须在Analysis 1基础分组之后因为滚动计算需要先按客户和日期排序。我们的标准流水线顺序是分析编号依赖项业务目的输出形态Analysis 1无基础统计验证数据质量MultiIndex DataFrameAnalysis 2Analysis 1识别高波动品类设风控阈值Flat DataFrameAnalysis 3Analysis 1 时间排序发现消费趋势变化Time-indexed SeriesAnalysis 4Analysis 1计算客户生命周期价值Customer-indexed SeriesAnalysis 5Analysis 1生成交叉销售矩阵Unstacked DataFrameAnalysis 6Analysis 1,4经营摘要供管理层决策Flat DataFrame with metricsAnalysis 7Analysis 1,2高价值客户识别精准营销Customer-indexed DataFrame关键点Analysis 6的summary必须基于Analysis 4的cumulative_spend因为“总消费额”在Analysis 1里是静态聚合而Analysis 4是动态累计后者才能反映客户成长轨迹。6.3 生产环境的健壮性加固所有分析代码上线前必须加三层防护第一层输入校验def validate_transaction_data(df): 交易数据基础校验 assert date in df.columns, 缺少date列 assert customer_id in df.columns, 缺少customer_id列 assert amount in df.columns, 缺少amount列 assert df[amount].min() 0, 存在负交易额 assert df[date].dtype datetime64[ns], date列非时间类型 return True第二层空值熔断# 在每个agg前检查 if result.isnull().values.any(): logger.warning(fAnalysis {i} 结果含空值数量: {result.isnull().sum().sum()}) # 根据业务决定填充、丢弃、或告警 result result.fillna(0) # 金额类填0第三层结果一致性校验# 比如Analysis 6的total_spend应等于Analysis 4的最终累计值 assert np.allclose( summary[total_spend], result_cumulative.groupby(customer_id)[cumulative_spend].last(), atol0.01 ), 总消费额校验失败6.4 监控与告警让聚合流水线自己说话我们给每个分析步骤加监控埋点import time from datetime import datetime def monitored_agg(func, *args, **kwargs): start_time time.time() try: result func(*args, **kwargs) duration time.time() - start_time # 上报到监控系统 report_metric( metric_namefagg.{func.__name__}.duration, valueduration, tags{env: prod} ) return result except Exception as e: report_alert(fagg.{func.__name__} failed: {str(e)}) raise # 使用 multi_agg monitored_agg( df_transactions.groupby([customer_id,category]).agg, {amount: [mean,median,count], fee: [min,max]} )这样当某个agg耗时突增300%监控系统自动告警我们立刻知道是数据分布变了如某客户突然刷了10万笔而不是代码bug。7. 常见问题与排查手册我在生产环境踩过的27个坑7.1 agg字典键名错误KeyError还是Silent Fail现象df.groupby(col).agg({non_exist_col: sum})不报错返回空DataFrame。原因pandas默认忽略不存在的列不抛异常。解决方案启用严格模式在agg前加# 检查列是否存在 missing_cols set([non_exist_col]) - set(df.columns) if missing_cols: raise KeyError(f列不存在: {missing_cols})7.2 unstack后列名丢失为什么columns变成RangeIndex现象unstack()后result.columns是RangeIndex(start0, stopN, step1)不是预期的[Gadget,Widget]。原因unstack()时原Series的name为空pandas无法推断列名。修复设置Series nameresult df_sales.groupby([region,product])[revenue].mean() result.name revenue # 关键 result_unstacked result.unstack() # 此时columns才是[Gadget,Widget]7.3 rolling计算结果错位时间索引没对齐现象rolling(7D)结果中某日的值对应的是未来日期。原因set_index(date)后未排序时间索引乱序。修复set_index后立即sort_index()df_ts df_ts.set_index(date).sort_index() df_ts[rolling_7d] df_ts.groupby(category)[daily_revenue].rolling(7D).mean()7.4 apply返回None函数没return现象groupby().apply(func)返回全NaN。原因函数末尾没写return或条件分支中某些路径没return。排查在函数开头加print(fProcessing group with {len(series)} rows)确认是否执行。7.5 内存爆炸agg时触发隐式copy现象df.groupby(id).agg({col: mean})内存暴涨。原因pandas 1.3版本中agg对大型DataFrame会触发隐式copy。解决方案升级到pandas 2.0或改用df.groupby(id)[col].mean()更省内存。7.6 时间窗口不生效freq参数被忽略现象rolling(30D)结果和rolling(30)一样。原因索引不是DatetimeIndex或freq未设置。修复df_ts df_ts.set_index(date).asfreq(D)确保索引有频率。7.7 多级索引重置失败reset_index()后列名混乱现象result.reset_index()后原索引列名变成level_0、level_1。原因未指定names参数。修复result.reset_index(names[region, product])。7.8 自定义函数性能差用apply代替agg现象groupby().apply(custom_func)比groupby().agg()慢10倍。原因apply是Python循环agg是C优化。优化将custom_func拆成多个向量化agg如agg({col1: max, col2: min})。7.9 NaN传播失控agg中NaN未处理现象agg