生产级 AI Agent 从零搭建:从 Prompt、Memory、Tool、RAG 到 Runtime 的全链路工程实践

关键词:Agent Runtime、Function Calling、Memory、RAG、Skill、Workflow、可观测性、多租户、安全治理
阅读目标:不是做一个“能对话”的 Demo,而是搭建一个能接入业务、能承压、能审计、能演进的生产级 Agent 系统


一、前言:为什么很多 Agent Demo 一上生产就失效

过去一年,很多团队都做过 Agent Demo:

  • • 接一个大模型 API

  • • 拼一个 system prompt

  • • 注册几个 function calling 工具

  • • 再接一个向量库做知识库问答

本地跑起来并不难,难的是一旦进入真实业务,系统会立刻暴露出一系列工程问题:

业务诉求 Demo 常见做法 生产环境真正的问题
多轮对话 直接把历史消息拼进 prompt Token 爆炸、上下文污染、历史失真
工具调用 模型说调哪个工具就调哪个工具 权限越权、重复调用、副作用失控
知识库问答 只做向量检索 topK 召回噪声高、事实冲突、租户隔离不足
高并发接入 单实例同步请求 大模型 RT 抖动、连接池打满、尾延迟恶化
生产审计 仅保留最终回答 无法回放推理链路,无法定位事故根因
成本控制 默认使用一个大模型 简单请求成本过高,缺乏路由与降级策略

真正的 Agent 系统,本质上不是一个 Prompt 技巧集合,而是一个新的应用运行时:

  • • 它要负责把用户意图翻译成可执行计划

  • • 它要协调模型、工具、知识、记忆与业务系统

  • • 它要在不确定的推理行为外面,套上可控的工程边界

换句话说,生产级 Agent 的核心不是“让模型更聪明”,而是“让系统在模型不稳定的前提下仍然可控”。

本文会从零拆解一套生产级 Agent Runtime 的设计思路,并给出一版可直接落地改造的代码骨架。


二、先讲结论:生产级 Agent 到底在“生产”什么

很多文章把 Agent 的重点放在 Prompt、Tool 或 RAG 本身,但工程落地时,真正决定系统上线质量的,通常是下面六件事:

2.1 可控性

模型是概率系统,业务系统需要确定性。

所以 Agent Runtime 必须回答这些问题:

  • • 哪些工具允许被调用

  • • 什么时候允许调用

  • • 一个请求最多允许调用几次

  • • 工具失败后是否允许重试

  • • 带副作用的操作如何保证幂等

2.2 可观测性

如果线上出现“回答胡说”“一直转圈调工具”“延迟突然升高”“成本超预算”,你必须能快速定位:

  • • 是 Prompt 设计问题

  • • 是 RAG 召回问题

  • • 是工具执行问题

  • • 是模型路由问题

  • • 还是基础设施问题

2.3 可扩展性

当场景从“一个客服 Agent”扩展到“客服、销售、运营、风控、运维多个 Agent”时,系统不能因为能力增加就整体重写。

因此架构必须天然支持:

  • • Skill 插件化

  • • 工具注册中心

  • • 模型路由

  • • 知识源扩展

  • • 多租户隔离

2.4 高并发与高可用

很多团队一开始忽略了一个事实:真正的瓶颈经常不在 CPU,而在外部依赖。

  • • 模型 API 有 RT 抖动

  • • 向量检索有尾延迟

  • • Redis 会出现热点 key

  • • 下游工具服务会限流或超时

Agent Runtime 需要具备完整的超时、隔离、熔断、降级与异步化能力。

2.5 安全与治理

Agent 能调用工具,就意味着它拥有“行为能力”。
一旦接上内部订单、库存、支付、工单、运维系统,安全问题就不再是可选项。

至少要做:

  • • 工具级鉴权

  • • 参数级校验

  • • 输出级脱敏

  • • Prompt 注入防护

  • • 租户数据隔离

  • • 审计日志留痕

2.6 演进能力

生产级架构不是第一次设计就完美,而是要支持从 MVP 演进到平台化:

  • • 单体服务

  • • 模块化单体

  • • 微服务化

  • • 事件驱动

  • • 多 Agent 协同

如果你的第一版系统从结构上就不允许演进,后面几乎必然推倒重来。


三、生产级 Agent 的整体架构:不要只看“模型”,要看“运行时”

3.1 架构全景

从工程角度看,一套 Agent 平台至少应拆成五层:

┌──────────────────────────────────────────────────────────────┐
│                        接入与治理层                           │
│ API Gateway / Auth / Rate Limit / Tenant Isolation / Audit  │
└──────────────────────────────┬───────────────────────────────┘

┌──────────────────────────────▼───────────────────────────────┐
│                         Agent Runtime                         │
│ Session / Planner / Tool Executor / State Machine / Stream  │
└───────┬──────────────────────┬──────────────────────┬────────┘
        │                      │                      │
┌───────▼────────┐   ┌─────────▼────────┐   ┌────────▼─────────┐
│ Prompt & Policy │   │ Memory & Context │   │ Knowledge & RAG  │
│ Role/Rules/Budget│  │ Window/Summary/Profile│ Recall/Rerank/Cite│
└───────┬────────┘   └─────────┬────────┘   └────────┬─────────┘
        │                      │                      │
┌───────▼──────────────────────▼──────────────────────▼────────┐
│                       Tool & Workflow Plane                   │
│ Registry / Permission / Idempotency / Compensation / Saga   │
└───────┬──────────────────────────────────────────────────────┘

┌───────▼──────────────────────────────────────────────────────┐
│                        Infrastructure                         │
│ Redis / PostgreSQL / Kafka / VectorDB / Object Store / OTel │
└──────────────────────────────────────────────────────────────┘

3.2 为什么要把 Runtime 放在中心

很多团队最开始的实现方式,是在 Web 接口里直接做这些事:

    1. 拼 prompt
    1. 调模型
    1. 如果返回 tool call,就调工具
    1. 再把工具结果送回模型
    1. 最后返回答案

这套流程在单场景下能跑,但很快会遇到三个问题:

    1. 业务逻辑和推理逻辑耦合在一起,难维护。
    1. 无法统一治理工具权限、重试、熔断、审计。
    1. 当后续要接流式输出、异步任务、人工审批、长任务恢复时,代码会迅速失控。

因此 Runtime 必须成为独立的一层,负责:

  • • 驱动状态机

  • • 管理上下文预算

  • • 调度工具

  • • 记录事件

  • • 输出流式结果

  • • 管控生命周期

从这个角度讲,Agent Runtime 更像一个“面向 LLM 的应用操作系统”。


四、请求生命周期:一条用户消息在系统里到底经历了什么

