From 32c03baa6fff3f4061e5306c77355f0f61a01947 Mon Sep 17 00:00:00 2001 From: guopeian Date: Wed, 22 May 2024 10:51:22 +0800 Subject: [PATCH 1/3] geop --- tf_adapter/kernels/geop_npu.cc | 762 +++++++------- tf_adapter/kernels/geop_npu.h | 48 +- .../ge_runner/src/callback_executor.cc | 86 ++ .../depends/ge_runner/src/callback_executor.h | 54 + .../depends/ge_runner/src/ge_runner_stub.cc | 48 +- .../tests/depends/ge_runner/src/ge_stub.h | 1 + .../st/kernels/testcase/geop_npu_test.cc | 16 +- .../kernels/pbtxt/geop_jit_compile_auto.pbtxt | 995 ++++++++++++++++++ .../ut/kernels/testcase/geop_npu_test.cc | 150 ++- 9 files changed, 1762 insertions(+), 398 deletions(-) create mode 100644 tf_adapter/tests/depends/ge_runner/src/callback_executor.cc create mode 100644 tf_adapter/tests/depends/ge_runner/src/callback_executor.h create mode 100644 tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index 537994554..a7107b79d 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -104,7 +104,8 @@ const float kMaxStepRatio = 0.9; const float kDefaultLossRatio = 1.05; const float kMinLossRatio = 1.01; const float kMaxLossRatio = 1.5; - +thread_local uint64_t add_graph_flag = 0UL; +const int32_t kMaxAddNum = 8; const std::map fast_value_string_2_eunm = {{"fast", GeOp::FastValue::kfast}, {"fast1", GeOp::FastValue::kfast1}}; @@ -333,6 +334,14 @@ void SetReuseOptions(const std::string &key, int32_t num, const std::mapsecond; } } +class ExitCallbackGuarder { + public: + explicit ExitCallbackGuarder(std::function done) : done_(done) {} + ~ExitCallbackGuarder() { done_(); } + + private: + std::function done_; +}; } // namespace std::string CurrentTimeInStr() { @@ -354,10 +363,10 @@ const int kFatalSleepTime = 3000; const std::string kAllReduce = "HcomAllReduce"; GeOp::GeOp(OpKernelConstruction *ctx) - : AsyncOpKernel(ctx), init_flag_(false), build_flag_(false), add_graph_flag_(false), sess_init_flag_(false), - compute_graph_empty_(false), is_input_convert_(false), data_format_(""), graph_id_(0), - is_initialized_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), - is_host_graph_(false), handle_(nullptr), need_compile_graph_first_(false), tuned_flag_(ATOMIC_FLAG_INIT), + : AsyncOpKernel(ctx), init_flag_(false), sess_init_flag_(false), + is_input_convert_(false), data_format_(""), graph_id_(0), + is_initialized_graph_(false), is_empty_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), + is_host_graph_(false), handle_(nullptr), tuned_flag_(ATOMIC_FLAG_INIT), jit_compile_("2"), is_dynamic_input_(false), session_id_(0), aoe_initialize_(nullptr), aoe_finalize_(nullptr), aoe_create_session_(nullptr), aoe_destroy_session_(nullptr), aoe_set_gesession_(nullptr), aoe_set_dependgraphs_(nullptr), aoe_set_tuninggraph_(nullptr), aoe_tuning_graph_(nullptr), @@ -508,18 +517,13 @@ void GeOp::Finalize() { // global environment finalize, invoke once for each process { mutex_lock lock{mu_}; - uint32_t graph_id = -1; if (sess_init_flag_ || !tf_session_.empty()) { - bool ret = DecrementGraphIdCount(tf_session_, graph_id); + bool ret = DecrementGraphIdCount(); if (!ret) { ADP_LOG(ERROR) << "tf session " << tf_session_ << " sub graph id failed."; LOG(ERROR) << "tf session " << tf_session_ << " sub graph id failed."; return; } - if (graph_id == kInvalidGraphId) { - SessionManager::GetInstance().DestroyGeSession(tf_session_); - ClearGraphIdCount(); - } } if (!SessionManager::GetInstance().IsGeSessionExist()) { @@ -808,42 +812,54 @@ bool GeOp::IsGraphNeedRebuild(const uint32_t cache_graph_id) { return ((need_recover_precision_mode_) || (ge_session_->IsGraphNeedRebuild(cache_graph_id))); } -int32_t GeOp::InitRebuildFlag(uint32_t cache_graph_id) { - if (!build_flag_) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id - << " does not build yet, no need to check rebuild"; - return 0; - } - if (compute_graph_empty_) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id - << " is empty, no need to check rebuild"; - return 0; - } - if (ge_session_ == nullptr) { - ADP_LOG(ERROR) << "[GEOP] GE session is nullptr"; - LOG(ERROR) << "[GEOP] GE session is nullptr"; - return -1; - } - if (!IsGraphNeedRebuild(cache_graph_id)) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id << " no need to rebuild"; - return 0; +Status GeOp::RemoveGraph(const uint32_t &graph_id) { + if (graph_handler_.status == Init) { + return Status::OK(); } - ADP_LOG(INFO) << "[GEOP] The graph need rebuild, graph id " << cache_graph_id << " ,need_change_precision_mode: " - << need_recover_precision_mode_; - - // The graph need to rebuild, remove it from GE first. - ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - auto ret = ge_session_->RemoveGraph(cache_graph_id); + auto ret = ge_session_->RemoveGraph(graph_id); if (ret != ge::SUCCESS) { - ADP_LOG(ERROR) << "[GEOP] Failed to remove graph " << cache_graph_id << " from ge, error code " << ret; - LOG(ERROR) << "[GEOP] Failed to remove graph " << cache_graph_id << " from ge, error code " << ret << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - return -1; + return errors::Internal("[GEOP] Failed to remove graph ", + graph_id, "from ge, error code ", ret, + "Error Message is : ", ge::GEGetErrorMsgV2().GetString()); } + ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << ", graph id: " << graph_id << "Removed graph"; + ADP_LOG(INFO) << "Set graph_status to Init" << std::endl; + return Status::OK(); +} - build_flag_ = false; - compute_graph_empty_ = false; - return 0; +Status GeOp::CheckAndRemoveGraph(OpKernelContext *ctx, const uint32_t &graph_id) { + mutex_lock lock{graph_handler_.graph_mu}; + // Init状态不需要做check + if (graph_handler_.status == Init) { + return Status::OK(); + } + // To be compatible with old versions, we should check dynamic_input_ and dynamic_config + bool shape_changed = false; + bool is_set_dynamic_config = IsDynamicConfig(); + if ((!is_dynamic_input_) && (!is_set_dynamic_config)) { + shape_changed = MaybeUpdateShape(ctx); + } + if (shape_changed || IsGraphNeedRebuild(graph_id)) { + ADP_LOG(INFO) << "[GEOP] The graph need rebuild, graph id " + << graph_id << " ,need_change_precision_mode: " + << need_recover_precision_mode_; + // 让进入需要Remove状态时,其他线程需要等待他remove完 + graph_handler_.status = Removing; + while (graph_handler_.graph_run_num > 0) { + ADP_LOG(INFO) << "Remove wait, run_num: " << graph_handler_.graph_run_num + << ", graph_status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + auto ret = RemoveGraph(graph_id); + graph_handler_.status = Init; + // 当remove模型时,所有的线程需要重新做加载,mask右移一位,重置flag + graph_handler_.add_graph_mask = graph_handler_.add_graph_mask << 1UL; + // 重置addGraph的个数 + graph_handler_.add_total_num = 0; + graph_handler_.cv.notify_all(); + return ret; + } + return Status::OK(); } bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { @@ -854,16 +870,24 @@ bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { } auto it = session_and_graph_id_map_.find(tf_session_); if (it != session_and_graph_id_map_.end()) { - it->second = it->second + kMaxCacheNum; - graph_id = it->second; - return true; + auto iter_graph_id = it->second.find(geop_name_); + if (iter_graph_id != it->second.end()) { + graph_id = iter_graph_id->second; + } else { + graph_id = current_size_ * kMaxCacheNum + 1U; + it->second.insert(std::make_pair(geop_name_, graph_id)); + current_size_++; + } + } else { + graph_id = current_size_ * kMaxCacheNum + 1U; + std::unordered_map graph_id_map = {{geop_name_, graph_id}}; + session_and_graph_id_map_.insert(std::make_pair(tf_session_, graph_id_map)); + current_size_++; } - graph_id = 1; - session_and_graph_id_map_.insert(std::make_pair(tf_session_, graph_id)); return true; } -bool GeOp::DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_id) { +bool GeOp::DecrementGraphIdCount() { if (tf_session_.empty()) { ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, tf session is empty."; LOG(ERROR) << "[GEOP] Sub graph id failed, tf session is empty."; @@ -872,17 +896,24 @@ bool GeOp::DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_ auto it = session_and_graph_id_map_.find(tf_session_); if (it != session_and_graph_id_map_.end()) { - if (it->second == 1) { - it->second = it->second - 1; - graph_id = it->second; - return true; + auto graph_name_iter = it->second.find(geop_name_); + if (graph_name_iter != it->second.end()) { + it->second.erase(graph_name_iter); + } else { + ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find geop name " << geop_name_; + LOG(ERROR) << "[GEOP] Sub graph id failed, can not find geop name " << geop_name_; + return false; + } + if (it->second.empty()) { + session_and_graph_id_map_.erase(it); + sess_init_flag_ = false; + SessionManager::GetInstance().DestroyGeSession(tf_session_); + ClearGraphIdCount(); } - it->second = it->second - kMaxCacheNum; - graph_id = it->second; return true; } - ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session; - LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session; + ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session_; + LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session_; return false; } @@ -894,6 +925,7 @@ void GeOp::ClearGraphIdCount() { } void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector input_shapes) { + mutex_lock lock{graph_handler_.graph_mu}; size_t num = cache_graphs_.size(); if (cache_graphs_.find(input_shapes) != cache_graphs_.end()) { auto iter = std::find_if(graph_counts_.begin(), graph_counts_.end(), @@ -904,7 +936,9 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp iter->second += 1; } cache_graph_id = cache_graphs_[input_shapes]; - build_flag_ = true; + ADP_LOG(INFO) << "Set graph_status to CompileDone when get exec graphid, graph_id: " << cache_graph_id; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); } else { ADP_LOG(INFO) << "[GEOP] This is a dynamic shape neural network, we recommend setting jit_compile to false"; if (num >= kMaxCacheNum) { @@ -922,8 +956,9 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp } else { cache_graph_id = graph_id_ + num; } - build_flag_ = false; - compute_graph_empty_ = false; + ADP_LOG(INFO) << "Set graph_status to Init when has no cache graph, graph_id: " << cache_graph_id; + graph_handler_.status = Init; + graph_handler_.cv.notify_all(); } } @@ -954,8 +989,8 @@ PartialTensorShape GeOp::MakeCompatShape(const PartialTensorShape &a, const Part return MakeUnknownShape(b.dims()); } -bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { - bool updated = false; +void GeOp::InitGraphShape(OpKernelContext *const ctx) { + mutex_lock lock{graph_handler_.graph_mu}; for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { auto &shape = input_shapes_vec_[i]; auto &value_shape = ctx->input(static_cast(i)).shape(); @@ -966,12 +1001,15 @@ bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { } else { shape = value_shape; } - updated = true; ADP_LOG(INFO) << "Init input " << i << " shape to " << shape.value().DebugString(); - continue; } + } +} +bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { + for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { + auto &shape = input_shapes_vec_[i]; + auto &value_shape = ctx->input(static_cast(i)).shape(); if (!shape.value().IsCompatibleWith(value_shape)) { - updated = true; ADP_LOG(INFO) << "Compat input " << i << " shape " << shape.value().DebugString() << " vs. " << value_shape.DebugString(); if ((jit_compile_ == "1") && (compile_dynamic_mode_ != "1")) { @@ -981,9 +1019,10 @@ bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { shape = MakeCompatShape(shape.value(), value_shape); } ADP_LOG(INFO) << "Refresh input " << i << " shape to " << shape.value().DebugString(); + return true; } } - return updated; + return false; } Status GeOp::CreateGeSession() { @@ -1004,12 +1043,9 @@ Status GeOp::CreateGeSession() { ADP_LOG(INFO) << "[GePlugin] Initialize ge success."; first = false; } - if (!sess_init_flag_) { - mutex_lock lock{mu_}; - if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || - tf_session_.empty() || ge_session_ == nullptr) { - return errors::Internal("Get ge session failed."); - } + if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || + tf_session_.empty() || ge_session_ == nullptr) { + return errors::Internal("Get ge session failed."); } sess_init_flag_ = true; ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << " get ge session success."; @@ -1055,70 +1091,278 @@ PartialTensorShape GeOp::MakeUnknownShape(const int32_t &size) const { return status.ok() ? out_shape : kUnknownRankShape; } +Status GeOp::ParserGraph(OpKernelContext *ctx, std::vector &input_vec) { + // Get Graph + if (graph_handler_.status == CompileDone) { + return Status::OK(); + } + auto func_lib = ctx->function_library(); + if (func_lib == nullptr) { + return errors::Internal("function library is nullptr"); + } + FunctionLibraryDefinition *flib_def = + const_cast(func_lib->GetFunctionLibraryDefinition()); + if (flib_def == nullptr) { + return errors::Internal("flib_def is nullptr"); + } + // Build GraphDef from FunctionDef + GraphDef ori_graph_def; + bool is_allreduce = false; + auto ret = BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce); + if (!ret.ok()) { + return ret; + } + if (kDumpGraph) { + const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name_.c_str() + ".pbtxt"; + (void)WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); + } + ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name_ + << " , tf session: " << tf_session_; + const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); + graph_handler_.graph = std::make_shared(compute_graph_name.c_str()); + if (graph_handler_.graph == nullptr) { + return errors::Internal("compute graph is nullptr"); + } + // parser, tensorflow graph to ge graph + ret = DoGraphParser(graph_handler_.graph, flib_def, ori_graph_def); + if (!ret.ok()) { + return ret; + } + ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name_ + << ", tf session: " << tf_session_ + << ", iteration_per_loop: " << iteration_per_loop_ << + ", need iteration: " << need_iteration_; + return SetGraphOptions(); +} + +Status GeOp::AddGraph(OpKernelContext *ctx, const uint32_t &graph_id) { + // 当此线程未add过图,且总大小小于maxNum,需要去做add + if (((add_graph_flag & graph_handler_.add_graph_mask) == graph_handler_.add_graph_mask) || + (graph_handler_.add_total_num >= kMaxAddNum)) { + return Status::OK(); + } + // call ge session addGraph api + auto graph_options = graph_options_; + if (is_aoe_) { + graph_options["ge.buildMode"] = "normal"; + } + if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { + SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), + sess_options_, init_options_, graph_options); + } + SetReuseOptions("ge.exec.outputReuseMemIndexes", + ctx->num_outputs(), sess_options_, init_options_, graph_options); + ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; + graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; + const auto graph_option_ascend_string = ChangeStringToAscendString(graph_options); + ADP_LOG(INFO) << "Graph options: "; + NpuAttrs::LogOptions(graph_options); + ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(graph_handler_.graph); + if (iteration_per_loop_ > 1) { + ge_graph.SetNeedIteration(need_iteration_); + } + + auto status = ge_session_->AddGraph(graph_id, ge_graph, graph_option_ascend_string); + std::stringstream ss; + if (status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name_ << ", tf session: " << tf_session_ + << ", graph id: " << graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + add_graph_flag = graph_handler_.add_graph_mask; + graph_handler_.add_total_num++; + ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name_ + << ", tf session: " << tf_session_ << ", graph id: " << graph_id + << ", add_num: " << graph_handler_.add_total_num; + return Status::OK(); +} + +Status GeOp::BuildGraph(const uint32_t &graph_id, const std::vector &inputs) { + if (graph_handler_.status == CompileDone) { + return Status::OK(); + } + ge::Status build_graph_status = ge_session_->BuildGraph(graph_id, inputs); + std::stringstream ss; + if (build_graph_status != ge::SUCCESS) { + ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + ADP_LOG(INFO) << "Set graph_status to CompileDone"; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); + LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << graph_id; + return Status::OK(); +} + +Status GeOp::RunGraph(OpKernelContext *ctx, const uint32_t &graph_id, + const std::vector &inputs, + ge::RunAsyncCallback callback) { + // call ge session runGraphAsync api + mutex_lock lock(graph_handler_.graph_mu); + while (graph_handler_.status == Init) { + ADP_LOG(INFO) << "RunGraph wait, graph_status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " + << geop_name_ << ", tf session: " << tf_session_ + << ", graph id: " << graph_id; + ge::Status run_graph_status = ge_session_->RunGraphAsync(graph_id, inputs, callback); + std::stringstream ss; + if (run_graph_status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name_ << ", tf session: " << tf_session_ + << ", graph id: " << graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + ADP_LOG(INFO) << "End RunGraph run_num: " << graph_handler_.graph_run_num; + graph_handler_.graph_run_num++; + graph_handler_.cv.notify_all(); + return Status::OK(); +} + +Status GeOp::SetGraphOptions() { + // convert to ge::graph + if (graph_options_.count("input_format") != 0) { + ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; + } + + if (iteration_per_loop_ > 1) { + graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); + } + + const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); + if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { + graph_options_["ge.graph_key"] = geop_name_; + } + + if (is_host_graph_) { + ADP_LOG(INFO) << "[GEOP] set graph option."; + graph_options_["ge.exec.placement"] = "HOST"; + } + graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; + if (!recompute_mode_.empty()) { + graph_options_["ge.recompute"] = recompute_mode_; + } + if (!max_key_num_.empty()) { + graph_options_["ge.max_key_num"] = max_key_num_; + } + if (!embedding_dim_.empty()) { + graph_options_["ge.embedding_dim"] = embedding_dim_; + } + if (!use_counter_filter_.empty()) { + graph_options_["ge.use_counter_filter"] = use_counter_filter_; + } + if (!padding_key_.empty()) { + graph_options_["ge.padding_key"] = padding_key_; + } + if (!embedding_flags_.empty()) { + graph_options_["ge.embedding_flags"] = embedding_flags_; + } + SetDynamicInput(); + graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; + graph_options_["ge.jit_compile"] = jit_compile_; + graph_options_["ge.exec.overflow"] = "1"; + graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; + return DoAccelerateTrain(); +} + +Status GeOp::CompileGraph(OpKernelContext *ctx, std::vector &input_vec, + const uint32_t &graph_id, const std::vector &inputs, + std::vector input_shapes) { + mutex_lock lock{graph_handler_.graph_mu}; + while (graph_handler_.status == Removing) { + ADP_LOG(INFO) << "Compile graph wait, status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + auto ret = ParserGraph(ctx, input_vec); + if (!ret.ok()) { + return ret; + } + /* if graph is init verify graph, return */ + if (is_initialized_graph_) { + Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); + ctx->set_output(0, initialized_tensor); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is initialize, kernel_name:" + << geop_name_ << ", ret_status:" << ToString(ge::SUCCESS) + << " , tf session: " << tf_session_ << " ,graph id: " << graph_id; + return Status::OK(); + } + if (graph_handler_.graph->GetAllNodesSize() == 0UL) { + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name_ + << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ + << " ,graph id: " << graph_id; + is_empty_graph_ = true; + return Status::OK(); + } + ret = AddGraph(ctx, graph_id); + if (!ret.ok()) { + return ret; + } + const bool is_set_dynamic_config = IsDynamicConfig(); + const bool is_lazy_recompile_mode = IsLazyCompile(); + if (!is_set_dynamic_config && is_lazy_recompile_mode) { + cache_graphs_.insert(std::make_pair(input_shapes, graph_id)); + graph_counts_.push_back(std::make_pair(input_shapes, 1)); + } + ret = BuildGraph(graph_id, inputs); + if (!ret.ok()) { + return ret; + } + return Status::OK(); +} + +bool GeOp::IsLazyCompile() { + return ((dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile")); +} + void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { - // ctx is not nullptr OP_REQUIRES_ASYNC(ctx, init_flag_, errors::InvalidArgument("GeOp not Initialize success."), done); - if (!sess_init_flag_) { - if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty - tf_session_ = "ps_worker_session"; - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; - } - if (tf_session_.empty()) { - tf_session_ = ctx->session_handle(); - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; - } - OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); - - ADP_LOG(INFO) << "[GEOP] Node name: " << ctx->op_kernel().name() << " , tf session: " << tf_session_; - if (!init_options_["ge.jobType"].empty() && !init_options_["ge.tuningPath"].empty()) { - uint32_t device_id = 0; - OP_REQUIRES_OK_ASYNC(ctx, GetEnvDeviceID(device_id), done); - ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] - << ", work_path:" << init_options_["ge.tuningPath"] - << ", distribute_config:" << init_options_["distribute_config"]; - tune_options_.insert(init_options_.cbegin(), init_options_.cend()); - tune_options_.insert({"devices", std::to_string(device_id)}); - tune_options_.insert(sess_options_.cbegin(), sess_options_.cend()); - tune_options_.insert({"work_path", init_options_["ge.tuningPath"]}); - tune_options_.insert({"job_type", init_options_["ge.jobType"]}); - // aoe ini - if (!tuned_initialize_flag_) { - std::map global_options; - global_options.insert( - {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); - global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); - global_options.insert({ge::AscendString("ge.resourceConfigPath"), - ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); - AoeStatus init_ret = (*aoe_initialize_)(global_options); - OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, - errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); - tuned_initialize_flag_ = true; + { + mutex_lock lock{mu_}; + if (!sess_init_flag_) { + if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty + tf_session_ = "ps_worker_session"; + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; + } + if (tf_session_.empty()) { + tf_session_ = ctx->session_handle(); + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; } + geop_name_ = ctx->op_kernel().name(); + OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); + OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); + ADP_LOG(INFO) << "[GEOP] Node name: " << geop_name_ << " , tf session: " << tf_session_; + } + } + if (is_aoe_) { + ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] + << ", work_path:" << init_options_["ge.tuningPath"] + << ", distribute_config:" << init_options_["distribute_config"]; + // aoe ini + mutex_lock lock{mu_}; + if (!tuned_initialize_flag_) { + std::map global_options; + global_options.insert( + {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); + global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); + global_options.insert({ge::AscendString("ge.resourceConfigPath"), + ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); + AoeStatus init_ret = (*aoe_initialize_)(global_options); + OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, + errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); + tuned_initialize_flag_ = true; } } - // convert input to const OP_REQUIRES_OK_ASYNC(ctx, GraphInputConvertToConst(ctx), done); - std::string geop_name = ctx->op_kernel().name(); uint32_t num_inputs = static_cast(ctx->num_inputs()); ADP_LOG(INFO) << "[GEOP] Begin GeOp::ComputeAsync" - << ", kernel_name:" << geop_name << ", num_inputs:" << num_inputs + << ", kernel_name:" << geop_name_ << ", num_inputs:" << num_inputs << ", num_outputs:" << ctx->num_outputs(); - int64 startTime = InferShapeUtil::GetCurrentTimestap(); - int64 endTime = 0; - - // To be compatible with old versions, we should check dynamic_input_ and dynamic_config - bool is_set_dynamic_config = IsDynamicConfig(); - if (dynamic_input_ != "1" && !is_set_dynamic_config) { - bool shape_changed = MaybeUpdateShape(ctx); - if (build_flag_ && shape_changed) { - ge::Status status = ge_session_->RemoveGraph(graph_id_); - if (status != ge::SUCCESS) { - ADP_LOG(WARNING) << "[GEOP] GE remove graph failed, ret : " << ToString(status) << ", graph_id: " << graph_id_; - } - build_flag_ = false; - } - } std::vector input_vec; std::vector input_shapes; @@ -1127,251 +1371,72 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { // if input shapes changed, cache graphs uint32_t cache_graph_id = graph_id_; - bool is_lazy_recompile_mode = (dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile"); - ADP_LOG(INFO) << "is_set_dynamic_config: " << is_set_dynamic_config - << " is_aoe_: " << is_aoe_ + bool is_lazy_recompile_mode = IsLazyCompile(); + ADP_LOG(INFO) << " is_aoe_: " << is_aoe_ << " is_lazy_recompile_mode: " << is_lazy_recompile_mode; + InitGraphShape(ctx); if (is_aoe_) { - if (is_set_dynamic_config) { - ADP_LOG(ERROR) << "dynamic input config can not use with mstuning."; - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("dynamic input config can not use with mstuning."), done); - return; - } + bool is_set_dynamic_config = IsDynamicConfig(); + OP_REQUIRES_ASYNC(ctx, !is_set_dynamic_config, + errors::Internal("dynamic input config can not use with mstuning."), done); auto input_vec_aoe = input_vec; - if (RunTuning(input_vec_aoe, inputs, ctx) != 0) { - ADP_LOG(ERROR) << "RunTuning fail."; - std::stringstream ss; - ss << std::endl << ge::GEGetErrorMsgV2().GetString(); - OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - ADP_LOG(INFO) << geop_name << " RunTuning finish."; - } else if (is_set_dynamic_config) { - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - } else { - // in dynamic input mode, cache graphs. - if (is_lazy_recompile_mode) { - GetExecGraphId(cache_graph_id, input_shapes); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } + OP_REQUIRES_ASYNC(ctx, RunTuning(input_vec_aoe, inputs, ctx) == 0, + errors::Internal("RunTuning fail.\n", ge::GEGetErrorMsgV2().GetString()), done); + ADP_LOG(INFO) << geop_name_ << " RunTuning finish."; } - if (!build_flag_) { - // Get Graph - OP_REQUIRES_ASYNC(ctx, ctx->function_library() != nullptr, errors::Internal("function library is nullptr"), done); - FunctionLibraryDefinition *flib_def = - const_cast(ctx->function_library()->GetFunctionLibraryDefinition()); - OP_REQUIRES_ASYNC(ctx, flib_def != nullptr, errors::Internal("flib_def is nullptr"), done); - - // Build GraphDef from FunctionDef - GraphDef ori_graph_def; - bool is_allreduce = false; - OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce), - done); - - /* if graph is init verify graph, return */ - if (this->is_initialized_graph_) { - Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); - ctx->set_output(0, initialized_tensor); - done(); - return; - } - if (kDumpGraph) { - const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name.c_str() + ".pbtxt"; - (void) WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); - } - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(EVENT) << "[GEOP] In GEOP computeAsync, kernel_name: " << geop_name << " ,TFadapter cost time: [" - << ((endTime - startTime) / kMicrosToMillis) << " ms]."; - ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name - << " , tf session: " << tf_session_ << " , graph id: " << cache_graph_id; - ge::ComputeGraphPtr compute_graph = nullptr; - try { - const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); - compute_graph = std::make_shared(compute_graph_name.c_str()); - } catch (...) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("make shared failed"), done); - } - OP_REQUIRES_ASYNC(ctx, compute_graph != nullptr, errors::InvalidArgument("create ComputeGraph failed"), done); - // parser, tensorflow graph to ge graph - OP_REQUIRES_OK_ASYNC(ctx, DoGraphParser(compute_graph, flib_def, ori_graph_def), done); - ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << " , graph id: " << cache_graph_id - << ", iteration_per_loop: " << iteration_per_loop_ << ", need iteration: " << this->need_iteration_; - size_t nodes = compute_graph->GetAllNodesSize(); - if (nodes == 0) { - build_flag_ = true; - compute_graph_empty_ = true; - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } - // convert to ge::graph - if (graph_options_.count("input_format") != 0) { - ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; - } - ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph); - if (iteration_per_loop_ > 1) { - ge_graph.SetNeedIteration(this->need_iteration_); - graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); - } + if (is_lazy_recompile_mode) { + // in dynamic input mode, cache graphs. + GetExecGraphId(cache_graph_id, input_shapes); + } - const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); - if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { - graph_options_["ge.graph_key"] = geop_name; - } + OP_REQUIRES_OK_ASYNC(ctx, CheckAndRemoveGraph(ctx, cache_graph_id), done); - if (is_host_graph_) { - ADP_LOG(INFO) << "[GEOP] set graph option."; - graph_options_["ge.exec.placement"] = "HOST"; - } - graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; - if (!recompute_mode_.empty()) { - graph_options_["ge.recompute"] = recompute_mode_; - } - if (!max_key_num_.empty()) { - graph_options_["ge.max_key_num"] = max_key_num_; - } - if (!embedding_dim_.empty()) { - graph_options_["ge.embedding_dim"] = embedding_dim_; - } - if (!use_counter_filter_.empty()) { - graph_options_["ge.use_counter_filter"] = use_counter_filter_; - } - if (!padding_key_.empty()) { - graph_options_["ge.padding_key"] = padding_key_; - } - if (!embedding_flags_.empty()) { - graph_options_["ge.embedding_flags"] = embedding_flags_; - } - SetDynamicInput(); - graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; - graph_options_["ge.jit_compile"] = jit_compile_; - graph_options_["ge.exec.overflow"] = "1"; - graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; - OP_REQUIRES_OK_ASYNC(ctx, DoAccelerateTrain(), done); - // call ge session addGraph api - auto graph_options = graph_options_; - if (is_aoe_) { - graph_options["ge.buildMode"] = "normal"; - } - if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { - SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), sess_options_, init_options_, graph_options); - } - SetReuseOptions("ge.exec.outputReuseMemIndexes", ctx->num_outputs(), sess_options_, init_options_, graph_options); - ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; - graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; - OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); - auto const graph_option_ascend_string = ChangeStringToAscendString(graph_options); - ADP_LOG(INFO) << "Graph options: "; - NpuAttrs::LogOptions(graph_options); - auto status = ge_session_->AddGraph(cache_graph_id, ge_graph, graph_option_ascend_string); - std::stringstream ss; - if (status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session add graph failed, kernel: " << geop_name << " ,tf session: " - << tf_session_ << ", graph id: " << cache_graph_id; + OP_REQUIRES_OK_ASYNC(ctx, CompileGraph(ctx, input_vec, cache_graph_id, inputs, input_shapes), done); - ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, status == ge::SUCCESS, errors::Internal(ss.str()), done); - add_graph_flag_ = true; - ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - - build_flag_ = true; - if (!is_set_dynamic_config && is_lazy_recompile_mode) { - cache_graphs_.insert(std::make_pair(input_shapes, cache_graph_id)); - graph_counts_.push_back(std::make_pair(input_shapes, 1)); - } - if (need_compile_graph_first_) { - ge::Status build_graph_status = ge_session_->BuildGraph(cache_graph_id, inputs); - std::stringstream ss; - if (build_graph_status != ge::SUCCESS) { - ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, build_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - ADP_LOG(INFO) << "[GEOP] Build graph success."; - done(); - return; - } - LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << cache_graph_id; - } else { - if (compute_graph_empty_) { - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } + if (is_initialized_graph_ || is_empty_graph_) { + done(); + return; } - int64 run_start_time = InferShapeUtil::GetCurrentTimestap(); - auto callback = [done, ctx, run_start_time](ge::Status ge_status, std::vector &outputs) { + auto callback = [done, ctx, run_start_time, this](ge::Status ge_status, std::vector &outputs) { + ExitCallbackGuarder guarder([ctx, this] () { + mutex_lock lock(graph_handler_.graph_mu); + ADP_LOG(INFO) << "Callback end, run_num: " << graph_handler_.graph_run_num; + graph_handler_.graph_run_num--; + graph_handler_.cv.notify_all(); + }); if (ge_status == ge::SUCCESS) { if (BuildOutputTensorInfo(ctx, outputs) != Status::OK()) { - ADP_LOG(FATAL) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; + ADP_LOG(ERROR) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; std::stringstream ss; ss << ctx->op_kernel().name() << "GEOP::DoRunAsync get output failed." << std::endl << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; } } else if (ge_status == ge::END_OF_SEQUENCE) { ctx->SetStatus(errors::OutOfRange("End of sequence")); ADP_LOG(WARNING) << "[GEOP] Out of range: End of sequence."; LOG(WARNING) << "[GEOP] Out of range: End of sequence."; } else if (ge_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; + ADP_LOG(ERROR) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; std::stringstream ss; ss << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed" << std::endl << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; } int64 run_end_time = InferShapeUtil::GetCurrentTimestap(); ADP_LOG(INFO) << "[GEOP] RunGraphAsync callback, status:" << ge_status << ", kernel_name:" << ctx->op_kernel().name() << "[ " << (run_end_time - run_start_time) << "us]"; done(); }; - - // call ge session runGraphAsync api - ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id; - ge::Status run_graph_status = ge_session_->RunGraphAsync(cache_graph_id, inputs, callback); - std::stringstream ss; - if (run_graph_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << " ,tf session: " - << tf_session_ << " ,graph id: " << cache_graph_id; - ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, run_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name - << ", ret_status: " << ToString(run_graph_status) << ", tf session : " << tf_session_ - << ", graph id: " << cache_graph_id << ", cost [" << ((endTime - startTime) / kMicrosToMillis) << "ms]"; + OP_REQUIRES_OK_ASYNC(ctx, RunGraph(ctx, cache_graph_id, inputs, callback), done); + int64_t end_time = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name_ + << ", tf session : " << tf_session_ << ", graph id: " + << cache_graph_id << ", cost [" + << ((end_time - run_start_time) / kMicrosToMillis) << "ms]"; return; } @@ -1443,12 +1508,6 @@ void GeOp::AddNodeAttrs(Node *node, bool &is_initialize) { is_host_graph_ = true; ADP_LOG(INFO) << "[GEOP] variable subgraph is initialized in host."; } - if (!need_compile_graph_first_) { - if (node->name().find("NpuCompile") != std::string::npos) { - need_compile_graph_first_ = true; - ADP_LOG(INFO) << "[GEOP] set subgraph compile first."; - } - } // clear device info && attr node_def.set_device(""); if (node_def.op() == "Const") { @@ -1589,7 +1648,7 @@ void GeOp::HandleDpOpAndGetNextNodes(Graph &graph) { remove_nodes.push_back(iterator_node); } } - if (dynamic_input_ == "1" && dynamic_graph_execute_mode_ == "lazy_recompile") { + if (IsLazyCompile()) { graph_options_["ge.exec.enableCopyOutputAddr"] = "1"; } } @@ -1951,6 +2010,7 @@ void GeOp::SetShapesToOutputDesc(const std::vector &input_shapes, c } int GeOp::RunTuning(std::vector &input_vec, std::vector &inputs, const OpKernelContext *const ctx) { + mutex_lock lock{graph_handler_.graph_mu}; if (tuned_flag_.test_and_set()) { ADP_LOG(INFO) << ctx->op_kernel().name() << " has tuned."; return 0; @@ -2040,11 +2100,6 @@ int GeOp::RunTuning(std::vector &input_vec, std::vector &inp } { GE_MAKE_GUARD(destroy, callback); - const auto &ge_status = CreateGeSession(); - if (!ge_status.ok()) { - ADP_LOG(ERROR) << "get ge session failed[" << ge_status.error_message() << "]."; - return -1; - } // share ge_session to aoe AoeStatus set_ret = (*aoe_set_gesession_)(session_id_, ge_session_); if (set_ret != Aoe::AOE_SUCCESS) { @@ -2190,7 +2245,7 @@ Status GeOp::AnalyzeStringInput(ge::Tensor &input, uint64_t count, const std::st } Status GeOp::GraphInputConvertToConst(OpKernelContext *ctx) { - mutex_lock lock{mu_}; + mutex_lock lock{graph_handler_.graph_mu}; if (is_input_convert_) { return Status::OK(); } @@ -2300,9 +2355,9 @@ Status GeOp::GraphCheckInputEqualConstOp(Tensor &tensor, int32_t index, bool &is Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vector &input_vec, std::vector &input_shapes, std::vector &inputs) { // ctx is not nullptr + mutex_lock lock{graph_handler_.graph_mu}; int num_inputs = ctx->num_inputs(); std::string cur_input_shapes; - // populate inputs for (int i = 0; i < num_inputs; i++) { Tensor tensor(ctx->input(i)); @@ -2559,7 +2614,8 @@ const std::string GeOp::SERIALIZE_FORMAT = "serialize_format"; const std::string GeOp::SERIALIZE_DATATYPE = "serialize_datatype"; const std::string GeOp::SERIALIZE_SHAPE = "serialize_shape"; const std::string GeOp::SubGraph = "SubGraph"; -std::unordered_map GeOp::session_and_graph_id_map_; +std::unordered_map> GeOp::session_and_graph_id_map_; +uint32_t GeOp::current_size_ = 0U; REGISTER_KERNEL_BUILDER(Name("GeOp").Device(DEVICE_CPU), GeOp); } // namespace tensorflow diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 975846463..59880a10b 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -19,6 +19,7 @@ #include #include +#include #include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/framework/op_kernel.h" @@ -46,6 +47,22 @@ using AoeSetTuningGraphInputFunc = AoeStatus (*)(SessionId, const std::vector &); +enum GraphStatus { + Init, + CompileDone, + Removing +}; + +struct GraphHandler { + GraphStatus status = Init; + mutex graph_mu; + condition_variable cv; + int32_t graph_run_num = 0; + uint64_t add_graph_mask = 1UL; + int32_t add_total_num = 0; + ge::ComputeGraphPtr graph; +}; + class GeOp : public AsyncOpKernel { public: explicit GeOp(OpKernelConstruction *ctx); @@ -94,6 +111,17 @@ public: // prepare output tensor Status BuildOutTensorInfo(OpKernelContext *ctx); + Status ParserGraph(OpKernelContext *ctx, std::vector &input_vec); + Status AddGraph(OpKernelContext *ctx, const uint32_t &graph_id); + Status BuildGraph(const uint32_t &graph_id, + const std::vector &inputs); + Status RunGraph(OpKernelContext *ctx, const uint32_t &graph_id, + const std::vector &inputs, + ge::RunAsyncCallback callback); + Status CompileGraph(OpKernelContext *ctx, std::vector &input_vec, + const uint32_t &graph_id, const std::vector &inputs, + std::vector input_shapes); + bool IsLazyCompile(); // create input and output desc for NodeDef Status GenerateDesc(Node *&node); @@ -108,7 +136,7 @@ public: void AddNodeAttrs(Node *node, bool &is_initialize); - int InitRebuildFlag(uint32_t cache_graph_id); + Status CheckAndRemoveGraph(OpKernelContext *ctx, const uint32_t &graph_id); bool IsGraphNeedRebuild(const uint32_t cache_graph_id); Status DoAccelerateTrain(); Status NeedRecompileWhenAccelerateTrainOn(bool &need_recompile); @@ -120,7 +148,7 @@ public: Status RecoverPrecisionMode(); bool IncrementGraphIdCount(uint32_t &graph_id); - bool DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_id); + bool DecrementGraphIdCount(); void ClearGraphIdCount(); @@ -138,13 +166,13 @@ public: void AnalyzeInputDesc(void *tensor_ptr, ge::Tensor &input, ge::DataType type, std::vector &input_shapes) const; - + Status RemoveGraph(const uint32_t &graph_id); int RunTuning(std::vector &input_vec, std::vector &inputs, const OpKernelContext *const ctx); std::string BuildSubGraph(FunctionLibraryDefinition *flib_def, const std::string &graph); void SetDynamicInput(); - + Status SetGraphOptions(); void ProcessDpOpFuncDef(const Node &node) const; void BuildQueueDataAndGetNextFromQueue(Graph &graph, const Node &getnext_node, @@ -160,6 +188,7 @@ public: PartialTensorShape MakeCompatShape(const PartialTensorShape &a, const PartialTensorShape &b) const; + void InitGraphShape(OpKernelContext *const ctx); bool MaybeUpdateShape(OpKernelContext *const ctx); PartialTensorShape MakeUnknownShape(const int32_t &size) const; Status ProcessForDiffNodeTypes(Graph &graph, bool &is_initialize, bool &is_allreduce); @@ -184,10 +213,7 @@ public: static bool tuned_initialize_flag_; bool init_flag_; - bool build_flag_; - bool add_graph_flag_; bool sess_init_flag_; - bool compute_graph_empty_; bool is_input_convert_; std::string input_shapes_; @@ -195,6 +221,7 @@ public: std::string data_format_; uint32_t graph_id_; bool is_initialized_graph_; + bool is_empty_graph_; bool need_iteration_; std::string tf_session_; ge::Session *ge_session_; @@ -205,7 +232,7 @@ public: std::vector, uint32_t>> graph_counts_; std::map sess_options_; std::map init_options_; - static std::unordered_map session_and_graph_id_map_; + static std::unordered_map> session_and_graph_id_map_; uint32_t iteration_per_loop_; bool is_host_graph_; std::map graph_options_; @@ -218,8 +245,6 @@ public: std::string dynamic_graph_execute_mode_; std::string data_inputs_shape_range_; std::string getnext_inputs_shape_range_; - bool need_compile_graph_first_; - std::map tune_options_; std::string is_dynamic_getnext_; std::string placeholder_index_; std::atomic_flag tuned_flag_; @@ -250,6 +275,9 @@ public: AoeSetTuningGraphInputFunc aoe_set_tuning_graph_input_; // accelerate train AccelerateInfo accelerate_info_; + GraphHandler graph_handler_; + std::string geop_name_; + static uint32_t current_size_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ diff --git a/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc b/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc new file mode 100644 index 000000000..7dfc3e91f --- /dev/null +++ b/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc @@ -0,0 +1,86 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "callback_executor.h" +#include +#include "acl/acl_rt.h" + +namespace tensorflow { + CallbackExecutor &CallbackExecutor::GetInstance() { + static CallbackExecutor instance; + return instance; + } + void CallbackExecutor::Init() { + std::cout << "Start callback thread pool." << std::endl; + copy_thread_pool_.resize(thread_num_); + for (size_t idx = 0UL; idx < copy_thread_pool_.size(); idx++) { + if (copy_thread_pool_[idx] == nullptr) { + std::string thread_name = "thread_pool" + std::to_string(idx); + copy_thread_pool_[idx].reset(new std::thread(std::bind(&CallbackExecutor::CallbackHandler, this))); + } + } + thread_stop_flag_.store(false); + } + + void CallbackExecutor::CallbackHandler() { + std::cout << "Start callback thread." << std::endl; + CallbackPack closure; + while (!thread_stop_flag_.load()) { + { + std::unique_lock lck(queue_lock_); + queue_var_.wait(lck, [this]() { return ((!task_queue_.empty()) || (thread_stop_flag_.load())); }); + if (thread_stop_flag_.load()) { + queue_var_.notify_all(); + break; + } + closure = task_queue_.front(); + task_queue_.pop(); + std::cout << "Run callback" << std::endl; + } + closure.callback(closure.ge_status, closure.outputs); + std::unique_lock lck(queue_lock_); + run_num_--; + } + std::cout << "Callback thread is finished." << std::endl; + } + + void CallbackExecutor::PushTask(const CallbackPack &closure) { + std::unique_lock lck(queue_lock_); + std::cout << "Push closure" << std::endl; + task_queue_.push(closure); + run_num_++; + queue_var_.notify_all(); + } + + void CallbackExecutor::StopThreadPool() { + { + std::unique_lock lck(queue_lock_); + queue_var_.wait(lck, [this]() { return run_num_ <= 0; }); + std::cout << "Stop callback thread." << std::endl; + thread_stop_flag_.store(true); + queue_var_.notify_all(); + } + for (size_t i = 0UL; i < copy_thread_pool_.size(); i++) { + if (copy_thread_pool_[i]->joinable()) { + copy_thread_pool_[i]->join(); + } + } + } + int32_t CallbackExecutor::GetRunNum() { + std::unique_lock lck(queue_lock_); + return run_num_; + } +} \ No newline at end of file diff --git a/tf_adapter/tests/depends/ge_runner/src/callback_executor.h b/tf_adapter/tests/depends/ge_runner/src/callback_executor.h new file mode 100644 index 000000000..ca8743372 --- /dev/null +++ b/tf_adapter/tests/depends/ge_runner/src/callback_executor.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ +#define TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include "ge/ge_api_types.h" +#include "graph/tensor.h" + +namespace tensorflow { +struct CallbackPack { + ge::RunAsyncCallback callback; + ge::Status ge_status; + std::vector outputs; +}; +class CallbackExecutor { + public: + static CallbackExecutor &GetInstance(); + void Init(); + void PushTask(const CallbackPack &closure); + void StopThreadPool(); + int32_t GetRunNum(); + private: + void CallbackHandler(); + std::mutex queue_lock_; + std::condition_variable queue_var_; + std::vector> copy_thread_pool_; + std::queue task_queue_; + std::atomic thread_stop_flag_{false}; + uint32_t thread_num_ = 1U; + int32_t run_num_ = 0; +}; +} +#endif // TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ \ No newline at end of file diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc index 35c7c9a23..deb474151 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc +++ b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc @@ -38,6 +38,7 @@ #include "ascendcl_stub.h" #include "ge_stub.h" #include "tf_adapter/common/adapter_logger.h" +#include "callback_executor.h" namespace ge { namespace { @@ -254,21 +255,21 @@ Status Session::RemoveGraph(uint32_t graphId) { graphs_map.erase(ret); return ge::SUCCESS; } - return ge::FAILED; + return ge::SUCCESS; } bool Session::IsGraphNeedRebuild(uint32_t graphId) { auto ret = graphs_map.find(graphId); if (ret != graphs_map.end()) { - return false; + return true; } - return true; + return false; } Status Session::AddGraph(uint32_t graphId, const Graph &graph, const std::map &options) { auto ret = graphs_map.find(graphId); if (ret != graphs_map.end()) { - return ge::FAILED; + return ge::SUCCESS; } graphs_map[graphId] = graph; return ge::SUCCESS; @@ -284,10 +285,6 @@ Status Session::AddGraphWithCopy(uint32_t graphId, const Graph &graph, const std } Status Session::BuildGraph(uint32_t graphId, const std::vector &inputs) { - auto ret = graphs_map.find(graphId); - if (ret == graphs_map.end()) { - return ge::FAILED; - } return ge::SUCCESS; } @@ -296,21 +293,36 @@ void RegRunGraphAsyncStub(RunGraphAsyncStub stub) { g_RunGraphAsyncStub = stub; } +void ClearRegRunGraphAsyncStub() { + g_RunGraphAsyncStub = nullptr; +} + Status Session::RunGraphAsync(uint32_t graphId, const std::vector &inputs, RunAsyncCallback callback) { if (g_RunGraphAsyncStub != nullptr) { return g_RunGraphAsyncStub(graphId, inputs, callback); } - ge::Status ret; std::vector outputs; - outputs.push_back(ge::Tensor()); - auto res = graphs_map.find(graphId); - if (res == graphs_map.end()) { - ret = ge::FAILED; - } else { - ret = ge::SUCCESS; - } - callback(ret, outputs); - return ret; + size_t total_size = sizeof(int32_t); + ge::TensorDesc tensor_desc(ge::Shape({1}), ge::Format::FORMAT_ND, ge::DT_INT32); + tensor_desc.SetPlacement(ge::kPlacementHost); + ge::Tensor tensor(tensor_desc); + std::shared_ptr base_addr(new uint8_t[67]); + const size_t offset = 63U; + uint8_t *aligned_addr = ge::PtrToPtr( + ge::ValueToPtr((ge::PtrToValue(ge::PtrToPtr(base_addr.get())) + offset) & ~offset)); + std::cout << "aligned_addr: " << reinterpret_cast(aligned_addr) << "origin addr: " + << reinterpret_cast(base_addr.get()) << std::endl; + tensor.SetData(aligned_addr, total_size, [](uint8_t *ptr) { + (void)ptr; + ptr = nullptr; + }); + outputs.emplace_back(tensor); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); + return ge::SUCCESS; } Status Session::BuildGraph(uint32_t graphId, const std::vector &inputs) { diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_stub.h b/tf_adapter/tests/depends/ge_runner/src/ge_stub.h index 025107d84..9a0b2be1b 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_stub.h +++ b/tf_adapter/tests/depends/ge_runner/src/ge_stub.h @@ -51,5 +51,6 @@ void RegRunGraphStub(RunGraphStub stub); using RunGraphAsyncStub = std::function&, RunAsyncCallback)>; void RegRunGraphAsyncStub(RunGraphAsyncStub stub); +void ClearRegRunGraphAsyncStub(); } // namespace ge #endif // COMMON_GRAPH_DEBUG_GE_UTIL_H_ diff --git a/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc index 2e632988c..273688fd5 100644 --- a/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc @@ -8,6 +8,7 @@ #include #include "gtest/gtest.h" #include "ge_stub.h" +#include "callback_executor.h" #define private public #include "tf_adapter/kernels/geop_npu.h" #undef private @@ -144,6 +145,9 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); async_op->ComputeAsync(ctx1.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -187,6 +191,9 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector(¶ms); async_op->ComputeAsync(ctx.get(), done); + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } } @@ -195,6 +202,7 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector inputs; @@ -594,8 +602,11 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { }); std::vector outputs; outputs.emplace_back(tensor); - - callback(ge::SUCCESS, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ge::SUCCESS; }); @@ -779,6 +790,7 @@ TEST_F(GeOpTest, test_Get_GeSession_Failed) { GeOp *geop_node = dynamic_cast(g_op.get()); geop_node->tf_session_ = ""; EXPECT_EQ(geop_node->CreateGeSession().ok(), false); + CallbackExecutor::GetInstance().StopThreadPool(); } } // namespace } //end tensorflow diff --git a/tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt b/tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt new file mode 100644 index 000000000..a98aeabbf --- /dev/null +++ b/tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt @@ -0,0 +1,995 @@ +node { + name: "arg_arg_Placeholder_0_0" + op: "_Arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + attr { + key: "index" + value { + i: 0 + } + } +} +node { + name: "retval_Mul_0_0" + op: "_Retval" + input: "GeOp11_1" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + attr { + key: "index" + value { + i: 0 + } + } +} +node { + name: "GeOp11_1" + op: "GeOp" + input: "GeOp11_0" + input: "GeOp11_0:1" + input: "arg_arg_Placeholder_0_0" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "Tin" + value { + list { + type: DT_INT64 + type: DT_INT64 + type: DT_INT32 + } + } + } + attr { + key: "Tout" + value { + list { + type: DT_INT32 + } + } + } + attr { + key: "_NpuOptimizer" + value { + s: "NpuOptimizer" + } + } + attr { + key: "_auto_tune_mode" + value { + s: "" + } + } + attr { + key: "_buffer_optimize" + value { + s: "l2_optimize" + } + } + attr { + key: "_compress_weight_conf" + value { + s: "" + } + } + attr { + key: "_debug_dir" + value { + s: "" + } + } + attr { + key: "_distribute_config" + value { + s: "" + } + } + attr { + key: "_do_npu_optimizer" + value { + s: "1" + } + } + attr { + key: "_dump_debug_mode" + value { + s: "all" + } + } + attr { + key: "_dump_mode" + value { + s: "output" + } + } + attr { + key: "_dump_path" + value { + s: "" + } + } + attr { + key: "_dump_step" + value { + s: "" + } + } + attr { + key: "_enable_compress_weight" + value { + s: "0" + } + } + attr { + key: "_enable_data_pre_proc" + value { + s: "1" + } + } + attr { + key: "_enable_dump" + value { + s: "0" + } + } + attr { + key: "_enable_dump_debug" + value { + s: "0" + } + } + attr { + key: "_enable_exception_dump" + value { + s: "" + } + } + attr { + key: "_enable_scope_fusion_passes" + value { + s: "" + } + } + attr { + key: "_enable_small_channel" + value { + s: "0" + } + } + attr { + key: "_fusion_switch_file" + value { + s: "" + } + } + attr { + key: "_graph_run_mode" + value { + s: "1" + } + } + attr { + key: "_hcom_multi_mode" + value { + s: "" + } + } + attr { + key: "_hcom_parallel" + value { + s: "0" + } + } + attr { + key: "_in_out_pair" + value { + s: "" + } + } + attr { + key: "_in_out_pair_flag" + value { + s: "1" + } + } + attr { + key: "_input_shape" + value { + s: "" + } + } + attr { + key: "_is_dynamic_getnext" + value { + s: "1" + } + } + attr { + key: "_jit_compile" + value { + s: "2" + } + } + attr { + key: "_is_tailing_optimization" + value { + s: "0" + } + } + attr { + key: "_iterations_per_loop" + value { + s: "1" + } + } + attr { + key: "_job" + value { + s: "localhost" + } + } + attr { + key: "_local_device_list" + value { + s: "" + } + } + attr { + key: "_local_rank_id" + value { + s: "-1" + } + } + attr { + key: "_lower_functional_ops" + value { + s: "0" + } + } + attr { + key: "_mix_compile_mode" + value { + s: "0" + } + } + attr { + key: "_mstune_mode" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_dir" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_mode" + value { + s: "" + } + } + attr { + key: "_op_debug_level" + value { + s: "0" + } + } + attr { + key: "_op_select_implmode" + value { + s: "" + } + } + attr { + key: "_op_tune_mode" + value { + s: "" + } + } + attr { + key: "_optypelist_for_implmode" + value { + s: "" + } + } + attr { + key: "_placeholder_index" + value { + s: "2" + } + } + attr { + key: "_precision_mode" + value { + s: "" + } + } + attr { + key: "_profiling_mode" + value { + s: "0" + } + } + attr { + key: "_profiling_options" + value { + s: "" + } + } + attr { + key: "_session_device_id" + value { + s: "" + } + } + attr { + key: "_stream_max_parallel_num" + value { + s: "" + } + } + attr { + key: "_task_index" + value { + s: "0" + } + } + attr { + key: "_use_off_line" + value { + s: "1" + } + } + attr { + key: "_variable_format_optimize" + value { + s: "1" + } + } + attr { + key: "_work_path" + value { + s: "" + } + } + attr { + key: "data_format" + value { + s: "NHWC" + } + } + attr { + key: "function" + value { + func { + name: "GeOp11_1" + } + } + } +} +node { + name: "GeOp11_0" + op: "GeOp" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "Tin" + value { + list { + } + } + } + attr { + key: "Tout" + value { + list { + type: DT_INT64 + type: DT_INT64 + } + } + } + attr { + key: "_NpuOptimizer" + value { + s: "NpuOptimizer" + } + } + attr { + key: "_auto_tune_mode" + value { + s: "" + } + } + attr { + key: "_buffer_optimize" + value { + s: "l2_optimize" + } + } + attr { + key: "_compress_weight_conf" + value { + s: "" + } + } + attr { + key: "_debug_dir" + value { + s: "" + } + } + attr { + key: "_distribute_config" + value { + s: "" + } + } + attr { + key: "_do_npu_optimizer" + value { + s: "1" + } + } + attr { + key: "_dump_debug_mode" + value { + s: "all" + } + } + attr { + key: "_dump_mode" + value { + s: "output" + } + } + attr { + key: "_dump_path" + value { + s: "" + } + } + attr { + key: "_dump_step" + value { + s: "" + } + } + attr { + key: "_dynamic_dims" + value { + s: "" + } + } + attr { + key: "_dynamic_graph_execute_mode" + value { + s: "lazy_recompile" + } + } + attr { + key: "_dynamic_input" + value { + s: "1" + } + } + attr { + key: "_dynamic_node_type" + value { + s: "" + } + } + attr { + key: "_enable_compress_weight" + value { + s: "0" + } + } + attr { + key: "_enable_data_pre_proc" + value { + s: "1" + } + } + attr { + key: "_enable_dump" + value { + s: "0" + } + } + attr { + key: "_enable_dump_debug" + value { + s: "0" + } + } + attr { + key: "_enable_exception_dump" + value { + s: "" + } + } + attr { + key: "_enable_scope_fusion_passes" + value { + s: "" + } + } + attr { + key: "_enable_small_channel" + value { + s: "0" + } + } + attr { + key: "_fusion_switch_file" + value { + s: "" + } + } + attr { + key: "_graph_run_mode" + value { + s: "1" + } + } + attr { + key: "_hcom_multi_mode" + value { + s: "" + } + } + attr { + key: "_hcom_parallel" + value { + s: "0" + } + } + attr { + key: "_in_out_pair" + value { + s: "" + } + } + attr { + key: "_in_out_pair_flag" + value { + s: "1" + } + } + attr { + key: "_input_shape" + value { + s: "" + } + } + attr { + key: "_is_dynamic_getnext" + value { + s: "1" + } + } + attr { + key: "_is_tailing_optimization" + value { + s: "0" + } + } + attr { + key: "_iterations_per_loop" + value { + s: "1" + } + } + attr { + key: "_job" + value { + s: "localhost" + } + } + attr { + key: "_local_device_list" + value { + s: "" + } + } + attr { + key: "_local_rank_id" + value { + s: "-1" + } + } + attr { + key: "_lower_functional_ops" + value { + s: "0" + } + } + attr { + key: "_mix_compile_mode" + value { + s: "0" + } + } + attr { + key: "_mstune_mode" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_dir" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_mode" + value { + s: "" + } + } + attr { + key: "_op_debug_level" + value { + s: "0" + } + } + attr { + key: "_op_select_implmode" + value { + s: "" + } + } + attr { + key: "_op_tune_mode" + value { + s: "" + } + } + attr { + key: "_optypelist_for_implmode" + value { + s: "" + } + } + attr { + key: "_precision_mode" + value { + s: "" + } + } + attr { + key: "_profiling_mode" + value { + s: "0" + } + } + attr { + key: "_profiling_options" + value { + s: "" + } + } + attr { + key: "_session_device_id" + value { + s: "" + } + } + attr { + key: "_stream_max_parallel_num" + value { + s: "" + } + } + attr { + key: "_task_index" + value { + s: "0" + } + } + attr { + key: "_use_off_line" + value { + s: "1" + } + } + attr { + key: "_variable_format_optimize" + value { + s: "1" + } + } + attr { + key: "_work_path" + value { + s: "" + } + } + attr { + key: "data_format" + value { + s: "NHWC" + } + } + attr { + key: "function" + value { + func { + name: "GeOp11_0" + } + } + } +} +library { + function { + signature { + name: "GeOp11_1" + input_arg { + name: "IteratorGetNext_0_arg" + type: DT_INT64 + } + input_arg { + name: "IteratorGetNext_1_arg" + type: DT_INT64 + } + input_arg { + name: "arg_arg_Placeholder_0_0_0_arg" + type: DT_INT32 + } + output_arg { + name: "Mul_0_retval" + type: DT_INT32 + } + } + node_def { + name: "Cast" + op: "Cast" + input: "IteratorGetNext_0_arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "DstT" + value { + type: DT_INT32 + } + } + attr { + key: "SrcT" + value { + type: DT_INT64 + } + } + attr { + key: "Truncate" + value { + b: false + } + } + } + node_def { + name: "Cast_1" + op: "Cast" + input: "IteratorGetNext_1_arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "DstT" + value { + type: DT_INT32 + } + } + attr { + key: "SrcT" + value { + type: DT_INT64 + } + } + attr { + key: "Truncate" + value { + b: false + } + } + } + node_def { + name: "Add" + op: "Add" + input: "Cast:y:0" + input: "Cast_1:y:0" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + } + node_def { + name: "Mul" + op: "Mul" + input: "Add:z:0" + input: "arg_arg_Placeholder_0_0_0_arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + attr { + key: "_graph_dynamic_graph_execute_mode" + value { + s: "lazy_recompile" + } + } + attr { + key: "_graph_dynamic_input" + value { + b: true + } + } + attr { + key: "_graph_dynamic_inputs_shape_range" + value { + s: "" + } + } + } + ret { + key: "Mul_0_retval" + value: "Mul:z:0" + } + } + function { + signature { + name: "GeOp11_0" + output_arg { + name: "IteratorGetNext_0_retval" + type: DT_INT64 + } + output_arg { + name: "IteratorGetNext_1_retval" + type: DT_INT64 + } + } + node_def { + name: "IteratorV2" + op: "IteratorV2" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "_NpuOptimizer" + value { + s: "NpuOptimizer" + } + } + attr { + key: "_enable_data_pre_proc" + value { + s: "1" + } + } + attr { + key: "_iterations_per_loop" + value { + s: "1" + } + } + attr { + key: "_job" + value { + s: "localhost" + } + } + attr { + key: "_mix_compile_mode" + value { + s: "0" + } + } + attr { + key: "container" + value { + s: "" + } + } + attr { + key: "output_shapes" + value { + list { + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + } + } + } + attr { + key: "output_types" + value { + list { + type: DT_INT64 + type: DT_INT64 + } + } + } + attr { + key: "shared_name" + value { + s: "IteratorV2" + } + } + } + node_def { + name: "IteratorGetNext" + op: "IteratorGetNext" + input: "IteratorV2:handle:0" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "output_shapes" + value { + list { + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + } + } + } + attr { + key: "output_types" + value { + list { + type: DT_INT64 + type: DT_INT64 + } + } + } + } + ret { + key: "IteratorGetNext_0_retval" + value: "IteratorGetNext:components:0" + } + ret { + key: "IteratorGetNext_1_retval" + value: "IteratorGetNext:components:1" + } + } +} +versions { + producer: 134 + min_consumer: 12 +} diff --git a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc index e319a8571..2760896f5 100644 --- a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc @@ -4,6 +4,7 @@ #include "tensorflow/core/public/version.h" #include "register/register_types.h" #include +#include #include "gtest/gtest.h" #include "ge_stub.h" @@ -11,6 +12,7 @@ #include "tf_adapter/util/npu_plugin.h" #include "tf_adapter/util/npu_plugin.h" #include "tf_adapter/util/util.h" +#include "callback_executor.h" #define private public #include "tf_adapter/kernels/geop_npu.h" #undef private @@ -142,9 +144,15 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); AsyncOpKernel::DoneCallback done = []() { LOG(INFO) << "DONE DoneCallback"; }; async_op->ComputeAsync(ctx.get(), done); + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } if (!only_run_once) { auto ctx1 = absl::make_unique(¶ms); async_op->ComputeAsync(ctx1.get(), done); + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } } @@ -162,18 +170,12 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vectorname() == node_name) { geop_node_def = *node_def; - OpKernelContext::Params params; - params.record_tensor_accesses = false; - auto device = absl::make_unique(env, params.record_tensor_accesses); - params.device = device.get(); + auto device = absl::make_unique(env, false); Status status; std::unique_ptr op( - CreateOpKernel(DEVICE_CPU, params.device, cpu_allocator(), *node_def, TF_GRAPH_DEF_VERSION, &status)); - EXPECT_TRUE(status.ok()); + CreateOpKernel(DEVICE_CPU, device.get(), cpu_allocator(), *node_def, TF_GRAPH_DEF_VERSION, &status)); AsyncOpKernel* async_op = op->AsAsync(); - params.op_kernel = async_op; - params.session_handle = "session_0"; - + EXPECT_TRUE(status.ok()); // function library FunctionDefLibrary func_def_lib = graph_def.library(); std::unique_ptr lib_def( @@ -182,20 +184,93 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector proc_flr(new ProcessFunctionLibraryRuntime( nullptr, Env::Default(), TF_GRAPH_DEF_VERSION, lib_def.get(), opts, nullptr, nullptr)); FunctionLibraryRuntime* flr = proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice); - params.function_library = flr; AsyncOpKernel::DoneCallback done = []() { LOG(INFO) << "DONE DoneCallback"; }; for (size_t i = 0UL; i < inputs.size(); i++) { LOG(INFO) << "Step: " << i; + OpKernelContext::Params params; + params.record_tensor_accesses = false; + params.device = device.get(); + + params.op_kernel = async_op; + params.session_handle = "session_0"; + params.function_library = flr; params.inputs = &inputs[i]; auto ctx = absl::make_unique(¶ms); async_op->ComputeAsync(ctx.get(), done); + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } } return Status::OK(); } +Status GeOpRunGraphAsyncMultiStepMultiThread(std::string example_path, std::vector> inputs, + NodeDef& geop_node_def, + std::string node_name, + const int32_t &thread_num) { + Env* env = Env::Default(); + GraphDef graph_def; + std::string graph_def_path = example_path; + ReadTextProto(env, graph_def_path, &graph_def); + Status ret = Status::OK(); + for (int i = 0; i < graph_def.node_size(); i++) { + NodeDef* node_def = graph_def.mutable_node(i); + if (node_def->name() == node_name) { + geop_node_def = *node_def; + auto device = absl::make_unique(env, false); + Status status; + std::unique_ptr op( + CreateOpKernel(DEVICE_CPU, device.get(), cpu_allocator(), *node_def, TF_GRAPH_DEF_VERSION, &status)); + AsyncOpKernel* async_op = op->AsAsync(); + EXPECT_TRUE(status.ok()); + auto thread_func = [inputs, async_op, &device, graph_def, &ret]() { + // function library + FunctionDefLibrary func_def_lib = graph_def.library(); + std::unique_ptr lib_def( + new FunctionLibraryDefinition(OpRegistry::Global(), func_def_lib)); + OptimizerOptions opts; + std::unique_ptr proc_flr(new ProcessFunctionLibraryRuntime( + nullptr, Env::Default(), TF_GRAPH_DEF_VERSION, lib_def.get(), opts, nullptr, nullptr)); + FunctionLibraryRuntime* flr = proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice); + AsyncOpKernel::DoneCallback done = []() { LOG(INFO) << "DONE DoneCallback"; }; + for (size_t i = 0UL; i < inputs.size(); i++) { + LOG(INFO) << "Step: " << i; + OpKernelContext::Params params; + params.record_tensor_accesses = false; + params.device = device.get(); + params.op_kernel = async_op; + params.session_handle = "session_0"; + params.function_library = flr; + params.inputs = &inputs[i]; + auto ctx = absl::make_unique(¶ms); + async_op->ComputeAsync(ctx.get(), done); + if (!ret.ok()) { + return; + } + } + }; + std::vector> tp; + tp.resize(thread_num); + for (int32_t i = 0; i < thread_num; i++) { + tp[i].reset(new std::thread(thread_func)); + } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } + for (size_t i = 0UL; i < tp.size(); i++) { + if (tp[i]->joinable()) { + tp[i]->join(); + } + } + } + } + return ret; +} + TEST_F(GeOpTest, GeOpInitTest) { + CallbackExecutor::GetInstance().Init(); NpuClose(); PluginFinalize(); NodeDef node_def; @@ -261,11 +336,41 @@ TEST_F(GeOpTest, GeOpJitCompileFalseTest) { Tensor b(allocator2, DT_INT64, TensorShape({2, 2})); Tensor c(DT_INT32, TensorShape({1,})); gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&b), TensorValue(&c)}; - EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp11_1", false).ok()); Tensor d(DT_INT32, TensorShape({4})); gtl::InlinedVector inputs2{TensorValue(&a), TensorValue(&b), TensorValue(&d)}; EXPECT_TRUE(GeOpRunGraphAsyncMultiStep(graph_def_path, {inputs, inputs2}, node_def, "GeOp11_1").ok()); } + +/* +TEST_F(GeOpTest, GeOpTestMultiThreadCallback) { + NodeDef node_def; + std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt"; + std::vector ge_output1_dims{2, 2}; + auto getnext_output1_info = + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 32, nullptr)); + Allocator* allocator1 = NpuHostGetNextAllocator::Create(std::move(getnext_output1_info)); + Tensor a(allocator1, DT_INT64, TensorShape({2, 2})); + std::vector ge_output2_dims{2, 2}; + auto getnext_output2_info = + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output2_dims, 32, nullptr)); + Allocator* allocator2 = NpuHostGetNextAllocator::Create(std::move(getnext_output2_info)); + Tensor b(allocator2, DT_INT64, TensorShape({2, 2})); + std::vector ge_output3_dims{1}; + auto getnext_output3_info = + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output3_dims, 4, nullptr)); + Allocator* allocator3 = NpuHostGetNextAllocator::Create(std::move(getnext_output3_info)); + Tensor c(allocator3, DT_INT32, TensorShape({1})); + gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&b), TensorValue(&c)}; + std::vector ge_output4_dims{1}; + auto getnext_output4_info = + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output4_dims, 4, nullptr)); + Allocator* allocator4 = NpuHostGetNextAllocator::Create(std::move(getnext_output4_info)); + Tensor d(allocator4, DT_INT32, TensorShape({1})); + gtl::InlinedVector inputs2{TensorValue(&a), TensorValue(&b), TensorValue(&d)}; + EXPECT_EQ(GeOpRunGraphAsyncMultiStepMultiThread(graph_def_path, {inputs, inputs2}, node_def, "GeOp11_1", 2).ok(), true); +} +*/ + TEST_F(GeOpTest, GeOpDynamicInputTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_dynamic_input_lazy_recompile.pbtxt"; @@ -308,15 +413,24 @@ TEST_F(GeOpTest, GeOpGetNextStringTest) { std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_getnext_string.pbtxt"; std::vector ge_output1_dims{2, 2}; auto getnext_output1_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 8, nullptr)); + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 32, nullptr)); Allocator* allocator1 = NpuHostGetNextAllocator::Create(std::move(getnext_output1_info)); Tensor a(allocator1, DT_INT64, TensorShape({2, 2})); - Tensor in(DT_STRING, TensorShape({1})); + std::vector ge_output2_dims{1}; + auto getnext_output2_info = + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output2_dims, 4, nullptr)); + Allocator* allocator2 = NpuHostGetNextAllocator::Create(std::move(getnext_output2_info)); + Tensor in(allocator2, DT_STRING, TensorShape({1})); in.scalar()() = "ABC"; - Tensor d(DT_INT32, TensorShape({2, 2})); + std::vector ge_output3_dims{2, 2}; + auto getnext_output3_info = + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output3_dims, 16, nullptr)); + Allocator* allocator3 = NpuHostGetNextAllocator::Create(std::move(getnext_output3_info)); + Tensor d(allocator3, DT_INT32, TensorShape({2, 2})); gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&in), TensorValue(&d)}; EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp14_0", false).ok()); } + TEST_F(GeOpTest, GeOpAoeTuningAndDynamicDimsTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_aoe_tuning_and_dynamic_dims.pbtxt"; @@ -604,7 +718,11 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { std::vector outputs; outputs.emplace_back(tensor); - callback(ge::SUCCESS, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ge::SUCCESS; }); @@ -615,6 +733,7 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { gtl::InlinedVector inputs{TensorValue(&in)}; EXPECT_TRUE(GeOpRunGraphAsync(graph_pbtxt_path, inputs, node_def, "GeOp1_0").ok()); } + TEST_F(GeOpTest, test_MakeCompatShape) { GeOp *geop_node; PartialTensorShape shape_a; @@ -784,6 +903,7 @@ TEST_F(GeOpTest, test_Get_GeSession_Failed) { GeOp *geop_node = dynamic_cast(g_op.get()); geop_node->tf_session_ = ""; EXPECT_EQ(geop_node->CreateGeSession().ok(), false); + CallbackExecutor::GetInstance().StopThreadPool(); } } // namespace } // namespace tensorflow -- Gitee From 4e78b8c3197bea4b804f6974643291af8c59ec42 Mon Sep 17 00:00:00 2001 From: guopeian Date: Fri, 7 Jun 2024 09:51:26 +0800 Subject: [PATCH 2/3] ut --- .../depends/ge_runner/src/ge_runner_stub.cc | 35 ++--- .../ut/kernels/testcase/geop_npu_test.cc | 146 +++--------------- 2 files changed, 35 insertions(+), 146 deletions(-) diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc index deb474151..4b3c36050 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc +++ b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc @@ -255,15 +255,15 @@ Status Session::RemoveGraph(uint32_t graphId) { graphs_map.erase(ret); return ge::SUCCESS; } - return ge::SUCCESS; + return ge::FAILED; } bool Session::IsGraphNeedRebuild(uint32_t graphId) { auto ret = graphs_map.find(graphId); if (ret != graphs_map.end()) { - return true; + return false; } - return false; + return true; } Status Session::AddGraph(uint32_t graphId, const Graph &graph, const std::map &options) { @@ -285,6 +285,10 @@ Status Session::AddGraphWithCopy(uint32_t graphId, const Graph &graph, const std } Status Session::BuildGraph(uint32_t graphId, const std::vector &inputs) { + auto ret = graphs_map.find(graphId); + if (ret == graphs_map.end()) { + return ge::FAILED; + } return ge::SUCCESS; } @@ -301,28 +305,21 @@ Status Session::RunGraphAsync(uint32_t graphId, const std::vector &i if (g_RunGraphAsyncStub != nullptr) { return g_RunGraphAsyncStub(graphId, inputs, callback); } + ge::Status ret; std::vector outputs; - size_t total_size = sizeof(int32_t); - ge::TensorDesc tensor_desc(ge::Shape({1}), ge::Format::FORMAT_ND, ge::DT_INT32); - tensor_desc.SetPlacement(ge::kPlacementHost); - ge::Tensor tensor(tensor_desc); - std::shared_ptr base_addr(new uint8_t[67]); - const size_t offset = 63U; - uint8_t *aligned_addr = ge::PtrToPtr( - ge::ValueToPtr((ge::PtrToValue(ge::PtrToPtr(base_addr.get())) + offset) & ~offset)); - std::cout << "aligned_addr: " << reinterpret_cast(aligned_addr) << "origin addr: " - << reinterpret_cast(base_addr.get()) << std::endl; - tensor.SetData(aligned_addr, total_size, [](uint8_t *ptr) { - (void)ptr; - ptr = nullptr; - }); - outputs.emplace_back(tensor); + outputs.push_back(ge::Tensor()); + auto res = graphs_map.find(graphId); + if (res == graphs_map.end()) { + ret = ge::FAILED; + } else { + ret = ge::SUCCESS; + } tensorflow::CallbackPack pack; pack.callback = callback; pack.ge_status = ge::SUCCESS; pack.outputs = outputs; tensorflow::CallbackExecutor::GetInstance().PushTask(pack); - return ge::SUCCESS; + return ret; } Status Session::BuildGraph(uint32_t graphId, const std::vector &inputs) { diff --git a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc index 2760896f5..74e0a1015 100644 --- a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc @@ -4,7 +4,6 @@ #include "tensorflow/core/public/version.h" #include "register/register_types.h" #include -#include #include "gtest/gtest.h" #include "ge_stub.h" @@ -144,15 +143,12 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); AsyncOpKernel::DoneCallback done = []() { LOG(INFO) << "DONE DoneCallback"; }; async_op->ComputeAsync(ctx.get(), done); - while (CallbackExecutor::GetInstance().GetRunNum() > 0) { - usleep(100); - } if (!only_run_once) { auto ctx1 = absl::make_unique(¶ms); async_op->ComputeAsync(ctx1.get(), done); - while (CallbackExecutor::GetInstance().GetRunNum() > 0) { - usleep(100); - } + } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); } } } @@ -170,12 +166,18 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vectorname() == node_name) { geop_node_def = *node_def; - auto device = absl::make_unique(env, false); + OpKernelContext::Params params; + params.record_tensor_accesses = false; + auto device = absl::make_unique(env, params.record_tensor_accesses); + params.device = device.get(); Status status; std::unique_ptr op( - CreateOpKernel(DEVICE_CPU, device.get(), cpu_allocator(), *node_def, TF_GRAPH_DEF_VERSION, &status)); - AsyncOpKernel* async_op = op->AsAsync(); + CreateOpKernel(DEVICE_CPU, params.device, cpu_allocator(), *node_def, TF_GRAPH_DEF_VERSION, &status)); EXPECT_TRUE(status.ok()); + AsyncOpKernel* async_op = op->AsAsync(); + params.op_kernel = async_op; + params.session_handle = "session_0"; + // function library FunctionDefLibrary func_def_lib = graph_def.library(); std::unique_ptr lib_def( @@ -184,95 +186,26 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector proc_flr(new ProcessFunctionLibraryRuntime( nullptr, Env::Default(), TF_GRAPH_DEF_VERSION, lib_def.get(), opts, nullptr, nullptr)); FunctionLibraryRuntime* flr = proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice); + params.function_library = flr; AsyncOpKernel::DoneCallback done = []() { LOG(INFO) << "DONE DoneCallback"; }; for (size_t i = 0UL; i < inputs.size(); i++) { LOG(INFO) << "Step: " << i; - OpKernelContext::Params params; - params.record_tensor_accesses = false; - params.device = device.get(); - - params.op_kernel = async_op; - params.session_handle = "session_0"; - params.function_library = flr; params.inputs = &inputs[i]; auto ctx = absl::make_unique(¶ms); async_op->ComputeAsync(ctx.get(), done); - while (CallbackExecutor::GetInstance().GetRunNum() > 0) { - usleep(100); - } - } - } - } - return Status::OK(); -} - -Status GeOpRunGraphAsyncMultiStepMultiThread(std::string example_path, std::vector> inputs, - NodeDef& geop_node_def, - std::string node_name, - const int32_t &thread_num) { - Env* env = Env::Default(); - GraphDef graph_def; - std::string graph_def_path = example_path; - ReadTextProto(env, graph_def_path, &graph_def); - Status ret = Status::OK(); - for (int i = 0; i < graph_def.node_size(); i++) { - NodeDef* node_def = graph_def.mutable_node(i); - if (node_def->name() == node_name) { - geop_node_def = *node_def; - auto device = absl::make_unique(env, false); - Status status; - std::unique_ptr op( - CreateOpKernel(DEVICE_CPU, device.get(), cpu_allocator(), *node_def, TF_GRAPH_DEF_VERSION, &status)); - AsyncOpKernel* async_op = op->AsAsync(); - EXPECT_TRUE(status.ok()); - auto thread_func = [inputs, async_op, &device, graph_def, &ret]() { - // function library - FunctionDefLibrary func_def_lib = graph_def.library(); - std::unique_ptr lib_def( - new FunctionLibraryDefinition(OpRegistry::Global(), func_def_lib)); - OptimizerOptions opts; - std::unique_ptr proc_flr(new ProcessFunctionLibraryRuntime( - nullptr, Env::Default(), TF_GRAPH_DEF_VERSION, lib_def.get(), opts, nullptr, nullptr)); - FunctionLibraryRuntime* flr = proc_flr->GetFLR(ProcessFunctionLibraryRuntime::kDefaultFLRDevice); - AsyncOpKernel::DoneCallback done = []() { LOG(INFO) << "DONE DoneCallback"; }; - for (size_t i = 0UL; i < inputs.size(); i++) { - LOG(INFO) << "Step: " << i; - OpKernelContext::Params params; - params.record_tensor_accesses = false; - params.device = device.get(); - params.op_kernel = async_op; - params.session_handle = "session_0"; - params.function_library = flr; - params.inputs = &inputs[i]; - auto ctx = absl::make_unique(¶ms); - async_op->ComputeAsync(ctx.get(), done); - if (!ret.ok()) { - return; - } - } - }; - std::vector> tp; - tp.resize(thread_num); - for (int32_t i = 0; i < thread_num; i++) { - tp[i].reset(new std::thread(thread_func)); } while (CallbackExecutor::GetInstance().GetRunNum() > 0) { usleep(100); } - for (size_t i = 0UL; i < tp.size(); i++) { - if (tp[i]->joinable()) { - tp[i]->join(); - } - } } } - return ret; + return Status::OK(); } TEST_F(GeOpTest, GeOpInitTest) { - CallbackExecutor::GetInstance().Init(); NpuClose(); PluginFinalize(); + CallbackExecutor::GetInstance().Init(); NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop.pbtxt"; gtl::InlinedVector inputs; @@ -336,41 +269,11 @@ TEST_F(GeOpTest, GeOpJitCompileFalseTest) { Tensor b(allocator2, DT_INT64, TensorShape({2, 2})); Tensor c(DT_INT32, TensorShape({1,})); gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&b), TensorValue(&c)}; + EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp11_1", false).ok()); Tensor d(DT_INT32, TensorShape({4})); gtl::InlinedVector inputs2{TensorValue(&a), TensorValue(&b), TensorValue(&d)}; EXPECT_TRUE(GeOpRunGraphAsyncMultiStep(graph_def_path, {inputs, inputs2}, node_def, "GeOp11_1").ok()); } - -/* -TEST_F(GeOpTest, GeOpTestMultiThreadCallback) { - NodeDef node_def; - std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt"; - std::vector ge_output1_dims{2, 2}; - auto getnext_output1_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 32, nullptr)); - Allocator* allocator1 = NpuHostGetNextAllocator::Create(std::move(getnext_output1_info)); - Tensor a(allocator1, DT_INT64, TensorShape({2, 2})); - std::vector ge_output2_dims{2, 2}; - auto getnext_output2_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output2_dims, 32, nullptr)); - Allocator* allocator2 = NpuHostGetNextAllocator::Create(std::move(getnext_output2_info)); - Tensor b(allocator2, DT_INT64, TensorShape({2, 2})); - std::vector ge_output3_dims{1}; - auto getnext_output3_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output3_dims, 4, nullptr)); - Allocator* allocator3 = NpuHostGetNextAllocator::Create(std::move(getnext_output3_info)); - Tensor c(allocator3, DT_INT32, TensorShape({1})); - gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&b), TensorValue(&c)}; - std::vector ge_output4_dims{1}; - auto getnext_output4_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output4_dims, 4, nullptr)); - Allocator* allocator4 = NpuHostGetNextAllocator::Create(std::move(getnext_output4_info)); - Tensor d(allocator4, DT_INT32, TensorShape({1})); - gtl::InlinedVector inputs2{TensorValue(&a), TensorValue(&b), TensorValue(&d)}; - EXPECT_EQ(GeOpRunGraphAsyncMultiStepMultiThread(graph_def_path, {inputs, inputs2}, node_def, "GeOp11_1", 2).ok(), true); -} -*/ - TEST_F(GeOpTest, GeOpDynamicInputTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_dynamic_input_lazy_recompile.pbtxt"; @@ -413,24 +316,15 @@ TEST_F(GeOpTest, GeOpGetNextStringTest) { std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_getnext_string.pbtxt"; std::vector ge_output1_dims{2, 2}; auto getnext_output1_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 32, nullptr)); + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 8, nullptr)); Allocator* allocator1 = NpuHostGetNextAllocator::Create(std::move(getnext_output1_info)); Tensor a(allocator1, DT_INT64, TensorShape({2, 2})); - std::vector ge_output2_dims{1}; - auto getnext_output2_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output2_dims, 4, nullptr)); - Allocator* allocator2 = NpuHostGetNextAllocator::Create(std::move(getnext_output2_info)); - Tensor in(allocator2, DT_STRING, TensorShape({1})); + Tensor in(DT_STRING, TensorShape({1})); in.scalar()() = "ABC"; - std::vector ge_output3_dims{2, 2}; - auto getnext_output3_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output3_dims, 16, nullptr)); - Allocator* allocator3 = NpuHostGetNextAllocator::Create(std::move(getnext_output3_info)); - Tensor d(allocator3, DT_INT32, TensorShape({2, 2})); + Tensor d(DT_INT32, TensorShape({2, 2})); gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&in), TensorValue(&d)}; EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp14_0", false).ok()); } - TEST_F(GeOpTest, GeOpAoeTuningAndDynamicDimsTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_aoe_tuning_and_dynamic_dims.pbtxt"; @@ -717,7 +611,6 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { }); std::vector outputs; outputs.emplace_back(tensor); - tensorflow::CallbackPack pack; pack.callback = callback; pack.ge_status = ge::SUCCESS; @@ -733,7 +626,6 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { gtl::InlinedVector inputs{TensorValue(&in)}; EXPECT_TRUE(GeOpRunGraphAsync(graph_pbtxt_path, inputs, node_def, "GeOp1_0").ok()); } - TEST_F(GeOpTest, test_MakeCompatShape) { GeOp *geop_node; PartialTensorShape shape_a; -- Gitee From d827a82876b1e7d997640474a8b98faf56055918 Mon Sep 17 00:00:00 2001 From: guopeian Date: Fri, 7 Jun 2024 10:21:30 +0800 Subject: [PATCH 3/3] log --- tf_adapter/kernels/geop_npu.cc | 13 +++++++++---- tf_adapter/kernels/geop_npu.h | 2 ++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index a7107b79d..1fc652acc 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -33,7 +33,7 @@ #include #include #include - +#include #include "tf_adapter/common/adapter_logger.h" #include "tf_adapter/common/common.h" #include "tf_adapter/util/ge_plugin.h" @@ -104,7 +104,6 @@ const float kMaxStepRatio = 0.9; const float kDefaultLossRatio = 1.05; const float kMinLossRatio = 1.01; const float kMaxLossRatio = 1.5; -thread_local uint64_t add_graph_flag = 0UL; const int32_t kMaxAddNum = 8; const std::map fast_value_string_2_eunm = {{"fast", GeOp::FastValue::kfast}, {"fast1", GeOp::FastValue::kfast1}}; @@ -884,6 +883,7 @@ bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { session_and_graph_id_map_.insert(std::make_pair(tf_session_, graph_id_map)); current_size_++; } + (void)add_graph_flag_map_.insert(std::make_pair(graph_id, 0UL)); return true; } @@ -956,6 +956,7 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp } else { cache_graph_id = graph_id_ + num; } + add_graph_flag_map_.insert(std::make_pair(cache_graph_id, 0UL)); ADP_LOG(INFO) << "Set graph_status to Init when has no cache graph, graph_id: " << cache_graph_id; graph_handler_.status = Init; graph_handler_.cv.notify_all(); @@ -1137,7 +1138,7 @@ Status GeOp::ParserGraph(OpKernelContext *ctx, std::vector &input_vec) { Status GeOp::AddGraph(OpKernelContext *ctx, const uint32_t &graph_id) { // 当此线程未add过图,且总大小小于maxNum,需要去做add - if (((add_graph_flag & graph_handler_.add_graph_mask) == graph_handler_.add_graph_mask) || + if (((add_graph_flag_map_[graph_id] & graph_handler_.add_graph_mask) == graph_handler_.add_graph_mask) || (graph_handler_.add_total_num >= kMaxAddNum)) { return Status::OK(); } @@ -1171,7 +1172,7 @@ Status GeOp::AddGraph(OpKernelContext *ctx, const uint32_t &graph_id) { << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); return errors::Internal(ss.str()); } - add_graph_flag = graph_handler_.add_graph_mask; + add_graph_flag_map_[graph_id] = graph_handler_.add_graph_mask; graph_handler_.add_total_num++; ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name_ << ", tf session: " << tf_session_ << ", graph id: " << graph_id @@ -1442,12 +1443,16 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { void GeOp::ChangeChannelNameAttr(NodeDef &node_def) const { const std::string pre_channel_name = node_def.attr().at("channel_name").s(); + std::cout << "channel_name: " << pre_channel_name << std::endl; uint32_t device_id = 0; (void) GetEnvDeviceID(device_id); AttrValue channel_name = AttrValue(); channel_name.set_s(std::to_string( std::hash{}(tf_session_ + pre_channel_name + "_device_" + std::to_string(device_id)))); + std::cout << "channel_name: " << channel_name.s() << std::endl; (*node_def.mutable_attr())["channel_name"] = channel_name; + std::cout << "[GEOP] changed the value of channel_name attr of node: " << node_def.name() << " to " + << channel_name.s() << std::endl; ADP_LOG(INFO) << "[GEOP] changed the value of channel_name attr of node: " << node_def.name() << " to " << channel_name.s(); } diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 59880a10b..2d6d5f862 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -278,6 +278,8 @@ public: GraphHandler graph_handler_; std::string geop_name_; static uint32_t current_size_; + // graphid 与 add_flag的映射 + static thread_local std::map add_graph_flag_map_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ -- Gitee