diff --git a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h index 6aa21c74204bde22806e1cd39261e72972cc34ef..92bc12b31bc7109a24398f32471cf43fae70ecd5 100644 --- a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h +++ b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h @@ -62,7 +62,75 @@ class TaskSpec { std::string fallback_reason_; }; +class FuncSpec : public TaskSpec { + using TensorDataTypes = tensorflow::gtl::InlinedVector; + + public: + using PruneInputsFunc = + std::function &)>; + FuncSpec(const tensorflow::OpRegistrationData *op_spec, tensorflow::NodeDef ndef, uint64_t ge_graph_id, + std::unique_ptr graph, PruneInputsFunc prune_func, + std::map> dependent_host_resources, std::string reason = "") + : ge_graph_id_(ge_graph_id), + graph_(std::move(graph)), + prune_func_(std::move(prune_func)), + dependent_host_resources_(std::move(dependent_host_resources)) { + TensorDataTypes input_dtypes; + TensorDataTypes output_dtypes; + tensorflow::InOutTypesForNode(ndef, op_spec->op_def, &input_dtypes, &output_dtypes); + + op_spec_ = op_spec; + ndef_ = std::move(ndef); + input_dtypes_ = std::move(input_dtypes); + output_dtypes_ = std::move(output_dtypes); + fallback_reason_ = std::move(reason); + } + ~FuncSpec() = default; + bool IsFunctionOp() const override { return true; } + + uint64_t GeGraphId() const { return ge_graph_id_; } + + const std::map> &DependentHostResources() const { + return dependent_host_resources_; + } + + const tensorflow::GraphDef *GraphDef() const { return graph_.get(); } + + void SetBuilt() const { built_.store(true); } + bool Built() const { return built_; } + + void SetNeedLoop(bool loop) const { need_loop_.store(loop); } + bool NeedLoop() const { return need_loop_; } + + void PruneInputs(int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) const { + prune_func_(num_inputs, inputs, pruned); + } + std::string DebugString() const override { + std::stringstream ss; + ss << NodeDef().DebugString() << std::endl; + ss << OpRegistrationData()->op_def.DebugString() << std::endl; + ss << "Ge graph id " << ge_graph_id_ << std::endl; + for (size_t i = 0; i < output_dtypes_.size(); i++) { + ss << "output " << i << " " << tensorflow::DataTypeString(output_dtypes_[i]) << std::endl; + } + if (ShouldFallback()) { + ss << "Fallback reason " << fallback_reason_; + } + return ss.str(); + } + + private: + uint64_t ge_graph_id_; + std::unique_ptr graph_; + PruneInputsFunc prune_func_; + const std::map> dependent_host_resources_; + std::atomic_bool mutable built_{false}; + std::atomic_bool mutable need_loop_{false}; +}; + class OpSpec : public TaskSpec { + using HashKey = uint64_t; + public: OpSpec(const tensorflow::OpRegistrationData *op_spec, tensorflow::NodeDef ndef, TensorShapes input_shapes, TensorPartialShapes output_shapes, std::string reason) @@ -146,77 +214,40 @@ class OpSpec : public TaskSpec { return ss.str(); } - private: - bool always_infer_shape_; - TensorShapes output_shapes_; - TensorPartialShapes partial_output_shapes_; - tensorflow::NodeDef attached_attrs_; -}; - -class FuncSpec : public TaskSpec { - using TensorDataTypes = tensorflow::gtl::InlinedVector; - - public: - using PruneInputsFunc = - std::function &)>; - FuncSpec(const tensorflow::OpRegistrationData *op_spec, tensorflow::NodeDef ndef, uint64_t ge_graph_id, - std::unique_ptr graph, PruneInputsFunc prune_func, - std::map> dependent_host_resources, std::string reason = "") - : ge_graph_id_(ge_graph_id), - graph_(std::move(graph)), - prune_func_(std::move(prune_func)), - dependent_host_resources_(std::move(dependent_host_resources)) { - TensorDataTypes input_dtypes; - TensorDataTypes output_dtypes; - tensorflow::InOutTypesForNode(ndef, op_spec->op_def, &input_dtypes, &output_dtypes); - - op_spec_ = op_spec; - ndef_ = std::move(ndef); - input_dtypes_ = std::move(input_dtypes); - output_dtypes_ = std::move(output_dtypes); - fallback_reason_ = std::move(reason); + void GetFuncSpec(const std::vector &handles, std::shared_ptr *spec) const { + shared_lock.lock_shared(); + HashKey key = Hash(handles); + auto iter = handles_to_specs_.find(key); + if (iter != handles_to_specs_.end()) { + *spec = iter->second; + } + shared_lock.unlock_shared(); } - ~FuncSpec() = default; - bool IsFunctionOp() const override { return true; } - uint64_t GeGraphId() const { return ge_graph_id_; } - - const std::map> &DependentHostResources() const { - return dependent_host_resources_; + void CacheFuncSpec(const std::vector &handles, std::shared_ptr spec) const { + shared_lock.lock(); + HashKey key = Hash(handles); + handles_to_specs_[key] = std::move(spec); + shared_lock.unlock(); } - const tensorflow::GraphDef *GraphDef() const { return graph_.get(); } - - void SetBuilt() const { built_.store(true); } - bool Built() const { return built_; } - - void SetNeedLoop(bool loop) const { need_loop_.store(loop); } - bool NeedLoop() const { return need_loop_; } - - void PruneInputs(int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) const { - prune_func_(num_inputs, inputs, pruned); - } - std::string DebugString() const override { - std::stringstream ss; - ss << NodeDef().DebugString() << std::endl; - ss << OpRegistrationData()->op_def.DebugString() << std::endl; - ss << "Ge graph id " << ge_graph_id_ << std::endl; - for (size_t i = 0; i < output_dtypes_.size(); i++) { - ss << "output " << i << " " << tensorflow::DataTypeString(output_dtypes_[i]) << std::endl; + private: + static HashKey Hash(const std::vector &handles) { + if (handles.empty()) { + return 0; } - if (ShouldFallback()) { - ss << "Fallback reason " << fallback_reason_; + HashKey hash = tensorflow::Hash64(handles[0].DebugString()); + for (size_t i = 1; i < handles.size(); i++) { + hash = tensorflow::Hash64Combine(hash, tensorflow::Hash64(handles[i].DebugString())); } - return ss.str(); + return hash; } - - private: - uint64_t ge_graph_id_; - std::unique_ptr graph_; - PruneInputsFunc prune_func_; - const std::map> dependent_host_resources_; - std::atomic_bool mutable built_{false}; - std::atomic_bool mutable need_loop_{false}; + tensorflow::mutex mutable shared_lock; + bool always_infer_shape_; + TensorShapes output_shapes_; + TensorPartialShapes partial_output_shapes_; + tensorflow::NodeDef attached_attrs_; + std::map> mutable handles_to_specs_ GUARDED_BY(shared_lock); }; } // namespace npu diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index bdaba6abfbb463bb4bc6e235367a63c0b4c32f60..017a7c725cbc0ea9b54a542354897ef9d599b8fd 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -327,11 +327,7 @@ tensorflow::Status NpuDevice::ValidateInput(const char *op_name, int num_inputs, tensorflow::Status NpuDevice::ValidateOutput(const char *op_name, const TensorDataTypes &data_types) { for (size_t i = 0; i < data_types.size(); i++) { auto data_type = data_types[i]; - if (data_type == tensorflow::DT_RESOURCE) { - if (!SupportedResourceGenerator(op_name)) { - return tensorflow::errors::Unimplemented("Op ", op_name, " unsupported resource generator by NPU"); - } - } else if (!tensorflow::DataTypeCanUseMemcpy(data_type)) { + if (!tensorflow::DataTypeCanUseMemcpy(data_type)) { return tensorflow::errors::Unimplemented("Op ", op_name, " output ", i, " unsupported type ", tensorflow::DataTypeString(data_type)); } @@ -396,9 +392,8 @@ tensorflow::Status NpuDevice::TransResourceInput2GraphNode( std::map> &dependent_host_resources) { (void)RemoveRedundantHcomControlEdges(graph); - std::set arg_is_variable; std::set arg_is_iterator; - + std::set arg_is_npu_resource; std::map arg_resource_handles; VecTensorDataTypes arg_handle_dtyes(num_inputs); @@ -417,12 +412,7 @@ tensorflow::Status NpuDevice::TransResourceInput2GraphNode( GetMirroredIteratorShapesAndTypes(handle, arg_handle_shapes[i], arg_handle_dtyes[i]); arg_is_iterator.insert(i); } else { - const auto &dtypes_and_shapes = handle.dtypes_and_shapes(); - for (auto &dtype_and_shape : dtypes_and_shapes) { - arg_handle_dtyes[i].push_back(dtype_and_shape.dtype); - arg_handle_shapes[i].push_back(dtype_and_shape.shape); - } - arg_is_variable.insert(i); + arg_is_npu_resource.insert(i); } } } @@ -441,16 +431,15 @@ tensorflow::Status NpuDevice::TransResourceInput2GraphNode( .Attr("_arg_index", int(index)) .Finalize(graph, &arg_substitutes[node])); - } else if (arg_is_variable.count(index)) { - tensorflow::Node *variable = nullptr; - NPU_REQUIRES_OK(tensorflow::NodeBuilder(WrapResourceName(arg_resource_handles[index].name()), "VarHandleOp") - .Attr("container", arg_resource_handles[index].container()) - .Attr("shared_name", arg_resource_handles[index].name()) - .Attr("dtype", arg_handle_dtyes[index][0]) - .Attr("shape", arg_handle_shapes[index][0]) - .Attr("_arg_name", node->name()) - .Attr("_arg_index", int(index)) - .Finalize(graph, &arg_substitutes[node])); + } else if (arg_is_npu_resource.count(index)) { + std::shared_ptr generator = nullptr; + GetResourceGeneratorDef(arg_resource_handles[index], &generator); + NPU_REQUIRES(generator != nullptr, + tensorflow::errors::Internal("Unknown npu resource ", arg_resource_handles[index].DebugString())); + tensorflow::Status status; + arg_substitutes[node] = graph->AddNode(*generator->NodeDef(), &status); + NPU_REQUIRES_OK(status); + arg_substitutes[node]->set_name(arg_resource_handles[index].name()); } } } @@ -1130,6 +1119,39 @@ void NpuDevice::FallbackCPU(TFE_Context *context, const char *op_name, const TFE (*hook)(context, this, op_name, attributes, num_inputs, inputs, *num_outputs, outputs, status); if (TF_GetCode(status) != TF_OK) return; } + + if (SupportedResourceGenerator(op_name)) { + auto ndef = std::make_shared(); + ndef->set_op(op_name); + tensorflow::unwrap(attributes)->FillAttrValueMap(ndef->mutable_attr()); + for (int i = 0; i < *num_outputs; ++i) { + const tensorflow::Tensor *cpu_tensor = nullptr; + NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(outputs[i], &cpu_tensor)); + + if (cpu_tensor->dtype() == tensorflow::DT_RESOURCE) { + scope_handle_deleter.Guard(outputs[i]); + outputs[i] = NewDeviceResourceHandle(context, cpu_tensor->shape(), status); + if (TF_GetCode(status) != TF_OK) { + return; + } + const tensorflow::Tensor *npu_tensor = nullptr; + NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(outputs[i], &npu_tensor)); + for (int j = 0; j < npu_tensor->NumElements(); j++) { + auto resource_handle = cpu_tensor->flat()(j); + const_cast(npu_tensor)->flat()(j) = resource_handle; + tensorflow::AttrValue container_attr; + tensorflow::SetAttrValue(resource_handle.container(), &container_attr); + tensorflow::AttrValue shared_name_attr; + tensorflow::SetAttrValue(resource_handle.name(), &shared_name_attr); + (*ndef->mutable_attr())["container"] = container_attr; + (*ndef->mutable_attr())["shared_name"] = shared_name_attr; + RecordResourceGeneratorDef(resource_handle, std::make_shared(ndef, j)); + DLOG() << "Create resource " << op_name << " " << resource_handle.DebugString() << " generated by " + << ndef->DebugString() << " on NPU"; + } + } + } + } } void NpuDevice::FallbackCPU(TFE_Context *context, const npu::OpSpec *spec, int num_inputs, TFE_TensorHandle **inputs, @@ -1170,7 +1192,7 @@ void NpuDevice::Execute(const TFE_Op *op, int *num_outputs, TFE_TensorHandle **o // 如果存在一个算子的输入来自多个设备的情况,需要直接报错 bool cpu_resource = false; NPU_CTX_REQUIRES_OK(s, ValidateResourcePlacement(op_name, num_inputs, inputs.data(), cpu_resource)); - // 如果算子有resource输入来自CPU,则必须fallback CPU + // 如果算子有resource输入来自CPU且未生成NPU镜像资源,则必须fallback CPU if (cpu_resource) { DLOG() << "NPU Executing " << op_name << " fallback[input resource from cpu]"; FallbackCPU(context, op_name, attributes, inputs.size(), inputs.data(), num_outputs, outputs, s); @@ -1219,6 +1241,45 @@ void NpuDevice::Run(TFE_Context *context, std::shared_ptr s void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status) { + // 这里有两种输入是Resource的可能,一种是Mirrored的Iterator资源输入,一种是NPU上的资源输入 + std::vector npu_resources; + std::vector mirrored_resources; + std::vector remain_indexes; + for (int i = 0; i < num_inputs; ++i) { + const tensorflow::Tensor *tensor = nullptr; + NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(inputs[i], &tensor)); + if (tensor->dtype() == tensorflow::DT_RESOURCE) { + if (IsNpuTensorHandle(npu::UnwrapHandle(inputs[i]))) { + npu_resources.emplace_back(tensor->flat()(0)); + } else { + mirrored_resources.emplace_back(tensor->flat()(0)); + } + } else { + remain_indexes.push_back(i); + } + } + + // 这个开关用于控制在function时,内存极端场景下,禁用ACL执行单算子,从而降低峰值内存占用 + if (!kExecuteOpByAcl) { + if (npu_resources.empty()) { + DLOG() << "NPU Executing op " << spec->Op() << " fallback cpu as acl engine not enabled"; + FallbackCPU(context, spec, num_inputs, inputs, num_outputs, outputs, status); + return; + } else { + DLOG() << "Op " << spec->Op() << " not fallback cpu as it has resource input from NPU"; + } + } + + // 对于镜像的资源,默认在host上消费 + if (!mirrored_resources.empty()) { + NPU_CTX_REQUIRES( + status, npu_resources.empty(), + tensorflow::errors::Unimplemented("Npu currently unsupported mix use of mirrored and npu resources")); + DLOG() << "NPU Executing op " << spec->Op() << " fallback cpu as mirrored resource from cpu in eager mode"; + FallbackCPU(context, spec, num_inputs, inputs, num_outputs, outputs, status); + return; + } + TensorShapes output_shapes; tensorflow::NodeDef parser_ndef = spec->ParserNodeDef(); if (spec->ShouldInferShape()) { @@ -1241,6 +1302,8 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp } if (should_fallback) { DLOG() << "NPU Executing op " << spec->Op() << " fallback cpu after re-infer shape"; + NPU_CTX_REQUIRES(status, npu_resources.empty(), + tensorflow::errors::Unimplemented("Npu currently not support dynamic output shapes resources")); FallbackCPU(context, spec, num_inputs, inputs, num_outputs, outputs, status); return; } @@ -1249,6 +1312,7 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp output_shapes = spec->OutputShapes(); } + // 先获取自定义Kernel,用于补齐当前NPU上不支持的算子操作 NpuCustomKernelFunc *custom_kernel = nullptr; if (CustomKernelRegistry::Instance().GetCustomKernelFunc(spec->Op(), &custom_kernel)) { (*custom_kernel)(context, this, spec, output_shapes, parser_ndef, num_inputs, inputs, *num_outputs, outputs, @@ -1256,35 +1320,114 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp return; } - if (!kExecuteOpByAcl) { - bool op_can_fallback = true; - if (SupportedResourceGenerator(spec->Op())) { // Should never fallback npu resource generator - DLOG() << "Op " << spec->Op() << " not fallback cpu as it is resource generator"; - op_can_fallback = false; - } else { - for (int i = 0; i < num_inputs; ++i) { // Should never fallback if op has npu resource input - if (IsNpuTensorHandle(npu::UnwrapHandle(inputs[i])) && - npu::UnwrapHandle(inputs[i])->DataType() == tensorflow::DT_RESOURCE) { - DLOG() << "Op " << spec->Op() << " not fallback cpu as it has resource input from NPU"; - op_can_fallback = false; - break; + // 如果输入有NPU的资源,这里进行节点补齐,并缓存到当前算子的func specs中 + if (!npu_resources.empty()) { + std::shared_ptr func_spec = nullptr; + spec->GetFuncSpec(npu_resources, &func_spec); + + if (func_spec == nullptr) { + std::unique_ptr graph = std::make_unique(tensorflow::OpRegistry::Global()); + + tensorflow::Status s; + tensorflow::Node *target_node = graph->AddNode(spec->NodeDef(), &s); + NPU_CTX_REQUIRES_OK(status, s); + target_node->set_name(spec->Op()); + + const auto &shapes = spec->InputShapes(); + const auto &types = spec->InputTypes(); + + int arg_index = 0; + int resource_index = 0; + std::unordered_map, tensorflow::Node *> def_nodes; + + for (int i = 0; i < num_inputs; i++) { + if (types[i] == tensorflow::DT_RESOURCE) { + std::shared_ptr generator = nullptr; + const auto &handle = npu_resources[resource_index++]; + GetResourceGeneratorDef(handle, &generator); + NPU_CTX_REQUIRES(status, generator != nullptr, + tensorflow::errors::Internal("Unknown npu resource ", handle.DebugString())); + + if (def_nodes.find(generator->NodeDef()) == def_nodes.end()) { + tensorflow::Node *node = graph->AddNode(*generator->NodeDef(), &s); + NPU_CTX_REQUIRES_OK(status, s); + node->set_name(handle.name()); + def_nodes[generator->NodeDef()] = node; + } + NPU_CTX_REQUIRES( + status, graph->AddEdge(def_nodes[generator->NodeDef()], generator->Index(), target_node, i), + tensorflow::errors::Internal("Failed add edge from ", def_nodes[generator->NodeDef()]->name(), " to ", + target_node->name())); + + } else { + tensorflow::Node *node = nullptr; + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder("arg_" + std::to_string(i), "_Arg") + .Attr("T", types[i]) + .Attr("index", arg_index++) + .Attr("_handle_dtypes", types[i]) + .Attr("_handle_shapes", shapes[i]) + .Finalize(graph.get(), &node)); + NPU_CTX_REQUIRES( + status, graph->AddEdge(node, 0, target_node, i), + tensorflow::errors::Internal("Failed add edge from ", node->name(), " to ", target_node->name())); } } + + const auto &output_types = spec->OutputTypes(); + for (int i = 0; i < *num_outputs; i++) { + tensorflow::Node *node = nullptr; + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder("ret_" + std::to_string(i), "_Retval") + .Input(target_node, i) + .Attr("T", output_types[i]) + .Attr("index", i) + .Finalize(graph.get(), &node)); + } + + npu::FuncSpec::PruneInputsFunc prune_func = [remain_indexes](int num_inputs, TFE_TensorHandle **inputs, + std::vector &pruned) { + for (auto index : remain_indexes) { + pruned.push_back(inputs[index]); + } + }; + std::vector pruned_inputs; + prune_func(num_inputs, inputs, pruned_inputs); + + tensorflow::FixupSourceAndSinkEdges(graph.get()); + MarkGraphNodeInOutDesc(context, graph.get(), pruned_inputs.size(), pruned_inputs.data()); + + auto graph_def = std::make_unique(); + graph->ToGraphDef(graph_def.get()); + + OptimizeStageGraphDumper graph_dumper(spec->Op()); + if (kDumpGraph && kDumpExecutionDetail) { + std::string suffix = npu_resources[0].name(); + for (int i = 1; i < npu_resources.size(); i++) { + suffix += "."; + suffix += npu_resources[i].name(); + } + graph_dumper.Dump(suffix, *graph_def); + } + + uint64_t ge_graph_id = NextUUID(); + AddGeGraph(context, ge_graph_id, spec->NodeDef().name() + "_" + std::to_string(ge_graph_id), *graph_def, status); + if (TF_GetCode(status) != TF_OK) return; + func_spec = + std::make_shared(spec->OpRegistrationData(), spec->NodeDef(), ge_graph_id, std::move(graph_def), + prune_func, std::map>{}, ""); + + spec->CacheFuncSpec(npu_resources, func_spec); } - if (op_can_fallback) { - DLOG() << "NPU Executing op " << spec->Op() << " fallback cpu as acl engine not enabled"; - FallbackCPU(context, spec, num_inputs, inputs, num_outputs, outputs, status); - return; - } + + RunGraph(context, func_spec.get(), num_inputs, inputs, num_outputs, outputs, status); + return; } - // 输入如果是CPU,此时要转换成NPU + + // 到达这里已经没有资源类型的输入或者输出了,输入如果是CPU,要转换成NPU std::vector npu_inputs(num_inputs); ScopeTensorHandleDeleter scope_handle_deleter; for (int i = 0; i < num_inputs; ++i) { TFE_TensorHandle *input = inputs[i]; - // 到达这里的Resource,要么是CPU的镜像 要么是NPU - if (!IsNpuTensorHandle(npu::UnwrapHandle(input)) && - npu::UnwrapHandle(input)->DataType() != tensorflow::DT_RESOURCE) { + if (!IsNpuTensorHandle(npu::UnwrapHandle(input))) { tensorflow::Status s; auto src_name = npu::UnwrapHandle(input)->DeviceName(&s); NPU_CTX_REQUIRES_OK(status, s); @@ -1300,16 +1443,9 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp } const auto &output_types = spec->OutputTypes(); for (size_t i = 0; i < output_types.size(); ++i) { - if (output_types[i] == tensorflow::DT_RESOURCE) { - outputs[i] = NewDeviceResourceHandle(context, output_shapes[i], status); - if (TF_GetCode(status) != TF_OK) { - return; - } - } else { - outputs[i] = NewDeviceTensorHandle(context, Format::FORMAT_ND, output_shapes[i], output_types[i], status); - if (TF_GetCode(status) != TF_OK) { - return; - } + outputs[i] = NewDeviceTensorHandle(context, Format::FORMAT_ND, output_shapes[i], output_types[i], status); + if (TF_GetCode(status) != TF_OK) { + return; } } /******************************************模拟NPU执行Start************************************/ @@ -1327,28 +1463,21 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(npu_inputs[i], &npu_tensor)); npu::Unwrap(npu_tensor); // 返回值就是NpuManagedBuffer* */ + /**********CPU MOCK Start*************/ std::vector acl_inputs(num_inputs); for (int i = 0; i < num_inputs; ++i) { const tensorflow::Tensor *npu_tensor = nullptr; NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(npu_inputs[i], &npu_tensor)); tensorflow::Tensor cpu_tensor(npu_tensor->dtype(), npu_tensor->shape()); - if (npu_tensor->dtype() == tensorflow::DT_RESOURCE) { - for (int j = 0; j < npu_tensor->NumElements(); j++) { - cpu_tensor.flat()(j) = - const_cast(npu_tensor)->flat()(j); - } - } else { - NPU_CTX_REQUIRES_OK(status, npu::Unwrap(npu_tensor)->AssembleTo(&cpu_tensor)); - } + NPU_CTX_REQUIRES_OK(status, npu::Unwrap(npu_tensor)->AssembleTo(&cpu_tensor)); acl_inputs[i] = tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(cpu_tensor)); scope_handle_deleter.Guard(acl_inputs[i]); if (TF_GetCode(status) != TF_OK) return; } - /**********调用CPU模拟NPU Start*************/ std::vector acl_outputs(*num_outputs); FallbackCPU(context, spec, num_inputs, acl_inputs.data(), num_outputs, acl_outputs.data(), status); if (TF_GetCode(status) != TF_OK) return; - /**********调用CPU模拟NPU End*************/ + /**********CPU MOCK End*************/ for (int i = 0; i < *num_outputs; ++i) { const tensorflow::Tensor *acl_tensor = nullptr; NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(acl_outputs[i], &acl_tensor)); @@ -1953,6 +2082,19 @@ bool NpuDevice::SupportedResourceGenerator(const std::string &op) { return kUnsupportedOps.count(op) != 0; } +void NpuDevice::RecordResourceGeneratorDef(const tensorflow::ResourceHandle &key, + std::shared_ptr src) { + device_resources_.emplace(key, src); +} + +void NpuDevice::GetResourceGeneratorDef(const tensorflow::ResourceHandle &key, + std::shared_ptr *src) { + auto iter = device_resources_.find(key); + if (iter != device_resources_.end()) { + *src = iter->second; + } +} + void NpuDevice::RecordIteratorMirror(const tensorflow::ResourceHandle &src, const TensorPartialShapes &shapes, const TensorDataTypes &types) { iterator_mirrors_.emplace(src, std::make_pair(shapes, types)); diff --git a/tf_adapter_2.x/npu_device/core/npu_device.h b/tf_adapter_2.x/npu_device/core/npu_device.h index bafeb9f574b1863ceea0da821cee086e7e8a3e49..3943e265b2492383fd84f098cb12cff52224e86c 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.h +++ b/tf_adapter_2.x/npu_device/core/npu_device.h @@ -41,6 +41,17 @@ class NpuDevice { using CachedFuncSpecs = std::map>; using DoneCallback = std::function; + class ResourceGenerator { + public: + ResourceGenerator(std::shared_ptr def, int index) : def_(def), index_(index) {} + std::shared_ptr NodeDef() const { return def_; } + int Index() const { return index_; } + + private: + std::shared_ptr def_; + int index_; + }; + public: static std::string CreateDevice(const char *name, int device_index, const std::map &session_options, NpuDevice **device); @@ -182,6 +193,10 @@ class NpuDevice { bool SupportedResourceGenerator(const std::string &op); + void RecordResourceGeneratorDef(const tensorflow::ResourceHandle &key, std::shared_ptr src); + + void GetResourceGeneratorDef(const tensorflow::ResourceHandle &key, std::shared_ptr *src); + void RecordIteratorMirror(const tensorflow::ResourceHandle &src, const TensorPartialShapes &shapes, const TensorDataTypes &types); @@ -249,6 +264,7 @@ class NpuDevice { std::unique_ptr cancellation_manager_; CachedOpSpecs cached_op_specs_; CachedFuncSpecs cached_func_specs_; + std::map, ResourceCompare> device_resources_; std::map, ResourceCompare> iterator_mirrors_; std::map, ResourceCompare> iterator_providers_; diff --git a/tf_adapter_2.x/npu_device/kernels/destroy_resource_op.cpp b/tf_adapter_2.x/npu_device/kernels/destroy_resource_op.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c2939a317f5749caf3043faf76e744aa007f50b1 --- /dev/null +++ b/tf_adapter_2.x/npu_device/kernels/destroy_resource_op.cpp @@ -0,0 +1,61 @@ +/* Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include +#include + +#include "tensorflow/c/c_api.h" +#include "tensorflow/c/eager/c_api.h" +#include "tensorflow/c/eager/c_api_experimental.h" +#include "tensorflow/c/tf_status.h" +#include "tensorflow/core/lib/gtl/cleanup.h" +#include "tensorflow/core/platform/logging.h" + +#include "absl/algorithm/container.h" +#include "tensorflow/c/c_api_internal.h" +#include "tensorflow/c/eager/immediate_execution_operation.h" +#include "tensorflow/c/eager/tfe_context_internal.h" +#include "tensorflow/c/eager/tfe_op_internal.h" +#include "tensorflow/c/eager/tfe_tensorhandle_internal.h" + +#include "npu_custom_kernel.h" +#include "npu_utils.h" + +static auto kernel = [](TFE_Context *context, NpuDevice *dev, const npu::OpSpec *spec, + const TensorShapes &output_shapes, const tensorflow::NodeDef &parser_ndef, int num_inputs, + TFE_TensorHandle **inputs, int num_outputs, TFE_TensorHandle **outputs, TF_Status *status) { + NPU_CTX_REQUIRES( + status, num_inputs == 1, + tensorflow::errors::InvalidArgument("Destroy resource op has ony 1 resource input, got ", num_inputs)); + NPU_CTX_REQUIRES(status, IsNpuTensorHandle(npu::UnwrapHandle(inputs[0])), + tensorflow::errors::InvalidArgument("Destroy resource op resource input must be from npu")); + const tensorflow::Tensor *npu_tensor = nullptr; + NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(inputs[0], &npu_tensor)); + NPU_CTX_REQUIRES(status, npu_tensor->dtype() == tensorflow::DT_RESOURCE, + tensorflow::errors::InvalidArgument("Destroy resource op input must be resource, got ", + tensorflow::DataTypeString(npu_tensor->dtype()))); + + tensorflow::Tensor cpu_tensor(npu_tensor->dtype(), npu_tensor->shape()); + for (int j = 0; j < npu_tensor->NumElements(); j++) { + cpu_tensor.flat()(j) = + const_cast(npu_tensor)->flat()(j); + } + + std::vector cpu_inputs(num_inputs); + cpu_inputs[0] = tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(cpu_tensor)); + dev->FallbackCPU(context, spec, num_inputs, cpu_inputs.data(), &num_outputs, outputs, status); + TFE_DeleteTensorHandle(cpu_inputs[0]); +}; + +NPU_REGISTER_CUSTOM_KERNEL("DestroyResourceOp", kernel); diff --git a/tf_adapter_2.x/npu_device/kernels/read_variable_op.cpp b/tf_adapter_2.x/npu_device/kernels/read_variable_op.cpp index 7062079bb89665cb667143258b8e737e949d5b48..c5f832e890fbadea2a5a15d1ad5fa3bd1c72b71e 100644 --- a/tf_adapter_2.x/npu_device/kernels/read_variable_op.cpp +++ b/tf_adapter_2.x/npu_device/kernels/read_variable_op.cpp @@ -114,4 +114,4 @@ static auto kernel = [](TFE_Context *context, NpuDevice *dev, const npu::OpSpec dev->RunGeGraphPin2CpuAnonymous(context, graph_name, var_read_graph, 0, nullptr, num_outputs, outputs, status); }; -NPU_REGISTER_CUSTOM_KERNEL("ReadVariableOp", kernel); +// NPU_REGISTER_CUSTOM_KERNEL("ReadVariableOp", kernel); diff --git a/tf_adapter_2.x/npu_device/kernels/resource_variable_op.cpp b/tf_adapter_2.x/npu_device/kernels/resource_variable_op.cpp index 6b70cb194471b9712793709c607d080bbfb21754..747a57fb1087a0f4b4be8d60cf0559b3fc41718f 100644 --- a/tf_adapter_2.x/npu_device/kernels/resource_variable_op.cpp +++ b/tf_adapter_2.x/npu_device/kernels/resource_variable_op.cpp @@ -142,6 +142,6 @@ static auto kernel_assign_sub = [](TFE_Context *context, NpuDevice *dev, const n num_outputs, outputs, status); }; -NPU_REGISTER_CUSTOM_KERNEL("AssignVariableOp", kernel_assign); -NPU_REGISTER_CUSTOM_KERNEL("AssignAddVariableOp", kernel_assign_add); -NPU_REGISTER_CUSTOM_KERNEL("AssignSubVariableOp", kernel_assign_sub); +//NPU_REGISTER_CUSTOM_KERNEL("AssignVariableOp", kernel_assign); +//NPU_REGISTER_CUSTOM_KERNEL("AssignAddVariableOp", kernel_assign_add); +//NPU_REGISTER_CUSTOM_KERNEL("AssignSubVariableOp", kernel_assign_sub);