一条用户请求从进入系统到返回结果,理想情况下应该经过如下阶段:

User Request
  -> 鉴权与租户识别
  -> Session 装载
  -> Memory 加载
  -> Query 分类与 Skill 路由
  -> RAG 检索与上下文构建
  -> Prompt 编译
  -> LLM 推理
  -> Tool Call 决策
  -> 工具执行 / 工作流执行
  -> 结果验证与后处理
  -> 流式返回 / 最终回答
  -> 事件落库与异步记忆写回

4.1 这里最容易被忽略的三个关键点

第一,Session 状态不等于聊天记录

Agent 处理请求时,需要的不只是“用户说过什么”,还包括:

  • • 当前租户

  • • 当前场景

  • • 当前权限

  • • 正在进行中的任务

  • • 已调用过的工具

  • • 已产生的中间结果

  • • 当前剩余预算

所以 Session 应该是一份结构化状态,而不是简单消息数组。

第二,工具调用是状态迁移,不只是函数调用

例如用户说:“帮我把订单改到明天下午配送,并短信通知客户。”

这不是一个工具调用,而是一个小型工作流:

    1. 校验订单是否存在
    1. 校验当前状态是否允许改约
    1. 修改配送时间
    1. 发送通知
    1. 记录审计事件
    1. 任一步失败时做补偿或人工兜底

所以在生产环境里,Tool 更像“可控操作单元”,Workflow 才是业务编排单元。

第三,Memory 写回应尽量异步

如果每轮响应结束后,同步做摘要、画像提取、长期记忆更新,模型 RT 会明显恶化。

更合理的方式是:

  • • 主链路只写短期窗口

  • • 摘要生成通过异步任务触发

  • • 用户画像抽取按阈值批量执行

  • • 长期事实更新走事件队列


五、Prompt 不是一段字符串,而是一套“策略编译结果”

5.1 为什么生产环境不能只写一个 system prompt

单一 system prompt 最大的问题不是不够长,而是不够可管理。

在真实系统里,Prompt 至少同时承载五类信息:

  • • 角色定义

  • • 行为规则

  • • 业务约束

  • • 工具说明

  • • 当前上下文

把这些内容全部硬编码在一段长文本里,会导致:

  • • 版本难以管理

  • • 不同业务场景难以复用

  • • 修改规则容易引入隐性回归

  • • 无法做 A/B 测试与灰度发布

5.2 推荐做法:Prompt 分层编译

from dataclasses import dataclass
from enum import Enum
from typing import List
 
class PromptLayerType(str, Enum):
    ROLE = "role"
    POLICY = "policy"
    TOOL = "tool"
    CONTEXT = "context"
    MEMORY = "memory"
    TASK = "task"
 
@dataclass
class PromptLayer:
    layer_type: PromptLayerType
    content: str
    priority: int
    token_budget: int
    required: bool = True
 
class PromptCompiler:
    def __init__(self, token_counter):
        self.token_counter = token_counter
 
    def compile(self, layers: List[PromptLayer], total_budget: int) -> str:
        ordered = sorted(layers, key=lambda x: x.priority)
        compiled: list[str] = []
        used_tokens = 0
 
        for layer in ordered:
            if not layer.content:
                continue
 
            current_tokens = self.token_counter.count(layer.content)
            if current_tokens > layer.token_budget:
                layer_content = self.token_counter.truncate(
                    layer.content, layer.token_budget
                )
                current_tokens = self.token_counter.count(layer_content)
            else:
                layer_content = layer.content
 
            if used_tokens + current_tokens > total_budget:
                if layer.required:
                    raise ValueError(f"Prompt budget exceeded at {layer.layer_type}")
                continue
 
            compiled.append(layer_content)
            used_tokens += current_tokens
 
        return "\n\n".join(compiled)

5.3 一个实用的 Prompt 分层模型

Layer 1: Role
你是谁,负责什么,不负责什么
 
Layer 2: Policy
合规规则、禁止行为、答复约束、升级策略
 
Layer 3: Tool Contract
可以使用哪些工具,工具的输入输出约束是什么
 
Layer 4: Memory Context
用户近期上下文、用户画像、当前任务状态
 
Layer 5: Knowledge Context
RAG 检索结果、来源与置信度
 
Layer 6: Task Instruction
本轮用户请求与输出格式要求

5.4 生产实践:把规则写成“可执行约束”

很多 Prompt 失败,不是因为模型不聪明,而是因为规则写得太抽象。

例如下面这类规则约束弱:

请谨慎调用工具,必要时才调用。

更好的写法是:

仅当回答需要外部实时信息、用户授权后的业务操作、或必须引用内部知识时才允许调用工具。
如果用户请求只是解释概念、总结信息或润色文本,不要调用任何工具。
同一工具在一轮会话中最多调用 2 次;若两次结果仍不足以回答,应明确说明信息不足并请求人工协助。

Prompt 最终服务的是 Runtime 的可控性,而不是文采。


六、Memory 设计:真正难的不是“记住”,而是“记什么、忘什么、何时更新”

6.1 为什么不能把全部历史直接塞进上下文

这种做法在 Demo 阶段最省事,但生产上会带来三类问题:

  1. 1. 成本问题:上下文越来越长,推理成本持续增加。

  2. 2. 质量问题:旧上下文噪声进入本轮推理,导致注意力偏移。

  3. 3. 安全问题:历史中混入不再适用的敏感信息或跨任务信息。

所以 Memory 的关键不是“无限记忆”,而是“分层管理”。

6.2 推荐的四层记忆模型

L1: Working Memory
- 当前任务内的短期状态
- 生命周期:本次会话 / 当前任务
- 存储:Redis / In-Memory
 
L2: Conversation Memory
- 近期对话窗口
- 生命周期:分钟到小时
- 存储:Redis List / Stream
 
L3: Summary Memory
- 历史摘要、关键决策、阶段性结论
- 生命周期:天级
- 存储:Redis + PostgreSQL
 
L4: Profile / Fact Memory
- 用户画像、长期偏好、稳定业务事实
- 生命周期:周到月
- 存储:PostgreSQL / KV / Graph

6.3 生产可用的 Memory 数据结构

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import time
 
class MemoryType(str, Enum):
    MESSAGE = "message"
    SUMMARY = "summary"
    FACT = "fact"
    TASK_STATE = "task_state"
 
@dataclass
class MemoryRecord:
    tenant_id: str
    user_id: str
    session_id: str
    memory_type: MemoryType
    content: str
    metadata: dict[str, Any] = field(default_factory=dict)
    created_at: float = field(default_factory=lambda: time.time())
    expire_at: Optional[float] = None
 
