news 2026/5/9 3:07:24

LangChain之聊天模型--流式传输

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
LangChain之聊天模型--流式传输

4. 聊天模型--流式传输

流式处理对于使基于 LLM 的应用程序能够响应最终用户至关重要。其通过逐步显示输出,甚至在完整的响应准备就绪之前,流式传输可以显着改善用户体验。

之前直接使用invoke 的调用方式属于非流式传输,看到的现象是聊天模型直接返回全量内容,若
模型思考时间较长,则我们等待的时间就越长

我们使用的deepseek客户端就是属于流式输出。可以看到输出时间快了很多

对于 LangChain 的聊天模型来说,它同样支持流式返回。

4.1 stream() 同步传输

LangChain 聊天模型中的.stream() 方法,来同步生成流式响应的效果,.stream() 方法返回一个迭代器,该迭代器在生成输出时同步产生输出 消息块 。可以 使用 for 循环实时处理每个块。

from langchain_deepseek import ChatDeepSeek model=ChatDeepSeek(model="deepseek-v4-flash") #流式输出,返回一个迭代器,产生的消息块 chunks=[] for chunk in model.stream("写一段关于龟兔赛跑的故事,20字左右"): chunks.append(chunk) #chunks:AIMessage print(chunk.content,end="|",flush=True) #我们得到了一个叫做 AIMessageChunk 的东西,它代表 AIMessage 的一部分,也就是消息块。 #消息块还可以直接相加 print("\n") temp_chunks=chunks[0]+chunks[1]+chunks[2]+chunks[3]+chunks[4]+chunks[5] print(temp_chunks)

4.2 astream() 异步传输

想象一个场景:你需要煮一壶水,同时还要给朋友发一条短信。我们分别用同步(传统)和异步两种 方式来完成,以此对比并引入 协程 和 事件循环 的概念

• 同步(阻塞)方式:这就像是一个“死心眼”的人,做事必须一件一件来:

import time #同步io def boil_water(): print("开始烧水...") time.sleep(5)#模拟烧水5s,cpu完全空闲 print("烧水完成...") def send_message(): print("开始发消息...") time.sleep(2) print("发消息完成...") def main(): boil_water() send_message() #耗时七秒 main()

问题: 在 boil_water 函数等待的5秒里,CPU 完全空闲,但却不能去做 send_message 任务, 效率低下。

• 异步方式:我们请出 asyncio 、 协程 和 事件循环 。

什么是协程?

• 多进程通常利用的是多核 CPU 的优势,同时执行多个计算任务。每个进程有自己独立的内存管 理,所以不同进程之间要进行数据通信比较麻烦。

• 多线程是在一个 cpu 上创建多个子任务,当某一个子任务休息的时候其他任务接着执行。多线程 的控制是由 python 自己控制的。线程存在数据同步问题,所以要有锁机制。

• 协程的实现是在一个线程内实现的,相当于流水线作业。由于线程切换的消耗比较大,所以对于 并发编程,可以优先使用协程。

协程,作为一种轻量级的并发编程模型,可以被视为用户态的“轻量级线程”。

其核心优势在于其调度完全由用户空间掌控,避免了操作系统内核的频繁介入,从而显著降低了上 下文切换的开销。

在诸如网络数据刷新、资源加载、用户界面更新、以及 I/O 读写等场景下,如果并 发任务的计算量相对较小、对系统资源占用较低,则不必动用操作系统级别的线程。

协程的切换则由程序员和编程语言控制,程序员决定在何时暂停或恢复协程。协程是一个特殊的函 数,它可以在执行过程中暂停,并在稍后恢复执行。它用async def定义,并在需要暂停的地方使 用await

#异步IO import asyncio #协程 async def boil_water(): print("开始烧水...") await asyncio.sleep(5) #await表示等待这个操作完成,但期间可以做别的事 print("烧水完成...") #协程 async def send_message(): print("开始发消息...") await asyncio.sleep(2) print("发消息完毕...") #协程调度 #事件循环 async def main(): #创建两个任务,并交给事件循环去调度 #1、烧水,任务1 task1=asyncio.create_task(boil_water()) #2、发消息。任务2 task2=asyncio.create_task(send_message()) #等待两个任务完成 await task1 await task2 #run会创建一个事件循环 #5s asyncio.run(main())

