news 2026/5/20 1:58:09

【langgraph+postgres】用于生产环境的langgraph短期记忆的存取(postgreSQL替代InMemorySaver)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【langgraph+postgres】用于生产环境的langgraph短期记忆的存取(postgreSQL替代InMemorySaver)

LangGraph 生产环境持久化学习总结

核心概念

1. 三种持久化方式对比

方式说明适用场景持久化
InMemorySaver内存存储测试环境❌ 重启丢失
PostgresSaverPostgreSQL 存储生产环境推荐✅ 持久化
RedisSaverRedis 存储高并发场景✅ 持久化

2. State(状态)- 工作流中共享的数据结构

classcustomstate(AgentState):user_name:str# 用户姓名conversation_count:int=0# 对话次数

3. Checkpointer(检查点)- 短期记忆

作用:保存对话历史和工作流状态

  • 记录每次对话的状态
  • 支持多轮对话
  • 提供 Time Travel 功能(状态回溯)

4. Store(存储)- 长期记忆

作用:保存用户画像、知识库等跨会话数据

  • 用户信息(姓名、偏好等)
  • 知识库元数据
  • 跨会话的业务数据

一、PostgreSQL Checkpointer 使用

1.1 数据库准备

-- 创建数据库(如果还没有)CREATEDATABASEdengmingfang OWNER dengmingfang;-- 赋予权限GRANTALLPRIVILEGESONDATABASEdengmingfangTOdengmingfang;

1.2 代码实现

fromlanggraph.checkpoint.postgresimportPostgresSaver# 数据库连接字符串DB_URL="postgresql://dengmingfang:123456@localhost:5432/dengmingfang"# 使用上下文管理器withPostgresSaver.from_conn_string(DB_URL)ascheckpointer:# 第一次使用必须调用 setup() 初始化数据库表checkpointer.setup()# 创建 graph,绑定 checkpointergraph=create_react_agent(model=llm,tools=[username,greet_user],state_schema=customstate,checkpointer=checkpointer)# 配置参数config={"configurable":{"thread_id":"user_123",# 会话 ID"username":"张三"}}# 多轮对话(状态自动持久化)result1=graph.invoke({"messages":[{"role":"user","content":"第一次对话"}]},config)result2=graph.invoke({"messages":[{"role":"user","content":"第二次对话"}]},config# 相同 thread_id,状态会累加)

1.3 查看状态

# 查看当前状态state_snapshot=graph.get_state(config)print(f"next:{state_snapshot.next}")print(f"values:{state_snapshot.values}")# 查看历史记录forsnapshotingraph.get_state_history(config):print(f"checkpoint_id:{snapshot.config['configurable']['checkpoint_id']}")

二、Time Travel(时间回溯)

2.1 获取历史状态

# 获取所有历史记录history=list(graph.get_state_history(config))# 倒数第二条记录snapshot=history[-2]

2.2 回溯并修改状态

# 从历史记录恢复new_config=graph.update_state(snapshot.config,{"messages":["这是人工修改的消息"]})# 从修改后的状态继续result=graph.invoke({"messages":[{"role":"user","content":"继续执行"}]},new_config)

2.3 应用场景

  • 调试:回溯到某个节点重新执行
  • 错误回滚:撤销错误的工具调用
  • 流程优化:对比不同执行路径的结果

三、Store(长期记忆)

3.1 保存用户信息

fromlanggraph.store.postgresimportPostgresStorewithPostgresStore.from_conn_string(DB_URL)asstore:# 初始化表store.setup()# 保存用户画像store.put(namespace=("profile",),# 命名空间key="user_123",value={"name":"邓","age":26,"preferences":["技术","阅读"]})

3.2 读取用户信息

# 读取用户数据user_profile=store.get(("profile",),"user_123")ifuser_profile:print(user_profile.value)# 输出:{'name': '邓茗芳', 'age': 27, 'preferences': ['技术', '阅读']}

3.3 更新用户信息

# 更新(会覆盖原值)store.put(("profile",),"user_123",{**user_profile.value,"last_login":"2026-01-29","login_count":user_profile.value.get("login_count",0)+1})

四、完整生产环境架构

4.1 架构图

