news 2026/5/1 10:10:28

minio分片上传

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
minio分片上传

minio分片上传

    • 前言
    • 分片上传的技术选择
    • 实现分片上传
      • 第一步自定义客户端
      • 初始化分片上传
        • 获取一个上传分片的url共前端使用
      • 确认每一个分片的上传
      • 合并分片文件

前言

为什么要选择将一个大文件拆分成许多小文件来上传?

  • 对于许多服务器和应用框架来说,单次HTTP请求的大小有严格的限制。
  • 上传一个10GB的大文件可能需要数个小时,单一的http请求连接不稳定,造成上传失败。
  • 降低服务器的压力
  • 其次分片上传可以实现断点续传、秒上传等功能

分片上传的技术选择

分片上传分为两种,一种是经过后端服务器来上传,另外一种是前端直传。

  • 第一种是由后端接受分片文件流,在调用minio的api上传
  • 第二种就是有后端生成预签名url返回给前端,前端上传到minio。
    一般我们选择后者,第二种利用客户端宽带,减轻了服务器的压力

实现分片上传

流程图

第一步自定义客户端

packagecom.petlife.base.config;importcom.google.common.collect.Multimap;importio.minio.CreateMultipartUploadResponse;importio.minio.ListPartsResponse;importio.minio.MinioClient;importio.minio.ObjectWriteResponse;importio.minio.errors.*;importio.minio.messages.Part;importjava.io.IOException;importjava.security.InvalidKeyException;importjava.security.NoSuchAlgorithmException;publicclassBigFileClientextendsMinioClient{publicBigFileClient(MinioClientclient){super(client);}/** * 创建分片上传任务 - 在MinIO服务器上初始化一个分片上传会话 * 获取uploadId - 返回一个唯一的uploadId,用于标识这次分片上传会话 * @param bucket 存储桶名称 * @param region 区域 (null 表示默认区域) * @param object 对象名称 (文件路径) * @param headers 额外的请求头 * @param extraQueryParams 额外的查询参数 * @return 上传ID */publicStringinitMultiPartUpload(Stringbucket,Stringregion,Stringobject,Multimap<String,String>headers,Multimap<String,String>extraQueryParams){try{CreateMultipartUploadResponseresponse=this.createMultipartUpload(bucket,region,object,headers,extraQueryParams);returnresponse.result().uploadId();}catch(IOException|InvalidKeyException|NoSuchAlgorithmException|InsufficientDataException|ServerException|InternalException|XmlParserException|InvalidResponseException|ErrorResponseExceptione){thrownewRuntimeException("初始化分片上传失败",e);}}/** * 合并分片上传 - 将之前初始化的分片上传任务合并为一个完整的对象 * @param bucketName 存储桶名称 * @param region 区域 (null 表示默认区域) * @param objectName 对象名称 (文件路径) * @param uploadId 上传ID (由initMultiPartUpload返回) * @param parts 分片信息数组 * @param extraHeaders 额外的请求头 * @param extraQueryParams 额外的查询参数 * @return 合并后的对象写入响应 */publicObjectWriteResponsemergeMultipartUpload(StringbucketName,Stringregion,StringobjectName,StringuploadId,Part[]parts,Multimap<String,String>extraHeaders,Multimap<String,String>extraQueryParams){try{returnthis.completeMultipartUpload(bucketName,region,objectName,uploadId,parts,extraHeaders,extraQueryParams);}catch(IOException|InvalidKeyException|NoSuchAlgorithmException|InsufficientDataException|ServerException|InternalException|XmlParserException|InvalidResponseException|ErrorResponseExceptione){thrownewRuntimeException("合并分片上传失败",e);}}/** * 列出分片上传 - 获取已上传的分片信息 * @param bucketName 存储桶名称 * @param region 区域 (null 表示默认区域) * @param objectName 对象名称 (文件路径) * @param maxParts 最大返回分片数量 (null 表示默认值) * @param partNumberMarker 分片编号标记 (null 表示从第一个分片开始) * @param uploadId 上传ID (由initMultiPartUpload返回) * @param extraHeaders 额外的请求头 * @param extraQueryParams 额外的查询参数 * @return ListPartsResponse 包含的重要信息是:List<Part> 分片信息列表 可以通过part.partNumber()获取分片编号,part.size()获取分片大小,part.etag()获取分片的ETag * 可以通过listPartsResponse.parts()获取分片信息列表 */publicListPartsResponselistMultipart(StringbucketName,Stringregion,StringobjectName,IntegermaxParts,IntegerpartNumberMarker,StringuploadId,Multimap<String,String>extraHeaders,Multimap<String,String>extraQueryParams){try{returnthis.listParts(bucketName,region,objectName,maxParts,partNumberMarker,uploadId,extraHeaders,extraQueryParams);}catch(IOException|InvalidKeyException|NoSuchAlgorithmException|InsufficientDataException|ServerException|InternalException|XmlParserException|InvalidResponseException|ErrorResponseExceptione){thrownewRuntimeException("列出分片上传失败",e);}}/** * 简化版初始化分片上传 * @param bucketName 存储桶名称 * @param objectName 对象名称 * @return 上传ID */publicStringinitMultiPartUpload(StringbucketName,StringobjectName){returninitMultiPartUpload(bucketName,null,objectName,null,null);}/** * 简化版合并分片上传 * @param bucketName 存储桶名称 * @param objectName 对象名称 * @param uploadId 上传ID * @param parts 分片数组 * @return 合并结果 */publicObjectWriteResponsemergeMultipartUpload(StringbucketName,StringobjectName,StringuploadId,Part[]parts){returnmergeMultipartUpload(bucketName,null,objectName,uploadId,parts,null,null);}publicbooleancancelMultiPartUpload(StringbucketName,StringobjectName,StringuploadId){try{this.abortMultipartUpload(bucketName,null,objectName,uploadId,null,null);returntrue;}catch(Exceptione){returnfalse;}}}

