一、手动实现AsyncRLock
import asyncio from typing import Optional class AsyncRLock: def __init__(self): self._lock = asyncio.Lock() # 底层互斥锁 self._owner: Optional[asyncio.Task] = None # 当前持有锁的协程(Task) self._count = 0 # 可重入计数 async def acquire(self) -> bool: current_task = asyncio.current_task() if self._owner is current_task: # 同一协程重入,计数+1 self._count += 1 return True else: # 不同协程,需获取底层锁 await self._lock.acquire() self._owner = current_task self._count = 1 return True def release(self) -> None: current_task = asyncio.current_task() if self._owner is not current_task: raise RuntimeError("Cannot release un-acquired lock") self._count -= 1 if self._count == 0: self._owner = None self._lock.release() def locked(self) -> bool: return self._owner is not None def __enter__(self): raise RuntimeError( "Use 'async with' instead of 'with' for AsyncRLock" ) def __exit__(self, *args): pass async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc_val, exc_tb): self.release()✅ 二、使用示例(验证可重入性)
import asyncio async def inner(lock): async with lock: print("Inner acquired") return "inner" async def outer(lock): async with lock: print("Outer acquired") result = await inner(lock) print("Back in outer") return result async def other(lock): async with lock: print("Other task acquired") async def main(): rlock = AsyncRLock() # 测试可重入:outer → inner result = await outer(rlock) print("Result:", result) # 测试互斥:另一个协程必须等待 task = asyncio.create_task(other(rlock)) await asyncio.sleep(0.1) # 确保 other 尝试获取锁 print("Other task should be blocked...") await task # 此时才会执行 other asyncio.run(main())✅ 预期输出:
Outer acquired Inner acquired Back in outer Result: inner Other task should be blocked... Other task acquired⚠️ 三、关键注意事项与潜在问题
1.必须绑定到asyncio.Task
- 使用
asyncio.current_task()获取当前协程身份(而非线程 ID) - ❌ 错误做法:用
id(asyncio.current_task())或其他标识(Task 对象本身可哈希且唯一)
2.异常安全:确保release()总是被调用
- 必须通过
__aexit__自动释放(即只允许async with) - 手动调用
acquire/release容易漏掉release(尤其在异常路径)
3.不要混用with和async with
- 实现中
__enter__主动报错,防止误用同步上下文管理器
4.不支持跨事件循环
current_task()依赖当前 loop,不能在多个 loop 间共享
5.性能开销
- 每次
acquire都需检查current_task()(但开销极小) - 比
asyncio.Lock略慢,但远优于死锁
6.不能用于await表达式直接返回
- 设计为上下文管理器或显式 acquire/release,不是 awaitable
四、常见错误实现(避坑指南)
❌ 错误 1:用计数器但不绑定协程
# 危险!多个协程可能交错导致计数错误 class BadRLock: def __init__(self): self._count = 0 self._lock = asyncio.Lock() async def acquire(self): if self._count > 0: self._count += 1 # ❌ 无协程绑定,多协程会混乱 else: await self._lock.acquire() self._count = 1❌ 错误 2:忘记异常安全
async def bad_usage(lock): await lock.acquire() raise ValueError("Oops!") # release() 永远不会被调用! lock.release()✅正确:始终用async with!
✅ 六、总结
手动实现
AsyncRLock的核心是:
- 用
asyncio.Lock保证跨协程互斥- 用
current_task()绑定持有者- 用计数器支持可重入
- 通过
__aexit__保证异常安全