news 2026/5/1 7:36:51

将dify接入钉钉机器人中

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
将dify接入钉钉机器人中

想要将dify接入钉钉机器人中进行问答,首先需要创建应用,添加企业机器人,创建消息卡片,开发机器人回复消息的代码,最后接入dify,使用dify的工作流进行回答。

一、机器人准备

1、创建应用:创建应用 - 钉钉开放平台

2、添加应用能力:添加应用能力 - 钉钉开放平台

3、配置企业机器人:配置企业机器人 - 钉钉开放平台

4、新建卡片模板:打字机效果流式 AI 卡片 - 钉钉开放平台

二、开发代码

1、安装官方SDK

pip install dingtalk_stream loguru

2、修改模板代码

以下为官方模板代码,根据自己的需求修改消息接收和回复逻辑

import os import logging import asyncio import argparse from loguru import logger from dingtalk_stream import AckMessage import dingtalk_stream from http import HTTPStatus from dashscope import Generation from typing import Callable def define_options(): parser = argparse.ArgumentParser() parser.add_argument( "--client_id", dest="client_id", default=os.getenv("DINGTALK_APP_CLIENT_ID"), help="app_key or suite_key from https://open-dev.digntalk.com", ) parser.add_argument( "--client_secret", dest="client_secret", default=os.getenv("DINGTALK_APP_CLIENT_SECRET"), help="app_secret or suite_secret from https://open-dev.digntalk.com", ) options = parser.parse_args() return options async def call_with_stream(request_content: str, callback: Callable[[str], None]): messages = [{"role": "user", "content": request_content}] responses = Generation.call( Generation.Models.qwen_turbo, messages=messages, result_format="message", # set the result to be "message" format. stream=True, # set stream output. incremental_output=True, # get streaming output incrementally. ) full_content = "" # with incrementally we need to merge output. length = 0 for response in responses: if response.status_code == HTTPStatus.OK: full_content += response.output.choices[0]["message"]["content"] full_content_length = len(full_content) if full_content_length - length > 20: await callback(full_content) logger.info( f"调用流式更新接口更新内容:current_length: {length}, next_length: {full_content_length}" ) length = full_content_length else: raise Exception( f"Request id: {response.request_id}, Status code: {response.status_code}, error code: {response.code}, error message: {response.message}" ) await callback(full_content) logger.info( f"Request Content: {request_content}\nFull response: {full_content}\nFull response length: {len(full_content)}" ) return full_content async def handle_reply_and_update_card(self: dingtalk_stream.ChatbotHandler, incoming_message: dingtalk_stream.ChatbotMessage): # 卡片模板 ID card_template_id = "8aebdfb9-28f4-4a98-98f5-396c3dde41a0.schema" # 该模板只用于测试使用,如需投入线上使用,请导入卡片模板 json 到自己的应用下 content_key = "content" card_data = {content_key: ""} card_instance = dingtalk_stream.AICardReplier( self.dingtalk_client, incoming_message ) # 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards card_instance_id = await card_instance.async_create_and_deliver_card( card_template_id, card_data ) # 再流式更新卡片: https://open.dingtalk.com/document/isvapp/api-streamingupdate async def callback(content_value: str): return await card_instance.async_streaming( card_instance_id, content_key=content_key, content_value=content_value, append=False, finished=False, failed=False, ) try: full_content_value = await call_with_stream( incoming_message.text.content, callback ) await card_instance.async_streaming( card_instance_id, content_key=content_key, content_value=full_content_value, append=False, finished=True, failed=False, ) except Exception as e: self.logger.exception(e) await card_instance.async_streaming( card_instance_id, content_key=content_key, content_value="", append=False, finished=False, failed=True, ) class CardBotHandler(dingtalk_stream.ChatbotHandler): def __init__(self, logger: logging.Logger = logger): super(dingtalk_stream.ChatbotHandler, self).__init__() if logger: self.logger = logger async def process(self, callback: dingtalk_stream.CallbackMessage): incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) self.logger.info(f"收到消息:{incoming_message}") if incoming_message.message_type != "text": self.reply_text("俺只看得懂文字喔~", incoming_message) return AckMessage.STATUS_OK, "OK" asyncio.create_task(handle_reply_and_update_card(self, incoming_message)) return AckMessage.STATUS_OK, "OK" def main(): options = define_options() credential = dingtalk_stream.Credential(options.client_id, options.client_secret) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler( dingtalk_stream.ChatbotMessage.TOPIC, CardBotHandler() ) client.start_forever() if __name__ == "__main__": main()

3、接入dify

根据dify创建的工作流的API文档进行调用,如果想进行多轮对话可以保存返回的conversation_id

full_content = "" length = 0 timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: try: async with session.post(DIFY_CHAT_URL, headers=headers, json=payload) as resp: if resp.status != 200: error_text = await resp.text() raise Exception(f"Dify API error: {resp.status}, {error_text}") async for line in resp.content: line = line.decode('utf-8').strip() if not line or not line.startswith("data:"): continue data_str = line[5:].strip() # 去掉 "data:" if data_str == "[DONE]": break try: data = json.loads(data_str) if 'conversation_id' in data and not conv_id: # 首次创建对话,保存 ID update_conversation_id(user_id, data['conversation_id']) except json.JSONDecodeError: continue if 'answer' in data: chunk = data['answer'] full_content += chunk # 策略1:每次收到 chunk 都回调 # await callback(full_content) # 策略2:累积一定长度再回调 if len(full_content) - length >= 20: await callback(full_content) logger.info( f"调用流式更新接口更新内容:current_length: {length}, next_length: {len(full_content)}" ) length = len(full_content) # 最终回调(确保完整内容被处理) if full_content and len(full_content) > length: await callback(full_content) logger.info( f"Request Content: {request_content}\nFull response: {full_content}\nFull response length: {len(full_content)}" ) return full_content except Exception as e: logger.error(f"调用 Dify 流式接口出错: {e}") raise

4、拓展

继续开发,不仅可以进行文字对话,还能够进行图片问答,先调用self.get_image_download_url方法获取钉钉上传的图像,再调用v1/files/upload接口即可上传图像到dify

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 6:52:22

为什么生产环境都在用Stable Diffusion 3.5 FP8?深度解析其优势

为什么生产环境都在用 Stable Diffusion 3.5 FP8?深度解析其优势 在AI图像生成服务逐渐从实验走向大规模商用的今天,一个现实问题摆在所有技术团队面前:如何在有限的GPU资源下,以更低的成本、更高的效率稳定支撑成千上万用户的文…

作者头像 李华
网站建设 2026/5/1 6:49:16

手机变3D扫描仪:Meshroom让照片秒变立体模型

想象一下,用普通手机拍摄的照片就能生成逼真的3D模型,这种看似科幻电影中的技术如今已成为现实。Meshroom作为一款开源3D重建软件,通过摄影测量技术让每个人都能轻松实现从二维图像到三维模型的转换,真正将3D建模的门槛降到最低。…

作者头像 李华
网站建设 2026/4/27 23:30:39

LaTeX算法伪代码排版:展示ACE-Step生成逻辑的标准方式

LaTeX算法伪代码排版:展示ACE-Step生成逻辑的标准方式 在AI与艺术创作深度融合的今天,音乐生成模型正从实验室走向创作者的工作台。以ACE-Step为代表的新型扩散模型,由ACE Studio与阶跃星辰(StepFun)联合推出&#xff…

作者头像 李华
网站建设 2026/4/26 14:31:26

如何在本地部署GPT-OSS-20B:基于清华源加速HuggingFace镜像下载

如何在本地部署 GPT-OSS-20B:借助清华源加速 Hugging Face 模型下载 你有没有试过凌晨两点,守着一个进度条等模型权重下载?明明是 40GB 的文件,网速却卡在 50KB/s,眼睁睁看着 wget 命令像老式打印机一样一行行爬……这…

作者头像 李华
网站建设 2026/5/1 6:49:03

Source Han Serif 思源宋体:7款免费商用字体完全使用指南

Source Han Serif 思源宋体:7款免费商用字体完全使用指南 【免费下载链接】source-han-serif-ttf Source Han Serif TTF 项目地址: https://gitcode.com/gh_mirrors/so/source-han-serif-ttf 还在为寻找高质量中文字体而困扰吗?Source Han Serif …

作者头像 李华
网站建设 2026/4/14 2:30:02

归并排序与快速排序

归并排序介绍有时候我们需要把两个有序的序列进行合并从而得到有序的大序列,基于这个思想,便引出了归并排序,其思路就是把一个无序序列拆分成多个有序序列(当序列的元素为1时就是有序序列),然后对所有序列进行合并操作&#xff0c…

作者头像 李华