┌─────────────────────────────────────────────────────────┐ │ LangGraph Application │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ Graph │────────▶│ Checkpointer │ │ │ │ (State) │ │ (PostgreSQL) │ │ │ └─────────────┘ └──────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ Store │ │ Vector Store │ │ │ │ (PostgreSQL)│ │ (FAISS) │ │ │ └─────────────┘ └──────────────┘ │ │ │ │ │ │ └───────────────────────┘ │ │ │ │ │ ▼ │ │ PostgreSQL 数据库 │ └─────────────────────────────────────────────────────────┘

4.2 完整代码示例

fromlanggraph.checkpoint.postgresimportPostgresSaverfromlanggraph.store.postgresimportPostgresStorefromlangchain_community.vectorstoresimportFAISSfromlanggraph.prebuilt.chat_agent_executorimportcreate_react_agent DB_URL="postgresql://dengmingfang:123456@localhost:5432/dengmingfang"withPostgresSaver.from_conn_string(DB_URL)ascheckpointer,\ PostgresStore.from_conn_string(DB_URL)asstore:# 初始化checkpointer.setup()store.setup()# 加载向量库vector_store=FAISS.load_local("/path/to/faiss_index",embeddings=embeddings,allow_dangerous_deserialization=True)# 创建 graphgraph=create_react_agent(model=llm,tools=[username,greet_user,rag_query],state_schema=customstate,checkpointer=checkpointer,store=store)# 执行result=graph.invoke({"messages":[{"role":"user","content":"查询商品"}]},{"configurable":{"thread_id":"user_123"}})

五、数据库表结构

5.1 Checkpointer 自动创建的表

-- 检查点表CREATETABLEcheckpoints(thread_idTEXT,checkpoint_nsTEXT,checkpoint_idTEXT,parent_checkpoint_idTEXT,typeTEXT,checkpointJSONB,metadata JSONB,PRIMARYKEY(thread_id,checkpoint_ns,checkpoint_id));-- 写入表CREATETABLEcheckpoint_writes(thread_idTEXT,checkpoint_nsTEXT,checkpoint_idTEXT,task_idTEXT,idxINTEGER,channelTEXT,typeTEXT,valueJSONB,PRIMARYKEY(thread_id,checkpoint_ns,checkpoint_id,task_id,idx));

5.2 Store 自动创建的表

-- 存储表CREATETABLEstore(namespaceTEXT,keyTEXT,valueJSONB,created_atTIMESTAMP,updated_atTIMESTAMP,PRIMARYKEY(namespace,key));

六、最佳实践

6.1 配置优化

# 使用连接池importpsycopg2.pool DB_URL="postgresql://dengmingfang:123456@localhost:5432/dengmingfang?pool_size=10&max_overflow=20"# 设置超时checkpointer=PostgresSaver.from_conn_string(DB_URL,checkpointer_options={"connection_timeout":30,"command_timeout":60})

6.2 错误处理

frompsycopg2importOperationalErrortry:checkpointer=PostgresSaver.from_conn_string(DB_URL)checkpointer.setup()exceptOperationalErrorase:print(f"数据库连接失败:{e}")# 使用降级方案(如 InMemorySaver)checkpointer=InMemorySaver()

6.3 数据备份

# 备份数据库pg_dump -U dengmingfang dengmingfang>backup.sql# 恢复数据库psql -U dengmingfang dengmingfang<backup.sql

七、测试环境 vs 生产环境

7.1 测试环境(InMemorySaver)

fromlanggraph.checkpoint.memoryimportInMemorySaver# 简单、快速,但重启后丢失checkpointer=InMemorySaver()graph=create_react_agent(model=llm,tools=tools,checkpointer=checkpointer)

7.2 生产环境(PostgresSaver)

fromlanggraph.checkpoint.postgresimportPostgresSaver# 持久化、可靠、支持回溯checkpointer=PostgresSaver.from_conn_string(DB_URL)checkpointer.setup()graph=create_react_agent(model=llm,tools=tools,checkpointer=checkpointer)

八、常见问题

8.1 数据库连接失败

# 错误:psycopg2.OperationalError# 解决:检查数据库服务、用户名密码、网络连接# 验证连接importpsycopg2 conn=psycopg2.connect(DB_URL)print("连接成功")

8.2 表已存在错误

# 第二次运行时 setup() 会报错# 解决:捕获异常或先检查try:checkpointer.setup()exceptExceptionase:print(f"表可能已存在:{e}")

8.3 状态丢失