初始化分片上传

对于分片上传任务的数据库设计,当前的版本比较粗糙。主要是对于已上传文件和分片任务的记录。

createtableifnotexists`pet-life`.upload_task(idintauto_incrementcomment'主键id'primarykey,chunk_indexintnullcomment'分片索引',chunk_etagintnullcomment'分片标签',statusintdefault0notnullcomment'上传状态(1:已经上传 0 : 未上传)',srcvarchar(50)nullcomment'上传路径',expired_timedatetimenullcomment'过期时间')comment'文件上传任务';createtableifnotexists`pet-life`.file_record(idbigintauto_incrementprimarykey,file_namevarchar(100)nullcomment'文件名',file_hashvarchar(50)nullcomment'文件唯一标识',srcvarchar(50)nullcomment'文件在minio中的存储路径',file_sizeintnullcomment'文件大小',is_mergedtinyintnullcomment'是否合并 0 否 1 是',upload_idvarchar(30)nullcomment'上传id',expired_timedatetimenullcomment'过期时间')comment'文件上传记录';
获取一个上传分片的url共前端使用
/** * 获取分一个分片的上传的url * @param uploadId 上传ID * @param chunkIndex 分片索引 * @return 上传URL */privateStringgenerateUploadFileURL(StringuploadId,intchunkIndex){try{Map<String,String>extraQueryParams=newHashMap<>();extraQueryParams.put("uploadId",uploadId);extraQueryParams.put("partNumber",String.valueOf(chunkIndex+1));returnminioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().method(Method.PUT).bucket(bucketName).object(uploadId+"/"+chunkIndex).extraQueryParams(extraQueryParams).expiry(1,TimeUnit.DAYS).build());}catch(Exceptione){thrownewRuntimeException("生成上传URL失败:"+e.getMessage());}}

初始化分上传任务逻辑:
要考虑该文件是否已经上传过了和断点续传的情况。

  • 根据文件唯一标识通过数据库查询,如果已经上传过了,则直接返回访问路径
  • 如果是断电续传的情况则返回完成上传的分片
  • 如果从未上传过,则直接创建一个新的分片上传任务。
