背景痛点:单体客服的“三堵墙”
过去两年,我先后接手过三套“单体 LLM 客服”项目,它们上线初期都很惊艳,一旦并发量破 500 QPS,就会集体撞墙:
- 所有请求塞进同一个 Prompt,上下文窗口线性膨胀,首字响应时间从 1.2 s 飙到 4.8 s
- 多轮对话靠 Thread-Local 字典维护状态,跨服务重启就丢上下文,用户刷新页面后“失忆”
- 新增业务域(例如退换货政策)要改全局 Prompt,回归测试全量跑,两周才能发版
一句话:单体架构把“智能”做成了“瓶颈”。
技术选型:LangGraph 为什么比 LangChain 更适合“多 Agent 协作”
LangChain 的 Agent Executor 是“单线程链式调用”,状态用AgentExecutor.intermediate_steps暂存,粒度粗、不可回滚;而 LangGraph 把每一次对话都抽象成有向无环图(DAG)——节点即 Agent,边即状态转移,天然满足“分而治之”的并发需求。
核心差异我画了一张表:
| 维度 | LangChain Agent Executor | LangGraph | |---|---|---|---| | 状态管理 | 单例变量,链尾覆盖 | 节点快照,可回溯 | | 并行调用 | 不支持 | 原生asyncio.gather| | 循环检测 | 无 | 内置CycleDetector| | 可视化 | 靠打印 | 一键graph.visualize()出 PNG |
结论:要做“多 Agent 协同”,LangGraph 是引擎,LangChain 只是零件库。
核心实现:用 DAG 建模客服工作流
1. 整体拓扑
[用户提问] ↓ Routing Agent ——意图置信度>0.85——→ Domain Agent ↓ 置信度<0.85 Fallback Agent ——兜底回复三条边全部带状态快照,任何节点失败都能回到上一快照重试。
2. 节点定义(Python 3.10)
# nodes.py from typing import TypedDict, Optional from langgraph.graph import StateGraph, END from langchain_core.messages import AIMessage class ChatState(TypedDict): user_query: str intent: Optional[str] confidence: float answer: Optional[str] retry: int = 0 async def routing_node(state: ChatState) -> ChatState: """Routing Agent:零样本意图分类""" from langchain.chains import create_tagging_chain schema = {"intent": "string", "confidence": "float"} chain = create_tagging_chain(schema, llm=gpt4_llm) result = await chain.ainvoke(state["user_query"]) state.update(result) return state async def domain_node(state: ChatState) -> ChatState: """Domain Agent:精准回答""" prompt = DOMAIN_PROMPTS[state["intent"]] ans = await prompt.ainvoke(state["user_query"]) state["answer"] = ans.content return state async def fallback_node(state: ChatState) -> ChatState: """Fallback Agent:安全兜底""" state["answer"] = "抱歉,我还在学习中,请联系人工客服。" return state3. 图组装与异常处理
# graph.py from langgraph.graph import StateGraph, START from nodes import routing_node, domain_node, fallback_node, ChatState workflow = StateGraph(ChatState) workflow.add_node("route", routing_node) workflow.add_node("domain", domain_node) workflow.add_node("fallback", fallback_node) workflow.add_edge(START, "route") def decide_edge(state: ChatState): return "domain" if state["confidence"] > 0.85 else "fallback" workflow.add_conditional_edge("route", decide_edge) workflow.add_edge("domain", END) workflow.add_edge("fallback", END) graph = workflow.compile()4. 单元测试片段
# test_graph.py import asyncio, pytest from graph import graph @pytest.mark.asyncio async def test_domain_path(): state = {"user_query": "如何办理退货?", "retry": 0} result = await graph.ainvoke(state) assert "answer" in result assert result["intent"] == "return"跑完pytest -q绿灯即 DAG 逻辑自洽。
生产考量:让图跑得快、跑得稳
1. 超时重试策略
每个节点加asyncio.timeout(3),重试次数写进状态:
async def safe_node(func, state: ChatState): for i in range(1, 4): try: return await asyncio.wait_for(func(state), timeout=3) except asyncio.TimeoutError: state["retry"] = i raise RuntimeError("Max retries exceeded")2. 对话上下文内存优化
- 只保留最近 3 轮用户消息,历史摘要用
llm.summarize()压缩成 64 token - 图快照存 Redis,key 带 TTL=15 min,内存峰值下降 42%
3. 敏感词过滤拦截层
在routing_node之前插入同步过滤器:
def sensitive_filter(state: ChatState) -> ChatState: if any(w in state["user_query"] for w in SENSITIVE_WORDS): state["answer"] = "您的提问包含敏感内容,已转人工客服。" state["intent"] = "blocked" return state该节点无 LLM 调用,P99 延迟 < 5 ms。
避坑指南:踩过的坑,写进代码注释
- Agent 间消息协议禁止用自由文本,统一 TypedDict,字段增减必须版本号管理
- 循环依赖用
graph.validate()预检,上线前跑pytest -m cyclic - 监控埋点务必打在三处:节点入口、节点出口、异常捕获,标签带上
node_name与retry_count,方便 Grafana 绘制热力图
效果验证与可观测性
灰度两周,核心指标如下:
- 首响时长 ↓ 38%(单体 4.8 s → 多 Agent 2.3 s)
- 意图识别准确率 ↑ 12%(85% → 95%)
- 单实例 QPS 从 120 提升到 310,CPU 只涨 8 核
开放讨论:如何评估不同 Agent 的贡献度权重?
目前我们按“节点调用次数”粗算成本,但 Routing 节点一次调用可能拯救整轮对话,也可能被 Fallback 抵消;Domain 节点偶尔一次精准回答就能带来五星好评。你觉得该用因果推断还是Shapley 值来量化每个 Agent 对最终满意度的真实贡献?欢迎留言交换思路。