class MemoryManager:
    def __init__(self, redis_client, repository, summary_queue):
        self.redis = redis_client
        self.repository = repository
        self.summary_queue = summary_queue
 
    async def append_message(self, record: MemoryRecord, window_size: int = 20) -> None:
        key = f"agent:mem:{record.tenant_id}:{record.session_id}:window"
        payload = self._serialize(record)
 
        pipe = self.redis.pipeline()
        pipe.rpush(key, payload)
        pipe.ltrim(key, -window_size, -1)
        pipe.expire(key, 3600)
        await pipe.execute()
 
    async def load_context(
        self,
        tenant_id: str,
        session_id: str,
        max_messages: int = 10,
    ) -> list[dict[str, Any]]:
        key = f"agent:mem:{tenant_id}:{session_id}:window"
        values = await self.redis.lrange(key, -max_messages, -1)
        return [self._deserialize(item) for item in values]
 
    async def trigger_summary_if_needed(self, tenant_id: str, session_id: str) -> None:
        key = f"agent:mem:{tenant_id}:{session_id}:window"
        size = await self.redis.llen(key)
        if size >= 18:
            await self.summary_queue.publish(
                {
                    "tenant_id": tenant_id,
                    "session_id": session_id,
                    "event": "memory.summary.generate",
                }
            )
 
    def _serialize(self, record: MemoryRecord) -> str:
        import json
        return json.dumps(record.__dict__, ensure_ascii=False)
 
    def _deserialize(self, payload: str) -> dict[str, Any]:
        import json
        return json.loads(payload)

6.4 摘要记忆的关键,不是“压缩”,而是“保真”

很多系统做摘要时只关心把内容压短,但生产场景里更重要的是保留对后续决策真正有价值的信息:

  • • 用户身份与角色

  • • 当前任务目标

  • • 已完成动作

  • • 未完成动作

  • • 明确约束

  • • 时间承诺

  • • 风险或冲突点

一个更稳妥的摘要模板通常应包含:

1. 当前用户目标
2. 已确认事实
3. 已执行动作
4. 待执行动作
5. 禁止或限制条件
6. 需要人工接管的风险点

6.5 分布式场景下的 Memory 难点

难点一:多副本并发写入

同一 session 可能被多个副本处理,例如:

  • • WebSocket 重连

  • • 请求重试

  • • 多通道输入

这时必须保证:

  • • 事件顺序可恢复

  • • 重复写入可识别

  • • 任务状态可覆盖但不混乱

推荐做法:

  • • 每条消息带 event_id

  • • 工作流状态带 version

  • • 对重要状态更新采用 CAS 或乐观锁

难点二:不同记忆层的刷新频率不同

短期窗口适合同步写入,长期画像不适合同步更新。
如果用户每发一句话都做画像提取,成本会非常差。

更合理的策略是:

  • • 关键字段变化才触发画像更新

  • • 达到轮次阈值才触发摘要

  • • 长期事实采用异步抽取

难点三:记忆污染

不是所有对话内容都应进入长期记忆。

例如:

  • • 用户一时的情绪表达

  • • 当前促销活动的临时信息

  • • 模型自己生成但未被确认的推断

这类内容如果写进长期记忆,会造成后续推理偏差。
所以长期记忆必须有“确认门槛”。


七、Tool 体系:Function Calling 只是入口,真正关键是执行治理

7.1 工具系统的本质

Tool 系统不是“把 Python 函数暴露给模型”,而是把业务能力包装成安全、可治理、可审计的执行单元。

生产级工具系统至少要包含:

  • • Schema 校验

  • • 权限校验

  • • 超时控制

  • • 重试策略

  • • 幂等控制

  • • 审计埋点

  • • 熔断降级

  • • 结果标准化

7.2 工具注册中心设计

import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
 
from pydantic import BaseModel, ValidationError
 
class ToolRequest(BaseModel):
    tenant_id: str
    user_id: str
    session_id: str
    trace_id: str
    idempotency_key: str
    arguments: dict[str, Any]
 
@dataclass
class ToolDefinition:
    name: str
    description: str
    schema_model: type[BaseModel]
    handler: Callable[..., Any]
    timeout_seconds: float = 5.0
    max_retries: int = 0
    need_approval: bool = False
    side_effect: bool = False
    allowed_roles: list[str] = field(default_factory=list)
 
    def to_openai_tool(self) -> dict[str, Any]:
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.schema_model.model_json_schema(),
            },
        }
 
class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, ToolDefinition] = {}
 
    def register(self, tool: ToolDefinition) -> None:
        if tool.name in self._tools:
            raise ValueError(f"tool already exists: {tool.name}")
        self._tools[tool.name] = tool
 
    def get(self, name: str) -> Optional[ToolDefinition]:
        return self._tools.get(name)
 
    def list_openai_tools(self) -> list[dict[str, Any]]:
        return [tool.to_openai_tool() for tool in self._tools.values()]
 
class ToolExecutor:
    def __init__(self, permission_service, idempotency_store, audit_logger):
        self.permission_service = permission_service
        self.idempotency_store = idempotency_store
        self.audit_logger = audit_logger
 
    async def execute(self, tool: ToolDefinition, request: ToolRequest) -> dict[str, Any]:
        start = time.time()
 
        try:
            validated_args = tool.schema_model(**request.arguments)
        except ValidationError as exc:
            return {"status": "invalid_argument", "error": exc.errors()}
 
        allowed = await self.permission_service.check(
            tenant_id=request.tenant_id,
            user_id=request.user_id,
            tool_name=tool.name,
            roles=tool.allowed_roles,
        )
        if not allowed:
            return {"status": "forbidden", "error": "permission denied"}
 
        if tool.side_effect:
            existed = await self.idempotency_store.try_acquire(
                request.idempotency_key, ttl_seconds=600
            )
            if not existed:
                return {"status": "duplicate", "error": "duplicated request"}
 
        try:
            result = await asyncio.wait_for(
                tool.handler(validated_args, request),
                timeout=tool.timeout_seconds,
            )
            return {"status": "ok", "result": result}
        except asyncio.TimeoutError:
            return {"status": "timeout", "error": f"{tool.name} timeout"}
        except Exception as exc:
            return {"status": "error", "error": str(exc)}
        finally:
            await self.audit_logger.log(
                trace_id=request.trace_id,
                tool_name=tool.name,
                cost_ms=int((time.time() - start) * 1000),
                side_effect=tool.side_effect,
            )

7.3 为什么幂等性是 Agent 工具系统的必选项

传统 API 调用重复一次可能只是多打一条日志。
Agent 重复调用一次,可能意味着:

  • • 订单被重复取消

  • • 优惠券被重复发放

  • • 工单被重复创建

  • • 消息被重复发送

