生产级 AI Agent 从零搭建:从 Prompt、Memory、Tool、RAG 到 Runtime 的全链路工程实践
关键词:Agent Runtime、Function Calling、Memory、RAG、Skill、Workflow、可观测性、多租户、安全治理
阅读目标:不是做一个“能对话”的 Demo,而是搭建一个能接入业务、能承压、能审计、能演进的生产级 Agent 系统
一、前言:为什么很多 Agent Demo 一上生产就失效
过去一年,很多团队都做过 Agent Demo:
-
• 接一个大模型 API
-
• 拼一个 system prompt
-
• 注册几个 function calling 工具
-
• 再接一个向量库做知识库问答
本地跑起来并不难,难的是一旦进入真实业务,系统会立刻暴露出一系列工程问题:
| 业务诉求 | Demo 常见做法 | 生产环境真正的问题 |
|---|---|---|
| 多轮对话 | 直接把历史消息拼进 prompt | Token 爆炸、上下文污染、历史失真 |
| 工具调用 | 模型说调哪个工具就调哪个工具 | 权限越权、重复调用、副作用失控 |
| 知识库问答 | 只做向量检索 topK | 召回噪声高、事实冲突、租户隔离不足 |
| 高并发接入 | 单实例同步请求 | 大模型 RT 抖动、连接池打满、尾延迟恶化 |
| 生产审计 | 仅保留最终回答 | 无法回放推理链路,无法定位事故根因 |
| 成本控制 | 默认使用一个大模型 | 简单请求成本过高,缺乏路由与降级策略 |
真正的 Agent 系统,本质上不是一个 Prompt 技巧集合,而是一个新的应用运行时:
-
• 它要负责把用户意图翻译成可执行计划
-
• 它要协调模型、工具、知识、记忆与业务系统
-
• 它要在不确定的推理行为外面,套上可控的工程边界
换句话说,生产级 Agent 的核心不是“让模型更聪明”,而是“让系统在模型不稳定的前提下仍然可控”。
本文会从零拆解一套生产级 Agent Runtime 的设计思路,并给出一版可直接落地改造的代码骨架。
二、先讲结论:生产级 Agent 到底在“生产”什么
很多文章把 Agent 的重点放在 Prompt、Tool 或 RAG 本身,但工程落地时,真正决定系统上线质量的,通常是下面六件事:
2.1 可控性
模型是概率系统,业务系统需要确定性。
所以 Agent Runtime 必须回答这些问题:
-
• 哪些工具允许被调用
-
• 什么时候允许调用
-
• 一个请求最多允许调用几次
-
• 工具失败后是否允许重试
-
• 带副作用的操作如何保证幂等
2.2 可观测性
如果线上出现“回答胡说”“一直转圈调工具”“延迟突然升高”“成本超预算”,你必须能快速定位:
-
• 是 Prompt 设计问题
-
• 是 RAG 召回问题
-
• 是工具执行问题
-
• 是模型路由问题
-
• 还是基础设施问题
2.3 可扩展性
当场景从“一个客服 Agent”扩展到“客服、销售、运营、风控、运维多个 Agent”时,系统不能因为能力增加就整体重写。
因此架构必须天然支持:
-
• Skill 插件化
-
• 工具注册中心
-
• 模型路由
-
• 知识源扩展
-
• 多租户隔离
2.4 高并发与高可用
很多团队一开始忽略了一个事实:真正的瓶颈经常不在 CPU,而在外部依赖。
-
• 模型 API 有 RT 抖动
-
• 向量检索有尾延迟
-
• Redis 会出现热点 key
-
• 下游工具服务会限流或超时
Agent Runtime 需要具备完整的超时、隔离、熔断、降级与异步化能力。
2.5 安全与治理
Agent 能调用工具,就意味着它拥有“行为能力”。
一旦接上内部订单、库存、支付、工单、运维系统,安全问题就不再是可选项。
至少要做:
-
• 工具级鉴权
-
• 参数级校验
-
• 输出级脱敏
-
• Prompt 注入防护
-
• 租户数据隔离
-
• 审计日志留痕
2.6 演进能力
生产级架构不是第一次设计就完美,而是要支持从 MVP 演进到平台化:
-
• 单体服务
-
• 模块化单体
-
• 微服务化
-
• 事件驱动
-
• 多 Agent 协同
如果你的第一版系统从结构上就不允许演进,后面几乎必然推倒重来。
三、生产级 Agent 的整体架构:不要只看“模型”,要看“运行时”
3.1 架构全景
从工程角度看,一套 Agent 平台至少应拆成五层:
┌──────────────────────────────────────────────────────────────┐
│ 接入与治理层 │
│ API Gateway / Auth / Rate Limit / Tenant Isolation / Audit │
└──────────────────────────────┬───────────────────────────────┘
│
┌──────────────────────────────▼───────────────────────────────┐
│ Agent Runtime │
│ Session / Planner / Tool Executor / State Machine / Stream │
└───────┬──────────────────────┬──────────────────────┬────────┘
│ │ │
┌───────▼────────┐ ┌─────────▼────────┐ ┌────────▼─────────┐
│ Prompt & Policy │ │ Memory & Context │ │ Knowledge & RAG │
│ Role/Rules/Budget│ │ Window/Summary/Profile│ Recall/Rerank/Cite│
└───────┬────────┘ └─────────┬────────┘ └────────┬─────────┘
│ │ │
┌───────▼──────────────────────▼──────────────────────▼────────┐
│ Tool & Workflow Plane │
│ Registry / Permission / Idempotency / Compensation / Saga │
└───────┬──────────────────────────────────────────────────────┘
│
┌───────▼──────────────────────────────────────────────────────┐
│ Infrastructure │
│ Redis / PostgreSQL / Kafka / VectorDB / Object Store / OTel │
└──────────────────────────────────────────────────────────────┘3.2 为什么要把 Runtime 放在中心
很多团队最开始的实现方式,是在 Web 接口里直接做这些事:
-
- 拼 prompt
-
- 调模型
-
- 如果返回 tool call,就调工具
-
- 再把工具结果送回模型
-
- 最后返回答案
这套流程在单场景下能跑,但很快会遇到三个问题:
-
- 业务逻辑和推理逻辑耦合在一起,难维护。
-
- 无法统一治理工具权限、重试、熔断、审计。
-
- 当后续要接流式输出、异步任务、人工审批、长任务恢复时,代码会迅速失控。
因此 Runtime 必须成为独立的一层,负责:
-
• 驱动状态机
-
• 管理上下文预算
-
• 调度工具
-
• 记录事件
-
• 输出流式结果
-
• 管控生命周期
从这个角度讲,Agent Runtime 更像一个“面向 LLM 的应用操作系统”。
四、请求生命周期:一条用户消息在系统里到底经历了什么
一条用户请求从进入系统到返回结果,理想情况下应该经过如下阶段:
User Request
-> 鉴权与租户识别
-> Session 装载
-> Memory 加载
-> Query 分类与 Skill 路由
-> RAG 检索与上下文构建
-> Prompt 编译
-> LLM 推理
-> Tool Call 决策
-> 工具执行 / 工作流执行
-> 结果验证与后处理
-> 流式返回 / 最终回答
-> 事件落库与异步记忆写回4.1 这里最容易被忽略的三个关键点
第一,Session 状态不等于聊天记录
Agent 处理请求时,需要的不只是“用户说过什么”,还包括:
-
• 当前租户
-
• 当前场景
-
• 当前权限
-
• 正在进行中的任务
-
• 已调用过的工具
-
• 已产生的中间结果
-
• 当前剩余预算
所以 Session 应该是一份结构化状态,而不是简单消息数组。
第二,工具调用是状态迁移,不只是函数调用
例如用户说:“帮我把订单改到明天下午配送,并短信通知客户。”
这不是一个工具调用,而是一个小型工作流:
-
- 校验订单是否存在
-
- 校验当前状态是否允许改约
-
- 修改配送时间
-
- 发送通知
-
- 记录审计事件
-
- 任一步失败时做补偿或人工兜底
所以在生产环境里,Tool 更像“可控操作单元”,Workflow 才是业务编排单元。
第三,Memory 写回应尽量异步
如果每轮响应结束后,同步做摘要、画像提取、长期记忆更新,模型 RT 会明显恶化。
更合理的方式是:
-
• 主链路只写短期窗口
-
• 摘要生成通过异步任务触发
-
• 用户画像抽取按阈值批量执行
-
• 长期事实更新走事件队列
五、Prompt 不是一段字符串,而是一套“策略编译结果”
5.1 为什么生产环境不能只写一个 system prompt
单一 system prompt 最大的问题不是不够长,而是不够可管理。
在真实系统里,Prompt 至少同时承载五类信息:
-
• 角色定义
-
• 行为规则
-
• 业务约束
-
• 工具说明
-
• 当前上下文
把这些内容全部硬编码在一段长文本里,会导致:
-
• 版本难以管理
-
• 不同业务场景难以复用
-
• 修改规则容易引入隐性回归
-
• 无法做 A/B 测试与灰度发布
5.2 推荐做法:Prompt 分层编译
from dataclasses import dataclass
from enum import Enum
from typing import List
class PromptLayerType(str, Enum):
ROLE = "role"
POLICY = "policy"
TOOL = "tool"
CONTEXT = "context"
MEMORY = "memory"
TASK = "task"
@dataclass
class PromptLayer:
layer_type: PromptLayerType
content: str
priority: int
token_budget: int
required: bool = True
class PromptCompiler:
def __init__(self, token_counter):
self.token_counter = token_counter
def compile(self, layers: List[PromptLayer], total_budget: int) -> str:
ordered = sorted(layers, key=lambda x: x.priority)
compiled: list[str] = []
used_tokens = 0
for layer in ordered:
if not layer.content:
continue
current_tokens = self.token_counter.count(layer.content)
if current_tokens > layer.token_budget:
layer_content = self.token_counter.truncate(
layer.content, layer.token_budget
)
current_tokens = self.token_counter.count(layer_content)
else:
layer_content = layer.content
if used_tokens + current_tokens > total_budget:
if layer.required:
raise ValueError(f"Prompt budget exceeded at {layer.layer_type}")
continue
compiled.append(layer_content)
used_tokens += current_tokens
return "\n\n".join(compiled)5.3 一个实用的 Prompt 分层模型
Layer 1: Role
你是谁,负责什么,不负责什么
Layer 2: Policy
合规规则、禁止行为、答复约束、升级策略
Layer 3: Tool Contract
可以使用哪些工具,工具的输入输出约束是什么
Layer 4: Memory Context
用户近期上下文、用户画像、当前任务状态
Layer 5: Knowledge Context
RAG 检索结果、来源与置信度
Layer 6: Task Instruction
本轮用户请求与输出格式要求5.4 生产实践:把规则写成“可执行约束”
很多 Prompt 失败,不是因为模型不聪明,而是因为规则写得太抽象。
例如下面这类规则约束弱:
请谨慎调用工具,必要时才调用。更好的写法是:
仅当回答需要外部实时信息、用户授权后的业务操作、或必须引用内部知识时才允许调用工具。
如果用户请求只是解释概念、总结信息或润色文本,不要调用任何工具。
同一工具在一轮会话中最多调用 2 次;若两次结果仍不足以回答,应明确说明信息不足并请求人工协助。Prompt 最终服务的是 Runtime 的可控性,而不是文采。
六、Memory 设计:真正难的不是“记住”,而是“记什么、忘什么、何时更新”
6.1 为什么不能把全部历史直接塞进上下文
这种做法在 Demo 阶段最省事,但生产上会带来三类问题:
-
1. 成本问题:上下文越来越长,推理成本持续增加。
-
2. 质量问题:旧上下文噪声进入本轮推理,导致注意力偏移。
-
3. 安全问题:历史中混入不再适用的敏感信息或跨任务信息。
所以 Memory 的关键不是“无限记忆”,而是“分层管理”。
6.2 推荐的四层记忆模型
L1: Working Memory
- 当前任务内的短期状态
- 生命周期:本次会话 / 当前任务
- 存储:Redis / In-Memory
L2: Conversation Memory
- 近期对话窗口
- 生命周期:分钟到小时
- 存储:Redis List / Stream
L3: Summary Memory
- 历史摘要、关键决策、阶段性结论
- 生命周期:天级
- 存储:Redis + PostgreSQL
L4: Profile / Fact Memory
- 用户画像、长期偏好、稳定业务事实
- 生命周期:周到月
- 存储:PostgreSQL / KV / Graph6.3 生产可用的 Memory 数据结构
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import time
class MemoryType(str, Enum):
MESSAGE = "message"
SUMMARY = "summary"
FACT = "fact"
TASK_STATE = "task_state"
@dataclass
class MemoryRecord:
tenant_id: str
user_id: str
session_id: str
memory_type: MemoryType
content: str
metadata: dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=lambda: time.time())
expire_at: Optional[float] = None
class MemoryManager:
def __init__(self, redis_client, repository, summary_queue):
self.redis = redis_client
self.repository = repository
self.summary_queue = summary_queue
async def append_message(self, record: MemoryRecord, window_size: int = 20) -> None:
key = f"agent:mem:{record.tenant_id}:{record.session_id}:window"
payload = self._serialize(record)
pipe = self.redis.pipeline()
pipe.rpush(key, payload)
pipe.ltrim(key, -window_size, -1)
pipe.expire(key, 3600)
await pipe.execute()
async def load_context(
self,
tenant_id: str,
session_id: str,
max_messages: int = 10,
) -> list[dict[str, Any]]:
key = f"agent:mem:{tenant_id}:{session_id}:window"
values = await self.redis.lrange(key, -max_messages, -1)
return [self._deserialize(item) for item in values]
async def trigger_summary_if_needed(self, tenant_id: str, session_id: str) -> None:
key = f"agent:mem:{tenant_id}:{session_id}:window"
size = await self.redis.llen(key)
if size >= 18:
await self.summary_queue.publish(
{
"tenant_id": tenant_id,
"session_id": session_id,
"event": "memory.summary.generate",
}
)
def _serialize(self, record: MemoryRecord) -> str:
import json
return json.dumps(record.__dict__, ensure_ascii=False)
def _deserialize(self, payload: str) -> dict[str, Any]:
import json
return json.loads(payload)6.4 摘要记忆的关键,不是“压缩”,而是“保真”
很多系统做摘要时只关心把内容压短,但生产场景里更重要的是保留对后续决策真正有价值的信息:
-
• 用户身份与角色
-
• 当前任务目标
-
• 已完成动作
-
• 未完成动作
-
• 明确约束
-
• 时间承诺
-
• 风险或冲突点
一个更稳妥的摘要模板通常应包含:
1. 当前用户目标
2. 已确认事实
3. 已执行动作
4. 待执行动作
5. 禁止或限制条件
6. 需要人工接管的风险点6.5 分布式场景下的 Memory 难点
难点一:多副本并发写入
同一 session 可能被多个副本处理,例如:
-
• WebSocket 重连
-
• 请求重试
-
• 多通道输入
这时必须保证:
-
• 事件顺序可恢复
-
• 重复写入可识别
-
• 任务状态可覆盖但不混乱
推荐做法:
-
• 每条消息带
event_id -
• 工作流状态带
version -
• 对重要状态更新采用 CAS 或乐观锁
难点二:不同记忆层的刷新频率不同
短期窗口适合同步写入,长期画像不适合同步更新。
如果用户每发一句话都做画像提取,成本会非常差。
更合理的策略是:
-
• 关键字段变化才触发画像更新
-
• 达到轮次阈值才触发摘要
-
• 长期事实采用异步抽取
难点三:记忆污染
不是所有对话内容都应进入长期记忆。
例如:
-
• 用户一时的情绪表达
-
• 当前促销活动的临时信息
-
• 模型自己生成但未被确认的推断
这类内容如果写进长期记忆,会造成后续推理偏差。
所以长期记忆必须有“确认门槛”。
七、Tool 体系:Function Calling 只是入口,真正关键是执行治理
7.1 工具系统的本质
Tool 系统不是“把 Python 函数暴露给模型”,而是把业务能力包装成安全、可治理、可审计的执行单元。
生产级工具系统至少要包含:
-
• Schema 校验
-
• 权限校验
-
• 超时控制
-
• 重试策略
-
• 幂等控制
-
• 审计埋点
-
• 熔断降级
-
• 结果标准化
7.2 工具注册中心设计
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from pydantic import BaseModel, ValidationError
class ToolRequest(BaseModel):
tenant_id: str
user_id: str
session_id: str
trace_id: str
idempotency_key: str
arguments: dict[str, Any]
@dataclass
class ToolDefinition:
name: str
description: str
schema_model: type[BaseModel]
handler: Callable[..., Any]
timeout_seconds: float = 5.0
max_retries: int = 0
need_approval: bool = False
side_effect: bool = False
allowed_roles: list[str] = field(default_factory=list)
def to_openai_tool(self) -> dict[str, Any]:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.schema_model.model_json_schema(),
},
}
class ToolRegistry:
def __init__(self):
self._tools: dict[str, ToolDefinition] = {}
def register(self, tool: ToolDefinition) -> None:
if tool.name in self._tools:
raise ValueError(f"tool already exists: {tool.name}")
self._tools[tool.name] = tool
def get(self, name: str) -> Optional[ToolDefinition]:
return self._tools.get(name)
def list_openai_tools(self) -> list[dict[str, Any]]:
return [tool.to_openai_tool() for tool in self._tools.values()]
class ToolExecutor:
def __init__(self, permission_service, idempotency_store, audit_logger):
self.permission_service = permission_service
self.idempotency_store = idempotency_store
self.audit_logger = audit_logger
async def execute(self, tool: ToolDefinition, request: ToolRequest) -> dict[str, Any]:
start = time.time()
try:
validated_args = tool.schema_model(**request.arguments)
except ValidationError as exc:
return {"status": "invalid_argument", "error": exc.errors()}
allowed = await self.permission_service.check(
tenant_id=request.tenant_id,
user_id=request.user_id,
tool_name=tool.name,
roles=tool.allowed_roles,
)
if not allowed:
return {"status": "forbidden", "error": "permission denied"}
if tool.side_effect:
existed = await self.idempotency_store.try_acquire(
request.idempotency_key, ttl_seconds=600
)
if not existed:
return {"status": "duplicate", "error": "duplicated request"}
try:
result = await asyncio.wait_for(
tool.handler(validated_args, request),
timeout=tool.timeout_seconds,
)
return {"status": "ok", "result": result}
except asyncio.TimeoutError:
return {"status": "timeout", "error": f"{tool.name} timeout"}
except Exception as exc:
return {"status": "error", "error": str(exc)}
finally:
await self.audit_logger.log(
trace_id=request.trace_id,
tool_name=tool.name,
cost_ms=int((time.time() - start) * 1000),
side_effect=tool.side_effect,
)7.3 为什么幂等性是 Agent 工具系统的必选项
传统 API 调用重复一次可能只是多打一条日志。
Agent 重复调用一次,可能意味着:
-
• 订单被重复取消
-
• 优惠券被重复发放
-
• 工单被重复创建
-
• 消息被重复发送
大模型并不天然知道“这个工具不应该再调一次”。
所以必须在 Runtime 外层做幂等治理,而不是把希望寄托在 Prompt 上。
7.4 把工具分成两类管理
推荐按副作用分类:
第一类:读工具
例如:
-
• 查订单
-
• 查库存
-
• 查物流
-
• 查知识库
特点:
-
• 可重试
-
• 可并发
-
• 风险较低
第二类:写工具
例如:
-
• 修改地址
-
• 发优惠券
-
• 创建工单
-
• 退款申请
特点:
-
• 必须鉴权
-
• 必须幂等
-
• 必须审计
-
• 需要更严格的速率控制
生产经验上,Agent 系统第一次出事故,往往都发生在写工具上。
7.5 工具调用循环的安全边界
一个稳健的 Agent 循环,至少要限制:
-
• 单轮最大推理次数
-
• 单工具最大调用次数
-
• 总 token 预算
-
• 总耗时预算
-
• 是否允许并行调用
一个更安全的执行器示例如下:
class AgentLoopGuard:
def __init__(
self,
max_iterations: int = 8,
max_tool_calls: int = 6,
max_elapsed_ms: int = 15000,
):
self.max_iterations = max_iterations
self.max_tool_calls = max_tool_calls
self.max_elapsed_ms = max_elapsed_ms
class SafeAgentExecutor:
def __init__(self, llm_client, tool_registry: ToolRegistry, tool_executor: ToolExecutor):
self.llm_client = llm_client
self.tool_registry = tool_registry
self.tool_executor = tool_executor
async def run(self, state, messages, guard: AgentLoopGuard):
started_at = time.time()
tool_call_count = 0
for iteration in range(guard.max_iterations):
elapsed_ms = int((time.time() - started_at) * 1000)
if elapsed_ms > guard.max_elapsed_ms:
return {"status": "timeout", "answer": "请求处理超时,请稍后重试。"}
response = await self.llm_client.chat(messages, state.available_tools)
assistant_message = response["message"]
if not assistant_message.get("tool_calls"):
return {"status": "ok", "answer": assistant_message.get("content", "")}
for tool_call in assistant_message["tool_calls"]:
tool_call_count += 1
if tool_call_count > guard.max_tool_calls:
return {"status": "overflow", "answer": "工具调用次数超限,已停止执行。"}
tool = self.tool_registry.get(tool_call["name"])
if not tool:
messages.append(
{
"role": "tool",
"content": json.dumps({"status": "unknown_tool"}, ensure_ascii=False),
}
)
continue
result = await self.tool_executor.execute(
tool=tool,
request=state.build_tool_request(tool_call["arguments"]),
)
messages.append(
{
"role": "tool",
"name": tool.name,
"content": json.dumps(result, ensure_ascii=False),
}
)
return {"status": "exhausted", "answer": "达到最大推理次数,建议转人工处理。"}八、RAG 设计:不是“向量检索”四个字就够了
8.1 生产级 RAG 的核心目标
RAG 的目标不是“把知识喂给模型”,而是尽可能让模型基于可信上下文回答,并明确回答边界。
真正有工程价值的 RAG,需要同时解决四件事:
-
- 召回正确的内容
-
- 排除错误或过期内容
-
- 控制上下文长度
-
- 让回答可引用、可追踪
8.2 一条完整的 RAG 查询链路
用户问题
-> Query Rewrite
-> 召回策略选择
-> Dense Recall
-> Sparse Recall
-> RRF 融合
-> Rerank
-> 权限过滤
-> 上下文压缩
-> 回答生成
-> 引用来源输出8.3 为什么只做向量召回通常不够
纯向量检索在这些场景中容易失效:
-
• 产品型号、订单号、错误码等精确词匹配
-
• 法规条款、版本变更、配置项对比
-
• 中文短问句歧义大
-
• 企业文档格式混乱
因此生产上更推荐混合检索:
-
• Dense Recall 负责语义相关性
-
• Sparse Recall 负责关键词精确命中
-
• Rerank 负责最终排序
8.4 生产可用的 RAG Pipeline 骨架
from dataclasses import dataclass
from typing import Any
@dataclass
class RetrievedDocument:
doc_id: str
content: str
score: float
source: str
tenant_id: str
metadata: dict[str, Any]
class RAGPipeline:
def __init__(self, dense_store, sparse_store, reranker, permission_filter):
self.dense_store = dense_store
self.sparse_store = sparse_store
self.reranker = reranker
self.permission_filter = permission_filter
async def query(
self,
tenant_id: str,
user_id: str,
query: str,
top_k: int = 8,
) -> tuple[str, list[RetrievedDocument]]:
rewritten = await self._rewrite_query(query)
dense = await self.dense_store.search(tenant_id=tenant_id, query=rewritten, top_k=20)
sparse = await self.sparse_store.search(tenant_id=tenant_id, query=rewritten, top_k=20)
fused = self._rrf_merge(dense, sparse)
permitted = await self.permission_filter.filter(
tenant_id=tenant_id,
user_id=user_id,
documents=fused,
)
ranked = await self.reranker.rank(query=rewritten, documents=permitted[:12])
selected = ranked[:top_k]
context = self._build_context(selected)
return context, selected
async def _rewrite_query(self, query: str) -> str:
return query
def _rrf_merge(self, dense: list[RetrievedDocument], sparse: list[RetrievedDocument]):
scores: dict[str, float] = {}
doc_map: dict[str, RetrievedDocument] = {}
for rank, doc in enumerate(dense):
scores[doc.doc_id] = scores.get(doc.doc_id, 0.0) + 1 / (60 + rank + 1)
doc_map[doc.doc_id] = doc
for rank, doc in enumerate(sparse):
scores[doc.doc_id] = scores.get(doc.doc_id, 0.0) + 1 / (60 + rank + 1)
doc_map[doc.doc_id] = doc
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [doc_map[doc_id] for doc_id in sorted_ids]
def _build_context(self, documents: list[RetrievedDocument]) -> str:
blocks = []
for index, doc in enumerate(documents, start=1):
blocks.append(
f"[来源{index}] source={doc.source} score={doc.score:.4f}\n{doc.content}"
)
return "\n\n---\n\n".join(blocks)8.5 文档入库比查询更重要
很多团队把时间都花在检索阶段调参,但实际效果经常由“文档预处理质量”决定。
入库阶段至少要做好:
-
• 去重
-
• 标题层级保留
-
• 表格转结构化文本
-
• 代码块单独分块
-
• 版本信息保留
-
• 来源标识保留
-
• 更新时间保留
-
• 文档权限标签保留
8.6 多租户 RAG 最容易踩的坑
坑一:只在应用层做租户过滤
如果向量库检索后才过滤租户,可能导致:
-
• 无关租户文档进入候选池
-
• 召回结果被污染
-
• 排序被错误影响
正确做法是尽量在检索阶段就带租户过滤条件。
坑二:把未确认答案写回知识库
有些系统会把模型回答自动反哺知识库。
如果没有审核链路,很容易把幻觉内容变成“正式知识”。
建议:
-
• 用户问答日志和正式知识库分开
-
• 只允许人工审核后的内容进入正式库
坑三:没有过期治理
制度、价格、活动、流程经常变化。
过期知识如果不做时间权重或版本治理,会让模型越来越不可信。
九、Skill 与 Workflow:不要把所有问题都交给“自由规划”
9.1 Skill 的作用是什么
Skill 适合承载一个相对稳定的业务意图域,例如:
-
• 订单查询
-
• 售后处理
-
• 发票开具
-
• 商品推荐
-
• 运维巡检
每个 Skill 可以有自己专属的:
-
• Prompt 模板
-
• 工具集
-
• RAG 数据域
-
• 风险策略
-
• 输出格式
这比让一个通用 Agent 自由决定一切更稳。
9.2 Workflow 和 Skill 的边界
一个经验法则:
-
• 如果任务主要是“理解并回答”,优先 Skill
-
• 如果任务包含多个可验证步骤,优先 Workflow
例如“解释某个制度”适合 Skill。
“查询订单并发起退款再通知用户”更适合 Workflow。
9.3 Skill Router 的基本设计
from abc import ABC, abstractmethod
class BaseSkill(ABC):
@property
@abstractmethod
def name(self) -> str:
...
@abstractmethod
async def match(self, request: dict) -> float:
...
@abstractmethod
async def execute(self, request: dict, runtime_context: dict) -> dict:
...
class SkillRouter:
def __init__(self, skills: list[BaseSkill], intent_classifier):
self.skills = skills
self.intent_classifier = intent_classifier
async def route(self, request: dict) -> BaseSkill:
scored = []
for skill in self.skills:
score = await skill.match(request)
if score >= 0.85:
return skill
if score >= 0.30:
scored.append((score, skill))
if not scored:
raise RuntimeError("no skill matched")
top_candidates = [skill for _, skill in sorted(scored, key=lambda x: x[0], reverse=True)[:3]]
selected_name = await self.intent_classifier.select(
text=request["message"],
candidates=[skill.name for skill in top_candidates],
)
for skill in top_candidates:
if skill.name == selected_name:
return skill
return top_candidates[0]9.4 什么时候不要让模型自由规划
以下场景不建议完全交给 autonomous planning:
-
• 涉及扣款、退款、发券等资金操作
-
• 涉及主数据修改
-
• 涉及外部系统联动
-
• 涉及高成本工具调用
-
• 涉及审批流
这些场景更适合:
-
• 固定流程模板
-
• 人工确认节点
-
• Saga 补偿设计
-
• 严格的参数白名单
生产经验里,越关键的业务,越不应该把控制权完全交给自由推理。
十、Runtime 调度:Agent 真正的难点,是“把不确定推理放进确定执行框架”
10.1 Agent Runtime 至少要管五类预算
生产环境里,预算不是只有 token。
一个请求至少同时受以下预算约束:
-
• Token Budget
-
• Latency Budget
-
• Tool Budget
-
• Cost Budget
-
• Retry Budget
例如一个用户请求整体 SLA 是 8 秒,那么可以拆成:
| 阶段 | 预算 |
|---|---|
| 鉴权与会话装载 | 100ms |
| RAG 检索 | 500ms |
| 首轮模型推理 | 2500ms |
| 工具执行 | 2000ms |
| 二轮推理 | 2000ms |
| 后处理与返回 | 300ms |
如果没有预算拆分,就没有稳定 SLA。
10.2 高并发下最实用的三个优化点
第一,读工具并发,写工具串行
例如订单查询、库存查询、物流查询可以并发。
但涉及状态变更的写操作,应串行或通过工作流协调。
第二,主链路只保留必要步骤
不影响当前回答质量的动作,尽量异步化:
-
• 记忆摘要
-
• 画像抽取
-
• 审计扩展字段补全
-
• 运营埋点
-
• 训练样本沉淀
第三,SSE 早返回,降低用户感知时延
即使总耗时没有降低,用户只要更早看到“系统开始处理”,体验就会明显更好。
10.3 Runtime 状态机设计
from enum import Enum
class RuntimeState(str, Enum):
RECEIVED = "received"
SESSION_READY = "session_ready"
CONTEXT_READY = "context_ready"
MODEL_THINKING = "model_thinking"
TOOL_RUNNING = "tool_running"
POST_PROCESSING = "post_processing"
COMPLETED = "completed"
FAILED = "failed"
HUMAN_REVIEW = "human_review"状态机的价值在于:
-
• 便于可观测性埋点
-
• 便于中断恢复
-
• 便于失败分类
-
• 便于引入人工审批节点
10.4 长任务如何处理
有些任务天然不适合同步接口,例如:
-
• 生成长报告
-
• 批量工单处理
-
• 大量知识清洗
-
• 多步骤外部系统协作
这类任务建议:
-
• 请求入口只负责创建任务
-
• Runtime 将任务投递到消息队列
-
• 前端通过 WebSocket / 轮询获取进度
-
• 中间步骤按事件流落盘
这时 Agent Runtime 更接近“编排引擎”,而不只是问答引擎。
十一、生产级代码骨架:一套更接近真实项目的实现方式
下面给出一版简化但工程化思路正确的代码结构。
11.1 配置中心
# app/config.py
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
app_name: str = "agent-platform"
env: str = "dev"
host: str = "0.0.0.0"
port: int = 8080
openai_api_key: str = ""
openai_base_url: str = "https://api.openai.com/v1"
llm_primary_model: str = "gpt-4.1"
llm_fast_model: str = "gpt-4.1-mini"
redis_url: str = "redis://localhost:6379/0"
postgres_dsn: str = ""
vector_backend: str = "pgvector"
agent_max_iterations: int = 8
agent_max_tool_calls: int = 6
agent_request_timeout_ms: int = 15000
agent_context_budget: int = 24000
rag_top_k: int = 6
rag_candidate_k: int = 20
enable_trace_log: bool = True
enable_sensitive_masking: bool = True
settings = Settings()11.2 统一请求上下文
# app/runtime/context.py
from dataclasses import dataclass, field
from typing import Any
import uuid
@dataclass
class RequestContext:
tenant_id: str
user_id: str
session_id: str
message: str
trace_id: str = field(default_factory=lambda: uuid.uuid4().hex)
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class RuntimeContext:
request: RequestContext
available_tools: list[dict[str, Any]]
prompt_budget: int
state: dict[str, Any] = field(default_factory=dict)
citations: list[dict[str, Any]] = field(default_factory=list)11.3 模型路由器
# app/llm/router.py
class LLMRouter:
def __init__(self, primary_client, fast_client):
self.primary_client = primary_client
self.fast_client = fast_client
async def select_model(self, scene: str, complexity: str) -> str:
if scene in {"routing", "query_rewrite", "summary"}:
return "fast"
if complexity == "low":
return "fast"
return "primary"
async def chat(self, scene: str, complexity: str, **kwargs):
model_key = await self.select_model(scene=scene, complexity=complexity)
client = self.fast_client if model_key == "fast" else self.primary_client
return await client.chat.completions.create(**kwargs)11.4 Agent Runtime 入口
# app/runtime/agent_runtime.py
import json
import time
class AgentRuntime:
def __init__(
self,
prompt_service,
memory_service,
rag_service,
skill_router,
tool_registry,
tool_executor,
llm_router,
event_store,
):
self.prompt_service = prompt_service
self.memory_service = memory_service
self.rag_service = rag_service
self.skill_router = skill_router
self.tool_registry = tool_registry
self.tool_executor = tool_executor
self.llm_router = llm_router
self.event_store = event_store
async def handle(self, request_context):
started_at = time.time()
recent_history = await self.memory_service.load_recent(
tenant_id=request_context.tenant_id,
session_id=request_context.session_id,
)
profile = await self.memory_service.load_profile(
tenant_id=request_context.tenant_id,
user_id=request_context.user_id,
)
skill = await self.skill_router.route(
{
"tenant_id": request_context.tenant_id,
"user_id": request_context.user_id,
"message": request_context.message,
}
)
rag_context, sources = await self.rag_service.query(
tenant_id=request_context.tenant_id,
user_id=request_context.user_id,
query=request_context.message,
)
prompt = await self.prompt_service.build(
skill=skill.name,
message=request_context.message,
history=recent_history,
profile=profile,
rag_context=rag_context,
)
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": request_context.message},
]
tool_schemas = self.tool_registry.list_openai_tools()
llm_response = await self.llm_router.chat(
scene="agent_main",
complexity="medium",
model="gpt-4.1",
messages=messages,
tools=tool_schemas,
tool_choice="auto",
temperature=0.2,
)
final_answer = await self._continue_if_needed(
request_context=request_context,
messages=messages,
response=llm_response,
)
await self.memory_service.write_back(
tenant_id=request_context.tenant_id,
user_id=request_context.user_id,
session_id=request_context.session_id,
user_message=request_context.message,
assistant_message=final_answer,
)
await self.event_store.append(
trace_id=request_context.trace_id,
event_type="agent.completed",
payload={
"skill": skill.name,
"elapsed_ms": int((time.time() - started_at) * 1000),
"sources": sources,
},
)
return {"answer": final_answer, "sources": sources}
async def _continue_if_needed(self, request_context, messages, response):
choice = response.choices[0].message
if not choice.tool_calls:
return choice.content or ""
messages.append(
{
"role": "assistant",
"content": choice.content or "",
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in choice.tool_calls
],
}
)
for tc in choice.tool_calls:
tool = self.tool_registry.get(tc.function.name)
if not tool:
tool_result = {"status": "unknown_tool"}
else:
tool_result = await self.tool_executor.execute(
tool=tool,
request=request_context.to_tool_request(
arguments=json.loads(tc.function.arguments or "{}")
),
)
messages.append(
{
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(tool_result, ensure_ascii=False),
}
)
second_round = await self.llm_router.chat(
scene="agent_main",
complexity="medium",
model="gpt-4.1",
messages=messages,
temperature=0.2,
)
return second_round.choices[0].message.content or ""11.5 一个带副作用工具的生产写法
# app/tools/reschedule_delivery.py
from pydantic import BaseModel, Field
class RescheduleDeliveryArgs(BaseModel):
order_id: str = Field(min_length=8, max_length=64)
expected_date: str
time_slot: str
reason: str = Field(default="user_request", max_length=128)
async def reschedule_delivery_handler(args: RescheduleDeliveryArgs, request):
order = await order_gateway.get_order(
tenant_id=request.tenant_id,
order_id=args.order_id,
)
if not order:
return {"status": "not_found", "message": "订单不存在"}
if order["status"] not in {"PAID", "READY_TO_SHIP", "DELIVERING"}:
return {"status": "rejected", "message": "当前订单状态不允许改约"}
if not await risk_service.allow_reschedule(
tenant_id=request.tenant_id,
user_id=request.user_id,
order_id=args.order_id,
):
return {"status": "review_required", "message": "需要人工审核"}
updated = await delivery_gateway.reschedule(
tenant_id=request.tenant_id,
order_id=args.order_id,
expected_date=args.expected_date,
time_slot=args.time_slot,
operator=request.user_id,
idempotency_key=request.idempotency_key,
)
await audit_service.record(
trace_id=request.trace_id,
action="delivery.reschedule",
order_id=args.order_id,
payload=args.model_dump(),
)
return {"status": "ok", "result": updated}11.6 FastAPI 接入层
# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import uuid
app = FastAPI(title="Production Agent Platform")
class ChatRequest(BaseModel):
tenant_id: str
user_id: str
session_id: str | None = None
message: str
class ChatResponse(BaseModel):
trace_id: str
session_id: str
answer: str
sources: list[dict]
@app.post("/api/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
session_id = request.session_id or uuid.uuid4().hex
request_context = RequestContext(
tenant_id=request.tenant_id,
user_id=request.user_id,
session_id=session_id,
message=request.message,
)
result = await agent_runtime.handle(request_context)
return ChatResponse(
trace_id=request_context.trace_id,
session_id=session_id,
answer=result["answer"],
sources=result["sources"],
)这套骨架的重点不在代码长短,而在职责分离:
-
• 接入层只做协议处理
-
• Runtime 负责 orchestration
-
• Prompt / Memory / RAG / Tool 分别独立治理
-
• 事件与审计单独沉淀
十二、一个实际业务场景:电商客服 Agent 如何落地
为了避免文章停留在抽象层,我们用一个真实且常见的场景串起来。
12.1 场景目标
用户发来一句话:
订单昨天还没发货,帮我查一下原因,如果今天也发不了就改到明天下午配送。这个请求至少涉及三种能力:
-
- 订单状态查询
-
- 发货原因解释
-
- 条件满足时执行配送改约
12.2 推荐的处理方式
第一步,不要让模型直接决定“能不能改约”。
这属于强业务规则,应由工具或工作流校验。
第二步,把任务拆成两个阶段:
-
• 阶段 A:查询与解释
-
• 阶段 B:条件满足时执行改约
第三步,对阶段 B 增加写操作保护:
-
• 用户身份校验
-
• 订单归属校验
-
• 状态机校验
-
• 风险规则校验
-
• 幂等校验
12.3 典型链路
用户请求
-> Skill Router 命中 order_service
-> 调 query_order 工具
-> 调 query_shipment_reason 工具
-> 模型判断是否需要进一步动作
-> 若满足规则,则调 reschedule_delivery 工具
-> 输出结果并附带操作结果说明
-> 异步写审计与会话摘要12.4 这里最值得强调的一点
改约动作不是模型“想做就做”,而是 Runtime 允许后才能做。
这就是生产级 Agent 与 Demo Agent 的本质差别:
-
• Demo 中模型主导一切
-
• 生产中 Runtime 决定边界,模型只在边界内推理
十三、工程化增强:高并发、可扩展、可恢复
13.1 高并发下的瓶颈通常在哪里
很多人直觉上会先担心应用服务 CPU。
但 Agent 系统高并发场景下更常见的瓶颈其实是:
-
• 模型 API RT 抖动
-
• 向量检索尾延迟
-
• Redis 热点 key
-
• 外部工具服务慢调用
-
• SSE 长连接占用
所以优化重点通常应放在依赖治理,而不是单纯扩机器。
13.2 三类隔离策略
线程池 / 协程池隔离
避免一个慢工具拖垮整个事件循环。
下游连接池隔离
不同工具服务、不同向量库、不同模型 provider 使用独立连接池。
资源配额隔离
大客户、高优先级请求、后台批任务分开治理,避免互相抢占。
13.3 Cache 应该加在哪
Agent 系统的缓存不是只加一层 Redis 就行。
通常可做四层缓存:
-
- Prompt 片段缓存
-
- Embedding 缓存
-
- RAG 召回缓存
-
- 语义回答缓存
但要注意:
-
• 带实时性要求的问题不适合回答缓存
-
• 带租户权限的知识上下文必须带权限维度缓存 key
-
• 写工具绝不能因为缓存影响幂等判断
13.4 降级策略要提前定义
如果主模型超时,你的系统是:
-
• 直接失败
-
• 切备用模型
-
• 改成只做知识检索回答
-
• 改成建议转人工
这些策略应该在上线前定义,而不是线上故障时临时决定。
一个常见降级矩阵如下:
| 故障点 | 降级策略 |
|---|---|
| 主模型超时 | 切轻量模型或返回简化答复 |
| RAG 超时 | 仅用会话上下文回答,并明确说明未引用知识库 |
| 读工具超时 | 提示当前系统繁忙,建议稍后重试 |
| 写工具失败 | 不自动重试副作用操作,转人工或进入审批 |
| 摘要服务异常 | 不影响主链路,延后重试 |
13.5 长链路恢复能力
只要系统中有多步骤工作流,就必须考虑恢复:
-
• 服务重启后如何知道执行到哪一步
-
• 工具已执行成功但回包丢失怎么办
-
• 用户重复提交怎么办
推荐做法:
-
• 每一步写事件日志
-
• 每个副作用工具都有幂等键
-
• 工作流状态持久化
-
• 支持基于状态恢复或补偿
十四、可观测性:没有可观测性,Agent 系统几乎无法运维
14.1 至少要采集哪些指标
请求级指标
-
• QPS
-
• 成功率
-
• P50 / P95 / P99
-
• 每请求平均 token
-
• 每请求平均成本
模型级指标
-
• 模型调用次数
-
• 首 token 延迟
-
• 总完成时延
-
• 工具触发率
-
• 无工具直接回答率
RAG 指标
-
• 检索时延
-
• topK 命中率
-
• rerank 时延
-
• 来源覆盖率
-
• 无结果比例
工具级指标
-
• 调用次数
-
• 成功率
-
• 超时率
-
• 幂等冲突率
-
• 审批触发率
14.2 事件日志比普通日志更重要
Agent 排障时,仅靠字符串日志通常不够。
更好的做法是定义统一事件模型,例如:
agent.request.received
agent.session.loaded
agent.skill.selected
agent.rag.recalled
agent.prompt.compiled
agent.llm.completed
agent.tool.called
agent.tool.completed
agent.response.returned
agent.memory.persisted这样你就可以完整回放一次请求的生命周期。
14.3 OpenTelemetry 是值得投入的
如果系统已经进入平台化阶段,建议把:
-
• HTTP 请求
-
• 模型调用
-
• 工具调用
-
• 向量检索
-
• Redis / DB 操作
全部纳入统一 Trace。
Agent 是典型的长链路系统,没有 Trace 基本看不清瓶颈。
十五、安全与治理:这是很多 Agent 项目最晚补、却最先出问题的部分
15.1 Prompt 注入不是理论问题
只要系统接收外部文本、网页、文档、工单内容,就可能被注入。
典型风险包括:
-
• 文档中藏有“忽略此前所有规则”
-
• 用户输入诱导模型泄露内部规则
-
• 外部网页内容污染 RAG 上下文
推荐防线:
-
• Prompt 中明确区分“系统规则”和“外部内容”
-
• 检索内容以引用块形式注入,而不是与系统规则混排
-
• 工具权限不由模型决定,必须二次校验
15.2 多租户隔离必须前置设计
常见错误做法是先做单租户,后面再补租户字段。
一旦系统接入知识库、记忆、工具审计,再补租户隔离代价会非常大。
至少要保证:
-
• Memory key 带 tenant_id
-
• RAG 检索带 tenant filter
-
• Tool 鉴权带 tenant context
-
• 审计日志带 tenant_id
15.3 输出安全
Agent 输出前建议增加后处理层,负责:
-
• 敏感信息脱敏
-
• 高风险回答拦截
-
• 不确定结论加免责声明
-
• 引用来源格式化
尤其在金融、医疗、政务场景,后处理几乎是必选项。
十六、从单体到平台:建议的演进路线
Phase 1:MVP 验证
适用阶段:
-
• 单一场景
-
• 单租户或小范围试点
-
• 重点验证业务价值
推荐形态:
-
• FastAPI / Spring Boot 单体
-
• Redis 做会话记忆
-
• PostgreSQL 或 pgvector 做知识库
-
• 读工具优先,少量写工具
Phase 2:模块化单体
适用阶段:
-
• 场景开始增多
-
• 工具数达到 20 以上
-
• 团队开始多人协作
推荐重点:
-
• 抽离 Runtime、RAG、Memory、Tool 模块
-
• 引入统一事件模型
-
• 增加审计与配置中心
Phase 3:平台化服务
适用阶段:
-
• 多团队复用
-
• 多租户
-
• 有成本与 SLA 压力
推荐重点:
-
• 模型路由中心
-
• 独立 RAG 服务
-
• 工具注册与审批平台
-
• 统一 Trace 与监控
Phase 4:事件驱动与多 Agent 协同
适用阶段:
-
• 长任务
-
• 多系统联动
-
• 多 Agent 分工协作
推荐重点:
-
• 任务总线
-
• 工作流引擎
-
• 人工审批节点
-
• 事件溯源与恢复机制
不要一开始就奔着多 Agent 去做。
绝大多数团队的第一优先级仍然是把单 Agent 做稳、做透、做可控。
十七、常见误区与生产排坑
误区一:以为模型效果不好,实际上是上下文管理失控
很多“模型不稳定”问题,根因并不是模型,而是:
-
• 历史上下文太乱
-
• RAG 召回不准
-
• 工具结果结构不统一
-
• Prompt 规则相互冲突
误区二:把所有复杂度都交给 Prompt
Prompt 可以描述规则,但不应该承担:
-
• 权限控制
-
• 幂等保证
-
• 风险决策
-
• 事务一致性
这些问题必须由 Runtime 和业务系统解决。
误区三:过早引入复杂 Planner
很多团队还没有把单轮工具调用做好,就急着做多步规划、多 Agent 协同。
结果通常是架构复杂度大幅上升,业务价值却没有同步增长。
更现实的路径是:
-
- 单场景单 Agent 跑通
-
- 工具治理做稳
-
- RAG 和 Memory 做准
-
- 再考虑复杂规划
误区四:没有事实来源就让模型直接回答
如果业务要求可追溯、可审计,就应尽可能输出:
-
• 来源文档
-
• 工具结果依据
-
• 当前回答的不确定边界
不是所有问题都必须“像人一样流畅”,很多业务场景更需要“像系统一样可靠”。
十八、生产上线前检查清单
架构层
-
• 是否明确区分接入层、Runtime、Tool、RAG、Memory 职责
-
• 是否支持请求级 trace_id
-
• 是否定义了请求状态机
Prompt 层
-
• 是否有分层 Prompt 设计
-
• 是否有 token 预算控制
-
• 是否有场景化模板与版本管理
Tool 层
-
• 是否做参数校验
-
• 是否做权限校验
-
• 是否做幂等控制
-
• 是否有超时与重试边界
-
• 是否对写工具做审计
RAG 层
-
• 是否保留来源信息
-
• 是否有租户隔离
-
• 是否处理过期知识
-
• 是否验证召回质量而不是只看技术链路
Memory 层
-
• 是否区分短期记忆、摘要记忆、长期事实
-
• 是否避免未确认内容写入长期记忆
-
• 是否将摘要生成异步化
运行层
-
• 是否有超时、熔断、降级策略
-
• 是否采集 P95 / P99
-
• 是否支持异常链路回放
安全层
-
• 是否防范 Prompt 注入
-
• 是否有敏感信息脱敏
-
• 是否有多租户隔离
-
• 是否保留审计日志
十九、总结:生产级 Agent 的核心,不是更“像人”,而是更“像系统”
如果只想做一个演示,Agent 的重点可能是 Prompt、模型和交互体验。
但如果目标是进入真实业务,工程重心会迅速转向 Runtime。
本文真正想表达的核心只有三点:
1. Agent 的本质是一个运行时,而不是一个 Prompt 模板
它需要统一管理:
-
• 推理
-
• 状态
-
• 工具
-
• 知识
-
• 安全
-
• 观测
2. 生产落地的关键不是“模型能力上限”,而是“系统失控下限”
优秀的生产系统,不是永远最聪明,而是在模型失误、依赖抖动、知识不完整时,仍然:
-
• 不越权
-
• 不重复执行
-
• 不无限循环
-
• 不把错误悄悄放大
3. 最优路线通常不是一步到位,而是可控演进
建议顺序通常是:
-
- 单场景跑通业务闭环
-
- 做好 Tool 治理与 RAG 准确性
-
- 做好可观测性与成本控制
-
- 再扩展 Skill、Workflow、多 Agent
当你把这些基础打稳之后,Agent 才真正从“会聊天的模型接口”,升级成“可进入核心业务链路的智能运行时”。
二十、延伸阅读
-
• OpenAI Function Calling Guide
-
• OpenAI Responses API Guide
-
• Anthropic Tool Use
-
• RAG Survey: Retrieval-Augmented Generation for Large Language Models
-
• Reciprocal Rank Fusion
-
• OpenTelemetry Documentation
如果你正在做企业内部 Agent 平台,下一步最值得补齐的通常不是更多 Prompt 技巧,而是下面四项:
-
• 工具治理与幂等体系
-
• RAG 的租户隔离与知识新鲜度治理
-
• Runtime 的状态机与预算控制
-
• 统一可观测性与审计回放
把这四件事做扎实,系统才具备真正的生产属性。
