# FlashAttentionScoreRegister **Repository Path**: guopeian/flash-attention-score-register ## Basic Information - **Project Name**: FlashAttentionScoreRegister - **Description**: 昇腾环境下 注册flashattention算子到tf,使能tf调用底层cann内flashattention算子。 非正式商用方法,仅供快速穿刺验证使用 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2024-11-18 - **Last Updated**: 2024-11-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # FlashAttentionScoreRegister #### tf调用CANN内已有算子 1. sh build.sh 编译空算子;编译tf算子注册so;删除FA空算子实现,以便链接CANN底层FA算子 2. cd test 3. python3 test.py 或 /usr/local/Ascend/ascend-toolkit/latest/toolkit/tools/profiler/bin/msprof --application="python3 test.py" --output=./profiling_data 4. 需要改入参时,只需要在./test/test.py中改动npu_flash_attention的入参即可。 当前npu_flash_attention必要入参为query,key,value,head_num, input_layout,其他为可选入参,可不传入。 #### tf调用自定义算子 环境:cann + tensorflow + cmake + gcc 1. 编译tf算子注册so: 示例tf_ops/tf_ops.cpp。 ``` 1. 编写空的OpKernal ``` ``` namespace { class CustOps : public OpKernel { public: explicit CustOps(OpKernelConstructionPtr context) : OpKernel(context) {} void Compute(OpKernelContextPtr context) override { std::cout << "Cust Ops not installed!!" << std::endl; } ~CustOps() override = default; }; } ``` 2. 通过REGISTER_OP()将算子输入输出以及shape推导信息写入 ``` namespace tensorflow { REGISTER_OP("XxxxXxxx") .Input("xx: float16") .Output("xx_out: float16") .SetIsStateful() .SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) { ShapeHandle dataShape; TF_RETURN_IF_ERROR(c->WithRank(c->input(0), 3, &dataShape)); tensorflow::shape_inference::DimensionHandle batchSize = c->Dim(dataShape, 0); int64_t shape0 = c->Value(batchSize); c->set_output(0, c->MakeShape({shape0})); return Status::OK(); }); ``` 3. 最后调用REGISTER_KERNEL_BUILDER()宏。 ``` REGISTER_KERNEL_BUILDER(Name("XxxxXxxx").Device(DEVICE_CPU), CustOps) ``` 2. 编译npu算子:示例shell脚本 ``` set -e # 查找msopgen的路径,加入到环境变量PATH中 msopgen_path=$(find /usr/local/Ascend/ -name msopgen | grep bin) parent_dir=$(dirname "$msopgen_path") export PATH=$parent_dir:$PATH # 利用msopgen生成可编译工程 rm -rf ./xxx_xxx /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/bin/msopgen gen -i xxx_xxx.json -f tf -c ai_core-Ascend910B2 -lan cpp -out ./xxx_xxx -m 0 -op XxxXxx rm -rf xxx_xxx/op_host rm -rf xxx_xxx/op_kernel # 将自己实现的算子host和kernel放进去 cp op_host ./xxx_xxx cp op_kernel./xxx_xxx cd xxx_xxx # 判断当前目录下是否存在CMakePresets.json文件 if [ ! -f "CMakePresets.json" ]; then echo "ERROR, CMakePresets.json file not exist." exit 1 fi # 禁止生成CRC校验和 sed -i 's/--nomd5/--nomd5 --nocrc/g' ./cmake/makeself.cmake # 修改cann安装路径 sed -i 's:"/usr/local/Ascend/latest":"/usr/local/Ascend/ascend-toolkit/latest":g' CMakePresets.json # 修改vendor_name 防止覆盖之前vendor_name为customize的算子; # vendor_name需要和aclnn中的CMakeLists.txt中的CUST_PKG_PATH值同步,不同步aclnn会调用失败; # vendor_name字段值不能包含customize;包含会导致多算子部署场景CANN的vendors路径下config.ini文件内容截取错误 sed -i 's:"customize":"xxx_xxx":g' CMakePresets.json bash build.sh # # 安装编译成功的算子包 bash ./build_out/custom_opp*.run ``` 1. 重点是cp op_host ./xxx_xxx和cp op_kernel./xxx_xxx,将自己的算子实现放到生成的编译工程中去 2. 参照xxx_xxx.json文件的格式,将自定义算子的输入输出等信息制成json 3. 将上方脚本中xxx_xxx、XxxXxx改成自己的算子名,然后执行该脚本 3. 编写python测试代码:tf load算子,触发npu执行示例 ``` import os import logging import numpy as np from tensorflow.python.framework import ops import tensorflow as tf import npu_device from npu_device.compat.v1.npu_init import * logging.getLogger().setLevel(logging.INFO) os.environ["DEVICE_ID"] = str(0) os.environ["ASCEND_DEVICE_ID"] = str(0) os.environ["JOB_ID"] = "10086" tf.compat.v1.disable_eager_execution() tfOpLib = tf.load_op_library("../build/tf_ops/libxxx_xxx.so") npu_device.compat.enable_v1() npu_init = npu_ops.initialize_system() npu_shutdown = npu_ops.shutdown_system() config = tf.compat.v1.ConfigProto() custom_op = config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" config.graph_options.rewrite_options.remapping = RewriterConfig.OFF config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF output = tfOpLib.xxx_xxx(x, y, z) with tf.compat.v1.Session(config=config) as sess: sess.run(tf.compat.v1.global_variables_initializer()) xxx_xxx_out= sess.run(output) ``` #### tfa注册算子到tf(需要修改Ascend/tensorflow仓源码并出包) tfadapter仓:https://gitee.com/ascend/tensorflow 1. tf_adapter/ops目录下,选择某个cc文件(例如npu_ops.cc)或新增一个,通过宏REGISTER_OP将算子IR注册到TF内,包括输入、输出、属性、infershape信息。 示例: ``` REGISTER_OP("BasicLSTMCellCStateGrad") .Input("c: T") .Input("dht: T") .Input("dct: T") .Input("it: T") .Input("jt: T") .Input("ft: T") .Input("ot: T") .Input("tanhct: T") .Output("dgate: T") .Output("dct_1: T") .Attr("T: {float16, float32}") .Attr("forget_bias: float = 1.0") .Attr("activation: string = 'tanh'") .SetIsStateful() .SetShapeFn([](shape_inference::InferenceContext *c) { auto input_it_shape = c->input(4); auto hidden_size = c->Dim(input_it_shape, 1); auto batch_size = c->Dim(input_it_shape, 0); DimensionHandle output_size; TF_RETURN_IF_ERROR(c->Multiply(hidden_size, 4, &output_size)); auto output_shape = c->MakeShape({batch_size, output_size}); c->set_output(0, output_shape); c->set_output(1, c->input(2)); return Status::OK(); }); ``` 2. tf_adapter/kernels目录下,选择某个cc文件,或新增一个,写一个算子op的类,继承于TF的opKernel。类里面需要写构造函数,析构函数,和一个空的compute函数,最后调用宏REGISTER_KERNEL_BUILDER。 示例: ``` namespace tensorflow { namespace { class NpuOnnxGraphOp : public OpKernel { public: explicit NpuOnnxGraphOp(OpKernelConstruction *context) : OpKernel(context) {} ~NpuOnnxGraphOp() override = default; void Compute(OpKernelContext *context) override { (void) context; return; } bool IsExpensive() override { return false; } }; REGISTER_KERNEL_BUILDER(Name("NpuOnnxGraphOp").Device(DEVICE_CPU), NpuOnnxGraphOp); } } ``` 3. tf_adapter/python/npu_bridge/npu_cpu目录下,选择某个py文件或新增一个,内部写一个python函数,函数内部调用gen_npu_cpu_ops的自定义算子名,并把算子的输入属性传入,返回结果。 示例: ``` def dense_image_warp(image, flow, name=None): """ Dense image warp. """ result = gen_npu_cpu_ops.dense_image_warp( image=image, flow=flow, name=name ) return result ```