大模型并不天然知道“这个工具不应该再调一次”。
所以必须在 Runtime 外层做幂等治理,而不是把希望寄托在 Prompt 上。

7.4 把工具分成两类管理

推荐按副作用分类:

第一类:读工具

例如:

  • • 查订单

  • • 查库存

  • • 查物流

  • • 查知识库

特点:

  • • 可重试

  • • 可并发

  • • 风险较低

第二类:写工具

例如:

  • • 修改地址

  • • 发优惠券

  • • 创建工单

  • • 退款申请

特点:

  • • 必须鉴权

  • • 必须幂等

  • • 必须审计

  • • 需要更严格的速率控制

生产经验上,Agent 系统第一次出事故,往往都发生在写工具上。

7.5 工具调用循环的安全边界

一个稳健的 Agent 循环,至少要限制:

  • • 单轮最大推理次数

  • • 单工具最大调用次数

  • • 总 token 预算

  • • 总耗时预算

  • • 是否允许并行调用

一个更安全的执行器示例如下:

class AgentLoopGuard:
    def __init__(
        self,
        max_iterations: int = 8,
        max_tool_calls: int = 6,
        max_elapsed_ms: int = 15000,
    ):
        self.max_iterations = max_iterations
        self.max_tool_calls = max_tool_calls
        self.max_elapsed_ms = max_elapsed_ms
 
class SafeAgentExecutor:
    def __init__(self, llm_client, tool_registry: ToolRegistry, tool_executor: ToolExecutor):
        self.llm_client = llm_client
        self.tool_registry = tool_registry
        self.tool_executor = tool_executor
 
    async def run(self, state, messages, guard: AgentLoopGuard):
        started_at = time.time()
        tool_call_count = 0
 
        for iteration in range(guard.max_iterations):
            elapsed_ms = int((time.time() - started_at) * 1000)
            if elapsed_ms > guard.max_elapsed_ms:
                return {"status": "timeout", "answer": "请求处理超时,请稍后重试。"}
 
            response = await self.llm_client.chat(messages, state.available_tools)
            assistant_message = response["message"]
 
            if not assistant_message.get("tool_calls"):
                return {"status": "ok", "answer": assistant_message.get("content", "")}
 
            for tool_call in assistant_message["tool_calls"]:
                tool_call_count += 1
                if tool_call_count > guard.max_tool_calls:
                    return {"status": "overflow", "answer": "工具调用次数超限,已停止执行。"}
 
                tool = self.tool_registry.get(tool_call["name"])
                if not tool:
                    messages.append(
                        {
                            "role": "tool",
                            "content": json.dumps({"status": "unknown_tool"}, ensure_ascii=False),
                        }
                    )
                    continue
 
                result = await self.tool_executor.execute(
                    tool=tool,
                    request=state.build_tool_request(tool_call["arguments"]),
                )
                messages.append(
                    {
                        "role": "tool",
                        "name": tool.name,
                        "content": json.dumps(result, ensure_ascii=False),
                    }
                )
 
        return {"status": "exhausted", "answer": "达到最大推理次数,建议转人工处理。"}

八、RAG 设计:不是“向量检索”四个字就够了

8.1 生产级 RAG 的核心目标

RAG 的目标不是“把知识喂给模型”,而是尽可能让模型基于可信上下文回答,并明确回答边界。

真正有工程价值的 RAG,需要同时解决四件事:

    1. 召回正确的内容
    1. 排除错误或过期内容
    1. 控制上下文长度
    1. 让回答可引用、可追踪

8.2 一条完整的 RAG 查询链路

用户问题
  -> Query Rewrite
  -> 召回策略选择
  -> Dense Recall
  -> Sparse Recall
  -> RRF 融合
  -> Rerank
  -> 权限过滤
  -> 上下文压缩
  -> 回答生成
  -> 引用来源输出

8.3 为什么只做向量召回通常不够

纯向量检索在这些场景中容易失效:

  • • 产品型号、订单号、错误码等精确词匹配

  • • 法规条款、版本变更、配置项对比

  • • 中文短问句歧义大

  • • 企业文档格式混乱

因此生产上更推荐混合检索:

  • • Dense Recall 负责语义相关性

  • • Sparse Recall 负责关键词精确命中

  • • Rerank 负责最终排序

8.4 生产可用的 RAG Pipeline 骨架

from dataclasses import dataclass
from typing import Any
 
@dataclass
class RetrievedDocument:
    doc_id: str
    content: str
    score: float
    source: str
    tenant_id: str
    metadata: dict[str, Any]
 
class RAGPipeline:
    def __init__(self, dense_store, sparse_store, reranker, permission_filter):
        self.dense_store = dense_store
        self.sparse_store = sparse_store
        self.reranker = reranker
        self.permission_filter = permission_filter
 
    async def query(
        self,
        tenant_id: str,
        user_id: str,
        query: str,
        top_k: int = 8,
    ) -> tuple[str, list[RetrievedDocument]]:
        rewritten = await self._rewrite_query(query)
        dense = await self.dense_store.search(tenant_id=tenant_id, query=rewritten, top_k=20)
        sparse = await self.sparse_store.search(tenant_id=tenant_id, query=rewritten, top_k=20)
 
        fused = self._rrf_merge(dense, sparse)
        permitted = await self.permission_filter.filter(
            tenant_id=tenant_id,
            user_id=user_id,
            documents=fused,
        )
 
        ranked = await self.reranker.rank(query=rewritten, documents=permitted[:12])
        selected = ranked[:top_k]
        context = self._build_context(selected)
        return context, selected
 
    async def _rewrite_query(self, query: str) -> str:
        return query
 
    def _rrf_merge(self, dense: list[RetrievedDocument], sparse: list[RetrievedDocument]):
        scores: dict[str, float] = {}
        doc_map: dict[str, RetrievedDocument] = {}
 
        for rank, doc in enumerate(dense):
            scores[doc.doc_id] = scores.get(doc.doc_id, 0.0) + 1 / (60 + rank + 1)
            doc_map[doc.doc_id] = doc
 
        for rank, doc in enumerate(sparse):
            scores[doc.doc_id] = scores.get(doc.doc_id, 0.0) + 1 / (60 + rank + 1)
            doc_map[doc.doc_id] = doc
 
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        return [doc_map[doc_id] for doc_id in sorted_ids]
 
    def _build_context(self, documents: list[RetrievedDocument]) -> str:
        blocks = []
        for index, doc in enumerate(documents, start=1):
            blocks.append(
                f"[来源{index}] source={doc.source} score={doc.score:.4f}\n{doc.content}"
            )
        return "\n\n---\n\n".join(blocks)

8.5 文档入库比查询更重要