事件循环

事件循环是asyncio(python标准库中的模块,用于编写异步I/O操作的代码)的核心,可以把它当作一个总调度员

工作流程:

1、他维护一个任务列表

2、不断循环检查每个任务

若任务处于“等待I/O状态”(比如等水烧开..)就暂停他,立即去执行下一个已经就绪的任务;

若任务等待时间到了或者I.O操作完成了,事件循环就恢复执行该任务

# 主程序(也是一个协程) async def main(): # 创建两个任务,并交给事件循环去调度 task1 = asyncio.create_task(boil_water_async()) task2 = asyncio.create_task(send_message_async()) # 等待两个任务都完成 await task1 await task2 # 它负责创建事件循环,并将第一个协程(主程序)放入其中运行。 asyncio.run(main())

通过asyncio,可以在单线程中同时处理多个任务。一个在单线程内调度和管理所有协程 的核心机制,就是事件循环。它不停地检查哪些协程可以执行,哪些在等待。

总结

• 协程是 asyncio 的核心概念之一。他是一个特殊函数,可以在执行过程中暂停,稍后可以恢复

协程通过async def关键字定义。通过await关键字暂停执行,等待异步操作完成

使用asyncio.run()函数运行一个协程。他会创建一个事件循环,并运行指定的协程。事件循环是 asyncio 的核心组件,负责调度和执行协程。它不断地检查是否有任务需要执 行,并在任务完成后调用相应的回调函数。

使用.astream(),异步生成流式响应

使用 .astream() 方法,来异步生成流式响应的效果,这专为非阻塞工作流程而设计。可以在 异步代码中使用它来实现相同的实时流式处理行为。

import asyncio from langchain_deepseek import ChatDeepSeek model=ChatDeepSeek(model="deepseek-v4-flash") #异步流式输出 async def async_stream(): print("====异步调用====") async for chunk in model.astream("写一段关于春天的故事。100字"): print(chunk.content,end="|",flush=True) asyncio.run(async_stream())

4.3 使用 StrOutputParser 解析模型的输出

StrOutputParser是 LangChain 中最基础的输出解析器

它的核心作用,是将聊天模型返回的复杂AIMessage对象,解析并提取为最纯净的字符串文本。

LLM 返回的原始结果是一个包含大量元数据的对象(如令牌用量、模型名称、响应ID等)。在大多数应用场景下,我们只关心回复的文本内容。StrOutputParser就是专门负责完成这个“过滤与提取”工作的组件。

from langchain_core.output_parsers import StrOutputParser from langchain_deepseek import ChatDeepSeek #组件1:聊天模型 model=ChatDeepSeek(model="deepseek-v4-flash") #2、初始化解析器 parser=StrOutputParser() chain=model | parser result=chain.invoke("简单介绍deepseek-v4") print(result)

流式输出的兼容由于构建出的链本身也是标准的Runnable对象,它完美地继承了流式处理能力。在流式场景下,解析器会自动将每一个流式消息块转换为对应的字符串片段。

from langchain_core.output_parsers import StrOutputParser from langchain_deepseek import ChatDeepSeek #组件1:聊天模型 model=ChatDeepSeek(model="deepseek-v4-flash") #2、初始化解析器 parser=StrOutputParser() chain=model | parser for chunk in chain.stream("简单介绍Claude"): print(chunk,end="")

注意:

流式传输不是聊天模型的专属特性,而是所有遵循Runnable接口标准的组件所共有的“标准能力”

可以借助“USB接口标准”这个类比来理解:

  • 聊天模型、输出解析器、处理链等,都像是按照“Runnable”(USB标准)设计的不同设备(U盘、鼠标、键盘)。

  • 流式传输.stream(),就像“即插即用”一样,是这个标准本身定义好的一项通用功能。

  • 因此,只要是Runnable的实例,原则上都可以调用.stream()方法,这是一种基于接口规范的、而不是具体类别的能力。

