基于 Python + LangChain + LangSmith 实现数据监控
前言在 AI 应用开发中我们往往专注于功能实现却忽略了一个关键问题我们的 AI 到底消耗了多少资源每次对话用了多少 Token产生了多少费用响应延迟如何哪些模型和接口被调用得最多本文将以一个完整的聊天应用为例从零开始搭建一套AI 使用数据监控系统涵盖后端数据采集、API 设计和前端可视化面板的完整实现。技术栈后端: Python Flask LangChain (ChatOpenAI / DashScope)数据库: MySQL前端: React 19 Ant Design 5 Zustand一、整体架构用户请求 │ ▼ Flask 后端 (server.py) │ ├─ 调用 LLM (ChatOpenAI) │ │ │ ▼ │ 提取 response_metadata (token 用量) │ │ │ ▼ ├─ log_usage() → 写入 MySQL (ai_usage_logs 表) │ ├─ /api/monitoring/summary ── 聚合统计 ├─ /api/monitoring/daily ── 每日趋势 ├─ /api/monitoring/recent ── 最近请求 │ ▼ React 前端 (MonitoringPanel.jsx) │ ├─ 5 个统计卡片 (Tokens / 费用 / 请求数 / 延迟 / 系统状态) ├─ SVG 双轴趋势图 (面积图 柱状图) ├─ 模型分布 端点分布 └─ 可筛选/排序的请求记录表格二、后端实现2.1 创建数据库表首先在 MySQL 中创建使用日志表CREATE TABLE IF NOT EXISTS ai_usage_logs ( id INT AUTO_INCREMENT PRIMARY KEY, endpoint VARCHAR(100) NOT NULL COMMENT API 端点, model VARCHAR(50) NOT NULL COMMENT 模型名称, prompt_tokens INT DEFAULT 0, completion_tokens INT DEFAULT 0, total_tokens INT DEFAULT 0, latency_ms INT DEFAULT 0 COMMENT 响应耗时(ms), cost DOUBLE DEFAULT 0 COMMENT 费用(CNY), conversation_id VARCHAR(36) DEFAULT NULL COMMENT 关联对话, request_preview VARCHAR(200) DEFAULT NULL COMMENT 用户输入预览, response_preview VARCHAR(200) DEFAULT NULL COMMENT AI回复预览, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_model (model), INDEX idx_endpoint (endpoint), INDEX idx_created (created_at) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;每条记录对应一次 LLM 调用存储端点、模型、Token 用量、延迟、费用等关键指标。2.2 使用数据追踪模块创建usage_tracker.py封装数据记录和提取逻辑AI 使用数据追踪模块 import pymysql from config import DB_CONFIG # DashScope 模型定价 (CNY / 1K tokens) MODEL_PRICING { qwen-plus: {input: 0.004, output: 0.012}, qwen-vl-plus: {input: 0.008, output: 0.02}, } def calculate_cost(model, prompt_tokens, completion_tokens): 根据模型定价计算费用 pricing MODEL_PRICING.get(model, {input: 0.004, output: 0.012}) return (prompt_tokens * pricing[input] completion_tokens * pricing[output]) / 1000 def log_usage(endpoint, model, prompt_tokens0, completion_tokens0, total_tokens0, latency_ms0, conversation_idNone, request_previewNone, response_previewNone): 记录一次 AI 调用的使用数据 if total_tokens 0 and (prompt_tokens 0 or completion_tokens 0): total_tokens prompt_tokens completion_tokens cost calculate_cost(model, prompt_tokens, completion_tokens) conn pymysql.connect(**DB_CONFIG) try: cursor conn.cursor() cursor.execute( INSERT INTO ai_usage_logs (endpoint, model, prompt_tokens, completion_tokens, total_tokens, latency_ms, cost, conversation_id, request_preview, response_preview) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s), (endpoint, model, prompt_tokens, completion_tokens, total_tokens, latency_ms, round(cost, 6), conversation_id, (request_preview or )[:200], (response_preview or )[:200]) ) conn.commit() finally: conn.close() def extract_metadata(response): 从 LangChain 响应中提取 Token 使用数据 metadata {prompt_tokens: 0, completion_tokens: 0, total_tokens: 0, model_name: } if hasattr(response, response_metadata): rm response.response_metadata if isinstance(rm, dict): token_usage rm.get(token_usage, {}) metadata[prompt_tokens] token_usage.get(prompt_tokens, 0) metadata[completion_tokens] token_usage.get(completion_tokens, 0) metadata[total_tokens] token_usage.get(total_tokens, 0) metadata[model_name] rm.get(model_name, ) return metadata def estimate_tokens(text): 估算 Token 数 (中文约 1.5 token/字, 英文约 0.25/字符) if not text: return 0 chinese sum(1 for c in text if \u4e00 c \u9fff) return int(chinese * 1.5 (len(text) - chinese) * 0.25)核心设计要点extract_metadata()— 从 LangChain 的ChatOpenAI响应对象中提取精确的 Token 用量estimate_tokens()— 对于流式调用DashScope 不返回 Token 统计用字符估算替代calculate_cost()— 基于模型定价自动计算每次调用费用2.3 在 LLM 调用点注入追踪流式调用估算 Token# 流式调用 — DashScope 不返回 Token 统计使用估算 start_time time.time() full_reply for chunk in llm.stream(history): if chunk.content: full_reply chunk.content yield fdata: {json.dumps({token: chunk.content})}\n\n # 流结束后记录 latency int((time.time() - start_time) * 1000) p_est estimate_tokens(user_message) c_est estimate_tokens(full_reply) log_usage( endpoint/api/chat/stream, modelqwen-plus, prompt_tokensp_est, completion_tokensc_est, total_tokensp_est c_est, latency_mslatency, request_previewuser_message, response_previewfull_reply[:200], )关键陷阱在流式端点中如果不同路径RAG、MCP、普通对话共用log_usage务必确保引用的变量在所有路径中都存在。例如history只在普通对话路径定义RAG/MCP 路径应使用user_message进行 Token 估算。2.4 监控 API 端点提供三个聚合查询接口供前端面板调用# 监控 API app.route(/api/monitoring/summary, methods[GET]) def monitoring_summary(): 获取使用数据汇总 days int(request.args.get(days, 30)) import pymysql conn pymysql.connect(**_DB_CONFIG) try: cursor conn.cursor(pymysql.cursors.DictCursor) # 总体统计 cursor.execute( SELECT COUNT(*) as total_requests, COALESCE(SUM(prompt_tokens), 0) as prompt_tokens, COALESCE(SUM(completion_tokens), 0) as completion_tokens, COALESCE(SUM(total_tokens), 0) as total_tokens, COALESCE(SUM(cost), 0) as total_cost, COALESCE(AVG(latency_ms), 0) as avg_latency, COALESCE(MAX(latency_ms), 0) as max_latency, COALESCE(MIN(latency_ms), 0) as min_latency FROM ai_usage_logs WHERE created_at DATE_SUB(NOW(), INTERVAL %s DAY), (days,), ) overall cursor.fetchone() # P95 延迟 cursor.execute( SELECT COALESCE(latency_ms, 0) as latency FROM ai_usage_logs WHERE created_at DATE_SUB(NOW(), INTERVAL %s DAY) AND latency_ms IS NOT NULL ORDER BY latency ASC, (days,), ) latency_rows cursor.fetchall() p95_latency 0 if latency_rows: idx int(len(latency_rows) * 0.95) p95_latency latency_rows[min(idx, len(latency_rows) - 1)][latency] # 成功率 (有 response_preview 的视为成功) cursor.execute( SELECT COUNT(*) as total, SUM(CASE WHEN response_preview IS NOT NULL AND response_preview ! THEN 1 ELSE 0 END) as success FROM ai_usage_logs WHERE created_at DATE_SUB(NOW(), INTERVAL %s DAY), (days,), ) sr cursor.fetchone() success_rate round((int(sr[success]) / int(sr[total])) * 100, 1) if int(sr[total]) 0 else 100.0 # 按模型分组 cursor.execute( SELECT model, COUNT(*) as requests, COALESCE(SUM(prompt_tokens), 0) as prompt_tokens, COALESCE(SUM(completion_tokens), 0) as completion_tokens, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY model, (days,), ) model_rows cursor.fetchall() models {} for row in model_rows: models[row[model]] { tokens: row[tokens], promptTokens: row[prompt_tokens], completionTokens: row[completion_tokens], requests: row[requests], cost: round(float(row[cost]), 4), } # 按端点分组 cursor.execute( SELECT endpoint, COUNT(*) as requests, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY endpoint, (days,), ) ep_rows cursor.fetchall() endpoints {} for row in ep_rows: endpoints[row[endpoint]] { tokens: row[tokens], requests: row[requests], cost: round(float(row[cost]), 4), } # 今日统计 cursor.execute( SELECT COALESCE(SUM(total_tokens), 0) as tokens, COUNT(*) as requests, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at CURDATE() ) today cursor.fetchone() # 昨日统计 cursor.execute( SELECT COALESCE(SUM(total_tokens), 0) as tokens FROM ai_usage_logs WHERE created_at DATE_SUB(CURDATE(), INTERVAL 1 DAY) AND created_at CURDATE() ) yesterday cursor.fetchone() yd_tokens yesterday[tokens] td_tokens int(today[tokens]) yd_tokens int(yesterday[tokens]) tokens_change round(((td_tokens - yd_tokens) / yd_tokens) * 100, 1) if yd_tokens 0 else (100.0 if td_tokens 0 else 0.0) # 最后请求时间 cursor.execute(SELECT MAX(created_at) as last_at FROM ai_usage_logs) last_row cursor.fetchone() last_request_at str(last_row[last_at]) if last_row[last_at] else None # 本月费用 cursor.execute( SELECT COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at DATE_FORMAT(NOW(), %%Y-%%m-01) ) month_cost float(cursor.fetchone()[cost]) return jsonify({ code: 200, data: { totalTokens: overall[total_tokens], promptTokens: overall[prompt_tokens], completionTokens: overall[completion_tokens], totalCost: round(float(overall[total_cost]), 4), totalRequests: overall[total_requests], avgLatencyMs: int(overall[avg_latency]), maxLatencyMs: int(overall[max_latency]), minLatencyMs: int(overall[min_latency]), p95LatencyMs: int(p95_latency), successRate: success_rate, todayTokens: td_tokens, todayRequests: today[requests], todayCost: round(float(today[cost]), 4), yesterdayTokens: yd_tokens, tokensChangeRate: tokens_change, lastRequestAt: last_request_at, monthCost: round(month_cost, 4), models: models, endpoints: endpoints, }, }) except Exception as e: return jsonify({code: 500, msg: str(e)}) finally: conn.close() app.route(/api/monitoring/daily, methods[GET]) def monitoring_daily(): 获取每日使用趋势 days int(request.args.get(days, 7)) import pymysql conn pymysql.connect(**_DB_CONFIG) try: cursor conn.cursor(pymysql.cursors.DictCursor) cursor.execute( SELECT DATE(created_at) as date, COALESCE(SUM(total_tokens), 0) as tokens, COUNT(*) as requests, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY DATE(created_at) ORDER BY date ASC, (days,), ) rows cursor.fetchall() data [] for row in rows: data.append({ date: str(row[date]), tokens: row[tokens], requests: row[requests], cost: round(float(row[cost]), 4), }) return jsonify({code: 200, data: data}) except Exception as e: return jsonify({code: 500, msg: str(e)}) finally: conn.close() app.route(/api/monitoring/recent, methods[GET]) def monitoring_recent(): 获取最近的 AI 调用记录 limit int(request.args.get(limit, 20)) import pymysql conn pymysql.connect(**_DB_CONFIG) try: cursor conn.cursor(pymysql.cursors.DictCursor) cursor.execute( SELECT id, endpoint, model, conversation_id, prompt_tokens, completion_tokens, total_tokens, latency_ms, cost, request_preview, response_preview, created_at FROM ai_usage_logs ORDER BY created_at DESC LIMIT %s, (limit,), ) rows cursor.fetchall() data [] for row in rows: has_response row[response_preview] is not None and row[response_preview] ! data.append({ id: row[id], endpoint: row[endpoint], model: row[model], conversationId: row[conversation_id], promptTokens: row[prompt_tokens], completionTokens: row[completion_tokens], totalTokens: row[total_tokens], latencyMs: row[latency_ms], cost: round(float(row[cost]), 4), requestPreview: row[request_preview], responsePreview: row[response_preview][:200] if row[response_preview] else None, status: success if has_response else failed, createdAt: str(row[created_at]), }) return jsonify({code: 200, data: data}) except Exception as e: return jsonify({code: 500, msg: str(e)}) finally: conn.close()三、前端实现3.1 API 服务层// monitoringService.js /** * Monitoring API Service * AI 使用数据监控 */ const BASE_URL process.env.REACT_APP_CHAT_API_URL || http://localhost:5000; export class MonitoringService { async getSummary(days 30) { const res await fetch(${BASE_URL}/api/monitoring/summary?days${days}); const data await res.json(); if (data.code 200) return data.data; throw new Error(data.msg || 获取监控汇总失败); } async getDailyUsage(days 7) { const res await fetch(${BASE_URL}/api/monitoring/daily?days${days}); const data await res.json(); if (data.code 200) return data.data; throw new Error(data.msg || 获取每日趋势失败); } async getRecentRequests(limit 20) { const res await fetch(${BASE_URL}/api/monitoring/recent?limit${limit}); const data await res.json(); if (data.code 200) return data.data; throw new Error(data.msg || 获取最近记录失败); } } export const monitoringService new MonitoringService();3.2 监控面板组件面板使用 antdModal弹窗 (960px 宽)包含以下区域1) 五个统计卡片Row gutter{[12, 12]} alignstretch {/* 总 Tokens / 总费用 / 请求次数 / 平均延迟 / 系统状态 */} {statCards.map(card ( Col flex1 key{card.key} div className{styles.statCard} style{{ background: card.gradient }} div className{styles.statLabel}{card.title}/div div className{styles.statValue}{card.value}/div div className{styles.statSub}{card.sub}/div /div /Col ))} /Row每个卡片使用渐变背景色 环比变化率标签 今日子数据系统状态卡片额外展示成功率、API 在线状态、月预算进度条。2) SVG 双轴趋势图使用纯 SVG 绘制面积图 柱状图组合左 Y 轴 Tokens平滑曲线 面积填充右 Y 轴 请求次数半透明柱状图Catmull-Rom 曲线平滑算法hover 显示详情function DualChart({ data }) { // 平滑曲线路径 (Catmull-Rom → Cubic Bezier) function smoothPath(points) { let d M${points[0].x},${points[0].y}; for (let i 0; i points.length - 1; i) { const tension 0.3; // 计算 Catmull-Rom 控制点... d C${cp1x},${cp1y} ${cp2x},${cp2y} ${p2.x},${p2.y}; } return d; } // SVG: 网格线 柱状图 面积 曲线 数据点 }3) 模型分布 端点分布左右并排展示使用 antdProgress进度条 百分比直观显示各模型和端点的使用占比。4) 最近请求表格Table dataSource{recentRequests} columns{[ { title: 时间, sorter: true }, { title: 端点, filters: [...] }, // 可筛选 { title: 模型, filters: [...] }, { title: Input/Output }, { title: 延迟, sorter: true, render: colorByLatency }, { title: 状态, render: successOrFailedTag }, { title: 操作, render: detailPopover }, ]} pagination{{ pageSize: 8 }} /支持按端点、模型筛选按时间、Token、延迟排序点击「详情」查看完整请求/响应内容。3.3 浮动按钮入口在聊天页面右下角添加浮动按钮点击打开监控弹窗div className{styles.monitoringFab} onClick{() setMonitoringOpen(true)} DashboardOutlined / /div MonitoringPanel open{monitoringOpen} onClose{() setMonitoringOpen(false)} / /div.monitoringFab { position: fixed; bottom: 28px; right: 28px; width: 48px; height: 48px; border-radius: 50%; background: linear-gradient(135deg, #1677ff, #4096ff); color: #fff; display: flex; align-items: center; justify-content: center; font-size: 20px; cursor: pointer; box-shadow: 0 4px 16px rgba(22, 119, 255, 0.4); z-index: 100; transition: all 0.25s ease; user-select: none; :hover { transform: scale(1.1); box-shadow: 0 6px 24px rgba(22, 119, 255, 0.5); } :active { transform: scale(0.95); } }五、最终效果监控面板展示以下数据区域内容统计卡片总 Tokens (含环比) / 总费用 / 请求次数 / 平均延迟 / 系统状态 预算进度趋势图每日 Token 使用趋势 (面积图) 请求次数 (柱状图)双 Y 轴分布按模型分布 按端点分布的进度条请求列表可筛选/排序的最近请求含状态、详情弹窗总结本文实现了一套轻量但完整的 AI 使用监控系统核心思路是拦截 LLM 响应— 从response_metadata提取精确数据流式调用用估算结构化存储— 每次调用记录端点、模型、Token、延迟、费用到 MySQL聚合 API— 提供多维度统计查询支持时间范围、模型/端点分组、环比计算可视化面板- 渐变卡片 SVG 图表 可操作表格集成在聊天界面中整个过程不需要 LangSmith 平台账号纯自建实现。