很多团队把时间都花在检索阶段调参,但实际效果经常由“文档预处理质量”决定。

入库阶段至少要做好:

  • • 去重

  • • 标题层级保留

  • • 表格转结构化文本

  • • 代码块单独分块

  • • 版本信息保留

  • • 来源标识保留

  • • 更新时间保留

  • • 文档权限标签保留

8.6 多租户 RAG 最容易踩的坑

坑一:只在应用层做租户过滤

如果向量库检索后才过滤租户,可能导致:

  • • 无关租户文档进入候选池

  • • 召回结果被污染

  • • 排序被错误影响

正确做法是尽量在检索阶段就带租户过滤条件。

坑二:把未确认答案写回知识库

有些系统会把模型回答自动反哺知识库。
如果没有审核链路,很容易把幻觉内容变成“正式知识”。

建议:

  • • 用户问答日志和正式知识库分开

  • • 只允许人工审核后的内容进入正式库

坑三:没有过期治理

制度、价格、活动、流程经常变化。
过期知识如果不做时间权重或版本治理,会让模型越来越不可信。


九、Skill 与 Workflow:不要把所有问题都交给“自由规划”

9.1 Skill 的作用是什么

Skill 适合承载一个相对稳定的业务意图域,例如:

  • • 订单查询

  • • 售后处理

  • • 发票开具

  • • 商品推荐

  • • 运维巡检

每个 Skill 可以有自己专属的:

  • • Prompt 模板

  • • 工具集

  • • RAG 数据域

  • • 风险策略

  • • 输出格式

这比让一个通用 Agent 自由决定一切更稳。

9.2 Workflow 和 Skill 的边界

一个经验法则:

  • • 如果任务主要是“理解并回答”,优先 Skill

  • • 如果任务包含多个可验证步骤,优先 Workflow

例如“解释某个制度”适合 Skill。
“查询订单并发起退款再通知用户”更适合 Workflow。

9.3 Skill Router 的基本设计

from abc import ABC, abstractmethod
 
class BaseSkill(ABC):
    @property
    @abstractmethod
    def name(self) -> str:
        ...
 
    @abstractmethod
    async def match(self, request: dict) -> float:
        ...
 
    @abstractmethod
    async def execute(self, request: dict, runtime_context: dict) -> dict:
        ...
 
class SkillRouter:
    def __init__(self, skills: list[BaseSkill], intent_classifier):
        self.skills = skills
        self.intent_classifier = intent_classifier
 
    async def route(self, request: dict) -> BaseSkill:
        scored = []
        for skill in self.skills:
            score = await skill.match(request)
            if score >= 0.85:
                return skill
            if score >= 0.30:
                scored.append((score, skill))
 
        if not scored:
            raise RuntimeError("no skill matched")
 
        top_candidates = [skill for _, skill in sorted(scored, key=lambda x: x[0], reverse=True)[:3]]
        selected_name = await self.intent_classifier.select(
            text=request["message"],
            candidates=[skill.name for skill in top_candidates],
        )
        for skill in top_candidates:
            if skill.name == selected_name:
                return skill
        return top_candidates[0]

9.4 什么时候不要让模型自由规划

以下场景不建议完全交给 autonomous planning:

  • • 涉及扣款、退款、发券等资金操作

  • • 涉及主数据修改

  • • 涉及外部系统联动

  • • 涉及高成本工具调用

  • • 涉及审批流

这些场景更适合:

  • • 固定流程模板

  • • 人工确认节点

  • • Saga 补偿设计

  • • 严格的参数白名单

生产经验里,越关键的业务,越不应该把控制权完全交给自由推理。


十、Runtime 调度:Agent 真正的难点,是“把不确定推理放进确定执行框架”

10.1 Agent Runtime 至少要管五类预算

生产环境里,预算不是只有 token。

一个请求至少同时受以下预算约束:

  • • Token Budget

  • • Latency Budget

  • • Tool Budget

  • • Cost Budget

  • • Retry Budget

例如一个用户请求整体 SLA 是 8 秒,那么可以拆成:

阶段 预算
鉴权与会话装载 100ms
RAG 检索 500ms
首轮模型推理 2500ms
工具执行 2000ms
二轮推理 2000ms
后处理与返回 300ms

如果没有预算拆分,就没有稳定 SLA。

10.2 高并发下最实用的三个优化点

第一,读工具并发,写工具串行

例如订单查询、库存查询、物流查询可以并发。
但涉及状态变更的写操作,应串行或通过工作流协调。

第二,主链路只保留必要步骤

不影响当前回答质量的动作,尽量异步化:

  • • 记忆摘要

  • • 画像抽取

  • • 审计扩展字段补全

  • • 运营埋点

  • • 训练样本沉淀

第三,SSE 早返回,降低用户感知时延

即使总耗时没有降低,用户只要更早看到“系统开始处理”,体验就会明显更好。

10.3 Runtime 状态机设计

from enum import Enum
 
class RuntimeState(str, Enum):
    RECEIVED = "received"
    SESSION_READY = "session_ready"
    CONTEXT_READY = "context_ready"
    MODEL_THINKING = "model_thinking"
    TOOL_RUNNING = "tool_running"
    POST_PROCESSING = "post_processing"
    COMPLETED = "completed"
    FAILED = "failed"
    HUMAN_REVIEW = "human_review"

状态机的价值在于:

  • • 便于可观测性埋点

  • • 便于中断恢复

  • • 便于失败分类

  • • 便于引入人工审批节点

10.4 长任务如何处理

有些任务天然不适合同步接口,例如:

  • • 生成长报告

  • • 批量工单处理

  • • 大量知识清洗

  • • 多步骤外部系统协作

这类任务建议:

  • • 请求入口只负责创建任务

  • • Runtime 将任务投递到消息队列

  • • 前端通过 WebSocket / 轮询获取进度

  • • 中间步骤按事件流落盘

这时 Agent Runtime 更接近“编排引擎”,而不只是问答引擎。


十一、生产级代码骨架:一套更接近真实项目的实现方式

下面给出一版简化但工程化思路正确的代码结构。

11.1 配置中心

 
# app/config.py
 
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
 
class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", extra="ignore")
 
    app_name: str = "agent-platform"
    env: str = "dev"
    host: str = "0.0.0.0"
    port: int = 8080
 
    openai_api_key: str = ""
    openai_base_url: str = "https://api.openai.com/v1"
    llm_primary_model: str = "gpt-4.1"
    llm_fast_model: str = "gpt-4.1-mini"
 
    redis_url: str = "redis://localhost:6379/0"
    postgres_dsn: str = ""
    vector_backend: str = "pgvector"
 
    agent_max_iterations: int = 8
    agent_max_tool_calls: int = 6
    agent_request_timeout_ms: int = 15000
    agent_context_budget: int = 24000
 
    rag_top_k: int = 6
    rag_candidate_k: int = 20
 
    enable_trace_log: bool = True
    enable_sensitive_masking: bool = True
 
