用户看到的是一行torch.nn.functional.softmax(x),背后 runtime 要做:分配 Stream、入队命令、调度到 AI Core、等待完成、同步结果。如果这一行的延迟是 10μs,runtime 的调度开销必须 < 0.5μs——否则就是 5% 的性能损失。
runtime 的 Stream 调度引擎管理 32 个命令队列(Command Queue),每个对应一个硬件 Stream。命令入队后,Dispatcher 根据资源(Cube/Vector/L1/HBM 带宽)做调度——不是先到先得,是 resource-aware 调度。
Stream 架构
runtime Stream 架构 每个 Stream 独立维护命令队列,32 个 Stream 共享 64 个 AI Core Stream 0:cmd_1,cmd_2,... → AI Core 0,1 (Cube/Vector) Stream 1:cmd_1,cmd_2,... → AI Core 2,3 (Cube/Vector) ... Dispatcher 调度器:根据资源可用性决定哪个 Stream 的命令可以执行// runtime/stream/stream_engine.hclassStreamEngine{private:staticconstexprintMAX_STREAMS=32;structCommand{uint64_tid;KernelPtr kernel;void*args;intstream_id;uint64_tenqueue_time;ResourceRequirement resources;};structCommandQueue{std::queue<Command>pending;std::queue<Command>executing;intstream_id;boolis_active;};std::array<CommandQueue,MAX_STREAMS>streams_;structResourceDispatcher{intavailable_cube_units=62;intavailable_vector_units=62;intavailable_l1_kb=32*1024;intavailable_hbm_bw=900;intmax_concurrent_kernels=8;intcurrent_concurrent=0;};ResourceDispatcher dispatcher_;public:StatusEnqueueCommand(intstream_id,KernelPtr kernel,void*args){Command cmd;cmd.id=next_command_id_++;cmd.kernel=kernel;cmd.args=args;cmd.stream_id=stream_id;cmd.enqueue_time=GetTimestamp();cmd.resources=kernel->GetResourceRequirement(args);streams_[stream_id].pending.push(cmd);Schedule();returnStatus::OK;}voidSchedule(){if(dispatcher_.current_concurrent>=dispatcher_.max_concurrent_kernels){return;}// 优先级老化调度(不是简单 Round-Robin)std::vector<Command*>candidates;for(ints=0;s<MAX_STREAMS;s++){auto&stream=streams_[s];if(stream.is_active&&!stream.pending.empty()){candidates.push_back(&stream.pending.front());}}// 按等待时间排序(等待越久优先级越高)std::sort(candidates.begin(),candidates.end(),[](Command*a,Command*b){return(GetTimestamp()-a->enqueue_time)>(GetTimestamp()-b->enqueue_time);});for(auto*cmd:candidates){if(!IsResourceAvailable(cmd->resources))continue;// 原子分配资源(避免部分分配导致死锁)ReserveResources(cmd->resources);auto&stream=streams_[cmd->stream_id];stream.pending.pop();stream.executing.push(*cmd);dispatcher_.current_concurrent++;LaunchKernelAsync(cmd->kernel,cmd->args,cmd->stream_id);}}boolIsResourceAvailable(constResourceRequirement&req){returnreq.cube_units<=dispatcher_.available_cube_units&&req.vector_units<=dispatcher_.available_vector_units&&req.l1_kb<=dispatcher_.available_l1_kb&&req.hbm_bw_gbps<=dispatcher_.available_hbm_bw;}voidReserveResources(constResourceRequirement&req){dispatcher_.available_cube_units-=req.cube_units;dispatcher_.available_vector_units-=req.vector_units;dispatcher_.available_l1_kb-=req.l1_kb;dispatcher_.available_hbm_bw-=req.hbm_bw_gbps;}voidOnKernelComplete(uint64_tcmd_id,intstream_id){auto&stream=streams_[stream_id];Command completed=stream.executing.front();stream.executing.pop();ReleaseResources(completed.resources);dispatcher_.current_concurrent--;Schedule();// 触发下一轮}};同步原语:Event
Stream 之间需要同步——等 Stream 0 的 AllReduce 完成后,Stream 1 才能用梯度更新参数。
// runtime/stream/sync_primitives.cppclassStreamSynchronizer{private:structEvent{uint64_tid;intstream_id;uint64_tcmd_id;boolrecorded;boolcompleted;};std::unordered_map<uint64_t,Event>events_;std::vector<std::vector<int>>stream_wait_for_;// Stream 依赖图public:voidRecordEvent(uint64_tevent_id,intstream_id,uint64_tcmd_id){Event ev;ev.id=event_id;ev.stream_id=stream_id;ev.cmd_id=cmd_id;ev.recorded=true;ev.completed=false;events_[event_id]=ev;}StatusStreamWaitEvent(intwaiting_stream_id,uint64_tevent_id){Event&ev=events_[event_id];if(!ev.recorded)returnStatus::INVALID_EVENT;if(ev.completed)returnStatus::OK;stream_wait_for_[waiting_stream_id].push_back(ev.stream_id);streams_[waiting_stream_id].is_active=false;returnStatus::OK;}voidCompleteEvent(uint64_tevent_id){Event&ev=events_[event_id];ev.completed=true;// 唤醒等待此事件的所有 Streamfor(ints=0;s<MAX_STREAMS;s++){auto&waiters=stream_wait_for_[s];boolall_waited_complete=true;for(intwaited_stream:waiters){if(!IsStreamComplete(waited_stream)){all_waited_complete=false;break;}}if(all_waited_complete){streams_[s].is_active=true;Schedule();}}}StatusSynchronizeAll(){for(ints=0;s<MAX_STREAMS;s++){while(!streams_[s].executing.empty()){SpinWait(10);// 最后手段,平时不用全局同步}}returnStatus::OK;}};事件同步的实际用法
# Python 侧——compute-communication overlapimporttorch_npu compute_stream=torch_npu.Stream()comm_stream=torch_npu.Stream()# 计算流上跑前向withtorch_npu.stream(compute_stream):loss=model(input)# 通信流上跑 AllReduce(和前向并行)withtorch_npu.stream(comm_stream):event=torch_npu.Event()hccl.all_reduce(loss,event=event)# 只等通信流——不影响其他 Streamcompute_stream.wait_event(event)# 等到了梯度就可以更新参数withtorch_npu.stream(compute_stream):optimizer.step()踩坑一:Stream 饥饿(Starvation)
某些大 kernel 持续提交 → 总是分配不到资源 → 小 kernel 永远得不到执行。
修复:优先级老化——每 100μs 等待 +1 优先级,等待时间越久的排越前。
intGetPriority()const{uint64_twait_time_us=(GetTimestamp()-enqueue_time)/1000;returnwait_time_us/100;// 每 100μs +1}踩坑二:资源死锁
Kernel A 占 16 个 Cube 等 2 个 Vector;Kernel B 占所有 Vector 等 32 个 Cube → 永远等不到 → 死锁。
修复:原子资源分配——要么全部分配,要么一个都不给(拒绝部分分配)。
StatusAtomicReserveResources(constResourceRequirement&req){if(!IsResourceAvailable(req)){returnStatus::INSUFFICIENT_RESOURCES;// 拒绝,不部分分配}dispatcher_.available_cube_units-=req.cube_units;dispatcher_.available_vector_units-=req.vector_units;dispatcher_.available_l1_kb-=req.l1_kb;dispatcher_.available_hbm_bw-=req.hbm_bw_gbps;returnStatus::OK;}踩坑三:全局同步破坏 Overlap
torch.npu.synchronize()等待所有 Stream 完成——compute-communication overlap 全部失效。
正确做法:用 Event 做精确同步
# ❌ 全局同步——所有 Stream 都停torch.npu.synchronize()# ✅ 精确同步——只等需要的 Streamevent=torch_npu.Event()compute_stream.record_event(event)comm_stream.wait_event(event)runtime 的 Stream 引擎是 NPU 硬件的操作系统。32 个 Stream 并发执行、资源感知调度防饥饿、原子资源分配防死锁、Event 同步只等需要的。训练时的 compute-communication overlap 就靠这个引擎——缺少它,AllReduce 和 MatrixMul 就只能串行。