publicResponseDTO<UploadDTO>upload(FileFormfileForm){// 1. 校验文件是否已上传StringfileHash=fileForm.getFileHash();UploadDTOresult=newUploadDTO();FileRecordfileRecord=fileRecordService.getBaseMapper().selectOne(newLambdaQueryWrapper<FileRecord>().eq(FileRecord::getFileHash,fileHash).ge(FileRecord::getExpiredTime,LocalDateTime.now()));//1.1 文件记录不存在,开启新的分片上传任务if(fileRecord==null){// 1.1.1 文件记录不存在,开启新的分片上传任务Map<String,Object>result1=initChunkUpload02(fileForm.getFileName(),fileForm.getChunkCount());StringuploadId=(String)result1.get("uploadId");List<UploadDTO.chunkInfo>chunkList=(List<UploadDTO.chunkInfo>)result1.get("chunkList");// 1.1.2 创建上传任务记录synchronized(fileHash){CompletableFuture.runAsync(()->{for(UploadDTO.chunkInfo chunkInfo:chunkList){UploadTaskuploadTask=newUploadTask().setChunkIndex(chunkInfo.getChunkIndex()).setSrc(chunkInfo.getSrc()).setUploadId(uploadId).setStatus(0).setExpiredTime(LocalDateTime.now().plusDays(1));uploadTaskService.getBaseMapper().insert(uploadTask);}},threadPoolTaskExecutor);// 1.1.3 保存文件记录fileRecord=newFileRecord().setFileHash(fileHash).setFileName(fileForm.getFileName()).setFileSize(fileForm.getFileSize()).setSrc(null).setIsMerged(0).setUploadId(uploadId).setExpiredTime(LocalDateTime.now().plusDays(1));fileRecordService.getBaseMapper().insert(fileRecord);}//1.1.3 保存分片上传人物result.setUploadId(uploadId).setExist(false).setChunkList(chunkList).setObjectName(fileRecord.getFileName());returnResponseDTO.success(result);}if(fileRecord.getIsMerged()==1){// 1.1文件已上传,返回已上传的URLresult.setExist(true);result.setSrc(fileRecord.getSrc());returnResponseDTO.success(result);}//1.2 文件未上传 判断是否是断点上传List<UploadTask>uploadTasks=uploadTaskService.getBaseMapper().selectList(newLambdaQueryWrapper<UploadTask>().eq(UploadTask::getUploadId,fileRecord.getUploadId()).eq(UploadTask::getStatus,0));if(!uploadTasks.isEmpty()){List<UploadDTO.chunkInfo>chunkList=newArrayList<>();for(UploadTaskuploadTask:uploadTasks){UploadDTO.chunkInfo chunkInfo=newUploadDTO.chunkInfo();chunkInfo.setChunkIndex(uploadTask.getChunkIndex());chunkInfo.setSrc(uploadTask.getSrc());chunkList.add(chunkInfo);}result.setExist(true);result.setChunkList(chunkList);returnResponseDTO.success(result);}// 1.2.2 不是断点上传,返回空列表result.setExist(false);returnResponseDTO.fail("服务器数据库异常");}

确认每一个分片的上传

每一个分片完成上传,前端向后端发送一个请求,确认分片上传的状态。

publicResponseDTO<Boolean>confirmChunkUpload(ConfirmChunkFormconfirmChunkForm){UploadTaskuploadTask=newUploadTask();uploadTask.setUploadId(confirmChunkForm.getUploadId());uploadTask.setChunkEtag(confirmChunkForm.getChunkEtag());uploadTask.setChunkIndex(confirmChunkForm.getChunkIndex());if(confirmChunkForm.getSuccess()){uploadTask.setStatus(1);}else{uploadTask.setStatus(2);}intupdate=uploadTaskService.getBaseMapper().update(uploadTask,newLambdaUpdateWrapper<UploadTask>().eq(UploadTask::getUploadId,confirmChunkForm.getUploadId()).eq(UploadTask::getChunkIndex,confirmChunkForm.getChunkIndex()));returnResponseDTO.success(update>0);}

合并分片文件

可以加上对分片任务的校验和数据库的更改,这里就不加了。

publicResponseDTO<String>mergeChunk(StringuploadID,StringobjectName){try{Part[]parts=bigFileClient.listMultipart(bucketName,null,objectName,null,null,uploadID,null,null).result().partList().toArray(newPart[0]);ObjectWriteResponseobjectWriteResponse=bigFileClient.mergeMultipartUpload(bucketName,objectName,uploadID,parts);// 构建访问URLStringfinalObjectName=objectWriteResponse.object();Stringsrc=endpoint+"/"+bucketName+"/"+finalObjectName;returnResponseDTO.success(src);}catch(Exceptione){thrownewRuntimeException("合并分片失败:"+e.getMessage());}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 8:56:20

19、深入了解SMB协议:实现Linux与Windows的集成

深入了解SMB协议:实现Linux与Windows的集成 1. Windows 98与Samba的性能问题 Windows 98的资源管理器(可能还有其他程序)在向网络共享发送写入请求时,会错误地设置 “sync” 位。这会导致严重的性能下降,因为Samba会在每次写入后正确地对文件执行 fsync() 操作。再加上…

作者头像 李华
网站建设 2026/5/1 4:25:59

24、私有 IP 网络地址分配解析

私有 IP 网络地址分配解析 1. 引言 在本文语境中,企业是指自主运营使用 TCP/IP 网络,并能自主决定网络内地址规划和分配的实体。本文着重探讨私有互联网的地址分配问题。这种分配方式既能保证企业内部所有主机间的网络层连通性,也能确保不同企业的公共主机间的连通性。不过…

作者头像 李华
网站建设 2026/5/1 7:27:55

14、UNIX/Linux Shell编程实用指南

UNIX/Linux Shell编程实用指南 1. 检测并处理崩溃生成的文件 在程序崩溃时,有时会生成一个名为 core 的文件,这个文件通常很大,往往需要将其删除。下面我们将编写一个脚本,每分钟检查一次主目录中是否生成了 core 文件,如果生成了,就在终端输出警告信息并终止脚本。…

作者头像 李华
网站建设 2026/5/1 6:26:41

自然语言处理容易混淆知识点(一)c-TF-IDF和TF-IDF的区别

c-TF-IDF 和 TF-IDF 什么是 c-TF-IDF&#xff1f;传统 TF-IDFc-TF-IDF&#xff08;基于类的 TF-IDF&#xff09; c-TF-IDF 的计算公式直观理解在 BERTopic 中的工作流程代码示例&#xff1a;使用 c-TF-IDF与传统 TF-IDF 对比c-TF-IDF 的优势自定义 c-TF-IDF 参数可视化 c-TF-ID…

作者头像 李华
网站建设 2026/5/1 8:30:53

AI时代裁员潮真相:是AI夺走了工作,还是企业转型的必然?

简介 文章探讨了科技行业裁员潮中AI的真实角色。AI虽提高效率降低成本&#xff0c;但经济下行、过度扩张和市场竞争也是重要因素。企业正进行战略转型&#xff0c;将资源从传统业务转向AI领域&#xff0c;这不仅是成本削减&#xff0c;更是人才结构重构。AI带来的是劳动力转型&…

作者头像 李华
网站建设 2026/5/1 7:28:17

GEO 3小问:一文搞懂 AI 搜索时代的 “品牌曝光关键”

1. 问&#xff1a;到底什么是 GEO&#xff1f;和传统搜索优化不一样吗&#xff1f;答&#xff1a;GEO 全称 “AI 搜索优化”&#xff0c;核心是让品牌精准出现在用户用 AI 提问的答案里 —— 比如用户问 AI “北京靠谱的装修公司”“国产好口碑奶粉”&#xff0c;GEO 能让你的品…

作者头像 李华