settings = Settings()

11.2 统一请求上下文

 
# app/runtime/context.py
 
from dataclasses import dataclass, field
from typing import Any
import uuid
 
@dataclass
class RequestContext:
    tenant_id: str
    user_id: str
    session_id: str
    message: str
    trace_id: str = field(default_factory=lambda: uuid.uuid4().hex)
    metadata: dict[str, Any] = field(default_factory=dict)
 
@dataclass
class RuntimeContext:
    request: RequestContext
    available_tools: list[dict[str, Any]]
    prompt_budget: int
    state: dict[str, Any] = field(default_factory=dict)
    citations: list[dict[str, Any]] = field(default_factory=list)

11.3 模型路由器

 
# app/llm/router.py
 
class LLMRouter:
    def __init__(self, primary_client, fast_client):
        self.primary_client = primary_client
        self.fast_client = fast_client
 
    async def select_model(self, scene: str, complexity: str) -> str:
        if scene in {"routing", "query_rewrite", "summary"}:
            return "fast"
        if complexity == "low":
            return "fast"
        return "primary"
 
    async def chat(self, scene: str, complexity: str, **kwargs):
        model_key = await self.select_model(scene=scene, complexity=complexity)
        client = self.fast_client if model_key == "fast" else self.primary_client
        return await client.chat.completions.create(**kwargs)

11.4 Agent Runtime 入口

 
# app/runtime/agent_runtime.py
 
import json
import time
 
class AgentRuntime:
    def __init__(
        self,
        prompt_service,
        memory_service,
        rag_service,
        skill_router,
        tool_registry,
        tool_executor,
        llm_router,
        event_store,
    ):
        self.prompt_service = prompt_service
        self.memory_service = memory_service
        self.rag_service = rag_service
        self.skill_router = skill_router
        self.tool_registry = tool_registry
        self.tool_executor = tool_executor
        self.llm_router = llm_router
        self.event_store = event_store
 
    async def handle(self, request_context):
        started_at = time.time()
 
        recent_history = await self.memory_service.load_recent(
            tenant_id=request_context.tenant_id,
            session_id=request_context.session_id,
        )
        profile = await self.memory_service.load_profile(
            tenant_id=request_context.tenant_id,
            user_id=request_context.user_id,
        )
 
        skill = await self.skill_router.route(
            {
                "tenant_id": request_context.tenant_id,
                "user_id": request_context.user_id,
                "message": request_context.message,
            }
        )
 
        rag_context, sources = await self.rag_service.query(
            tenant_id=request_context.tenant_id,
            user_id=request_context.user_id,
            query=request_context.message,
        )
 
        prompt = await self.prompt_service.build(
            skill=skill.name,
            message=request_context.message,
            history=recent_history,
            profile=profile,
            rag_context=rag_context,
        )
 
        messages = [
            {"role": "system", "content": prompt},
            {"role": "user", "content": request_context.message},
        ]
 
        tool_schemas = self.tool_registry.list_openai_tools()
        llm_response = await self.llm_router.chat(
            scene="agent_main",
            complexity="medium",
            model="gpt-4.1",
            messages=messages,
            tools=tool_schemas,
            tool_choice="auto",
            temperature=0.2,
        )
 
        final_answer = await self._continue_if_needed(
            request_context=request_context,
            messages=messages,
            response=llm_response,
        )
 
        await self.memory_service.write_back(
            tenant_id=request_context.tenant_id,
            user_id=request_context.user_id,
            session_id=request_context.session_id,
            user_message=request_context.message,
            assistant_message=final_answer,
        )
 
        await self.event_store.append(
            trace_id=request_context.trace_id,
            event_type="agent.completed",
            payload={
                "skill": skill.name,
                "elapsed_ms": int((time.time() - started_at) * 1000),
                "sources": sources,
            },
        )
 
        return {"answer": final_answer, "sources": sources}
 
    async def _continue_if_needed(self, request_context, messages, response):
        choice = response.choices[0].message
        if not choice.tool_calls:
            return choice.content or ""
 
        messages.append(
            {
                "role": "assistant",
                "content": choice.content or "",
                "tool_calls": [
                    {
                        "id": tc.id,
                        "type": "function",
                        "function": {
                            "name": tc.function.name,
                            "arguments": tc.function.arguments,
                        },
                    }
                    for tc in choice.tool_calls
                ],
            }
        )
 
        for tc in choice.tool_calls:
            tool = self.tool_registry.get(tc.function.name)
            if not tool:
                tool_result = {"status": "unknown_tool"}
            else:
                tool_result = await self.tool_executor.execute(
                    tool=tool,
                    request=request_context.to_tool_request(
                        arguments=json.loads(tc.function.arguments or "{}")
                    ),
                )
 
            messages.append(
                {
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": json.dumps(tool_result, ensure_ascii=False),
                }
            )
 
        second_round = await self.llm_router.chat(
            scene="agent_main",
            complexity="medium",
            model="gpt-4.1",
            messages=messages,
            temperature=0.2,
        )
        return second_round.choices[0].message.content or ""

11.5 一个带副作用工具的生产写法

 
# app/tools/reschedule_delivery.py
 
from pydantic import BaseModel, Field
 
class RescheduleDeliveryArgs(BaseModel):
    order_id: str = Field(min_length=8, max_length=64)
    expected_date: str
    time_slot: str
    reason: str = Field(default="user_request", max_length=128)
 
async def reschedule_delivery_handler(args: RescheduleDeliveryArgs, request):
    order = await order_gateway.get_order(
        tenant_id=request.tenant_id,
        order_id=args.order_id,
    )
    if not order:
        return {"status": "not_found", "message": "订单不存在"}
 
    if order["status"] not in {"PAID", "READY_TO_SHIP", "DELIVERING"}:
        return {"status": "rejected", "message": "当前订单状态不允许改约"}
 
    if not await risk_service.allow_reschedule(
        tenant_id=request.tenant_id,
        user_id=request.user_id,
        order_id=args.order_id,
    ):
        return {"status": "review_required", "message": "需要人工审核"}
 
    updated = await delivery_gateway.reschedule(
        tenant_id=request.tenant_id,
        order_id=args.order_id,
        expected_date=args.expected_date,
        time_slot=args.time_slot,
        operator=request.user_id,
        idempotency_key=request.idempotency_key,
    )
 
    await audit_service.record(
        trace_id=request.trace_id,
        action="delivery.reschedule",
        order_id=args.order_id,
        payload=args.model_dump(),
    )
    return {"status": "ok", "result": updated}