实践中需要注意的三点

  1. “流出”的数据类型各不相同
    虽然都能流式输出,但不同组件流出的“数据块”类型完全不同。例如:

    • 聊天模型直接.stream(),产出的是AIMessageChunk对象。

    • “模型 + 解析器”构成的链对流式输出的每个AIMessageChunk做了处理,再.stream(),产出的就是纯净的str字符串。

  2. 并非所有组件都支持流式
    即便实现了Runnable接口,部分组件因其工作性质,不提供流式处理(或流式无意义)。例如,检索器 (Retrievers)是一次性返回全量检索结果的,过程无法拆分,因此对它调用.stream()也只会获得一次性返回的结果,没有逐令牌生成的过程。

  3. 链式调用是常态
    在实际开发中,几乎不会单独使用模型的原生流,绝大多数情况下,都是通过构建链(Chain)的方式,利用解析器将流式消息处理为最终想要的格式(如字符串)再进行流式输出。

4.4 自定义流式输出解析器

一个思维模式的转变:从这一节开始,我们要明白,自定义流式解析器的任务,就是对流经的数据进行任意的加工和重组。它不再局限于提取内容,而是可以改变数据的形态。

StrOutputParser,它能帮我们把大模型返回的复杂消息块,变成纯净的字符串,并且在流式输出时,也是一个字一个字地往外蹦。

但这带来了一个新问题:如果我们不想看一个字一个字地蹦,而是想等模型“说”完一句话,再把这句完整的话显示出来,该怎么办?

这就引出了这一节的核心:自定义流式输出解析器。它允许我们通过写一个生成器函数,来任意地改变流式输出的“节奏”和“样式”。在链中使用 生成器函数,即可完成自定义流式输出的能力。

聊天模型的 .stream() 方法返回的是一个迭代器,该迭代器在生成输出时同步产 生输出 消息块 。那么我们的将实现的这些生成器的签名应该是Iterator[Input] -> Iterator[Output]。它规定了我们写的生成器函数,必须接收一个“流”(迭代器)作为输入,并且也返回一个“流”(迭代器)作为输出。这样就能完美地嵌入到model | parser这个管道里。或者对于异步生成器:AsyncIterator[Input] -> AsyncIterator[Output] 。

from typing import Iterator, List from langchain_core.output_parsers import StrOutputParser from langchain_deepseek import ChatDeepSeek #组件1:聊天模型 model=ChatDeepSeek(model="deepseek-v4-flash") #组件2:输出解析器(str) parser=StrOutputParser() #自定义生成器 def split_into_list(input:Iterator[str])->Iterator[List[str]]: #创建一个空字符串变量buffer buffer="" for chunk in input: buffer += chunk #遇到句号需要刷新buffer while"。" in buffer: #找到句号的位置 stop_index=buffer.index("。") #yield用于创造生成器 #buffer[:stop_index]是切片,取开头到句号位置(stop_index)之前的所有字符 #.strip() 去掉字符串前后的空白字符(空格、换行等)。 yield [buffer[:stop_index].strip()]#外面套上 [ ... ] 表示把这个字符串放进一个列表里。 #yield [buffer[:stop_index].strip()] —— 这是生成器的关键。 # yield 有点像 return,但不会结束函数,而是“返回一个值,然后暂停函数,下次调用时继续”。 # 这里返回的是一个列表,列表中只有一个元素:buffer[:stop_index].strip()。 buffer=buffer[stop_index+1:] #处理buffer最后几个字 yield [buffer.strip()] #定义链。 # 构建处理链:模型 -> 字符串解析器 -> 我们自定义的句子分割器 chain=model | parser | split_into_list #流式输出,返回一个迭代器,产生的消息块 for chunk in chain.stream("写一段关于爱情的歌词,需要5句话,每句话中文句号隔开"): #使用parser,结果就是str print(chunk,flush=True)

4.5 深度探索流式传输

我们需要从源码层面回答三个根本性问题:

  1. LangChain 请求 OpenAI 时,底层使用什么网络协议?

  2. LangChain 如何实现并支持流式传输?

  3. OpenAI 返回的原始数据块是何格式? LangChain 如何将其转换为标准的AIMessageChunk对象?

SSE 协议介绍

流式传输并非 LangChain 自身创造,其根基在于大模型服务商(如 OpenAI)提供的SSE(Server-Sent Events,服务器发送事件)支持。

