news 2026/5/1 6:07:20

AI 流式响应实战:从同步等待到实时推送

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AI 流式响应实战:从同步等待到实时推送

AI 流式响应实战:从同步等待到实时推送

在 IM 系统中集成 AI 时,流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应,从同步等待到实时推送。

一、为什么需要流式响应?

同步等待的问题

传统同步方式的问题

// ❌ 同步方式:用户需要等待AI完整响应StringaiResponse=aiService.getAnswer(userMessage);// 如果AI响应需要10秒,用户就要等待10秒sendMessage(aiResponse);

问题:

  1. 等待时间长:AI 生成可能需要 5-10 秒,用户长时间等待
  2. 体验差:无法看到生成过程,感觉卡顿
  3. 资源占用:长时间占用连线和线程

流式响应的优势

  1. 实时反馈:逐字显示,用户可立即看到内容
  2. 体验更好:类似 ChatGPT 的打字机效果
  3. 资源利用:边生成边推送,不阻塞

对比

方式首字延迟完整响应时间用户体验
同步等待10秒10秒
流式响应1-2秒10秒

回调函数模式的设计

统一接口设计

定义统一的 AI 服务接口

publicinterfaceIAiService{/** * 流式调用AI服务 * @param userMsg 用户消息 * @param consumer 回调函数,处理每个数据块 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);/** * 多轮对话 */defaultvoidchat(Stringmessage,List<MessageRecord>messages,Consumer<AIResult>consumer){}}

关键点

  • 使用Consumer<AIResult>作为回调
  • 每个数据块通过回调处理
  • 支持多轮对话

AIResult 设计

