《【Python实战】AI批量处理Excel:从入门到精通》
基础篇Excel读写代码1读取Excel文件import pandas as pd class ExcelReader: Excel读取器 def __init__(self, file_path): self.file_path file_path self.df None def read(self, sheet_name0, header0): 读取Excel try: self.df pd.read_excel( self.file_path, sheet_namesheet_name, headerheader ) print(f✅ 成功读取{self.file_path}) print(f 行数{len(self.df)}列数{len(self.df.columns)}) return self.df except Exception as e: print(f❌ 读取失败{e}) return None def preview(self, n5): 预览数据 if self.df is not None: return self.df.head(n) return None def info(self): 数据信息 if self.df is not None: print(\n 数据信息) print(f形状{self.df.shape}) print(f\n列名{list(self.df.columns)}) print(f\n数据类型) print(self.df.dtypes) print(f\n缺失值) print(self.df.isnull().sum()) # 使用示例 if __name__ __main__: reader ExcelReader(data.xlsx) df reader.read() reader.info() print(\n预览) print(reader.preview())代码2写入Excel文件import pandas as pd from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment class ExcelWriter: Excel写入器 def __init__(self, output_path): self.output_path output_path def write_simple(self, df, sheet_nameSheet1): 简单写入 df.to_excel(self.output_path, sheet_namesheet_name, indexFalse) print(f✅ 已保存{self.output_path}) def write_formatted(self, df, sheet_name数据): 带格式的写入 with pd.ExcelWriter(self.output_path, engineopenpyxl) as writer: df.to_excel(writer, sheet_namesheet_name, indexFalse) # 获取工作表 worksheet writer.sheets[sheet_name] # 设置表头样式 header_fill PatternFill(start_color4472C4, end_color4472C4, fill_typesolid) header_font Font(boldTrue, colorFFFFFF, size11) for cell in worksheet[1]: cell.fill header_fill cell.font header_font cell.alignment Alignment(horizontalcenter, verticalcenter) # 自动调整列宽 for column in worksheet.columns: max_length 0 column_letter column[0].column_letter for cell in column: try: if len(str(cell.value)) max_length: max_length len(str(cell.value)) except: pass adjusted_width min(max_length 2, 50) worksheet.column_dimensions[column_letter].width adjusted_width print(f✅ 已保存带格式{self.output_path}) # 使用示例 if __name__ __main__: # 创建示例数据 data { 姓名: [张三, 李四, 王五], 销售额: [15000, 22000, 18000], 月份: [1月, 1月, 1月] } df pd.DataFrame(data) # 写入 writer ExcelWriter(output.xlsx) writer.write_formatted(df)进阶篇数据清洗代码3数据清洗工具类import pandas as pd import numpy as np class DataCleaner: 数据清洗工具 def __init__(self, df): self.df df.copy() self.clean_log [] def remove_duplicates(self, subsetNone): 删除重复行 before len(self.df) self.df self.df.drop_duplicates(subsetsubset) after len(self.df) removed before - after self.clean_log.append(f删除重复行{removed} 行) print(f✅ 删除重复行{removed} 行) return self def fill_missing(self, strategymean, columnsNone): 填充缺失值 if columns is None: columns self.df.columns for col in columns: if self.df[col].isnull().sum() 0: missing_count self.df[col].isnull().sum() if strategy mean and self.df[col].dtype in [int64, float64]: self.df[col].fillna(self.df[col].mean(), inplaceTrue) elif strategy median and self.df[col].dtype in [int64, float64]: self.df[col].fillna(self.df[col].median(), inplaceTrue) elif strategy mode: self.df[col].fillna(self.df[col].mode()[0], inplaceTrue) elif strategy constant: self.df[col].fillna(未知, inplaceTrue) self.clean_log.append(f填充 {col}{missing_count} 个缺失值) print(f✅ 填充 {col}{missing_count} 个缺失值) return self def remove_outliers(self, column, methodiqr): 删除异常值 if method iqr: Q1 self.df[column].quantile(0.25) Q3 self.df[column].quantile(0.75) IQR Q3 - Q1 lower Q1 - 1.5 * IQR upper Q3 1.5 * IQR before len(self.df) self.df self.df[(self.df[column] lower) (self.df[column] upper)] after len(self.df) removed before - after self.clean_log.append(f删除 {column} 异常值{removed} 行) print(f✅ 删除 {column} 异常值{removed} 行) return self def standardize_columns(self, column, mapping): 标准化列值 self.df[column] self.df[column].map(mapping) print(f✅ 标准化 {column}) return self def get_clean_data(self): 获取清洗后的数据 return self.df def get_report(self): 获取清洗报告 print(\n 数据清洗报告) print( * 40) for log in self.clean_log: print(f {log}) print(f\n最终数据{len(self.df)} 行 × {len(self.df.columns)} 列) print( * 40) # 使用示例 if __name__ __main__: # 创建脏数据 data { 姓名: [张三, 李四, 张三, 王五, None], 年龄: [25, 30, 25, 100, 28], 城市: [北京, 上海, 北京, 北京, 广州] } df pd.DataFrame(data) print(原始数据) print(df) # 清洗 cleaner DataCleaner(df) cleaner.remove_duplicates() \ .fill_missing(strategyconstant) \ .remove_outliers(年龄) \ .get_report() print(\n清洗后数据) print(cleaner.get_clean_data())高级篇批量处理代码4批量处理多个文件import os import pandas as pd from glob import glob class BatchProcessor: 批量处理器 def __init__(self, input_dir, output_dir): self.input_dir input_dir self.output_dir output_dir os.makedirs(output_dir, exist_okTrue) self.results [] def get_files(self, pattern*.xlsx): 获取文件列表 files glob(os.path.join(self.input_dir, pattern)) print(f 找到 {len(files)} 个文件) return files def process_file(self, file_path, process_func): 处理单个文件 try: df pd.read_excel(file_path) processed_df process_func(df) # 保存结果 filename os.path.basename(file_path) output_path os.path.join(self.output_dir, fprocessed_{filename}) processed_df.to_excel(output_path, indexFalse) self.results.append({ file: filename, status: success, input_rows: len(df), output_rows: len(processed_df) }) print(f✅ 处理完成{filename}) return True except Exception as e: self.results.append({ file: os.path.basename(file_path), status: failed, error: str(e) }) print(f❌ 处理失败{file_path} - {e}) return False def process_all(self, process_func, pattern*.xlsx): 批量处理所有文件 files self.get_files(pattern) success_count 0 for file_path in files: if self.process_file(file_path, process_func): success_count 1 print(f\n 批量处理完成{success_count}/{len(files)} 成功) return self.results def generate_report(self): 生成处理报告 df pd.DataFrame(self.results) print(\n * 50) print( 批量处理报告) print( * 50) success len(df[df[status] success]) failed len(df[df[status] failed]) print(f成功{success} 个) print(f失败{failed} 个) if success 0: total_input df[df[status] success][input_rows].sum() total_output df[df[status] success][output_rows].sum() print(f总输入行数{total_input}) print(f总输出行数{total_output}) if failed 0: print(\n❌ 失败文件) for _, row in df[df[status] failed].iterrows(): print(f - {row[file]}: {row.get(error, 未知错误)}) print( * 50) # 使用示例 if __name__ __main__: # 定义处理函数 def process_sales_data(df): 处理销售数据 # 删除空行 df df.dropna(howall) # 添加计算列 if 数量 in df.columns and 单价 in df.columns: df[金额] df[数量] * df[单价] # 按日期排序 if 日期 in df.columns: df[日期] pd.to_datetime(df[日期]) df df.sort_values(日期) return df # 批量处理 processor BatchProcessor(./input, ./output) processor.process_all(process_sales_data, pattern*.xlsx) processor.generate_report()实战篇自动化报表代码5自动生成日报/月报import pandas as pd from datetime import datetime, timedelta class ReportGenerator: 报表生成器 def __init__(self, data_dir): self.data_dir data_dir self.summary_data [] def generate_daily_report(self, dateNone): 生成日报 if date is None: date datetime.now() - timedelta(days1) date_str date.strftime(%Y-%m-%d) # 读取当日数据 file_path f{self.data_dir}/sales_{date_str}.xlsx try: df pd.read_excel(file_path) # 计算指标 report { 日期: date_str, 订单数: len(df), 销售额: df[金额].sum() if 金额 in df.columns else 0, 平均客单价: df[金额].mean() if 金额 in df.columns else 0, 最大订单: df[金额].max() if 金额 in df.columns else 0, 最小订单: df[金额].min() if 金额 in df.columns else 0 } # 按类别汇总 if 类别 in df.columns and 金额 in df.columns: category_summary df.groupby(类别)[金额].sum().to_dict() report[分类销售] category_summary self.summary_data.append(report) print(f✅ 日报生成完成{date_str}) return report except Exception as e: print(f❌ 日报生成失败{e}) return None def generate_monthly_report(self, year, month): 生成月报 # 获取当月所有日报 monthly_data [d for d in self.su ...(truncated)...