登录
注册
开源
企业版
高校版
搜索
帮助中心
使用条款
关于我们
开源
企业版
高校版
私有云
模力方舟
AI 队友
登录
注册
轻量养虾,开箱即用!低 Token + 稳定算力,Gitee & 模力方舟联合出品的 PocketClaw 正式开售!点击了解详情
代码拉取完成,页面将自动刷新
仓库状态说明
开源项目
>
人工智能
>
AI-人工智能
&&
捐赠
捐赠前请先登录
取消
前往登录
扫描微信二维码支付
取消
支付完成
支付提示
将跳转至支付宝完成支付
确定
取消
Watch
不关注
关注所有动态
仅关注版本发行动态
关注但不提醒动态
39
Star
92
Fork
621
Ascend
/
torchair
暂停
代码
Issues
77
Pull Requests
168
Wiki
统计
流水线
服务
质量分析
Jenkins for Gitee
腾讯云托管
腾讯云 Serverless
悬镜安全
阿里云 SAE
Codeblitz
SBOM
开发画像分析
我知道了,不再自动展开
更新失败,请稍后重试!
移除标识
内容风险标识
本任务被
标识为内容中包含有代码安全 Bug 、隐私泄露等敏感信息,仓库外成员不可访问
Torchair提供FX图多流表达与执行能力
TODO
#ICW9JJ
需求
薛鹏
成员
创建于
2025-09-04 09:16
# 1 需求场景&价值 ## 1.1 目标 torchair提供FX图上表达多Stream执行的能力。 torchair提供Stream间Kernel执行时序控制能力。 torchair提供编译过程中调用外部FX Pass的能力。 ## 1.2 需求描述 使用torch.compile时,单算子执行时的stream信息无法被捕获,为此torchair对外提供了可以被dynamo捕获的stream标记接口torchair.scope.npu_stream_switch,用于手动控制算子并发,使用示例如下: ``` def test_fn(x): mm = torch.mm(x, x) # 在默认流上执行 with torchair.scope.npu_stream_switch('1'): abs = torch.abs(x) # 在新的stream "1"上执行 return torch.add(mm, abs) ``` 多个Stream上的Task并发执行,而细粒度的时序控制,则是指通过例如Event的record和wait能力,来编排不同Stream上Task间的执行时序,通常用于避免资源竞争,或通过保证顺序,来实现更为激进的内存复用策略(例如FSDP等训练框架通过Event消除record stream)。  在vllm等推理框架中,针对模型的此类优化具有高度相似性和稳定性。借助torch.compile捕获得到的FX图,自动分析或识别出需要并发执行的部分,并分配Stream,可以达成与脚本修改一致的效果,消除脚本上的修改动作。 # 2 需求建议实现的规格 ## 2.1 FX图上的多流表达【已支持】 计划将npu_stream_switch对应的FX图表达能力开放。 当前在FX上,npu_stream_switch最终通过torchair提供的FX 图Scope表达能力实现多Stream执行。Scope表示一段具有相同特征的区域,stream是一种Scope类型。 Scope在FX图上表现为以torch.ops.air.scope_enter.default开始,以torch.ops.air.scope_exit.default结束的区域。 torch.ops.air.scope_enter表示Scope区域的开始,其原型定义如下: ``` func: scope_enter(str[] keys, str[] values, bool need_excute=False) -> None ``` > keys表示Scope的类型,可以同时具有多个Scope。 > values表示对应Scope类型的取值,与keys一一对应。 > need_excute忽略,为内部代码复用引入的参数,对功能没有影响。 - torch.ops.air.scope_exit表示Scope区域的结束,其原型定义如下: ``` func: scope_exit(bool need_excute=False) -> None ``` > need_excute忽略,为内部代码复用引入的参数,对功能没有影响。 以一个具体的脚本为例: ``` def test_fn(x): mm = torch.mm(x, x) abs = torch.abs(x) return torch.add(mm, abs) ``` 上述脚本在Stream标记前的FX图如下所示: ``` opcode name target args kwargs ------------- ------ ---------------- ---------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ``` 如果希望abs在单独的Stream上执行,从而能与mm计算达成并行效果,可以对FX图进行如下修改,在需要单独分流的区域(这个例子中只有abs一个节点)前添加scope_enter,区域后添加scope_exit。 ``` opcode name target args kwargs ------------- ------------------- ----------------------- ------------------------------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) {} call_function scope_enter_default air.scope_enter.default (['_user_stream_label'], ['stream1']) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function scope_exit_default air.scope_exit.default () {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ```  ## 2.2 FX图上表达Stream间Kernel执行时序【待开发】 计划通过新增torchair.ops.record/torchair.ops.wait表达stream间的控制关系。 torch.ops.air.wait表示等待输入的Tensor计算完成,其原型定义如下: ``` func: wait(Tensor[] x) -> () ``` > 表示阻塞,等待x中的Tensor都计算完成。 在FX图上的表达如下: ``` opcode name target args kwargs ------------- ------------------- ----------------------- ------------------------------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) {} call_function scope_enter_default air.scope_enter.default (['_user_stream_label'], ['stream1']) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function wait air.ops.wait.default ([mm]) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function scope_exit_default air.scope_exit.default () {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ```  torch.ops.air.record用于显式地在当前Stream上下发一个任务,其返回值可以被wait等待。在依赖的节点没有输出时,可以使用record,即使依赖的节点有输出,仍然可以使用record让FX图看起来更清晰。其原型定义如下: ``` func: record(*, Device? device=None) -> Tensor ``` > device入参忽略,对功能没有影响,由于record需要返回Tensor,因此torch约束必须带有Device入参。 如下的FX图结构,与上面只使用air.ops.wait节点的效果一致。 ``` opcode name target args kwargs ------------- ------------------- ----------------------- ------------------------------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) call_function record air.ops.record.default () {} call_function scope_enter_default air.scope_enter.default (['_user_stream_label'], ['stream1']) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function wait air.ops.wait.default ([record]) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function scope_exit_default air.scope_exit.default () {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ``` ## 2.3 FX Pass实现与调用【待开发】 ### 2.3.1 FX Pass函数签名 Torchair约定FX Pass的函数签名如下: ``` def _(gm, example_inputs, config: torchair.CompilerConfig) -> None ``` > gm为aot后的GraphModule类对象,gm.graph为其FX图。 > example inputs为aot后的GraphModule对象的FakeTensor类型输入,通常不需要使用。 > config为torchair编译后端创建时设置的torchair.CompilerConfig对象,用于pass感知完整编译选项。 FX Pass原地修改gm对象,任何返回值会被忽略。对于无法处理的异常情况,应当抛出异常。需要确保不抛出异常时,处理后的FX图是正确的:即其执行结果与修改前的FX完全一致。 ### 2.3.2 FX Pass实现示例 上一章节示例中的处理,对应的FX Pass实现示例: FX Pass中调用的工具方法都为FX Graph提供的工具方法。 ``` import torchair import torchair.scope._scope def parallel_abs_sub(gm, example_inputs, config: torchair.CompilerConfig): # Simple example to show how to use scope to do stream planning in torch.compile fx_graph = gm.graph for node in fx_graph.nodes: if node.op == "call_function" and node.target == torch.ops.aten.abs.default: with fx_graph.inserting_before(node): fx_graph.call_function(torch.ops.air.scope_enter.default, args=( ["_user_stream_label"], ["stream1"])) if node.op == "call_function" and node.target == torch.ops.aten.sub.Tensor: with fx_graph.inserting_after(node): fx_graph.call_function(torch.ops.air.scope_exit.default, args=()) ``` ### 2.3.3 自定义FX Pass使能 编译过程中涉及的主要图优化阶段如下图所示,当期计划开放post_grad_custom_pre_pass和post_grad_custom_post_pass两个阶段。 注意,同一执行阶段的外部pass只允许设置一个。如果需要进行多种外部处理,需要封装为单个pass接口后设置到config上。  当前torchair的编译选项都通过torchair.CompilerConfig类传递,需求开放post_grad_custom_pre_pass和post_grad_custom_post_pass两个阶段的自定义pass注册。开关开启方式如下: ``` config = torchair.CompilerConfig() config.post_grad_custom_pre_pass = parallel_abs_sub # parallel_abs_sub将在torchair原生fx图优化前执行 # 或者 config.post_grad_custom_post_pass = parallel_abs_sub # parallel_abs_sub将在torchair原生fx图优化后执行 ``` > 相比于上一版本实现,去除了依次添加pass的行为,变为直接向特定阶段注册pass。每个阶段的自定义pass唯一,如果涉及多个pass处理,需要用户封装为单个pass。 > 变更的动机:上一版本实现的缺陷(1)顺序添加pass的执行时序不够明确。(2)pass执行位置不够明确(3)对pass名称或函数名的唯一性要求有时会影响编码实现(4)与GPU实现方案差异较大,不利于知识共享。 ## 3 附录 ### 3.1 计划提供的IR定义 ``` import torch from torch.library import Library from torch.fx.node import has_side_effect m = Library("air", "FRAGMENT") m.define("record(*, Device? device=None) -> Tensor") m.define("wait(Tensor[] x) -> ()") has_side_effect(torch.ops.air.record.default) has_side_effect(torch.ops.air.wait.default) @torch.library.impl(m, "record", "Meta") def record_meta(*, device=None): return torch.empty(0, device=device) @torch.library.impl(m, "wait", "Meta") def record_meta(x): pass @torch.library.impl(m, "record", "CompositeExplicitAutograd") def record_impl(*args, **kwargs): return torch.empty(0) @torch.library.impl(m, "wait", "CompositeExplicitAutograd") def wait_impl(*args, **kwargs): pass ``` ### 3.2 Inductor开放的自定义Pass阶段 
# 1 需求场景&价值 ## 1.1 目标 torchair提供FX图上表达多Stream执行的能力。 torchair提供Stream间Kernel执行时序控制能力。 torchair提供编译过程中调用外部FX Pass的能力。 ## 1.2 需求描述 使用torch.compile时,单算子执行时的stream信息无法被捕获,为此torchair对外提供了可以被dynamo捕获的stream标记接口torchair.scope.npu_stream_switch,用于手动控制算子并发,使用示例如下: ``` def test_fn(x): mm = torch.mm(x, x) # 在默认流上执行 with torchair.scope.npu_stream_switch('1'): abs = torch.abs(x) # 在新的stream "1"上执行 return torch.add(mm, abs) ``` 多个Stream上的Task并发执行,而细粒度的时序控制,则是指通过例如Event的record和wait能力,来编排不同Stream上Task间的执行时序,通常用于避免资源竞争,或通过保证顺序,来实现更为激进的内存复用策略(例如FSDP等训练框架通过Event消除record stream)。  在vllm等推理框架中,针对模型的此类优化具有高度相似性和稳定性。借助torch.compile捕获得到的FX图,自动分析或识别出需要并发执行的部分,并分配Stream,可以达成与脚本修改一致的效果,消除脚本上的修改动作。 # 2 需求建议实现的规格 ## 2.1 FX图上的多流表达【已支持】 计划将npu_stream_switch对应的FX图表达能力开放。 当前在FX上,npu_stream_switch最终通过torchair提供的FX 图Scope表达能力实现多Stream执行。Scope表示一段具有相同特征的区域,stream是一种Scope类型。 Scope在FX图上表现为以torch.ops.air.scope_enter.default开始,以torch.ops.air.scope_exit.default结束的区域。 torch.ops.air.scope_enter表示Scope区域的开始,其原型定义如下: ``` func: scope_enter(str[] keys, str[] values, bool need_excute=False) -> None ``` > keys表示Scope的类型,可以同时具有多个Scope。 > values表示对应Scope类型的取值,与keys一一对应。 > need_excute忽略,为内部代码复用引入的参数,对功能没有影响。 - torch.ops.air.scope_exit表示Scope区域的结束,其原型定义如下: ``` func: scope_exit(bool need_excute=False) -> None ``` > need_excute忽略,为内部代码复用引入的参数,对功能没有影响。 以一个具体的脚本为例: ``` def test_fn(x): mm = torch.mm(x, x) abs = torch.abs(x) return torch.add(mm, abs) ``` 上述脚本在Stream标记前的FX图如下所示: ``` opcode name target args kwargs ------------- ------ ---------------- ---------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ``` 如果希望abs在单独的Stream上执行,从而能与mm计算达成并行效果,可以对FX图进行如下修改,在需要单独分流的区域(这个例子中只有abs一个节点)前添加scope_enter,区域后添加scope_exit。 ``` opcode name target args kwargs ------------- ------------------- ----------------------- ------------------------------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) {} call_function scope_enter_default air.scope_enter.default (['_user_stream_label'], ['stream1']) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function scope_exit_default air.scope_exit.default () {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ```  ## 2.2 FX图上表达Stream间Kernel执行时序【待开发】 计划通过新增torchair.ops.record/torchair.ops.wait表达stream间的控制关系。 torch.ops.air.wait表示等待输入的Tensor计算完成,其原型定义如下: ``` func: wait(Tensor[] x) -> () ``` > 表示阻塞,等待x中的Tensor都计算完成。 在FX图上的表达如下: ``` opcode name target args kwargs ------------- ------------------- ----------------------- ------------------------------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) {} call_function scope_enter_default air.scope_enter.default (['_user_stream_label'], ['stream1']) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function wait air.ops.wait.default ([mm]) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function scope_exit_default air.scope_exit.default () {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ```  torch.ops.air.record用于显式地在当前Stream上下发一个任务,其返回值可以被wait等待。在依赖的节点没有输出时,可以使用record,即使依赖的节点有输出,仍然可以使用record让FX图看起来更清晰。其原型定义如下: ``` func: record(*, Device? device=None) -> Tensor ``` > device入参忽略,对功能没有影响,由于record需要返回Tensor,因此torch约束必须带有Device入参。 如下的FX图结构,与上面只使用air.ops.wait节点的效果一致。 ``` opcode name target args kwargs ------------- ------------------- ----------------------- ------------------------------------- -------- placeholder arg0_1 arg0_1 () {} call_function mm aten.mm.default (arg0_1, arg0_1) call_function record air.ops.record.default () {} call_function scope_enter_default air.scope_enter.default (['_user_stream_label'], ['stream1']) {} call_function abs_1 aten.abs.default (arg0_1,) {} call_function wait air.ops.wait.default ([record]) {} call_function sub aten.sub.Tensor (abs_1, arg0_1) {} call_function scope_exit_default air.scope_exit.default () {} call_function add aten.add.Tensor (mm, sub) {} output output output ((add,),) {} ``` ## 2.3 FX Pass实现与调用【待开发】 ### 2.3.1 FX Pass函数签名 Torchair约定FX Pass的函数签名如下: ``` def _(gm, example_inputs, config: torchair.CompilerConfig) -> None ``` > gm为aot后的GraphModule类对象,gm.graph为其FX图。 > example inputs为aot后的GraphModule对象的FakeTensor类型输入,通常不需要使用。 > config为torchair编译后端创建时设置的torchair.CompilerConfig对象,用于pass感知完整编译选项。 FX Pass原地修改gm对象,任何返回值会被忽略。对于无法处理的异常情况,应当抛出异常。需要确保不抛出异常时,处理后的FX图是正确的:即其执行结果与修改前的FX完全一致。 ### 2.3.2 FX Pass实现示例 上一章节示例中的处理,对应的FX Pass实现示例: FX Pass中调用的工具方法都为FX Graph提供的工具方法。 ``` import torchair import torchair.scope._scope def parallel_abs_sub(gm, example_inputs, config: torchair.CompilerConfig): # Simple example to show how to use scope to do stream planning in torch.compile fx_graph = gm.graph for node in fx_graph.nodes: if node.op == "call_function" and node.target == torch.ops.aten.abs.default: with fx_graph.inserting_before(node): fx_graph.call_function(torch.ops.air.scope_enter.default, args=( ["_user_stream_label"], ["stream1"])) if node.op == "call_function" and node.target == torch.ops.aten.sub.Tensor: with fx_graph.inserting_after(node): fx_graph.call_function(torch.ops.air.scope_exit.default, args=()) ``` ### 2.3.3 自定义FX Pass使能 编译过程中涉及的主要图优化阶段如下图所示,当期计划开放post_grad_custom_pre_pass和post_grad_custom_post_pass两个阶段。 注意,同一执行阶段的外部pass只允许设置一个。如果需要进行多种外部处理,需要封装为单个pass接口后设置到config上。  当前torchair的编译选项都通过torchair.CompilerConfig类传递,需求开放post_grad_custom_pre_pass和post_grad_custom_post_pass两个阶段的自定义pass注册。开关开启方式如下: ``` config = torchair.CompilerConfig() config.post_grad_custom_pre_pass = parallel_abs_sub # parallel_abs_sub将在torchair原生fx图优化前执行 # 或者 config.post_grad_custom_post_pass = parallel_abs_sub # parallel_abs_sub将在torchair原生fx图优化后执行 ``` > 相比于上一版本实现,去除了依次添加pass的行为,变为直接向特定阶段注册pass。每个阶段的自定义pass唯一,如果涉及多个pass处理,需要用户封装为单个pass。 > 变更的动机:上一版本实现的缺陷(1)顺序添加pass的执行时序不够明确。(2)pass执行位置不够明确(3)对pass名称或函数名的唯一性要求有时会影响编码实现(4)与GPU实现方案差异较大,不利于知识共享。 ## 3 附录 ### 3.1 计划提供的IR定义 ``` import torch from torch.library import Library from torch.fx.node import has_side_effect m = Library("air", "FRAGMENT") m.define("record(*, Device? device=None) -> Tensor") m.define("wait(Tensor[] x) -> ()") has_side_effect(torch.ops.air.record.default) has_side_effect(torch.ops.air.wait.default) @torch.library.impl(m, "record", "Meta") def record_meta(*, device=None): return torch.empty(0, device=device) @torch.library.impl(m, "wait", "Meta") def record_meta(x): pass @torch.library.impl(m, "record", "CompositeExplicitAutograd") def record_impl(*args, **kwargs): return torch.empty(0) @torch.library.impl(m, "wait", "CompositeExplicitAutograd") def wait_impl(*args, **kwargs): pass ``` ### 3.2 Inductor开放的自定义Pass阶段 
评论 (
0
)
登录
后才可以发表评论
状态
TODO
TODO
WIP
DONE
CLOSED
REJECTED
负责人
未设置
标签
未设置
项目
未立项任务
未立项任务
里程碑
未关联里程碑
未关联里程碑
Pull Requests
未关联
未关联
关联的 Pull Requests 被合并后可能会关闭此 issue
分支
未关联
分支 (
-
)
标签 (
-
)
开始日期   -   截止日期
-
置顶选项
不置顶
置顶等级:高
置顶等级:中
置顶等级:低
优先级
不指定
严重
主要
次要
不重要
预计工期
(小时)
参与者(1)
Python
1
https://gitee.com/ascend/torchair.git
git@gitee.com:ascend/torchair.git
ascend
torchair
torchair
点此查找更多帮助
搜索帮助
Git 命令在线学习
如何在 Gitee 导入 GitHub 仓库
Git 仓库基础操作
企业版和社区版功能对比
SSH 公钥设置
如何处理代码冲突
仓库体积过大,如何减小?
如何找回被删除的仓库数据
Gitee 产品配额说明
GitHub仓库快速导入Gitee及同步更新
什么是 Release(发行版)
将 PHP 项目自动发布到 packagist.org
评论
仓库举报
回到顶部
登录提示
该操作需登录 Gitee 帐号,请先登录后再操作。
立即登录
没有帐号,去注册