publicinterfaceAIResult{StringgetContent();// 当前数据块的内容intgetStatus();// 状态:WAIT(0-进行中)、END(1-结束)、FAIL(2-失败)}

状态枚举

publicenumAIMessageStatusEnum{WAIT(0,"wait"),// 流式响应进行中END(1,"end"),// 流式响应结束FAIL(2,"fail");// 流式响应失败}

三、WebSocket 实时推送的实现

整体流程

用户发送消息 ↓RocketMQ异步处理 ↓ AI服务流式调用 ↓ 回调函数处理每个数据块 ↓ 封装为 STREAM_MSG_NOTIFY ↓WebSocket实时推送

代码实现

  1. RocketMQ 消费者接收消息
@ComponentpublicclassAIHelperReceiverimplementsInitializingBean{@ResourceprivateIAiServiceaiService;@ResourceprivateGlobalChannelHolderglobalChannelHolder;publicvoidinitConsumer(){defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently)(messageExtList,context)->{for(MessageExtmessageExt:messageExtList){MessageDtomessageDto=JSONObject.parseObject(msgStr,MessageDto.class);// 提交到独立线程池,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{StringBuilderfullContent=newStringBuilder();try{// 流式调用AI服务aiService.streamCallWithMessage(messageDto.getMessageContent(),aiResult->{// 回调函数:处理每个数据块AIMessageDtoaiMessageDto=newAIMessageDto();aiMessageDto.setMessageId(messageDto.getMessageId());aiMessageDto.setRoomId(messageDto.getRoomId());aiMessageDto.setContent(aiResult.getContent());aiMessageDto.setStatus(aiResult.getStatus());// 实时推送globalChannelHolder.sendBroadcastAIMessage(aiMessageDto,AQBusinessConstant.AI_HELPER_ID);// 累积完整内容fullContent.append(aiResult.getContent());});}catch(Exceptione){// 错误处理LOGGER.error("AI助手处理消息失败",e);AIMessageDtofailMessage=newAIMessageDto();failMessage.setStatus(AIMessageStatusEnum.FAIL.getCode());globalChannelHolder.sendBroadcastAIMessage(failMessage,AQBusinessConstant.AI_HELPER_ID);}finally{// 流式响应结束后,持久化完整消息MessageDtostoreMessage=buildStoreMessage(messageDto,fullContent);messageService.saveMessage(storeMessage);}});}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});}}
  1. 封装流式消息并推送
@ComponentpublicclassGlobalChannelHolder{publicvoidsendBroadcastAIMessage(AIMessageDtoaiMessageDto,StringaiId){// 1. 获取AI助手信息UserGlobalInfoDtouserInfo=userHolder.getUserInfo(aiId);// 2. 构建流式消息AQChatMsgProtocol.StreamMsgNotifystreamMsgNotify=AQChatMsgProtocol.StreamMsgNotify.newBuilder().setUser(userBuilder).setMsgId(aiMessageDto.getMessageId()).setRoomId(aiMessageDto.getRoomId()).setContent(aiMessageDto.getContent()==null?"":aiMessageDto.getContent()).setStreamType(aiMessageDto.getStatus())// 0-进行中,1-结束,2-失败.build();// 3. 广播到房间内所有用户messageBroadcaster.broadcast(aiMessageDto.getRoomId(),streamMsgNotify);}}
  1. 消息广播
@ComponentpublicclassMessageBroadcaster{privatefinalMap<String,ChannelGroup>channelGroupMap=newConcurrentHashMap<>();public<TextendsGeneratedMessageV3>voidbroadcast(StringroomId,Tmsg){ChannelGroupchannelGroup=channelGroupMap.get(roomId);if(channelGroup!=null){// 批量发送,高效channelGroup.writeAndFlush(msg);}}}

四、流式消息的封装(STREM_MSG_NOTIFY)

Protobuf 消息定义

// 流式消息通知messageStreamMsgNotify{string roomId=1;// 房间IDstring msgId=2;// 消息IDUseruser=3;// AI助手信息int32 streamType=4;// 流类型:0-进行中,1-结束,2-失败string content=5;// 当前数据块内容}

消息类型

enumMsgCommand{// ...STREAM_MSG_NOTIFY=32;// 流式消息通知// ...}

消息状态流转

用户发送消息 ↓ STREAM_MSG_NOTIFY(streamType=0,content="你")← 第一个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content="好")← 第二个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content=",")← 第三个数据块 ↓...↓ STREAM_MSG_NOTIFY(streamType=1,content="")← 结束标志

前端处理示例(伪代码)

websocket.onmessage=(event)=>{constmessage=JSON.parse(event.data);if(message.command==='STREAM_MSG_NOTIFY'){if(message.streamType===0){// 进行中:追加内容appendContent(message.content);}elseif(message.streamType===1){// 结束:显示完整消息showCompleteMessage();}elseif(message.streamType===2){// 失败:显示错误提示showErrorMessage();}}};

五、多 AI 平台集成的统一接口设计

问题:不同 AI 平台的 API 不同

  • 阿里百炼:使用Flowable<GenerationResult>
  • Gitee AI:使用MessageHandler<String>
  • 其他平台:可能有不同的流式接口

解决方案:统一接口 + 适配器模式

  1. 统一接口定义
publicinterfaceIAiService{/** * 流式调用,统一使用 Consumer<AIResult> 回调 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);}
  1. 阿里百炼实现
@Service@PrimarypublicclassQWAiServiceimplementsIAiService{@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){Generationgen=newGeneration();Messagemessage=Message.builder().role(Role.USER.getValue()).content(userMsg).build();// 调用阿里百炼流式APIFlowable<GenerationResult>result=gen.streamCall(generationParam);// 转换为统一格式result.blockingForEach(r->{Stringcontent=r.getOutput().getChoices().get(0).getMessage().getContent();StringfinishReason=r.getOutput().getChoices().get(0).getFinishReason();QWResultqwResult=newQWResult();qwResult.setContent(content);// 判断是否结束qwResult.setStatus("stop".equals(finishReason)?AIMessageStatusEnum.END.getCode():AIMessageStatusEnum.WAIT.getCode());// 调用统一回调consumer.accept(qwResult);});}}
  1. Gitee AI 实现
@ServicepublicclassGiteeAIServiceimplementsIAiService{@ResourceprivateGiteeAIClientgiteeAIClient;@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){// 调用Gitee AI流式APIgiteeAIClient.streamChat(message,messageList,data->{JSONObjectparse=JSONObject.parseObject(data);JSONArraychoices=parse.getJSONArray("choices");JSONObjectchoicesIn=choices.getJSONObject(0);StringfinishReason=choicesIn.getString("finish_reason");if(finishReason!=null&&finishReason.equals("stop")){// 结束GiteeResultgiteeResult=newGiteeResult();giteeResult.setStatus(AIMessageStatusEnum.END.getCode());consumer.accept(giteeResult);return;}// 进行中JSONObjectdelta=choicesIn.getJSONObject("delta");Stringcontent=delta.getString("content");if(content!=null&&!content.isEmpty()){GiteeResultgiteeResult=newGiteeResult();giteeResult.setContent(content);giteeResult.setStatus(AIMessageStatusEnum.WAIT.getCode());consumer.accept(giteeResult);}});}}

统一接口的优势

  1. 业务代码无需关心具体平台
  2. 易于扩展新平台
  3. 便于切换平台(通过@Prime注解)

使用示例

// 业务代码只需要调用统一接口@ResourceprivateIAiServiceaiService;// Spring会自动注入@Primary的实现aiService.streamCallWithMessage(userMsg,aiResult->{// 处理流式响应,不关心是哪个AI平台sendBroadcastAIMessage(aiResult);});

六、性能优化

  1. 独立线程池
// AI处理在独立线程池中执行,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{aiService.streamCallWithMessage(userMsg,consumer);});

优势

  • 不阻塞 RocketMQ 消费线程
  • AI 处理失败不影响其他消息
  • 可控制并发数
  1. 异步处理
// 消息发送到RocketMQ,异步处理mqSendingAgent.aiHelper(messageDto);// 立即返回,不等待AI响应

优势

  • 用户发送消息后立即返回
  • AI 响应通过 WebSocket 实时推送
  • 提升响应速度

七、总结

关键点

  1. 流式响应:使用回调函数模式,实时推送每个数据块
  2. 统一接口:IAiservice统一不同 AI 平台的接口
  3. WebSocket 推送:通过STREAM_MSG_NOTIFY实时推送
  4. 异步处理:使用 RocketMQ + 独立线程池,不阻塞主流程

优化效果

指标同步流式响应提升
首字延迟10秒1-2秒5-10倍
用户体验显著提升
资源占用降低

经验总结

  1. 流式响应能显著提升性能
  2. 统一接口便于多平台集成
  3. 异步处理避免阻塞
  4. 回调函数模式适合流式场景

通过以上实现,AQChat 实现了类似 ChatGPT 的流式响应效果,提升了用户体验。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 1:25:25

【AI开发必备技能】:Open-AutoGLM依赖安装的8个关键步骤你掌握了吗?

第一章&#xff1a;Open-AutoGLM依赖安装概述Open-AutoGLM 是一个面向自动化自然语言任务的开源框架&#xff0c;支持模型调用、任务编排与结果解析。在使用该框架前&#xff0c;正确配置其运行环境和依赖项是确保功能正常执行的基础。本章介绍如何安装 Open-AutoGLM 所需的核心…

作者头像 李华
网站建设 2026/4/18 16:52:55

Open-AutoGLM Python依赖安装实战(从报错到零失败的完整流程)

第一章&#xff1a;Open-AutoGLM Python 依赖安装实战概述在部署 Open-AutoGLM 这一自动化语言模型推理框架时&#xff0c;正确配置 Python 环境是确保系统稳定运行的首要步骤。该框架依赖多个核心库&#xff0c;包括 PyTorch、Transformers 和 Accelerate&#xff0c;需通过包…

作者头像 李华
网站建设 2026/4/18 1:45:05

【大模型落地必看】:Open-AutoGLM离线配置9大坑,你避开了吗?

第一章&#xff1a;Open-AutoGLM离线部署全景解析Open-AutoGLM作为新一代开源自动代码生成模型&#xff0c;支持在无网络连接环境下完成本地化部署与推理&#xff0c;适用于企业级安全敏感场景。其离线部署方案兼顾性能优化与资源调度&#xff0c;能够灵活适配多种硬件平台。环…

作者头像 李华
网站建设 2026/4/28 7:05:26

【Open-AutoGLM优化全攻略】:低配置电脑流畅运行的5大核心技术揭秘

第一章&#xff1a;Open-AutoGLM低配运行的核心挑战在资源受限的设备上部署如Open-AutoGLM这类大型语言模型&#xff0c;面临多重技术瓶颈。尽管模型具备强大的自动化推理能力&#xff0c;但其原始设计通常依赖高内存、多核GPU支持&#xff0c;难以直接适配低配环境。为实现低配…

作者头像 李华
网站建设 2026/4/27 18:41:46

Open-AutoGLM模型加速秘技,让全球镜像同步不再是难题

第一章&#xff1a;Open-AutoGLM模型下载加速概述 在大规模语言模型应用日益普及的背景下&#xff0c;Open-AutoGLM 作为一款开源的自动化生成语言模型&#xff0c;其下载效率直接影响开发与部署速度。由于模型体积庞大&#xff0c;传统下载方式常受限于网络带宽、源服务器负载…

作者头像 李华
网站建设 2026/4/30 7:32:38

揭秘Open-AutoGLM离线配置难题:5步实现内网环境全流程闭环部署

第一章&#xff1a;揭秘Open-AutoGLM离线配置的核心挑战在本地环境中部署 Open-AutoGLM 模型时&#xff0c;开发者常面临一系列与环境隔离、资源调度和依赖管理相关的复杂问题。由于该模型依赖于特定版本的深度学习框架与系统级库&#xff0c;任何版本错配都可能导致推理失败或…

作者头像 李华