云原生 AI 平台搭建与智能调度系统设计一、为什么 AI 平台需要云原生架构当企业从单一大模型调用转向生产级 AI 应用时基础设施的挑战才真正显现。模型服务需要 GPU 资源、推理请求存在波峰波谷、不同的模型对资源需求各异——这些特性与传统 Web 服务有本质区别。云原生架构的优势恰好契合 AI 平台的需求容器化让模型镜像快速部署Kubernetes 提供弹性伸缩能力Operator 模式实现自定义资源管理Service Mesh 简化服务间通信。将 AI 平台构建在云原生基础设施之上可以获得更高的资源利用率、更强的容错能力、更便捷的运维体验。本文从云原生架构出发探讨 AI 平台的设计与实现包括模型服务化、资源调度、弹性伸缩、可观测性等核心环节。二、整体架构设计2.1 AI 平台架构总览flowchart TD subgraph 用户层 A[API Gateway] B[模型市场] end subgraph 调度层 C[模型调度器] D[GPU Scheduler] E[队列管理] end subgraph 推理层 F[模型服务 Pod] G[模型服务 Pod] H[模型服务 Pod] end subgraph 基础设施层 I[Kubernetes Cluster] J[GPU Node Pool] K[共享存储] end A -- C C -- D D -- E E --|分配资源| F E --|分配资源| G E --|分配资源| H F -- K G -- K H -- K style C fill:#ffcccc style F fill:#ccffcc style K fill:#ffffcc2.2 核心组件职责# AI Platform Kubernetes 架构 # 1. 模型服务部署 - Deployment 配置 apiVersion: apps/v1 kind: Deployment metadata: name: llm-inference-server namespace: ai-platform spec: replicas: 3 selector: matchLabels: app: llm-inference template: spec: containers: - name: inference image: model-server:v1.2.0 resources: limits: nvidia.com/gpu: 1 memory: 32Gi cpu: 8 requests: nvidia.com/gpu: 1 memory: 16Gi cpu: 4 env: - name: MODEL_NAME value: llama-2-7b-chat - name: MAX_CONCURRENT_REQUESTS value: 32三、模型服务化实现3.1 模型服务框架# model_server/server.py import asyncio from typing import AsyncGenerator from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel import torch from vllm import LLM, SamplingParams app FastAPI(titleModel Inference Server) class InferenceConfig(BaseModel): prompt: str max_tokens: int 256 temperature: float 0.7 top_p: float 0.95 class ModelServer: def __init__(self, model_path: str): self.llm LLM( modelmodel_path, tensor_parallel_sizetorch.cuda.device_count(), gpu_memory_utilization0.9, max_model_len4096, ) self.sampling_params SamplingParams( temperature0.7, top_p0.95, max_tokens256, ) async def generate_stream( self, config: InferenceConfig ) - AsyncGenerator[str, None]: 流式推理 sampling_params SamplingParams( temperatureconfig.temperature, top_pconfig.top_p, max_tokensconfig.max_tokens, ) # 异步生成 loop asyncio.get_event_loop() results await loop.run_in_executor( None, self.llm.generate, [config.prompt], sampling_params, ) # 流式输出 token for output in results[0].outputs: yield fdata: {output.text}\n\n yield data: [DONE]\n\n # Kubernetes Probe 配置示例 LIVENESS_PROBE livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 10 periodSeconds: 5 3.2 模型调度器实现# scheduler/model_scheduler.py from typing import Optional, List from dataclasses import dataclass from kubernetes import client, config import yaml dataclass class ModelDeployment: name: str model_path: str replicas: int gpu_per_replica: int memory_limit: str status: str class ModelScheduler: 模型调度器管理模型服务部署 def __init__(self): try: config.load_incluster_config() except: config.load_kube_config() self.apps_v1 client.AppsV1Api() self.core_v1 client.CoreV1Api() def deploy_model(self, deployment: ModelDeployment) - bool: 部署模型服务 manifest self._generate_deployment_manifest(deployment) try: self.apps_v1.create_namespaced_deployment( namespaceai-platform, bodymanifest, ) return True except client.ApiException as e: if e.status 409: # Already exists self.apps_v1.replace_namespaced_deployment( namedeployment.name, namespaceai-platform, bodymanifest, ) return True raise def scale_model(self, name: str, replicas: int) - bool: 弹性伸缩 try: self.apps_v1.patch_namespaced_deployment_scale( namename, namespaceai-platform, body{spec: {replicas: replicas}}, ) return True except Exception as e: print(fScale failed: {e}) return False def get_available_gpu_count(self) - int: 查询可用 GPU 数量 try: nodes self.core_v1.list_node() gpu_count 0 for node in nodes.items: allocatable node.status.allocatable if nvidia.com/gpu in allocatable: gpu_count int(allocatable[nvidia.com/gpu]) return gpu_count except Exception as e: print(fGet GPU count failed: {e}) return 0 def _generate_deployment_manifest(self, deployment: ModelDeployment) - client.V1Deployment: 生成 Deployment YAML container client.V1Container( nameinference, imagemodel-server:v1.2.0, resourcesclient.V1ResourceRequirements( limits{ nvidia.com/gpu: str(deployment.gpu_per_replica), memory: deployment.memory_limit, cpu: 8, }, requests{ nvidia.com/gpu: str(deployment.gpu_per_replica), memory: 16Gi, cpu: 4, }, ), env[ client.V1EnvVar(nameMODEL_PATH, valuedeployment.model_path), ], ports[client.V1ContainerPort(container_port8000)], ) template client.V1PodTemplateSpec( specclient.V1PodSpec(containers[container]), ) spec client.V1DeploymentSpec( replicasdeployment.replicas, selectorclient.V1LabelSelector( match_labels{app: deployment.name} ), templatetemplate, ) return client.V1Deployment( api_versionapps/v1, kindDeployment, metadataclient.V1ObjectMeta( namedeployment.name, namespaceai-platform, ), specspec, )3.3 GPU 资源管理# scheduler/gpu_manager.py from typing import Dict, List from dataclasses import dataclass import time dataclass class GPUAllocation: node_name: str gpu_id: int allocated_to: str allocated_at: float class GPUResourceManager: GPU 资源管理器 def __init__(self): self.allocations: Dict[str, GPUAllocation] {} def allocate(self, deployment_name: str, gpu_count: int) - List[GPUAllocation]: 分配 GPU 资源 allocated [] # 查询集群 GPU 状态 gpu_status self._get_gpu_status() for gpu_id, status in gpu_status.items(): if status[available] and len(allocated) gpu_count: alloc GPUAllocation( node_namestatus[node], gpu_idgpu_id, allocated_todeployment_name, allocated_attime.time(), ) self.allocations[f{status[node]}:{gpu_id}] alloc allocated.append(alloc) if len(allocated) gpu_count: # 释放已分配的并返回失败 for alloc in allocated: del self.allocations[f{alloc.node_name}:{alloc.gpu_id}] raise RuntimeError(fInsufficient GPU: requested {gpu_count}, available {len(allocated)}) return allocated def release(self, deployment_name: str): 释放 GPU 资源 to_release [ key for key, alloc in self.allocations.items() if alloc.allocated_to deployment_name ] for key in to_release: del self.allocations[key] def _get_gpu_status(self) - Dict[int, dict]: 获取 GPU 状态 # 简化实现实际需要查询 Kubernetes Node 状态 return { 0: {node: gpu-node-1, available: True}, 1: {node: gpu-node-1, available: True}, 2: {node: gpu-node-2, available: False}, }四、弹性伸缩与负载均衡4.1 基于队列的弹性伸缩# autoscaler/queue_based_scaler.py from kubernetes import client, config import time class QueueBasedAutoscaler: 基于队列长度的弹性伸缩 def __init__(self): config.load_incluster_config() self.autoscaling_v2 client.AutoscalingV2Api() def create_hpa(self, deployment_name: str, min_replicas: int, max_replicas: int): 创建 HPA metric client.V2beta1MetricSpec( typeExternal, externalclient.V2beta1ExternalMetricSource( metricclient.V2beta1MetricIdentifier( namequeue_length, ), targetclient.V2beta1MetricTarget( typeAverageValue, average_value10, ), ), ) hpa client.V2beta1HorizontalPodAutoscaler( metadataclient.V1ObjectMeta( namef{deployment_name}-hpa, namespaceai-platform, ), specclient.V2beta1HorizontalPodAutoscalerSpec( scale_target_refclient.V2beta1CrossVersionObjectReference( kindDeployment, namedeployment_name, api_versionapps/v1, ), min_replicasmin_replicas, max_replicasmax_replicas, metrics[metric], ), ) self.autoscaling_v2.create_namespaced_horizontal_pod_autoscaler( namespaceai-platform, bodyhpa, )4.2 负载均衡策略# Service LoadBalancer 配置 apiVersion: v1 kind: Service metadata: name: llm-inference-service namespace: ai-platform annotations: # AWS ALB 注解 service.beta.kubernetes.io/aws-load-balancer-type: nlb spec: type: LoadBalancer selector: app: llm-inference ports: - port: 80 targetPort: 8000 sessionAffinity: None五、可观测性体系5.1 指标采集# Prometheus 指标配置 apiVersion: v1 kind: ConfigMap metadata: name: prometheus-config namespace: monitoring data: prometheus.yml: | global: scrape_interval: 15s scrape_configs: - job_name: model-servers kubernetes_sd_configs: - role: pod relabel_configs: - source_labels: [__meta_kubernetes_pod_label_app] action: keep regex: llm-inference# metrics/prometheus_metrics.py from prometheus_client import Counter, Histogram, Gauge # 定义指标 REQUEST_COUNT Counter( inference_requests_total, Total inference requests, [model_name, status] ) REQUEST_LATENCY Histogram( inference_duration_seconds, Inference request latency, [model_name] ) GPU_UTILIZATION Gauge( gpu_utilization, GPU utilization percentage, [gpu_id] ) QUEUE_LENGTH Gauge( inference_queue_length, Number of requests in queue, [model_name] ) # 使用示例 def inference_endpoint(request): start time.time() try: result model.generate(request.prompt) REQUEST_COUNT.labels(model_nameMODEL_NAME, statussuccess).inc() return result except Exception as e: REQUEST_COUNT.labels(model_nameMODEL_NAME, statuserror).inc() raise finally: REQUEST_LATENCY.labels(model_nameMODEL_NAME).observe(time.time() - start)5.2 日志与追踪# observability/distributed_tracing.py from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter # 初始化追踪 provider TracerProvider() processor BatchSpanProcessor(JaegerExporter( agent_host_namejaeger-collector.monitoring, agent_port6831, )) provider.add_span_processor(processor) trace.set_tracer_provider(provider) tracer trace.get_tracer(__name__) tracer.span_manager async def trace_inference(request_id: str, prompt: str): with tracer.start_as_current_span(inference) as span: span.set_attribute(request_id, request_id) span.set_attribute(prompt_length, len(prompt)) # 模型调度 with tracer.start_as_current_span(schedule) as schedule_span: allocation scheduler.allocate_gpu() schedule_span.set_attribute(gpu_id, allocation.gpu_id) # 推理执行 with tracer.start_as_current_span(generate) as gen_span: result await model.generate(prompt) gen_span.set_attribute(tokens_generated, len(result.tokens)) return result六、总结云原生 AI 平台的核心优势在于基础设施的复用和弹性能力。架构要点模型服务化统一推理接口支持多种模型GPU 调度Kubernetes 原生调度 自定义调度策略弹性伸缩基于队列长度和 GPU 利用率的 HPA可观测性Prometheus Grafana Jaeger运维要点模型版本管理支持灰度发布和快速回滚资源配额避免单一模型占用全部资源容量规划根据历史负载预测未来需求成本优化利用 Spot Instance 弹性伸缩生产建议分离训练和推理环境建立模型镜像仓库和版本策略实施 GPU 利用率监控和优化定期进行故障演练