可以将其理解为一种基于 HTTP 的“长连接热线”。相比传统“一问一答”即关闭的短连接,SSE 建立连接后,服务器可持续向客户端单向推送数据流,直到传输结束。这为流式输出提供了底层物理通道。

HTTP 协议本身设计为无状态的请求-响应模式,严格来说,是无法做到服务器主动推送消息到客户端,但通过Server-Sent Events (服务器发送事件,简称 SSE)技术可实现流式传输,允许服
务器主动向浏览器推送数据流。

SSE(Server-Sent Events)是一种基于 HTTP 的轻量级实时通信协议,浏览器可以通过内置的
EventSource API 接收并处理这些实时事件。

核心特点

• 基于 HTTP 协议:复用标准 HTTP/HTTPS 协议,无需额外端口或协议,兼容性好且易于部署。
• 单向通信机制:SSE 仅支持服务器向客户端的单向数据推送,客户端通过普通 HTTP 请求建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。
• 自动重连机制:支持断线重连,连接中断时,浏览器会自动尝试重新连接(支持 retry 字段指定重连间隔)。
• 自定义消息类型:

客户端发起请求后,服务器保持连接开放。响应头设置Content-Type: text/eventstream,标识为事件流格式,持续推送事件流。

数据格式
服务端向浏览器发送 SSE 数据,需要设置必要的 HTTP 头信息:

Content-Type: text/event-stream;charset=utf-8 Connection: keep-alive

LangChain 的流式处理流程(源码层面)

通过分析BaseChatOpenAI类中的_stream方法,可梳理出以下核心生命周期:

1. 发起请求(建立热线)

LangChain 本身并不“创造”或“规定”一个底层的网络传输协议,而是依赖于其底层的大模型供应商(如 OpenAI)的协议。因此当我们发起请求时,会在请求中设置 stream=True ( _stream() 源码中的第一步),表示OpenAI 服务器将在生成 Response 时向客户端发出数据(server-sent events,SSE)。此时 API会保持 HTTP 连接打开,并以特定格式发送数据。在构建 HTTP 请求时,LangChain 将参数stream强制设置为True。此标志告知 OpenAI 服务器:本次对话需采用 SSE 模式,持续推送生成结果。

# 源码逻辑 kwargs["stream"] = True payload = self._get_request_payload(messages, stop=stop, **kwargs)

2. 接入与接收(监听数据流)
LangChain 使用openai官方 SDK 构建的 HTTP 客户端(具体为_SyncHttpxClientWrapper类)发起调用,并持续监听来自https://api.openai.com/v1的 SSE 事件流。

3. 核心转换(数据“本土化”)
这是最关键的一步。OpenAI 通过 SSE 推送的原始数据,是一系列包含delta字段的 JSON 对象(例如"delta":{"content":"你"}),而非 LangChain 可直接使用的消息对象。

这一步的转换工作,由_convert_chunk_to_generation_chunk方法完成。它的职责是:

  • 输入:OpenAI 推送的原始 JSON 数据块(chunk)。

  • 处理:提取choices[0].delta中的内容。

  • 输出:一个标准的ChatGenerationChunk对象,其核心属性message就是我们所熟悉的AIMessageChunk实例。

至此,每一块原始的流式数据便被封装成了包含contentresponse_metadata等完整信息的AIMessageChunk对象,可供下游组件统一处理。

数据转换结构对比

阶段数据格式示例内容
OpenAI 原始 SSE 数据块JSON 对象{"choices":[{"delta":{"content":"你"}}]}
LangChain 转换后AIMessageChunkcontent='你', additional_kwargs={...}
总结
  • 技术基石:流式传输能力由模型服务商基于 SSE 协议提供,LangChain 是这一能力的使用者和封装者。

  • 核心机制:LangChain 通过在请求中开启stream=True启用 SSE,并在客户端内部实现了一套从“原始 JSON 事件流”到“AIMessageChunk对象序列”的完整转换逻辑。

  • 深入价值:掌握此流程,有助于开发者理解流式处理的全链路,便于日后进行定制化开发或排查相关问题。至此,我们从底层的网络协议,到上层的对象封装,完成了对流式传输的深度剖析。