# 确保 thread_id 一致config={"configurable":{"thread_id":"same_id"}}# 每次都用同一个 configresult1=graph.invoke({"messages":[...]},config)result2=graph.invoke({"messages":[...]},config)# 状态会累加

九、性能优化

9.1 批量操作

# 批量存储用户数据foruser_id,user_datainuser_list:store.put(("profile",),user_id,user_data)

9.2 定期清理

-- 清理 30 天前的历史记录DELETEFROMcheckpointsWHEREcreated_at<NOW()-INTERVAL'30 days';

9.3 索引优化

-- 为常用查询创建索引CREATEINDEXidx_checkpoints_thread_idONcheckpoints(thread_id);CREATEINDEXidx_store_namespaceONstore(namespace);

十、总结

关键要点

  1. Checkpointer:短期记忆,保存对话历史

    • 生产环境用PostgresSaver
    • 测试环境用InMemorySaver
    • 必须调用setup()初始化表
  2. Store:长期记忆,保存用户画像

    • 独立于 checkpointer
    • 支持命名空间分类存储
    • put()/get()/delete()操作
  3. Time Travel:状态回溯

    • get_state_history()获取历史
    • update_state()修改状态
    • 支持从任意历史节点恢复
  4. 生产环境

    • PostgreSQL 作为持久化存储
    • checkpointer + store 配合使用
    • 定期备份数据库
    • 配置连接池优化性能

运行完整示例

python /www/learning_langchain/langgraph_state_learining.py

输出示例:

============================================================ PostgreSQL 持久化示例 ============================================================ ✓ 数据库表初始化完成 第一次对话: 响应: 祝贺你,张三!这是第 1 次对话 第二次对话(状态已持久化): 响应: 祝贺你,张三!这是第 2 次对话 当前状态快照: - next: () - config: {'configurable': {'thread_id': 'user_123', 'username': '张三'}} - values: {...}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/10 0:47:02

如何在没有旧手机的情况下设置新 iPhone?

如果您购买了新款 iPhone 17&#xff0c;却发现旧手机丢失、损坏或完全无法使用&#xff0c;您可能会担心如何设置新设备。好消息是&#xff0c;即使没有旧 iPhone&#xff0c;您仍然可以顺利设置新设备。本文将指导您如何在没有旧手机的情况下设置新 iPhone&#xff0c;帮助您…

作者头像 李华
网站建设 2026/5/10 6:49:24

uniapp+python基于安卓的医院在线问诊系统_yjm小程序

目录技术架构概述核心功能模块数据交互流程安卓端适配要点扩展性设计开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;技术架构概述 uniapp作为前端框架&#xff0c;结合Python后端开发安卓平…

作者头像 李华
网站建设 2026/5/1 10:18:30

uniapp+python课堂辅助教学在线学习签到答疑系统 微信小程序

目录系统概述核心功能模块技术实现要点应用场景价值开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;系统概述 基于UniApp和Python开发的课堂辅助教学在线学习系统&#xff0c;集成微信小程序…

作者头像 李华
网站建设 2026/5/1 3:29:02

Elasticsearch 分片迁移与重新平衡监控指南

目录标题Elasticsearch 分片迁移与重新平衡监控指南目录概述环境信息当前集群状态Kubernetes 集群节点分片重新平衡机制ES 默认平衡策略分片状态流转查看分片迁移的命令1. 查看集群健康状态2. 查看所有分片状态3. 查看迁移进度&#xff08;最详细&#xff09;4. 只看正在进行的…

作者头像 李华
网站建设 2026/5/1 9:11:21

单片机温度测量和控制系统的设计与实现

单片机温度测量和控制系统的设计与实现 一、设计背景与意义 温度测量与控制在工业生产、智能家居、医疗设备、科研实验等领域至关重要&#xff0c;传统温控系统存在测量精度低、控温响应慢、操作复杂等问题&#xff0c;难以满足高精度场景需求。现有单片机温控设计多采用单一传…

作者头像 李华
网站建设 2026/5/1 7:53:01

单片机控制的自动门系统

单片机控制的自动门系统设计与实现 一、设计背景与意义 自动门作为便捷通行与智能安防的重要设备&#xff0c;广泛应用于商业建筑、办公场所、住宅小区等场景。传统自动门系统多依赖专用控制器&#xff0c;存在成本高、功能固化、兼容性差等问题&#xff0c;且部分设计感应灵敏…

作者头像 李华