企业知识库与流程自动化 Agent 项目的完整实践项目NexusAgent技术栈FastAPI、DeepSeek、LangGraph、PostgreSQL、原生前端1. 项目背景与最终目标NexusAgent 的出发点是把企业里分散的资料、问答需求和流程动作连接起来。很多知识库产品只解决“搜资料”的问题很多聊天机器人只解决“自然语言回答”的问题但真实业务里常常还需要执行动作查询数据、计算结果、生成图表、保留来源、记录过程并且让不同用户的历史对话互相隔离。知识库能力上传 TXT、Markdown、PDF 后自动解析、分块并进入检索。问答能力基于来源片段生成答案命中不足时明确拒答。Agent 能力根据用户意图自动规划工具并组合知识库结果。2. 整体架构与技术选型项目采用前后端分离但部署上保持轻量的结构。后端使用 FastAPI 统一提供认证、文档、知识库、对话和 Agent 接口前端使用原生 HTML、CSS、JavaScript 构建工作台大模型能力由 DeepSeek API 提供任务编排由 LangGraph 承担PostgreSQL 保存用户与历史对话文档和知识片段在当前版本中仍使用本地 JSON 文件保存。FastAPI 入口与接口组织后端入口集中在 Backend/App.py。它既挂载前端静态资源也暴露核心 API。这样的结构让本地启动和 Render 部署都比较简单一个 Uvicorn 服务即可同时承载页面和接口。源码摘录Backend/App.pyApp FastAPI(titleNexusAgent, version0.2.0) App.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) App.mount(/Frontend, StaticFiles(directoryFrontendDirectory), nameFrontend) GeneratedChartsDirectory.mkdir(parentsTrue, exist_okTrue) App.mount(/GeneratedCharts, StaticFiles(directoryGeneratedChartsDirectory), nameGeneratedCharts) App.post(/Chat/AgentAsk, response_modelChatResponse) async def AskWithAgent(Request: ChatRequest, CurrentUserDepends(RequireCurrentUser)): return await ExecuteAgentAsk(Request, CurrentUser)3. 知识库与 RAG 问答实现知识库的第一步是让资料进入系统。NexusAgent 支持 TXT、Markdown 和 PDF。上传后DocumentService 会根据文件类型抽取文本、做基础清洗然后按段落进行分块。分块结果会写入 KnowledgeItem每个片段都保留 SourceId、标题、正文、文档 ID 和 ChunkIndex方便后续引用。源码摘录Backend/DocumentService.py - 文档上传与分块async def ExtractUpload(self, FileData: UploadFile): FileName FileData.filename or UploadedDocument Suffix Path(FileName).suffix.lower() if Suffix not in self.SupportedSuffixes: raise HTTPException(status_code400, detail仅支持上传 TXT、Markdown 或 PDF 文档。) RawContent await FileData.read() if not RawContent: raise HTTPException(status_code400, detail上传文件为空。) if Suffix .pdf: Content self.ExtractPdfText(RawContent) else: Content self.DecodeText(RawContent) Content self.NormalizeText(Content) if not Content: raise HTTPException(status_code400, detail未能从文件中解析出文本内容。) return FileName, Content def BuildChunks(self, Content, ChunkSize1200, Overlap120): Paragraphs [Paragraph.strip() for Paragraph in re.split(r\n\s*\n, Content) if Paragraph.strip()] Chunks [] Current for Paragraph in Paragraphs: Candidate f{Current}\n\n{Paragraph}.strip() if Current else Paragraph if len(Candidate) ChunkSize: Current Candidate continue if Current: Chunks.append(Current) Current Paragraph while len(Current) ChunkSize: Chunks.append(Current[:ChunkSize]) Current Current[ChunkSize - Overlap :] if Current: Chunks.append(Current) return Chunks or [Content[:ChunkSize]]在检索实现上项目没有一开始接入复杂向量数据库而是先实现了一个本地稀疏检索对中文做 2 到 6 字的 n-gram 扩展对英文、数字、符号关键词直接保留再根据标题命中、正文命中、完整查询命中等规则累积分数。这种方案不完美但非常适合 MVP 阶段快速验证 RAG 链路。源码摘录Backend/RuntimeStore.py - 本地检索def ExtractSearchTerms(self, Query): CleanQuery Query.lower() Terms re.findall(r[a-zA-Z0-9_#.%/-]|[\u4e00-\u9fff], CleanQuery) ExpandedTerms [] for Term in Terms: if re.fullmatch(r[\u4e00-\u9fff], Term): ExpandedTerms.append(Term) for Size in (2, 3, 4, 5, 6): ExpandedTerms.extend(Term[Index : Index Size] for Index in range(0, len(Term) - Size 1)) else: ExpandedTerms.append(Term) StopTerms { 我们, 公司, 这个, 那个, # ... 省略非关键实现 ... 进行, 当前, } return [Term for Term in ExpandedTerms if Term and Term not in StopTerms] def SearchKnowledge(self, Query, Limit4): QueryTerms self.ExtractSearchTerms(Query) Results [] for Item in self.KnowledgeItems: SearchText f{Item.Title} {Item.Content}.lower() Score 0 if Query.lower() in SearchText: Score 24 for Term in QueryTerms: Count SearchText.count(Term) if Count 0: continue if Term in Item.Title.lower(): Score Count * 5 elif len(Term) 3: Score Count * 2 else: Score Count if Term in Item.VectorTerms: Score 1 if Score 0: Snippet Item.Content[:220] (... if len(Item.Content) 220 else ) # ... 省略非关键实现 ... ) ) Results.sort(keylambda Item: Item.Score, reverseTrue) return Results[:Limit]RAG 问答流程的关键不是“让模型回答”而是让模型基于可追溯的来源回答。当没有命中 Sources 时服务会直接提示需要先上传相关文档避免模型编造不存在的依据。DeepSeek API 调用封装模型调用集中封装在 DeepSeekService 中。GenerateAnswer 会组装 system prompt、最近对话上下文和本轮用户 prompt然后请求 DeepSeek 的 /chat/completions 接口。这里也处理了缺少 API Key、请求超时、HTTP 错误和网络错误等兜底逻辑。源码摘录Backend/DeepSeekService.py - 调用 DeepSeek APIasync def GenerateAnswer(self, SystemPrompt, ConversationMessages, UserPrompt, Sources, ToolCalls): if not self.Settings.DeepSeekApiKey: return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, 当前未配置 DEEPSEEK_API_KEY。) Messages [{role: system, content: SystemPrompt}] Messages.extend(ConversationMessages[-8:]) Messages.append({role: user, content: UserPrompt}) Payload { model: self.Settings.DeepSeekModel, messages: Messages, stream: False, temperature: 0.2, } Headers { Authorization: fBearer {self.Settings.DeepSeekApiKey}, Content-Type: application/json, } try: async with httpx.AsyncClient(timeoutself.Settings.RequestTimeoutSeconds) as Client: Response await Client.post( f{self.Settings.DeepSeekBaseUrl}/chat/completions, headersHeaders, jsonPayload, ) Response.raise_for_status() Data Response.json() except httpx.TimeoutException: return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, DeepSeek 请求超时已返回本地执行摘要。) except httpx.HTTPStatusError as Error: return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, fDeepSeek API 返回 {Error.response.status_code}已返回本地执行摘要。) except httpx.RequestError as Error: Detail str(Error) or Error.__class__.__name__ return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, fDeepSeek 网络请求失败{Detail}。已返回本地执行摘要。) return Data[choices][0][message][content].strip()4. Agent 工作流与工具调用当项目从 RAG 问答升级到 Agent 自动化时最大的变化是系统不再只是检索资料并生成回答而是需要判断用户意图、决定是否调用工具、执行工具并把结果整合进最终答案。NexusAgent 使用 LangGraph 把这个过程拆成固定节点既方便调试也方便前端展示执行轨迹。源码摘录Backend/AgentWorkflow.py - LangGraph 编排class AgentWorkflowState(TypedDict, totalFalse): Message: str ConversationMessages: List[Dict[str, str]] Intent: str Sources: List[SourceItem] ToolPlans: List[Dict[str, Any]] ToolCalls: List[ToolCallItem] WorkflowSteps: List[WorkflowStepItem] Answer: str class AgentWorkflow: def __init__(self): Graph StateGraph(AgentWorkflowState) Graph.add_node(IdentifyIntent, self.IdentifyIntent) Graph.add_node(RetrieveKnowledge, self.RetrieveKnowledge) Graph.add_node(PlanTools, self.PlanTools) Graph.add_node(ExecuteTools, self.ExecuteTools) Graph.add_node(GenerateAnswer, self.GenerateAnswer) Graph.set_entry_point(IdentifyIntent) Graph.add_edge(IdentifyIntent, RetrieveKnowledge) Graph.add_edge(RetrieveKnowledge, PlanTools) Graph.add_edge(PlanTools, ExecuteTools) Graph.add_edge(ExecuteTools, GenerateAnswer) Graph.add_edge(GenerateAnswer, END) self.CompiledGraph Graph.compile() async def Run(self, Message, ConversationMessages): InitialState { Message: Message, ConversationMessages: ConversationMessages, WorkflowSteps: [], # ... 省略非关键实现 ... def AppendStep(self, State, StepName, Status, Detail): Steps list(State.get(WorkflowSteps, [])) Steps.append(WorkflowStepItem(StepNameStepName, StatusStatus, DetailDetail)) return Steps工具规划采用关键词触发的轻量方案。比如用户提到“时间”“几点”会触发当前时间工具提到“工时”“考勤”“月报”会触发工时统计提到“销售”“奖金”会触发销售数据与奖金计算提到“图表”“折线图”则会触发图表生成工具。源码摘录Backend/AgentTools.py - 工具规划def PlanTools(self, Message): LowerMessage Message.lower() Plans [] WantsChart any(Keyword in Message for Keyword in (画图, 图表, 折线图, 柱形图, 柱状图, 趋势图, 可视化)) if any(Keyword in Message for Keyword in (时间, 几点, 日期, 今天)): Plans.append({ToolName: GetCurrentTime, Input: {Timezone: Asia/Shanghai}}) if any(Keyword in Message for Keyword in (工时, 打卡, 考勤, 月报, 出勤, 日报)): Plans.append({ToolName: QueryWorkHours, Input: {Month: self.ExtractMonth(Message)}}) if any(Keyword in Message for Keyword in (销售, 奖金, 营收, 业绩, 华东, 华南, 华北)): Region self.ExtractRegion(Message) Plans.append({ToolName: QuerySalesData, Input: {Region: Region}}) if 奖金 in Message: Plans.append({ToolName: CalculateSalesBonus, Input: {Region: Region}}) Expression self.ExtractMathExpression(Message) if Expression: Plans.append({ToolName: SafeCalculate, Input: {Expression: Expression}}) if calculate in LowerMessage and not Expression: Plans.append({ToolName: SafeCalculate, Input: {Expression: Message}}) if WantsChart: Plans.append( { ToolName: RenderDocumentChart, Input: { ChartType: self.ExtractChartType(Message), DataKind: self.ExtractChartDataKind(Message), Query: Message, Month: self.ExtractMonth(Message), }, } ) return Plans为了避免工具变成不可控的黑盒项目把每次工具调用都包装成 ToolCallItem其中包含工具名、输入、输出、状态和错误信息。这样前端可以把工具执行结果展示给用户后端也可以把失败原因记录进运行日志。源码摘录Backend/AgentTools.py - 安全计算与图表生成def SafeCalculate(self, Expression): if not Expression or len(Expression) 120: raise ValueError(数学表达式为空或过长。) ParsedExpression ast.parse(Expression, modeeval) return {Expression: Expression, Result: self.EvaluateAst(ParsedExpression.body)} def EvaluateAst(self, Node): if isinstance(Node, ast.Constant) and isinstance(Node.value, (int, float)): return Node.value if isinstance(Node, ast.BinOp) and type(Node.op) in self.AllowedOperators: Left self.EvaluateAst(Node.left) Right self.EvaluateAst(Node.right) if isinstance(Node.op, ast.Pow) and abs(Right) 8: raise ValueError(指数过大已拒绝执行。) return self.AllowedOperators[type(Node.op)](Left, Right) if isinstance(Node, ast.UnaryOp) and type(Node.op) in self.AllowedOperators: return self.AllowedOperators[type(Node.op)](self.EvaluateAst(Node.operand)) raise ValueError(表达式包含不允许的内容。) def RenderDocumentChart(self, ChartType, DataKind, Query, Month): Series self.BuildChartSeries(ChartType, DataKind, Query, Month) if len(Series[Points]) 2: raise ValueError(未在已上传文档中找到足够的数值数据至少需要两个数据点才能画图。) FileName fChart-{uuid4().hex[:12]}.svg GeneratedChartsDirectory.mkdir(parentsTrue, exist_okTrue) ChartPath GeneratedChartsDirectory / FileName ChartPath.write_text( self.BuildChartSvg( Series[Title], Series[Points], Series[ChartType], Series[XLabel], Series[YLabel], ), encodingutf-8, ) # ... 省略非关键实现 ... ChartUrl: ChartUrl, ChartMarkdown: f![{Series[Title]}]({ChartUrl}), Source: Series[Source], }5. 用户系统、历史对话与运行日志企业级 Agent 和个人 Demo 最大的区别之一是需要区分用户、保留上下文并支持审计。NexusAgent 增加了注册、登录和 Token 会话机制并把用户、对话和消息写入 PostgreSQL。这样同一个部署实例可以服务多个用户每个用户只能访问自己的历史对话。源码摘录Backend/AuthService.py - 登录凭证与密码校验def HashPassword(self, Password, Salt): Digest hashlib.pbkdf2_hmac(sha256, Password.encode(utf-8), Salt.encode(utf-8), 120000) return Digest.hex() def BuildAuthResponse(self, User): Token secrets.token_urlsafe(32) self.Sessions[Token] { UserName: User[UserName], DisplayName: User[DisplayName], CreatedAt: self.GetCurrentTimeStamp(), } return { Token: Token, UserName: User[UserName], DisplayName: User[DisplayName], Status: Authenticated, } def Login(self, Request): User self.FindUser(Request.UserName) if not User: raise HTTPException(status_code401, detail用户名或密码错误。) ExpectedHash User.get(PasswordHash, ) ActualHash self.HashPassword(Request.Password, User.get(PasswordSalt, )) if not hmac.compare_digest(ExpectedHash, ActualHash): raise HTTPException(status_code401, detail用户名或密码错误。) return self.BuildAuthResponse(User)每次问答还会生成 RunId记录用户问题、答案预览、来源数量、状态、耗时、Sources、ToolCalls、WorkflowSteps 和错误信息。这个设计让“模型为什么这样回答”不再只能靠猜而是可以从一次运行记录回溯出来。源码摘录Backend/RuntimeStore.py - 对话消息与运行日志def AppendConversationMessage(self, ConversationId, UserName, Role, Content): Conversation self.EnsureConversation(ConversationId, UserName) Timestamp self.GetCurrentTimeStamp() with self.GetConnection() as Connection: SequenceIndex Connection.execute( SELECT COALESCE(MAX(SequenceIndex), 0) 1 AS NextSequenceIndex FROM ConversationMessages WHERE ConversationId %s AND UserName %s , (ConversationId, UserName), ).fetchone()[NextSequenceIndex] Connection.execute( INSERT INTO ConversationMessages (ConversationId, UserName, Role, Content, SequenceIndex, CreatedAt) VALUES (%s, %s, %s, %s, %s, %s) , (ConversationId, UserName, Role, Content, SequenceIndex, Timestamp), ) Connection.execute( UPDATE Conversations SET UpdatedAt %s WHERE ConversationId %s AND UserName %s , (Timestamp, ConversationId, UserName), # ... 省略非关键实现 ... WHERE ConversationId %s AND UserName %s , (Title, Timestamp, ConversationId, UserName), ) def AddRunLog( self, RunId, ConversationId, UserMessage, Answer, SourceCount, Status, LatencyMs, SourcesNone, ToolCallsNone, WorkflowStepsNone, ErrorMessageNone, ): RunLog RunLogItem( RunIdRunId, ConversationIdConversationId, UserMessageUserMessage, AnswerPreviewAnswer[:160] (... if len(Answer) 160 else ), SourceCountSourceCount, StatusStatus, CreatedAtself.GetCurrentTimeStamp(), LatencyMsLatencyMs, SourcesSources or [], # ... 省略非关键实现 ... self.RunLogs.insert(0, RunLog) self.RunLogs self.RunLogs[:50] return RunLog后续优化方向代码由codex生成结构上还未优化。目前工具数量较少离真正应用还差得远只是一次技术学习与测试。目前工具的调用依赖关键词后续最好是能够实现根据需求动态生成代码并执行。