LangChain流式传输的完整流程与底层协议。总结一下:
1. langchain-openai 包通过集成 OpenAI Python SDK,提供了一个 HTTP 客户端。
2. 因此,支持 LangChain 向 OpenAI 的 API 发起调用请求。
3. 若希望发起流式传输请求,则需在请求中加入 stream=True ,向 OpenAI 说明以 SSE 协议进行
流式返回。

4. LangChain 接收 OpenAI 的 SSE 格式的响应,并将其转换为 LangChain 自封装的消息格式,如
AIMessageChunk 消息。这样就可以以统一的方式处理来自不同模型提供商(OpenAI,
Anthropic等)的流式响应。

5. 使用 LangSmith 跟踪 LLM 应用

使用 LangSmith 时没有代码介入,只需要配置下环境,就可以直接监控我们的应用。

使用 LangChain 构建的许多应用程序,可能会包含多个步骤和多次的 LLM 调用。随着这些应用程序变得越来越复杂,作为开发者,我们能够检查链或代理内部到底发生了什么变得至关重要。最好的方法是使用 LangSmith。


LangSmith 与框架无关,它可以与 langchain 和 langgraph 一起使用,也可以不使用。LangSmith 是一个用于帮助我们构建生产级 LLM 应用程序的平台,它将密切监控和评估我们的应用。
LangSmith 平台地址:https://smith.langchain.com/ (新用户需要注册)


要想让 LangSmith 跟踪 LLM 应用,第一步申请 LangSmith API Key,点击 Settings,就会跳转
到"API Keys"设置页面,若没有跳转,可以在左侧 tab 栏中找到进入。

创建完成后,保存好你的 API Key。
接下来配置两个环境变量:

LANGSMITH_TRACING="true"
LANGSMITH_API_KEY="你的 LangSmith API Key"

配置完成后,我们任意执行代码,查看 LangSmith 平台,这将在 LangSmith 的默认跟踪项目中生成调用的跟踪:

跟踪会以瀑布流形式展示调用的完整步骤,以及每个步骤的详细信息和耗时。让我们能够检查内部到底发生了什么

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/9 3:06:29

L-system与硬件补偿技术在自动钢琴音乐生成中的应用

1. L-system与硬件补偿技术概述L-system(Lindenmayer系统)作为一种形式化语法,最初由生物学家Aristid Lindenmayer于1968年提出,用于模拟植物的生长过程。其核心机制是通过字符串重写规则生成具有自相似性的复杂结构。在音乐生成领…

作者头像 李华
网站建设 2026/5/9 3:04:27

电气类电网输电线异物检测任务的实现 通过yolov8训练输电线异物检测数据集 建立基于深度学习yolov8卷积神经网络的输电线异物检测

电气类电网输电线异物检测任务的实现 通过yolov8训练输电线异物检测数据集 建立基于深度学习yolov8卷积神经网络的输电线异物检测 文章目录**1. 数据准备**数据集结构**2. 格式转换****3. 数据划分****4. 环境搭建****5. 数据配置****6. 模型训练****7. 配置超参数****8. 模型推…

作者头像 李华
网站建设 2026/5/9 3:03:16

基于角色指令的AI专业化应用:从通用对话到专家级交互

1. 项目概述:一个专为AI角色扮演设计的指令库最近在折腾各种AI工具时,我一直在思考一个问题:如何让AI的输出更专业、更贴近真实场景?无论是用ChatGPT写代码,还是让DeepSeek分析商业策略,通用的“你好&#…

作者头像 李华
网站建设 2026/5/9 3:03:16

Git Worktree Manager:VS Code插件实现多分支并行开发

1. 项目概述:为什么你需要一个 Git Worktree 管理器如果你和我一样,每天都要在多个 Git 分支之间来回切换——比如同时处理一个紧急的线上 Bug 修复、一个正在开发的新功能,以及一个长期演进的技术重构分支——那你一定对git checkout的等待时…

作者头像 李华
网站建设 2026/5/9 3:01:09

AI编程助手新范式:基于神经多样性的多智能体协作系统

1. 项目概述:当AI助手拥有“神经多样性”人格如果你和我一样,每天都在和Claude Code、Cursor、GitHub Copilot这些AI编程助手打交道,那你肯定也经历过这种时刻:你让它“检查一下这段代码”,它给你一个温和但笼统的建议…

作者头像 李华