CANN运行时Stream模块架构
Stream 模块架构【免费下载链接】runtime本项目提供CANN运行时组件和维测功能组件。项目地址: https://gitcode.com/cann/runtime1. 模块概述功能介绍Stream 模块负责管理任务队列实现异步执行和同步机制。通过 SQSubmission Queue和 CQCompletion Queue机制与硬件交互支持多种流类型普通流、控制流、协处理器流、TSCH流、David流、XPU流。设计目标提供高效的异步任务执行机制支持多种流类型适配不同场景实现任务队列管理和同步机制支持 SQ/CQ 资源管理和复用支持流捕获功能ACL Graph支持自动切分 SQ 模式2. 使用场景与对外接口2.1 使用场景场景一创建流并提交任务aclrtStream stream; aclrtCreateStream(stream); // 创建流 aclrtLaunchKernel(stream, ...); // 提交内核任务 aclrtSynchronizeStream(stream); // 等待完成场景二多流并行执行aclrtStream stream1, stream2; aclrtCreateStream(stream1); aclrtCreateStream(stream2); // 在不同流上并行执行任务 aclrtLaunchKernel(stream1, kernel1, ...); aclrtLaunchKernel(stream2, kernel2, ...);场景三事件同步aclrtRecordEvent(event, stream1); // 在 stream1 记录事件 aclrtStreamWaitEvent(stream2, event); // stream2 等待事件场景四带配置创建流aclrtStream stream; aclrtCreateStreamWithConfig(stream, 0, ACL_STREAM_FAST_SYNC); // 创建快速同步流2.2 对外接口核心接口接口头文件位置说明aclrtCreateStream()include/external/acl/acl_rt.h创建流aclrtCreateStreamWithConfig()include/external/acl/acl_rt.h带优先级和标志创建流aclrtDestroyStream()include/external/acl/acl_rt.h销毁流aclrtDestroyStreamForce()include/external/acl/acl_rt.h强制销毁流aclrtSynchronizeStream()include/external/acl/acl_rt.h同步流aclrtStreamQuery()include/external/acl/acl_rt.h查询流状态aclrtStreamWaitEvent()include/external/acl/acl_rt.h流等待事件扩展接口接口头文件位置说明aclrtSynchronizeStreamWithTimeout()include/external/acl/acl_rt.h带超时同步流aclrtStreamGetPriority()include/external/acl/acl_rt.h获取流优先级aclrtStreamGetFlags()include/external/acl/acl_rt.h获取流标志aclrtStreamAbort()include/external/acl/acl_rt.h终止流执行aclrtStreamGetId()include/external/acl/acl_rt.h获取流 IDaclrtGetStreamAvailableNum()include/external/acl/acl_rt.h获取可用流数量aclrtStreamWaitEventWithTimeout()include/external/acl/acl_rt.h流等待事件带超时流属性接口接口头文件位置说明aclrtSetStreamAttribute()include/external/acl/acl_rt.h设置流属性aclrtGetStreamAttribute()include/external/acl/acl_rt.h获取流属性aclrtSetStreamFailureMode()include/external/acl/acl_rt.h设置流失败模式aclrtSetStreamOverflowSwitch()include/external/acl/acl_rt.h设置流溢出开关aclrtGetStreamOverflowSwitch()include/external/acl/acl_rt.h获取流溢出开关流控制接口接口头文件位置说明aclrtActiveStream()include/external/acl/acl_rt.h激活流aclrtSwitchStream()include/external/acl/acl_rt.h切换流aclrtStreamStop()include/external/acl/acl_rt.h停止流aclrtPersistentTaskClean()include/external/acl/acl_rt.h清理持久任务流捕获接口aclmdlRI接口头文件位置说明aclmdlRICaptureBegin()include/external/acl/acl_rt.h开始流捕获aclmdlRICaptureEnd()include/external/acl/acl_rt.h结束流捕获aclmdlRICaptureGetInfo()include/external/acl/acl_rt.h获取捕获信息aclmdlRICaptureTaskGrpBegin()include/external/acl/acl_rt.h开始任务组捕获aclmdlRICaptureTaskGrpEnd()include/external/acl/acl_rt.h结束任务组捕获aclmdlRICaptureTaskUpdateBegin()include/external/acl/acl_rt.h开始任务更新捕获aclmdlRICaptureTaskUpdateEnd()include/external/acl/acl_rt.h结束任务更新捕获流配置接口接口头文件位置说明aclrtCreateStreamConfigHandle()include/external/acl/acl_rt.h创建流配置句柄aclrtDestroyStreamConfigHandle()include/external/acl/acl_rt.h销毁流配置句柄aclrtSetStreamConfigOpt()include/external/acl/acl_rt.h设置流配置选项aclrtCreateStreamV2()include/external/acl/acl_rt.h带配置创建流3. 架构总览整体设计思路Stream 采用 SQ/CQ 异步机制实现任务执行任务通过 SQ 提交到硬件硬件执行完成后通过 CQ 返回结果。Stream 维护任务队列taskPosHead/taskPosTail支持任务分配、回收和同步。继承自 NoCopy 基类防止拷贝。架构分层图核心模块交互图4. 详细设计4.1 核心流程流创建流程关键代码// 文件位置src/acl/aclrt_impl/stream.cpp:33-50 aclError aclrtCreateStreamImpl(aclrtStream *stream) { rtStream_t rtStream nullptr; const rtError_t rtErr rtStreamCreate(rtStream, static_castint32_t(RT_STREAM_PRIORITY_DEFAULT)); if (rtErr ! RT_ERROR_NONE) { return ACL_GET_ERRCODE_RTS(rtErr); } *stream static_castaclrtStream(rtStream); return ACL_SUCCESS; } // 文件位置src/runtime/core/src/stream/stream.cc:606-788 rtError_t Stream::Setup() { // 设置 SQ 深度 const uint32_t rtsqDepth (((flags_ RT_STREAM_HUGE) ! 0U) (device_-GetDevProperties().maxTaskNumPerHugeStream ! 0)) ? device_-GetDevProperties().maxTaskNumPerHugeStream : device_-GetDevProperties().rtsqDepth; SetSqDepth(rtsqDepth); // 分配 streamId error device_-Driver_()-StreamIdAlloc(streamId_, device_-Id_(), device_-DevGetTsId(), priority_); // 分配 SQ/CQ error stmSqCqManage-AllocStreamSqCq(this, priority_, 0U, tmpSqId, tmpCqId); sqId_ tmpSqId; cqId_ tmpCqId; // 分配逻辑 CQ error AllocLogicCq(isDisableThread, starsFlag, stmSqCqManage); // 创建任务资源 CreateStreamTaskRes(); error CreateStreamArgRes(); return RT_ERROR_NONE; }任务提交流程关键代码// 文件位置src/runtime/core/src/stream/stream.cc:4544-4579 TaskInfo* Stream::AllocTask(TaskInfo* pTask, tsTaskType_t taskType, rtError_t errorReason, uint32_t sqeNum, UpdateTaskFlag flag) { // 任务组更新场景 if (IsTaskGroupUpdate()) { TaskInfo* updateTask nullptr; if (flag UpdateTaskFlag::SUPPORT) { errorReason UpdateTask(updateTask); return updateTask; } } // 捕获模式场景 if (GetCaptureStatus() ! RT_STREAM_CAPTURE_STATUS_NONE) { TaskInfo* captureTask pTask; const rtError_t error AllocCaptureTask(taskType, sqeNum, captureTask); if (error ! RT_ERROR_STREAM_CAPTURE_EXIT) { errorReason error; return error RT_ERROR_NONE ? captureTask : nullptr; } } // 正常分配 if (taskResMang_ nullptr) { return device_-GetTaskFactory()-Alloc(this, taskType, errorReason); } else { pTask-stream this; return pTask; } }流同步流程关键代码// 文件位置src/acl/aclrt_impl/stream.cpp:132-149 aclError aclrtSynchronizeStreamImpl(aclrtStream stream) { const rtError_t rtErr rtStreamSynchronize(static_castrtStream_t(stream)); if (rtErr ! RT_ERROR_NONE) { return ACL_GET_ERRCODE_RTS(rtErr); } return ACL_SUCCESS; } // 文件位置src/runtime/core/src/stream/stream.cc:1927-1982 rtError_t Stream::Synchronize(const bool isNeedWaitSyncCq, int32_t timeout) { // STARS 平台路径 if (device_-IsStarsPlatform()) { if (!IsSeparateSendAndRecycle() || GetBindFlag()) { error StarsWaitForTask(lastTaskId_, isNeedWaitSyncCq, timeout); } else { if (!IsSyncFinished()) { error SynchronizeImpl(lastTaskId_, latestConcernedTaskId.Value(), timeout); } } return GetSynchronizeError(error); } // 禁用线程路径 if (Runtime::Instance()-GetDisableThread()) { error WaitForTask(lastTaskId_, isNeedWaitSyncCq, timeout); return GetSynchronizeError(error); } // 正常路径通过 Event 同步 Event *event new Event(device_, RT_EVENT_DEFAULT, Context_(), true); error event-Record(this); error event-Synchronize(timeout); return error; }4.2 核心机制详解SQ/CQ 异步机制设计思想通过 SQ提交队列和 CQ完成队列实现异步任务执行任务提交到 SQ 后立即返回硬件执行完成后通过 CQ 通知。// 文件位置src/runtime/core/src/stream/stream.hpp class Stream : public NoCopy { protected: uint32_t sqId_; // SQ ID uint32_t cqId_; // CQ ID uint32_t sqTailPos_; // SQ 尾位置 uint32_t sqHeadPos_; // SQ 头位置 uint64_t sqRegVirtualAddr_; // SQ 寄存器虚拟地址 private: uint64_t sqAddr_; // SQ 基地址 (最大 2M) uint32_t sqDepth_; // SQ 深度 uint8_t* sqeBuffer_; // SQE 缓冲区指针 uint32_t sqeBufferSize_; // SQE 缓冲区大小 Atomicuint32_t taskPosHead_; // 任务位置头 (Stars) Atomicuint32_t taskPosTail_; // 任务位置尾 (Stars) };多类型流支持设计思想支持多种流类型适配不同硬件架构和使用场景。派生类特点类名继承关系特点关键标志CtrlStreamStream控制流固定SQ深度1022isCtrlStream_trueCoprocessorStreamStream协处理器流远程处理TSDRV_FLAG_REMOTE_IDTschStreamStreamTS调度流DSA支持TSDRV_FLAG_ONLY_SQCQ_IDDavidStreamStreamDavid/Stars架构流重写大量方法XpuStreamStreamXPU架构流重写核心方法DvppGrpNoCopyDVPP组管理非Stream派生提供logicCq自动切分 SQ 模式设计思想当单个流的 SQ 深度达到上限时自动创建 slave stream 扩展 SQ 容量实现主从模式。// 文件位置src/runtime/core/src/stream/stream.hpp:146-151 struct AutoSplitSqContext { Stream *masterStream{nullptr}; // slave stream 指向 master std::vectorStream * slaveStreams; // master 的 slave 列表 int32_t exposedStreamId{-1}; // 对外暴露的 streamId uint32_t curStreamSqeCount{0U}; // 当前已分配的 SQE 数量 };关键代码// 文件位置src/runtime/core/src/stream/stream.cc:1003-1048 rtError_t Stream::SetupForAutoSplit() { error InitAutoSplitBasicParams(); error AllocPosToTaskIdMap(); error CreateStreamArgRes(); error AllocStreamIdForAutoSplit(); error AllocAutoSplitContext(); // 创建 AutoSplitSqContext error AllocSqeBufferForAutoSplit(); error AllocSqCqForAutoSplitWithRetry(); SetMaxTaskId(isDisableThread); return RT_ERROR_NONE; }4.3 模块职责划分模块职责位置aclrt API对外 ACL 接口include/external/acl/acl_rt.haclrt_implACL 接口实现src/acl/aclrt_impl/stream.cpprt API内部 RT 接口src/runtime/api/api_c_stream.ccStream流管理核心类继承自 NoCopystream/stream.hppStreamFactory静态流创建工厂版本分发stream/stream_factory.hppStreamSqCqManageSQ/CQ ID 管理与复用stream/stream_sqcq_manage.hppTaskAllocator任务对象分配器task/task_allocator.hppTaskResManage任务资源管理task/task_res_manage/EngineStreamObserver流状态观察者stream/engine_stream_observer.hppDvppGrpDVPP 组管理非流类型stream/dvpp_grp.hpp4.4 核心数据结构5. 关键文件索引模块文件路径核心内容ACL 头文件include/external/acl/acl_rt.haclrt* 外部接口声明ACL 实现src/acl/aclrt_impl/stream.cppaclrt 接口实现466行RT 接口src/runtime/api/api_c_stream.ccrtStream* 内部接口Stream 核心类src/runtime/core/src/stream/stream.hppStream 类定义1700行Stream 实现src/runtime/core/src/stream/stream.ccSetup/Synchronize/AllocTask 实现6000行Stream 工厂src/runtime/core/src/stream/stream_factory.hpp静态 CreateStream 工厂方法SQ/CQ 管理src/runtime/core/src/stream/stream_sqcq_manage.hppSQ/CQ ID 分配与复用控制流src/runtime/core/src/stream/ctrl_stream.hppCtrlStream 定义协处理器流src/runtime/core/src/stream/coprocessor_stream.hppCoprocessorStreamTS调度流src/runtime/core/src/stream/tsch_stream.hppTschStreamDSA支持David流src/runtime/core/src/stream/stream_david.hppDavidStreamStars架构XPU流src/runtime/core/src/stream/stream_xpu.hppXpuStreamDVPP组src/runtime/core/inc/stream/dvpp_grp.hppDvppGrp非Stream派生流创建分发src/runtime/core/src/stream/v200/stream_creator_c.ccCreateStreamAndGet 实现6. 性能优化策略任务预分配TaskAllocator 预分配任务对象减少分配开销SQ/CQ 池化复用多 Stream 共享 SQ/CQ 资源CQ 复用机制减少资源开销原子状态管理taskPosHead/taskPosTail/pendingNum 使用原子操作减少锁开销异步回收支持异步任务回收不阻塞任务提交自动切分 SQ大任务量场景自动扩展 SQ 容量主从模式快速同步模式ACL_STREAM_FAST_SYNC 标志支持快速同步引用计数sqIdRefMap_ 管理 SQ ID 复用避免频繁分配释放版本分发工厂根据硬件版本v100/v200/v201自动选择最优 Stream 类型本模块文档基于源码分析已验证所有 ACL 接口来自include/external/acl/acl_rt.h实现来自src/acl/aclrt_impl/stream.cpp。【免费下载链接】runtime本项目提供CANN运行时组件和维测功能组件。项目地址: https://gitcode.com/cann/runtime创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考