第29章 视频生成服务
29.1 概述
视频生成服务是剪映小助手的核心功能之一,负责将编辑完成的草稿导出为最终的视频文件。该服务采用异步处理模式,通过任务管理系统实现视频渲染的提交、处理和状态查询。服务支持云端渲染,能够处理复杂的视频合成任务,并提供实时的任务状态反馈。
29.2 核心实现
29.2.1 视频生成任务提交
视频生成服务的核心实现位于src/service/gen_video.py文件中,包含两个主要功能:
asyncdefgen_video(request:GenVideoRequest)->GenVideoResponse:"""提交视频生成任务"""logger.info(f"提交视频生成任务:{request.draft_url}")# 参数验证ifnotrequest.draft_url:raiseValueError("草稿URL不能为空")# 获取草稿缓存draft=awaitget_draft_cache(request.draft_url)ifnotdraft:raiseINVALID_DRAFT_URL# 生成任务IDtask_id=f"video_gen_{uuid.uuid4().hex}"# 创建任务数据task_data={'draft_url':request.draft_url,'draft_data':draft.to_dict(),'create_time':time.time(),'status':'pending','progress':0}# 提交任务到任务管理器try:task_manager.submit_task(task_id,'video_generation',task_data)logger.info(f"视频生成任务提交成功:{task_id}")returnGenVideoResponse(message=f"视频生成任务已提交,任务ID:{task_id}")exceptExceptionase:logger.error(f"视频生成任务提交失败:{str(e)}")raiseVIDEO_GENERATION_SUBMIT_FAILED29.2.2 任务状态查询
查询视频生成任务的状态:
asyncdefgen_video_status(task_id:str)->dict:"""查询视频生成任务状态"""logger.info(f"查询视频生成任务状态:{task_id}")# 参数验证ifnottask_id:raiseValueError("任务ID不能为空")# 从任务管理器获取任务状态try:task_info=task_manager.get_task_status(task_id)ifnottask_info:raiseVIDEO_TASK_NOT_FOUND logger.info(f"任务状态查询成功:{task_info.get('status')}")return{'task_id':task_id,'status':task_info.get('status'),'progress':task_info.get('progress',0),'message':task_info.get('message',''),'result_url':task_info.get('result_url'),'create_time':task_info.get('create_time'),'update_time':task_info.get('update_time'),'error':task_info.get('error')}exceptHTTPException:raiseexceptExceptionase:logger.error(f"任务状态查询失败:{str(e)}")raiseVIDEO_STATUS_QUERY_FAILED29.3 云端渲染与异步处理
29.3.1 任务管理系统
视频生成采用任务管理系统进行异步处理:
classTaskManager:"""任务管理器"""def__init__(self):self.tasks={}# 任务存储self.workers={}# 工作进程self.task_queue=asyncio.Queue()# 任务队列defsubmit_task(self,task_id:str,task_type:str,task_data:dict):"""提交任务"""task_info={'task_id':task_id,'task_type':task_type,'task_data':task_data,'status':'pending','progress':0,'create_time':time.time(),'update_time':time.time()}# 存储任务信息self.tasks[task_id]=task_info# 添加到处理队列asyncio.create_task(self.process_task(task_id))returntask_idasyncdefprocess_task(self,task_id:str):"""处理任务"""task_info=self.tasks.get(task_id)ifnottask_info:returntry:# 更新状态为处理中task_info['status']='processing'task_info['progress']=0task_info['update_time']=time.time()# 执行视频生成iftask_info['task_type']=='video_generation':awaitself.generate_video(task_id,task_info)exceptExceptionase:# 处理失败task_info['status']='failed'task_info['error']=str(e)task_info['update_time']=time.time()logger.error(f"任务处理失败{task_id}:{str(e)}")29.3.2 视频渲染流程
视频渲染的具体实现流程:
asyncdefgenerate_video(self,task_id:str,task_info:dict):"""生成视频"""task_data=task_info['task_data']draft_data=task_data['draft_data']try:# 步骤1: 准备渲染环境logger.info(f"开始准备渲染环境:{task_id}")task_info['progress']=10task_info['message']='准备渲染环境'# 创建临时工作目录work_dir=f"/tmp/video_gen_{task_id}"os.makedirs(work_dir,exist_ok=True)# 步骤2: 解析草稿数据logger.info(f"解析草稿数据:{task_id}")task_info['progress']=20task_info['message']='解析草稿数据'draft=Draft.from_dict(draft_data)# 步骤3: 准备素材文件logger.info(f"准备素材文件:{task_id}")task_info['progress']=30task_info['message']='准备素材文件'awaitself.prepare_materials(draft,work_dir)# 步骤4: 视频合成logger.info(f"开始视频合成:{task_id}")task_info['progress']=50task_info['message']='视频合成中'output_file=awaitself.compose_video(draft,work_dir)# 步骤5: 上传结果文件logger.info(f"上传结果文件:{task_id}")task_info['progress']=80task_info['message']='上传结果文件'result_url=awaitself.upload_result(output_file)# 步骤6: 完成任务logger.info(f"视频生成完成:{task_id}")task_info['status']='completed'task_info['progress']=100task_info['message']='视频生成完成'task_info['result_url']=result_url task_info['update_time']=time.time()# 清理临时文件shutil.rmtree(work_dir,ignore_errors=True)exceptExceptionase:logger.error(f"视频生成失败{task_id}:{str(e)}")task_info['status']='failed'task_info['error']=str(e)task_info['update_time']=time.time()# 清理临时文件ifos.path.exists(work_dir):shutil.rmtree(work_dir,ignore_errors=True)29.3.3 视频合成实现
视频合成的具体实现:
asyncdefcompose_video(self,draft:Draft,work_dir:str)->str:"""合成视频"""# 输出文件路径output_file=os.path.join(work_dir,'output.mp4')# 构建FFmpeg命令cmd=['ffmpeg','-y',# 覆盖输出文件'-f','rawvideo','-vcodec','rawvideo','-s',f'{draft.width}x{draft.height}','-pix_fmt','rgb24','-r',str(draft.fps),'-i','-',# 从标准输入读取'-f','mp4','-vcodec','libx264','-pix_fmt','yuv420p',output_file]# 执行视频合成process=awaitasyncio.create_subprocess_exec(*cmd,stdin=asyncio.subprocess.PIPE,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE)# 生成视频帧数据frame_data=awaitself.generate_frames(draft)# 写入帧数据stdout,stderr=awaitprocess.communicate(input=frame_data)ifprocess.returncode!=0:raiseException(f"视频合成失败:{stderr.decode()}")returnoutput_file29.4 任务状态管理
29.4.1 任务状态定义
视频生成任务的状态定义:
classTaskStatus:PENDING="pending"# 待处理PROCESSING="processing"# 处理中COMPLETED="completed"# 已完成FAILED="failed"# 已失败CANCELLED="cancelled"# 已取消TIMEOUT="timeout"# 超时29.4.2 任务进度更新
实时更新任务进度:
defupdate_task_progress(self,task_id:str,progress:int,message:str):"""更新任务进度"""task_info=self.tasks.get(task_id)iftask_info:task_info['progress']=progress task_info['message']=message task_info['update_time']=time.time()# 发送进度更新事件self.send_progress_event(task_id,progress,message)29.4.3 任务超时处理
处理任务超时情况:
asyncdefcheck_task_timeout(self):"""检查任务超时"""current_time=time.time()timeout_seconds=3600# 1小时超时fortask_id,task_infoinself.tasks.items():iftask_info['status']=='processing':ifcurrent_time-task_info['update_time']>timeout_seconds:# 任务超时task_info['status']='timeout'task_info['error']='任务处理超时'task_info['update_time']=current_time logger.warning(f"任务超时:{task_id}")29.5 数据结构定义
29.5.1 请求参数模型
视频生成服务的请求参数定义在src/schemas/gen_video.py中:
classGenVideoRequest(BaseModel):"""根据草稿导出视频"""draft_url:str=Field(default="",description="草稿URL")29.5.2 响应参数模型
classGenVideoResponse(BaseModel):"""生成视频响应参数"""message:str=Field(...,description="响应消息")29.6 异常处理
视频生成服务定义了完善的异常处理机制:
# 无效的草稿URLINVALID_DRAFT_URL=HTTPException(status_code=400,detail="无效的草稿URL")# 视频生成任务提交失败VIDEO_GENERATION_SUBMIT_FAILED=HTTPException(status_code=500,detail="视频生成任务提交失败")# 视频任务未找到VIDEO_TASK_NOT_FOUND=HTTPException(status_code=404,detail="视频生成任务未找到")# 视频状态查询失败VIDEO_STATUS_QUERY_FAILED=HTTPException(status_code=500,detail="视频生成任务状态查询失败")29.7 API接口定义
29.7.1 视频生成接口
@router.post("/genVideo",response_model=GenVideoResponse)asyncdefgen_video_endpoint(request:GenVideoRequest):"""生成视频"""try:returnawaitgen_video(request)exceptExceptionase:logger.error(f"生成视频失败:{str(e)}")raiseVIDEO_GENERATION_SUBMIT_FAILED29.7.2 任务状态查询接口
@router.get("/genVideoStatus/{task_id}")asyncdefgen_video_status_endpoint(task_id:str):"""查询视频生成任务状态"""try:returnawaitgen_video_status(task_id)exceptExceptionase:logger.error(f"查询视频状态失败:{str(e)}")raiseVIDEO_STATUS_QUERY_FAILED29.8 使用示例
29.8.1 视频生成请求示例
{"draft_url":"capcut://draft/123456789"}29.8.2 视频生成响应示例
{"message":"视频生成任务已提交,任务ID: video_gen_abc123def456"}29.8.3 任务状态查询响应示例
{"task_id":"video_gen_abc123def456","status":"processing","progress":65,"message":"视频合成中","result_url":null,"create_time":1640995200,"update_time":1640995265,"error":null}29.9 性能优化
视频生成服务采用了多种性能优化策略:
29.9.1 异步处理优化
# 使用连接池管理HTTP连接connector=aiohttp.TCPConnector(limit=100,# 连接池大小limit_per_host=30,# 每个主机的连接数ttl_dns_cache=300,# DNS缓存时间use_dns_cache=True,# 启用DNS缓存)# 创建会话session=aiohttp.ClientSession(connector=connector,timeout=aiohttp.ClientTimeout(total=30))29.9.2 内存管理优化
# 使用内存映射文件处理大文件importmmapdefprocess_large_file(file_path):withopen(file_path,'rb')asf:# 创建内存映射withmmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)asmm:# 处理文件内容returnprocess_data(mm)29.9.3 缓存优化
# 使用LRU缓存fromfunctoolsimportlru_cache@lru_cache(maxsize=128)defget_video_template(template_id):"""获取视频模板(带缓存)"""returnload_template_from_db(template_id)29.10 扩展性设计
视频生成服务具有良好的扩展性:
- 渲染引擎扩展:支持接入不同的视频渲染引擎
- 输出格式扩展:支持多种视频输出格式
- 云服务商扩展:支持多个云渲染服务商
- 任务类型扩展:支持添加新的任务类型
附录
代码仓库地址:
- GitHub:
https://github.com/Hommy-master/capcut-mate - Gitee:
https://gitee.com/taohongmin-gitee/capcut-mate
接口文档地址:
- API文档地址:
https://docs.jcaigc.cn