[菜鸟教程] 机器学习教程第四课-机器学习项目生命周期
机器学习流程的六个核心阶段问题定义明确要解决什么问题数据收集获取相关数据数据准备清洗和预处理数据模型训练选择算法并训练模型模型评估评估模型性能模型部署将模型投入使用第一阶段问题定义明确业务问题问题定义是机器学习项目最重要的起点就像导航前需要明确目的地一样。关键问题我们要解决什么问题分类问题判断邮件是否为垃圾邮件回归问题预测房价聚类问题客户分群异常检测发现信用卡欺诈为什么这个问题重要业务价值提高效率、降低成本、增加收入用户价值改善体验、提供个性化服务成功的标准是什么量化指标准确率达到 90% 以上业务指标转化率提升 20# 数据准备示例 import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.model_selection import train_test_split class DataPreparer: def __init__(self, data): self.data data.copy() self.processed_data None def clean_data(self): 数据清洗 print(开始数据清洗...) # 1. 处理缺失值 print(f处理前缺失值数量{self.data.isnull().sum().sum()}) # 数值列用均值填充 numeric_columns self.data.select_dtypes(include[np.number]).columns for col in numeric_columns: if self.data[col].isnull().sum() 0: self.data[col].fillna(self.data[col].mean(), inplaceTrue) # 类别列用众数填充 categorical_columns self.data.select_dtypes(include[object]).columns for col in categorical_columns: if self.data[col].isnull().sum() 0: mode_val self.data[col].mode()[0] self.data[col].fillna(mode_val, inplaceTrue) print(f处理后缺失值数量{self.data.isnull().sum().sum()}) # 2. 处理重复值 duplicates_before self.data.duplicated().sum() self.data.drop_duplicates(inplaceTrue) duplicates_after self.data.duplicated().sum() print(f删除重复值{duplicates_before - duplicates_after} 条) # 3. 处理异常值简单方法使用 IQR for col in numeric_columns: Q1 self.data[col].quantile(0.25) Q3 self.data[col].quantile(0.75) IQR Q3 - Q1 lower_bound Q1 - 1.5 * IQR upper_bound Q3 1.5 * IQR outliers ((self.data[col] lower_bound) | (self.data[col] upper_bound)).sum() if outliers 0: # 用边界值替换异常值 self.data[col] self.data[col].clip(lower_bound, upper_bound) print(f处理 {col} 列的 {outliers} 个异常值) return self.data def feature_engineering(self): 特征工程 print(\n开始特征工程...) # 1. 创建新特征示例 if price in self.data.columns and rating in self.data.columns: # 创建性价比特征 self.data[price_per_rating] self.data[price] / self.data[rating] print(创建新特征price_per_rating) # 2. 特征选择简单示例移除低方差特征 numeric_columns self.data.select_dtypes(include[np.number]).columns low_variance_features [] for col in numeric_columns: if self.data[col].var() 0.01: # 方差阈值 low_variance_features.append(col) if low_variance_features: self.data.drop(columnslow_variance_features, inplaceTrue) print(f移除低方差特征{low_variance_features}) return self.data def transform_data(self): 数据转换 print(\n开始数据转换...) # 1. 编码类别变量 categorical_columns self.data.select_dtypes(include[object]).columns label_encoders {} for col in categorical_columns: le LabelEncoder() self.data[col] le.fit_transform(self.data[col]) label_encoders[col] le print(f编码类别变量{col}) # 2. 标准化数值变量 numeric_columns self.data.select_dtypes(include[np.number]).columns scaler StandardScaler() if len(numeric_columns) 0: self.data[numeric_columns] scaler.fit_transform(self.data[numeric_columns]) print(f标准化数值变量{list(numeric_columns)}) return self.data, label_encoders, scaler def split_data(self, target_column, test_size0.2, val_size0.2): 数据划分 print(f\n开始数据划分测试集比例{test_size}验证集比例{val_size}...) X self.data.drop(columns[target_column]) y self.data[target_column] # 首先分离出测试集 X_temp, X_test, y_temp, y_test train_test_split( X, y, test_sizetest_size, random_state42 ) # 再从剩余数据中分离出验证集 val_size_adjusted val_size / (1 - test_size) X_train, X_val, y_train, y_val train_test_split( X_temp, y_temp, test_sizeval_size_adjusted, random_state42 ) print(f训练集大小{X_train.shape[0]}) print(f验证集大小{X_val.shape[0]}) print(f测试集大小{X_test.shape[0]}) return { X_train: X_train, y_train: y_train, X_val: X_val, y_val: y_val, X_test: X_test, y_test: y_test } def prepare_pipeline(self, target_column): 完整的数据准备流水线 print( * 50) print(数据准备流水线) print( * 50) # 1. 数据清洗 self.clean_data() # 2. 特征工程 self.feature_engineering() # 3. 数据转换 processed_data, encoders, scaler self.transform_data() # 4. 数据划分 splits self.split_data(target_column) self.processed_data processed_data return splits, encoders, scaler # 创建示例数据并演示数据准备 np.random.seed(42) sample_data pd.DataFrame({ age: np.random.randint(18, 65, 1000), income: np.random.normal(50000, 15000, 1000), gender: np.random.choice([男, 女], 1000), city: np.random.choice([北京, 上海, 广州], 1000), target: np.random.choice([0, 1], 1000) }) # 添加一些缺失值和异常值 sample_data.loc[np.random.choice(1000, 50), income] np.nan sample_data.loc[np.random.choice(1000, 20), age] np.random.randint(100, 150) preparer DataPreparer(sample_data) splits, encoders, scaler preparer.prepare_pipeline(target)第二阶段数据收集数据来源数据是机器学习的燃料没有合适的数据再好的算法也无法发挥作用。常见数据来源内部数据公司业务数据、用户行为数据外部数据公开数据集、第三方数据服务网络爬虫网页数据、社交媒体数据传感器数据IoT 设备、监控系统数据收集示例# 数据收集示例模拟多种数据源 import pandas as pd import numpy as np from datetime import datetime, timedelta class DataCollector: def __init__(self): self.collected_data {} def collect_user_data(self, n_users1000): 收集用户数据 np.random.seed(42) user_data { user_id: range(1, n_users 1), age: np.random.randint(18, 65, n_users), gender: np.random.choice([男, 女], n_users), city: np.random.choice([北京, 上海, 广州, 深圳], n_users), registration_date: [ datetime.now() - timedelta(daysnp.random.randint(1, 365)) for _ in range(n_users) ] } self.collected_data[users] pd.DataFrame(user_data) print(f收集了 {len(user_data[user_id])} 条用户数据) return self.collected_data[users] def collect_behavior_data(self, n_behaviors5000): 收集用户行为数据 np.random.seed(42) user_ids np.random.choice(range(1, 1001), n_behaviors) product_ids np.random.choice(range(1, 501), n_behaviors) behavior_data { behavior_id: range(1, n_behaviors 1), user_id: user_ids, product_id: product_ids, behavior_type: np.random.choice( [浏览, 点击, 加购物车, 购买], n_behaviors, p[0.4, 0.3, 0.2, 0.1] ), timestamp: [ datetime.now() - timedelta(minutesnp.random.randint(1, 10080)) for _ in range(n_behaviors) ], duration: np.random.exponential(30, n_behaviors) # 停留时间秒 } self.collected_data[behaviors] pd.DataFrame(behavior_data) print(f收集了 {len(behavior_data[behavior_id])} 条行为数据) return self.collected_data[behaviors] def collect_product_data(self, n_products500): 收集商品数据 np.random.seed(42) categories [电子产品, 服装, 食品, 家居, 图书] product_data { product_id: range(1, n_products 1), category: np.random.choice(categories, n_products), price: np.random.uniform(10, 1000, n_products), rating: np.random.uniform(3.0, 5.0, n_products), stock: np.random.randint(0, 1000, n_products) } self.collected_data[products] pd.DataFrame(product_data) print(f收集了 {len(product_data[product_id])} 条商品数据) return self.collected_data[products] def get_data_summary(self): 获取数据摘要 print(\n数据收集摘要) for name, df in self.collected_data.items(): print(f\n{name} 数据集) print(f 形状{df.shape}) print(f 列名{list(df.columns)}) print(f 缺失值{df.isnull().sum().sum()}) print(f 示例数据) print(df.head(2)) # 使用示例 collector DataCollector() collector.collect_user_data() collector.collect_behavior_data() collector.collect_product_data() collector.get_data_summary()第三阶段数据准备数据准备的重要性数据准备占机器学习项目 60-80% 的时间就像做菜前的准备工作一样重要。数据准备的主要任务数据清洗处理缺失值、异常值、重复值特征工程创建新特征、选择重要特征数据转换标准化、归一化、编码数据划分训练集、验证集、测试集数据准备示例# 数据准备示例 import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.model_selection import train_test_split class DataPreparer: def __init__(self, data): self.data data.copy() self.processed_data None def clean_data(self): 数据清洗 print(开始数据清洗...) # 1. 处理缺失值 print(f处理前缺失值数量{self.data.isnull().sum().sum()}) # 数值列用均值填充 numeric_columns self.data.select_dtypes(include[np.number]).columns for col in numeric_columns: if self.data[col].isnull().sum() 0: self.data[col].fillna(self.data[col].mean(), inplaceTrue) # 类别列用众数填充 categorical_columns self.data.select_dtypes(include[object]).columns for col in categorical_columns: if self.data[col].isnull().sum() 0: mode_val self.data[col].mode()[0] self.data[col].fillna(mode_val, inplaceTrue) print(f处理后缺失值数量{self.data.isnull().sum().sum()}) # 2. 处理重复值 duplicates_before self.data.duplicated().sum() self.data.drop_duplicates(inplaceTrue) duplicates_after self.data.duplicated().sum() print(f删除重复值{duplicates_before - duplicates_after} 条) # 3. 处理异常值简单方法使用 IQR for col in numeric_columns: Q1 self.data[col].quantile(0.25) Q3 self.data[col].quantile(0.75) IQR Q3 - Q1 lower_bound Q1 - 1.5 * IQR upper_bound Q3 1.5 * IQR outliers ((self.data[col] lower_bound) | (self.data[col] upper_bound)).sum() if outliers 0: # 用边界值替换异常值 self.data[col] self.data[col].clip(lower_bound, upper_bound) print(f处理 {col} 列的 {outliers} 个异常值) return self.data def feature_engineering(self): 特征工程 print(\n开始特征工程...) # 1. 创建新特征示例 if price in self.data.columns and rating in self.data.columns: # 创建性价比特征 self.data[price_per_rating] self.data[price] / self.data[rating] print(创建新特征price_per_rating) # 2. 特征选择简单示例移除低方差特征 numeric_columns self.data.select_dtypes(include[np.number]).columns low_variance_features [] for col in numeric_columns: if self.data[col].var() 0.01: # 方差阈值 low_variance_features.append(col) if low_variance_features: self.data.drop(columnslow_variance_features, inplaceTrue) print(f移除低方差特征{low_variance_features}) return self.data def transform_data(self): 数据转换 print(\n开始数据转换...) # 1. 编码类别变量 categorical_columns self.data.select_dtypes(include[object]).columns label_encoders {} for col in categorical_columns: le LabelEncoder() self.data[col] le.fit_transform(self.data[col]) label_encoders[col] le print(f编码类别变量{col}) # 2. 标准化数值变量 numeric_columns self.data.select_dtypes(include[np.number]).columns scaler StandardScaler() if len(numeric_columns) 0: self.data[numeric_columns] scaler.fit_transform(self.data[numeric_columns]) print(f标准化数值变量{list(numeric_columns)}) return self.data, label_encoders, scaler def split_data(self, target_column, test_size0.2, val_size0.2): 数据划分 print(f\n开始数据划分测试集比例{test_size}验证集比例{val_size}...) X self.data.drop(columns[target_column]) y self.data[target_column] # 首先分离出测试集 X_temp, X_test, y_temp, y_test train_test_split( X, y, test_sizetest_size, random_state42 ) # 再从剩余数据中分离出验证集 val_size_adjusted val_size / (1 - test_size) X_train, X_val, y_train, y_val train_test_split( X_temp, y_temp, test_sizeval_size_adjusted, random_state42 ) print(f训练集大小{X_train.shape[0]}) print(f验证集大小{X_val.shape[0]}) print(f测试集大小{X_test.shape[0]}) return { X_train: X_train, y_train: y_train, X_val: X_val, y_val: y_val, X_test: X_test, y_test: y_test } def prepare_pipeline(self, target_column): 完整的数据准备流水线 print( * 50) print(数据准备流水线) print( * 50) # 1. 数据清洗 self.clean_data() # 2. 特征工程 self.feature_engineering() # 3. 数据转换 processed_data, encoders, scaler self.transform_data() # 4. 数据划分 splits self.split_data(target_column) self.processed_data processed_data return splits, encoders, scaler # 创建示例数据并演示数据准备 np.random.seed(42) sample_data pd.DataFrame({ age: np.random.randint(18, 65, 1000), income: np.random.normal(50000, 15000, 1000), gender: np.random.choice([男, 女], 1000), city: np.random.choice([北京, 上海, 广州], 1000), target: np.random.choice([0, 1], 1000) }) # 添加一些缺失值和异常值 sample_data.loc[np.random.choice(1000, 50), income] np.nan sample_data.loc[np.random.choice(1000, 20), age] np.random.randint(100, 150) preparer DataPreparer(sample_data) splits, encoders, scaler preparer.prepare_pipeline(target)处理isnull 异常数字值# 使用中位数填充对异常值更鲁棒 self.data[col].fillna(self.data[col].median(), inplaceTrue) # 或使用更高级的方法 from sklearn.impute import KNNImputer imputer KNNImputer(n_neighbors5) self.data[numeric_columns] imputer.fit_transform(self.data[numeric_columns])clip()方法的作用将所有小于lower_bound的值替换为lower_bound将所有大于upper_bound的值替换为upper_bound介于两者之间的值保持不变评价✅ IQR 方法是检测异常值的经典方法⚠️ 直接裁剪clip会丢失信息可能抹去真实的极端值⚠️ 对于偏态分布如收入数据IQR 方法可能不适合改进建议# 方法1标记异常值而非直接删除/裁剪 self.data[f{col}_outlier] ((self.data[col] lower_bound) | (self.data[col] upper_bound)).astype(int) # 方法2使用更鲁棒的方法MAD - 中位数绝对偏差 median self.data[col].median() mad np.median(np.abs(self.data[col] - median)) threshold 3 * mad第四阶段模型训练模型选择策略选择合适的模型是成功的关键就像选择合适的工具来完成工作一样。模型选择考虑因素问题类型分类、回归、聚类等数据特征数据量、特征数量、数据类型性能要求准确率、速度、可解释性资源约束计算资源、时间限制模型训练示例# 模型训练示例 from sklearn.linear_model import LogisticRegression, LinearRegression from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor from sklearn.svm import SVC, SVR from sklearn.metrics import accuracy_score, mean_squared_error, classification_report class ModelTrainer: def __init__(self): self.models {} self.trained_models {} def register_model(self, name, model, problem_type): 注册模型 self.models[name] { model: model, problem_type: problem_type } print(f注册模型{name}{problem_type}) def train_single_model(self, name, X_train, y_train): 训练单个模型 if name not in self.models: raise ValueError(f模型 {name} 未注册) model_info self.models[name] model model_info[model] print(f\n训练模型{name}) model.fit(X_train, y_train) self.trained_models[name] model print(f模型 {name} 训练完成) return model def train_all_models(self, X_train, y_train): 训练所有注册的模型 print(\n开始训练所有模型...) for name in self.models.keys(): try: self.train_single_model(name, X_train, y_train) except Exception as e: print(f训练模型 {name} 时出错{e}) return self.trained_models def evaluate_models(self, X_test, y_test): 评估所有训练好的模型 print(\n模型评估结果) print(- * 50) results {} for name, model in self.trained_models.items(): problem_type self.models[name][problem_type] # 预测 y_pred model.predict(X_test) # 根据问题类型选择评估指标 if problem_type classification: accuracy accuracy_score(y_test, y_pred) results[name] {accuracy: accuracy} print(f{name}: 准确率 {accuracy:.4f}) # 详细报告 print(classification_report(y_test, y_pred)) elif problem_type regression: mse mean_squared_error(y_test, y_pred) rmse np.sqrt(mse) results[name] {mse: mse, rmse: rmse} print(f{name}: MSE {mse:.4f}, RMSE {rmse:.4f}) print(- * 50) return results def get_best_model(self, results, metricaccuracy): 获取最佳模型 if not results: return None best_model_name max(results.keys(), keylambda x: results[x].get(metric, 0)) best_score results[best_model_name][metric] print(f\n最佳模型{best_model_name}{metric} {best_score:.4f}) return best_model_name, self.trained_models[best_model_name] # 使用示例 trainer ModelTrainer() # 注册不同类型的模型 trainer.register_model(逻辑回归, LogisticRegression(random_state42), classification) trainer.register_model(随机森林, RandomForestClassifier(n_estimators100, random_state42), classification) trainer.register_model(支持向量机, SVC(random_state42), classification) # 创建训练数据 X_train splits[X_train] y_train splits[y_train] X_test splits[X_test] y_test splits[y_test] # 训练所有模型 trained_models trainer.train_all_models(X_train, y_train) # 评估模型 results trainer.evaluate_models(X_test, y_test) # 获取最佳模型 best_name, best_model trainer.get_best_model(results)第五阶段模型评估评估指标选择选择合适的评估指标就像选择合适的尺子不同的指标适用于不同的场景。常见评估指标分类问题准确率Accuracy正确预测的比例精确率Precision预测为正的样本中真正为正的比例召回率Recall实际为正的样本中被正确预测为正的比例F1 分数精确率和召回率的调和平均回归问题均方误差MSE预测值与真实值差的平方的平均均方根误差RMSEMSE 的平方根平均绝对误差MAE预测值与真实值差的绝对值的平均R² 分数模型解释的方差比例# 模型评估示例 import matplotlib.pyplot as plt from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc ) class ModelEvaluator: def __init__(self): self.evaluation_results {} def evaluate_classification(self, y_true, y_pred, y_probNone, model_nameModel): 评估分类模型 results {} # 基本指标 results[accuracy] accuracy_score(y_true, y_pred) results[precision] precision_score(y_true, y_pred, averageweighted) results[recall] recall_score(y_true, y_pred, averageweighted) results[f1] f1_score(y_true, y_pred, averageweighted) print(f\n{model_name} 分类评估结果) print(f准确率{results[accuracy]:.4f}) print(f精确率{results[precision]:.4f}) print(f召回率{results[recall]:.4f}) print(fF1 分数{results[f1]:.4f}) # 混淆矩阵 cm confusion_matrix(y_true, y_pred) print(f\n混淆矩阵) print(cm) # ROC 曲线如果有概率预测 if y_prob is not None and len(np.unique(y_true)) 2: fpr, tpr, thresholds roc_curve(y_true, y_prob[:, 1]) roc_auc auc(fpr, tpr) results[roc_auc] roc_auc # 绘制 ROC 曲线 plt.figure(figsize(8, 6)) plt.plot(fpr, tpr, colordarkorange, lw2, labelfROC 曲线 (AUC {roc_auc:.2f})) plt.plot([0, 1], [0, 1], colornavy, lw2, linestyle--) plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel(假正率) plt.ylabel(真正率) plt.title(f{model_name} ROC 曲线) plt.legend(loclower right) plt.grid(True) plt.show() self.evaluation_results[model_name] results return results def evaluate_regression(self, y_true, y_pred, model_nameModel): 评估回归模型 results {} # 基本指标 mse np.mean((y_true - y_pred) ** 2) rmse np.sqrt(mse) mae np.mean(np.abs(y_true - y_pred)) # R² 分数 ss_res np.sum((y_true - y_pred) ** 2) ss_tot np.sum((y_true - np.mean(y_true)) ** 2) r2 1 - (ss_res / ss_tot) results[mse] mse results[rmse] rmse results[mae] mae results[r2] r2 print(f\n{model_name} 回归评估结果) print(f均方误差 (MSE){mse:.4f}) print(f均方根误差 (RMSE){rmse:.4f}) print(f平均绝对误差 (MAE){mae:.4f}) print(fR² 分数{r2:.4f}) # 绘制预测 vs 真实值 plt.figure(figsize(8, 6)) plt.scatter(y_true, y_pred, alpha0.6) plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], r--, lw2) plt.xlabel(真实值) plt.ylabel(预测值) plt.title(f{model_name} 预测 vs 真实值) plt.grid(True) plt.show() self.evaluation_results[model_name] results return results def compare_models(self): 比较所有评估过的模型 if not self.evaluation_results: print(没有可比较的模型评估结果) return print(\n模型比较) print(- * 50) # 创建比较表格 comparison_data [] for model_name, results in self.evaluation_results.items(): row [model_name] for metric, value in results.items(): row.append(f{value:.4f}) comparison_data.append(row) # 打印表格 headers [模型名称] list(self.evaluation_results.values())[0].keys() print(\t.join(headers)) for row in comparison_data: print(\t.join(row)) # 使用示例 evaluator ModelEvaluator() # 评估分类模型 y_pred_class best_model.predict(X_test) y_prob_class best_model.predict_proba(X_test) evaluator.evaluate_classification(y_test, y_pred_class, y_prob_class, 最佳分类模型) # 比较所有模型 evaluator.compare_models()第六阶段模型部署部署策略模型部署是将模型投入实际使用的过程就像将研发的产品推向市场一样。部署方式批量预测定期处理大量数据实时预测在线服务即时响应嵌入式部署将模型集成到现有系统边缘部署在设备端运行模型模型部署示例# 模型部署示例 import pickle import json from datetime import datetime class ModelDeployer: def __init__(self): self.deployed_models {} self.deployment_logs [] def save_model(self, model, model_name, filepathNone): 保存模型 if filepath is None: filepath f{model_name}.pkl with open(filepath, wb) as f: pickle.dump(model, f) print(f模型 {model_name} 已保存到 {filepath}) # 记录部署日志 log_entry { timestamp: datetime.now().isoformat(), action: save_model, model_name: model_name, filepath: filepath } self.deployment_logs.append(log_entry) return filepath def load_model(self, model_name, filepath): 加载模型 with open(filepath, rb) as f: model pickle.load(f) self.deployed_models[model_name] model print(f模型 {model_name} 已从 {filepath} 加载) # 记录部署日志 log_entry { timestamp: datetime.now().isoformat(), action: load_model, model_name: model_name, filepath: filepath } self.deployment_logs.append(log_entry) return model def create_prediction_service(self, model_name, encodersNone, scalerNone): 创建预测服务 if model_name not in self.deployed_models: raise ValueError(f模型 {model_name} 未部署) model self.deployed_models[model_name] def predict_service(input_data): 预测服务函数 try: # 数据预处理 if encoders: for col, encoder in encoders.items(): if col in input_data.columns: input_data[col] encoder.transform(input_data[col]) if scaler: numeric_cols input_data.select_dtypes(include[number]).columns input_data[numeric_cols] scaler.transform(input_data[numeric_cols]) # 预测 prediction model.predict(input_data) # 如果是分类模型也返回概率 if hasattr(model, predict_proba): probability model.predict_proba(input_data) return { prediction: prediction.tolist(), probability: probability.tolist(), status: success, timestamp: datetime.now().isoformat() } else: return { prediction: prediction.tolist(), status: success, timestamp: datetime.now().isoformat() } except Exception as e: return { error: str(e), status: error, timestamp: datetime.now().isoformat() } # 记录服务创建日志 log_entry { timestamp: datetime.now().isoformat(), action: create_service, model_name: model_name } self.deployment_logs.append(log_entry) return predict_service def monitor_model(self, model_name, input_data, true_labelsNone): 监控模型性能 if model_name not in self.deployed_models: raise ValueError(f模型 {model_name} 未部署) predict_service self.create_prediction_service(model_name) # 获取预测结果 result predict_service(input_data) # 监控信息 monitoring_info { timestamp: datetime.now().isoformat(), model_name: model_name, input_shape: input_data.shape, prediction_count: len(result.get(prediction, [])), status: result.get(status, unknown) } # 如果有真实标签计算性能指标 if true_labels is not None and prediction in result: predictions result[prediction] if len(predictions) len(true_labels): accuracy accuracy_score(true_labels, predictions) monitoring_info[accuracy] accuracy print(模型监控信息) for key, value in monitoring_info.items(): print(f {key}: {value}) return monitoring_info def get_deployment_logs(self): 获取部署日志 return self.deployment_logs # 使用示例 deployer ModelDeployer() # 保存最佳模型 model_path deployer.save_model(best_model, best_classification_model) # 加载模型 deployer.load_model(best_classification_model, model_path) # 创建预测服务 prediction_service deployer.create_prediction_service( best_classification_model, encoders, scaler ) # 使用预测服务 test_input X_test.head(5) prediction_result prediction_service(test_input) print(\n预测结果) print(json.dumps(prediction_result, indent2, ensure_asciiFalse)) # 监控模型 deployer.monitor_model(best_classification_model, test_input, y_test.head(5).values)完整流程示例# 完整的机器学习流程示例 class MLProjectPipeline: def __init__(self): self.data_collector DataCollector() self.data_preparer None self.model_trainer ModelTrainer() self.model_evaluator ModelEvaluator() self.model_deployer ModelDeployer() def run_complete_pipeline(self, target_column): 运行完整的机器学习流水线 print( * 60) print(机器学习项目完整流程) print( * 60) # 1. 数据收集 print(\n第1步数据收集) print(- * 30) user_data self.data_collector.collect_user_data(1000) behavior_data self.data_collector.collect_behavior_data(5000) # 合并数据简化示例 merged_data pd.merge(user_data, behavior_data, onuser_id, howinner) # 创建目标变量示例是否购买 merged_data[purchased] (merged_data[behavior_type] 购买).astype(int) # 2. 数据准备 print(\n第2步数据准备) print(- * 30) # 选择特征列 feature_columns [age, gender, city, duration] if all(col in merged_data.columns for col in feature_columns): data_for_ml merged_data[feature_columns [purchased]].copy() # 处理类别变量 data_for_ml[gender] data_for_ml[gender].map({男: 0, 女: 1}) data_for_ml[city] data_for_ml[city].map({北京: 0, 上海: 1, 广州: 2}) # 数据准备 self.data_preparer DataPreparer(data_for_ml) splits, encoders, scaler self.data_preparer.prepare_pipeline(purchased) # 3. 模型训练 print(\n第3步模型训练) print(- * 30) # 注册模型 self.model_trainer.register_model( 逻辑回归, LogisticRegression(random_state42), classification ) self.model_trainer.register_model( 随机森林, RandomForestClassifier(n_estimators100, random_state42), classification ) # 训练模型 trained_models self.model_trainer.train_all_models( splits[X_train], splits[y_train] ) # 4. 模型评估 print(\n第4步模型评估) print(- * 30) results self.model_trainer.evaluate_models( splits[X_test], splits[y_test] ) best_name, best_model self.model_trainer.get_best_model(results) # 5. 模型部署 print(\n第5步模型部署) print(- * 30) # 保存模型 model_path self.model_deployer.save_model(best_model, production_model) # 创建预测服务 prediction_service self.model_deployer.create_prediction_service( production_model ) # 测试预测服务 test_input splits[X_test].head(3) prediction_result prediction_service(test_input) print(\n预测服务测试结果) print(json.dumps(prediction_result, indent2, ensure_asciiFalse)) print(\n * 60) print(机器学习项目流程完成) print( * 60) return { data: data_for_ml, splits: splits, best_model: best_model, best_model_name: best_name, evaluation_results: results, prediction_service: prediction_service } else: print(数据列不完整无法继续流程) return None # 运行完整流程 pipeline MLProjectPipeline() project_results pipeline.run_complete_pipeline(purchased)原文地址https://www.runoob.com/ml/ml-project-lifecycle.html