大家好,我是jobleap.cn的小九。
concurrent.futures 是 Python 标准库中用于简化并发编程的核心模块,基于抽象的Executor类封装了ThreadPoolExecutor(线程池)和ProcessPoolExecutor(进程池),无需手动管理线程/进程的生命周期,即可高效实现并行任务执行。本教程将全面拆解其所有常用 API,通过连贯的代码示例串联核心用法,并结合实战场景说明适用场景,帮助你快速掌握并发编程的关键技巧。
一、核心组件概述
concurrent.futures 的核心是Executor抽象基类,定义了并发执行任务的通用接口,其子类适配不同的并发场景:
- ThreadPoolExecutor:基于线程的执行器,适用于IO密集型任务(如网络请求、文件读写、数据库操作),受 GIL(全局解释器锁)限制,无法真正并行执行CPU密集任务。
- ProcessPoolExecutor:基于进程的执行器,适用于CPU密集型任务(如数值计算、数据处理),通过多进程绕开 GIL,实现真正的并行。
二、常用 API 全解析(串联式讲解)
我们从「创建执行器」到「任务管理」再到「资源释放」,逐步拆解所有核心 API,并通过连贯的代码示例展示用法。
2.1 基础:创建执行器(构造方法)
构造执行器的核心参数决定了并发能力,不同执行器的参数略有差异:
| 参数 | ThreadPoolExecutor | ProcessPoolExecutor | 说明 |
|---|---|---|---|
| max_workers | 可选(默认:min(32, CPU核心数+4)) | 可选(默认:CPU核心数) | 最大工作线程/进程数 |
| thread_name_prefix | 可选 | 不支持 | 线程名前缀(便于调试) |
| mp_context | 不支持 | 可选 | 多进程上下文(如spawn/fork) |
示例(创建执行器):
importconcurrent.futuresimportosimportmultiprocessing# 1. 创建线程池thread_executor=concurrent.futures.ThreadPoolExecutor(max_workers=5,# 最大5个工作线程thread_name_prefix="demo_thread_"# 线程名前缀:demo_thread_0、demo_thread_1...)# 2. 创建进程池(Windows系统需指定spawn上下文)process_executor=concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count(),# 按CPU核心数设置进程数mp_context=multiprocessing.get_context('spawn'))2.2 提交单个任务:submit()
submit(fn, *args, **kwargs)是提交单个异步任务的核心方法,返回Future对象(封装任务的执行状态、结果、异常)。
示例:
importtime# 定义基础任务函数(模拟IO/CPU操作)deftask(name,delay):"""简单任务:延迟指定秒数后返回结果"""print(f"任务{name}启动(延迟{delay}秒)")time.sleep(delay)# 模拟IO等待(线程池适配场景)print(f"任务{name}完成")returnf"任务{name}结果:延迟{delay}秒"# 提交任务到线程池future1=thread_executor.submit(task,"A",2)# 任务A:延迟2秒future2=thread_executor.submit(task,"B",1)# 任务B:延迟1秒# Future对象初始状态(任务未完成)print(f"任务A是否完成:{future1.done()}")# 输出:Falseprint(f"任务B是否完成:{future2.done()}")# 输出:False2.3 获取任务结果:result()
future.result(timeout=None)阻塞等待任务完成并返回结果,若指定timeout,超时则抛出concurrent.futures.TimeoutError。
示例(接上文):
# 阻塞获取结果(等待任务完成)result1=future1.result()# 等待任务A完成(约2秒)result2=future2.result(timeout=3)# 超时时间3秒(足够完成)print(f"任务A结果:{result1}")# 输出:任务 A 结果:延迟 2 秒print(f"任务B结果:{result2}")# 输出:任务 B 结果:延迟 1 秒# 任务完成后状态变为Trueprint(f"任务A是否完成:{future1.done()}")# 输出:True2.4 任务状态管理:done() & cancel()
future.done():判断任务是否完成(正常完成/异常/取消)。future.cancel():尝试取消未开始执行的任务,返回布尔值(已启动则取消失败)。
示例:
# 提交一个排队的任务(线程池满时任务会排队)future3=thread_executor.submit(task,"C",5)# 尝试取消任务C(若未启动则成功)cancel_success=future3.cancel()print(f"取消任务C是否成功:{cancel_success}")# 线程池空闲则失败,否则成功ifnotcancel_success:# 未取消则等待结果result3=future3.result()print(f"任务C结果:{result3}")else:print("任务C已取消")2.5 按完成顺序迭代任务:as_completed()
concurrent.futures.as_completed(fs, timeout=None)返回迭代器,按任务完成顺序生成Future对象(先完成的先返回),解决result()按提交顺序阻塞的问题。
示例:
# 提交多个任务(提交顺序:3秒、1秒、2秒)futures=[thread_executor.submit(task,f"Task-{i}",i)foriin[3,1,2]]# 按完成顺序处理结果(Task-1 → Task-2 → Task-3)print("\n按完成顺序获取结果:")forfutureinconcurrent.futures.as_completed(futures,timeout=10):try:result=future.result()print(result)exceptExceptionase:print(f"任务执行异常:{e}")2.6 批量执行任务:map()
executor.map(fn, *iterables, timeout=None, chunksize=1)批量提交任务,类似内置map,但异步执行;返回迭代器,按提交顺序返回结果(即使后面任务先完成,也会等待前面的)。
chunksize(仅ProcessPoolExecutor):分块处理迭代器,减少进程间通信开销(建议CPU密集任务设为1000+)。
示例:
# 批量任务参数(提交顺序:D、E、F)task_args=[("D",1),("E",2),("F",3)]# 封装任务函数(适配map的单参数输入)deftask_wrapper(args):returntask(*args)# 批量提交并按提交顺序获取结果print("\n按提交顺序获取map结果:")results=thread_executor.map(task_wrapper,task_args,timeout=10)forresultinresults:print(result)# 输出顺序:D → E → F2.7 等待任务集合:wait()
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)等待指定任务集合完成,返回两个集合:
done:已完成(正常/异常/取消)的任务。not_done:未完成的任务。
return_when可选值:
FIRST_COMPLETED:任意一个任务完成即返回。FIRST_EXCEPTION:任意一个任务抛异常即返回(无异常则等全部完成)。ALL_COMPLETED(默认):所有任务完成才返回。
示例:
# 提交任务集合futures_wait=[thread_executor.submit(task,f"Wait-{i}",i)foriin[2,1,3]]# 等待第一个任务完成done,not_done=concurrent.futures.wait(futures_wait,timeout=5,return_when=concurrent.futures.FIRST_COMPLETED)print(f"\n完成的任务数:{len(done)}")# 输出:1print(f"未完成的任务数:{len(not_done)}")# 输出:2# 处理已完成任务forfutureindone:print(f"已完成任务结果:{future.result()}")# 等待剩余任务全部完成concurrent.futures.wait(not_done,return_when=concurrent.futures.ALL_COMPLETED)2.8 释放资源:shutdown()
executor.shutdown(wait=True, cancel_futures=False)关闭执行器,释放线程/进程资源:
wait=True(默认):阻塞等待所有已提交任务完成。cancel_futures=False(默认):若wait=False,是否取消未启动的任务(Python 3.9+ 支持)。
最佳实践:使用with语句,自动调用shutdown(),无需手动管理:
# with语句自动关闭执行器(推荐)print("\n使用with语句管理线程池:")withconcurrent.futures.ThreadPoolExecutor(max_workers=3)asexecutor:future4=executor.submit(task,"G",1)future5=executor.submit(task,"H",2)print(future4.result())print(future5.result())# 退出with块后,executor自动shutdown,释放资源三、异常处理
任务内的异常不会立即抛出,仅在调用result()/map()时触发,需通过 try-except 捕获。
示例:
# 定义抛异常的任务deferror_task():raiseValueError("任务执行出错!")# 捕获单个任务异常withconcurrent.futures.ThreadPoolExecutor(max_workers=1)asexecutor:future_error=executor.submit(error_task)try:future_error.result()exceptValueErrorase:print(f"\n捕获任务异常:{e}")# 输出:捕获任务异常:任务执行出错!# 捕获map批量任务异常deferror_map_task(x):ifx==2:raiseRuntimeError(f"x={x}执行失败")returnx*2withconcurrent.futures.ThreadPoolExecutor(max_workers=2)asexecutor:try:results=executor.map(error_map_task,[1,2,3])forresinresults:# 迭代时抛出异常print(res)exceptRuntimeErrorase:print(f"\nmap任务异常:{e}")# 输出:map任务异常:x=2 执行失败四、ThreadPoolExecutor vs ProcessPoolExecutor
| 特性 | ThreadPoolExecutor | ProcessPoolExecutor |
|---|---|---|
| 底层实现 | 线程 | 进程 |
| GIL 影响 | 受GIL限制,CPU密集任务低效 | 绕开GIL,CPU密集任务高效 |
| 内存共享 | 线程共享进程内存(易数据竞争) | 进程间内存隔离(需IPC) |
| 启动开销 | 低 | 高(进程创建/销毁成本高) |
| 适用场景 | IO密集(爬虫、文件读写) | CPU密集(计算、数据处理) |
对比示例(CPU密集任务)
# CPU密集任务:计算斐波那契数deffib(n):ifn<=2:return1returnfib(n-1)+fib(n-2)if__name__=="__main__":# Windows进程池必须在main函数内执行importtime# 测试数据(4个fib(35)任务)fib_nums=[35,35,35,35]# 1. 线程池执行(低效)start=time.time()withconcurrent.futures.ThreadPoolExecutor(max_workers=4)asexecutor:executor.map(fib,fib_nums)print(f"\n线程池执行时间:{time.time()-start:.2f}秒")# 2. 进程池执行(高效)start=time.time()withconcurrent.futures.ProcessPoolExecutor(max_workers=4)asexecutor:executor.map(fib,fib_nums)print(f"进程池执行时间:{time.time()-start:.2f}秒")结果:进程池执行时间约为线程池的 1/4(CPU核心数为4时)。
五、实战案例
案例1:线程池爬取网页(IO密集)
importrequestsfromconcurrent.futuresimportThreadPoolExecutor,as_completed# 目标网址列表URLS=['https://www.baidu.com','https://www.taobao.com','https://www.python.org','https://www.github.com']# 爬取单个网页deffetch_url(url):try:response=requests.get(url,timeout=10)return{'url':url,'status':response.status_code,'length':len(response.content)}exceptExceptionase:return{'url':url,'error':str(e)}if__name__=="__main__":# 线程池爬取(IO密集场景)withThreadPoolExecutor(max_workers=5)asexecutor:# 提交任务并映射URL与Futurefuture_to_url={executor.submit(fetch_url,url):urlforurlinURLS}# 按完成顺序输出结果forfutureinas_completed(future_to_url):url=future_to_url[future]result=future.result()if'error'inresult:print(f"爬取{url}失败:{result['error']}")else:print(f"爬取{url}成功:状态码{result['status']},内容长度{result['length']}")案例2:进程池处理数据计算(CPU密集)
importnumpyasnpfromconcurrent.futuresimportProcessPoolExecutor# CPU密集任务:计算数组均值defcompute_mean(arr):returnnp.mean(arr)if__name__=="__main__":# 生成10个大数组(每个100万元素)data=[np.random.rand(1_000_000)for_inrange(10)]# 进程池并行计算withProcessPoolExecutor(max_workers=4)asexecutor:means=list(executor.map(compute_mean,data))# 输出结果print("\n各数组均值:")fori,meaninenumerate(means):print(f"数组{i+1}:{mean:.6f}")六、注意事项
- GIL 限制:ThreadPoolExecutor 仅适用于IO密集任务,CPU密集任务必须用 ProcessPoolExecutor。
- 资源释放:务必通过
with语句或shutdown()关闭执行器,避免线程/进程泄漏。 - 进程池约束:Windows系统中,进程池任务必须放在
if __name__ == "__main__":内(避免递归创建进程)。 - 任务粒度:小任务(如微秒级)会导致调度开销大于执行开销,需合并后提交。
- 超时处理:所有阻塞方法(
result()/as_completed()/wait())建议设置timeout,避免无限阻塞。 - 数据共享:进程池无法共享内存,需通过
multiprocessing.Queue/Pipe实现进程间通信。
总结
concurrent.futures 是Python并发编程的高效工具,核心用法可总结为:
- 选执行器:IO密集用
ThreadPoolExecutor,CPU密集用ProcessPoolExecutor。 - 提任务:
submit()提交单个任务(返回Future),map()批量提交(按序返回结果)。 - 处理结果:
as_completed()按完成顺序迭代,wait()批量等待任务状态。 - 控资源:优先用
with语句自动管理执行器生命周期。
掌握这些API和实战技巧,即可快速实现Python程序的并发/并行执行,大幅提升IO/CPU密集任务的执行效率。