From c3129c97aff2018a112796482ee40b12d32705b3 Mon Sep 17 00:00:00 2001 From: yaolun Date: Fri, 24 Mar 2023 16:56:03 +0800 Subject: [PATCH] =?UTF-8?q?HostQueueDataset=E5=A4=84=E7=90=86NotFound?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kernels/aicpu/host_queue_dataset_op.cc | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc index 61519f018..4883544a6 100644 --- a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc +++ b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc @@ -56,6 +56,7 @@ constexpr int64 kSleepUs = 10; const uint32_t kMaxValue = 128U; const size_t kMaxDepth = 128UL; const int32_t kSleepTime = 1; +const int32_t kDelayTime = 1; const uint32_t kSleepDuration = 5000; const static int64_t kStringTypeDepth = 64LL; const int64_t kUnknownShapeDepth = 3LL; @@ -453,6 +454,26 @@ class HostQueueDatasetOp : public DatasetOpKernel { } } + void HandleGetNextStatus(const Status status, const bool endOfSequence) { + if (status.ok() || errors::IsCancelled(status)) { + ADP_LOG(INFO) << "Finish to get tensor data, Status:" << status.ToString() + << "; end_of_sequence: " << endOfSequence; + return; + } + auto showLog = [status](){ + ADP_LOG(ERROR) << "Failed to get tensor data, Status:" << status.ToString(); + LOG(ERROR) << "Failed to get tensor data, Status:" << status.ToString(); + }; + if (!errors::IsNotFound(status)) { + showLog(); + return; + } + mutex_lock lck(mu_); + // cond_error_.wait_for is to wait forthe iterator destructed, kDelayTime is an estimate value. + cond_error_.wait_for(lck, std::chrono::seconds(kDelayTime)); + if (!finish_send_) { showLog(); } + } + void GetDataThread(const std::shared_ptr &ctx) { { mutex_lock lck(mu_); @@ -502,14 +523,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { buffer_element.status = input_impls_[1]->GetNext(ctx.get(), &args, &end_of_sequence); auto end = std::chrono::steady_clock::now(); if ((!buffer_element.status.ok()) || (buffer_element.status.ok() && end_of_sequence)) { - if ((!buffer_element.status.ok()) && - (!errors::IsCancelled(buffer_element.status))) { - ADP_LOG(ERROR) << "Failed to get tensor data, Status:" << buffer_element.status.ToString(); - LOG(ERROR) << "Failed to get tensor data, Status:" << buffer_element.status.ToString(); - } else { - ADP_LOG(INFO) << "Finish to get tensor data, Status:" << buffer_element.status.ToString() - << "; end_of_sequence:" << end_of_sequence; - } + HandleGetNextStatus(buffer_element.status, end_of_sequence); mutex_lock lck(mu_); buffer_element.host_thread_finished = true; buffer_.push_back(std::move(buffer_element)); @@ -1011,6 +1025,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { std::vector> input_impls_ GUARDED_BY(mu_); condition_variable cond_var_; condition_variable destory_var_; + condition_variable cond_error_; std::deque buffer_ GUARDED_BY(mu_); MemoryPool mem_pool_; HostThreadPool thread_pool_; -- Gitee