11.6 FastAPI 接入层

 
# app/main.py
 
from fastapi import FastAPI
from pydantic import BaseModel
import uuid
 
app = FastAPI(title="Production Agent Platform")
 
class ChatRequest(BaseModel):
    tenant_id: str
    user_id: str
    session_id: str | None = None
    message: str
 
class ChatResponse(BaseModel):
    trace_id: str
    session_id: str
    answer: str
    sources: list[dict]
 
@app.post("/api/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    session_id = request.session_id or uuid.uuid4().hex
    request_context = RequestContext(
        tenant_id=request.tenant_id,
        user_id=request.user_id,
        session_id=session_id,
        message=request.message,
    )
    result = await agent_runtime.handle(request_context)
    return ChatResponse(
        trace_id=request_context.trace_id,
        session_id=session_id,
        answer=result["answer"],
        sources=result["sources"],
    )

这套骨架的重点不在代码长短,而在职责分离:

  • • 接入层只做协议处理

  • • Runtime 负责 orchestration

  • • Prompt / Memory / RAG / Tool 分别独立治理

  • • 事件与审计单独沉淀


十二、一个实际业务场景:电商客服 Agent 如何落地

为了避免文章停留在抽象层,我们用一个真实且常见的场景串起来。

12.1 场景目标

用户发来一句话:

订单昨天还没发货,帮我查一下原因,如果今天也发不了就改到明天下午配送。

这个请求至少涉及三种能力:

    1. 订单状态查询
    1. 发货原因解释
    1. 条件满足时执行配送改约

12.2 推荐的处理方式

第一步,不要让模型直接决定“能不能改约”。
这属于强业务规则,应由工具或工作流校验。

第二步,把任务拆成两个阶段:

  • • 阶段 A:查询与解释

  • • 阶段 B:条件满足时执行改约

第三步,对阶段 B 增加写操作保护:

  • • 用户身份校验

  • • 订单归属校验

  • • 状态机校验

  • • 风险规则校验

  • • 幂等校验

12.3 典型链路

用户请求
  -> Skill Router 命中 order_service
  -> 调 query_order 工具
  -> 调 query_shipment_reason 工具
  -> 模型判断是否需要进一步动作
  -> 若满足规则,则调 reschedule_delivery 工具
  -> 输出结果并附带操作结果说明
  -> 异步写审计与会话摘要

12.4 这里最值得强调的一点

改约动作不是模型“想做就做”,而是 Runtime 允许后才能做。

这就是生产级 Agent 与 Demo Agent 的本质差别:

  • • Demo 中模型主导一切

  • • 生产中 Runtime 决定边界,模型只在边界内推理


十三、工程化增强:高并发、可扩展、可恢复

13.1 高并发下的瓶颈通常在哪里

很多人直觉上会先担心应用服务 CPU。
但 Agent 系统高并发场景下更常见的瓶颈其实是:

  • • 模型 API RT 抖动

  • • 向量检索尾延迟

  • • Redis 热点 key

  • • 外部工具服务慢调用

  • • SSE 长连接占用

所以优化重点通常应放在依赖治理,而不是单纯扩机器。

13.2 三类隔离策略

线程池 / 协程池隔离

避免一个慢工具拖垮整个事件循环。

下游连接池隔离

不同工具服务、不同向量库、不同模型 provider 使用独立连接池。

资源配额隔离

大客户、高优先级请求、后台批任务分开治理,避免互相抢占。

13.3 Cache 应该加在哪

Agent 系统的缓存不是只加一层 Redis 就行。

通常可做四层缓存:

    1. Prompt 片段缓存
    1. Embedding 缓存
    1. RAG 召回缓存
    1. 语义回答缓存

但要注意:

  • • 带实时性要求的问题不适合回答缓存

  • • 带租户权限的知识上下文必须带权限维度缓存 key

  • • 写工具绝不能因为缓存影响幂等判断

13.4 降级策略要提前定义

如果主模型超时,你的系统是:

  • • 直接失败

  • • 切备用模型

  • • 改成只做知识检索回答

  • • 改成建议转人工

这些策略应该在上线前定义,而不是线上故障时临时决定。

一个常见降级矩阵如下:

故障点 降级策略
主模型超时 切轻量模型或返回简化答复
RAG 超时 仅用会话上下文回答,并明确说明未引用知识库
读工具超时 提示当前系统繁忙,建议稍后重试
写工具失败 不自动重试副作用操作,转人工或进入审批
摘要服务异常 不影响主链路,延后重试

13.5 长链路恢复能力

只要系统中有多步骤工作流,就必须考虑恢复:

  • • 服务重启后如何知道执行到哪一步

  • • 工具已执行成功但回包丢失怎么办

  • • 用户重复提交怎么办

推荐做法:

  • • 每一步写事件日志

  • • 每个副作用工具都有幂等键

  • • 工作流状态持久化

  • • 支持基于状态恢复或补偿


十四、可观测性:没有可观测性,Agent 系统几乎无法运维

14.1 至少要采集哪些指标

请求级指标

  • • QPS

  • • 成功率

  • • P50 / P95 / P99

  • • 每请求平均 token

  • • 每请求平均成本

模型级指标

  • • 模型调用次数

  • • 首 token 延迟

  • • 总完成时延

  • • 工具触发率

  • • 无工具直接回答率

RAG 指标

  • • 检索时延

  • • topK 命中率

  • • rerank 时延

  • • 来源覆盖率

  • • 无结果比例

工具级指标

  • • 调用次数

  • • 成功率

  • • 超时率

  • • 幂等冲突率

  • • 审批触发率

14.2 事件日志比普通日志更重要

Agent 排障时,仅靠字符串日志通常不够。
更好的做法是定义统一事件模型,例如:

agent.request.received
agent.session.loaded
agent.skill.selected
agent.rag.recalled
agent.prompt.compiled
agent.llm.completed
agent.tool.called
agent.tool.completed
agent.response.returned
agent.memory.persisted

这样你就可以完整回放一次请求的生命周期。

14.3 OpenTelemetry 是值得投入的

如果系统已经进入平台化阶段,建议把:

  • • HTTP 请求

  • • 模型调用

  • • 工具调用

  • • 向量检索

  • • Redis / DB 操作

全部纳入统一 Trace。
Agent 是典型的长链路系统,没有 Trace 基本看不清瓶颈。


十五、安全与治理:这是很多 Agent 项目最晚补、却最先出问题的部分

15.1 Prompt 注入不是理论问题

只要系统接收外部文本、网页、文档、工单内容,就可能被注入。

典型风险包括:

  • • 文档中藏有“忽略此前所有规则”

  • • 用户输入诱导模型泄露内部规则

  • • 外部网页内容污染 RAG 上下文

推荐防线:

  • • Prompt 中明确区分“系统规则”和“外部内容”

  • • 检索内容以引用块形式注入,而不是与系统规则混排

  • • 工具权限不由模型决定,必须二次校验

15.2 多租户隔离必须前置设计

常见错误做法是先做单租户,后面再补租户字段。
一旦系统接入知识库、记忆、工具审计,再补租户隔离代价会非常大。

至少要保证:

  • • Memory key 带 tenant_id

  • • RAG 检索带 tenant filter

  • • Tool 鉴权带 tenant context

  • • 审计日志带 tenant_id

15.3 输出安全

Agent 输出前建议增加后处理层,负责:

  • • 敏感信息脱敏

  • • 高风险回答拦截

  • • 不确定结论加免责声明

  • • 引用来源格式化

尤其在金融、医疗、政务场景,后处理几乎是必选项。


十六、从单体到平台:建议的演进路线

Phase 1:MVP 验证

适用阶段:

  • • 单一场景

  • • 单租户或小范围试点

  • • 重点验证业务价值

推荐形态:

  • • FastAPI / Spring Boot 单体

  • • Redis 做会话记忆

  • • PostgreSQL 或 pgvector 做知识库

  • • 读工具优先,少量写工具

Phase 2:模块化单体

适用阶段:

  • • 场景开始增多

  • • 工具数达到 20 以上

  • • 团队开始多人协作

推荐重点:

  • • 抽离 Runtime、RAG、Memory、Tool 模块

  • • 引入统一事件模型

  • • 增加审计与配置中心

Phase 3:平台化服务

适用阶段:

  • • 多团队复用

  • • 多租户

  • • 有成本与 SLA 压力

推荐重点:

  • • 模型路由中心

  • • 独立 RAG 服务

  • • 工具注册与审批平台

  • • 统一 Trace 与监控

Phase 4:事件驱动与多 Agent 协同

适用阶段:

  • • 长任务

  • • 多系统联动

  • • 多 Agent 分工协作

推荐重点:

  • • 任务总线

  • • 工作流引擎

  • • 人工审批节点

  • • 事件溯源与恢复机制

不要一开始就奔着多 Agent 去做。
绝大多数团队的第一优先级仍然是把单 Agent 做稳、做透、做可控。


十七、常见误区与生产排坑

误区一:以为模型效果不好,实际上是上下文管理失控

很多“模型不稳定”问题,根因并不是模型,而是:

  • • 历史上下文太乱

  • • RAG 召回不准

  • • 工具结果结构不统一

  • • Prompt 规则相互冲突

误区二:把所有复杂度都交给 Prompt

Prompt 可以描述规则,但不应该承担:

  • • 权限控制

  • • 幂等保证

  • • 风险决策

  • • 事务一致性

这些问题必须由 Runtime 和业务系统解决。

误区三:过早引入复杂 Planner

很多团队还没有把单轮工具调用做好,就急着做多步规划、多 Agent 协同。
结果通常是架构复杂度大幅上升,业务价值却没有同步增长。

更现实的路径是:

    1. 单场景单 Agent 跑通
    1. 工具治理做稳
    1. RAG 和 Memory 做准
    1. 再考虑复杂规划

误区四:没有事实来源就让模型直接回答

如果业务要求可追溯、可审计,就应尽可能输出:

  • • 来源文档

  • • 工具结果依据

  • • 当前回答的不确定边界

不是所有问题都必须“像人一样流畅”,很多业务场景更需要“像系统一样可靠”。


十八、生产上线前检查清单

架构层

  • • 是否明确区分接入层、Runtime、Tool、RAG、Memory 职责

  • • 是否支持请求级 trace_id

  • • 是否定义了请求状态机

Prompt 层

  • • 是否有分层 Prompt 设计

  • • 是否有 token 预算控制

  • • 是否有场景化模板与版本管理

Tool 层

  • • 是否做参数校验

  • • 是否做权限校验

  • • 是否做幂等控制

  • • 是否有超时与重试边界

  • • 是否对写工具做审计

RAG 层

  • • 是否保留来源信息

  • • 是否有租户隔离

  • • 是否处理过期知识

  • • 是否验证召回质量而不是只看技术链路

Memory 层

  • • 是否区分短期记忆、摘要记忆、长期事实

  • • 是否避免未确认内容写入长期记忆

  • • 是否将摘要生成异步化

运行层

  • • 是否有超时、熔断、降级策略

  • • 是否采集 P95 / P99

  • • 是否支持异常链路回放

安全层

  • • 是否防范 Prompt 注入

  • • 是否有敏感信息脱敏

  • • 是否有多租户隔离

  • • 是否保留审计日志


十九、总结:生产级 Agent 的核心,不是更“像人”,而是更“像系统”

如果只想做一个演示,Agent 的重点可能是 Prompt、模型和交互体验。
但如果目标是进入真实业务,工程重心会迅速转向 Runtime。

本文真正想表达的核心只有三点:

1. Agent 的本质是一个运行时,而不是一个 Prompt 模板

它需要统一管理:

  • • 推理

  • • 状态

  • • 工具

  • • 知识

  • • 安全

  • • 观测

2. 生产落地的关键不是“模型能力上限”,而是“系统失控下限”

优秀的生产系统,不是永远最聪明,而是在模型失误、依赖抖动、知识不完整时,仍然:

  • • 不越权

  • • 不重复执行

  • • 不无限循环

  • • 不把错误悄悄放大

3. 最优路线通常不是一步到位,而是可控演进

建议顺序通常是:

    1. 单场景跑通业务闭环
    1. 做好 Tool 治理与 RAG 准确性
    1. 做好可观测性与成本控制
    1. 再扩展 Skill、Workflow、多 Agent

当你把这些基础打稳之后,Agent 才真正从“会聊天的模型接口”,升级成“可进入核心业务链路的智能运行时”。


二十、延伸阅读

  • • OpenAI Function Calling Guide

  • • OpenAI Responses API Guide

  • • Anthropic Tool Use

  • • RAG Survey: Retrieval-Augmented Generation for Large Language Models

  • • Reciprocal Rank Fusion

  • • OpenTelemetry Documentation


如果你正在做企业内部 Agent 平台,下一步最值得补齐的通常不是更多 Prompt 技巧,而是下面四项:

  • • 工具治理与幂等体系

  • • RAG 的租户隔离与知识新鲜度治理

  • • Runtime 的状态机与预算控制

  • • 统一可观测性与审计回放

把这四件事做扎实,系统才具备真正的生产属性。