From eadbb5d8342aed7914b28a945a77d50fe3fb7ca2 Mon Sep 17 00:00:00 2001 From: weidandan 00687068 Date: Mon, 20 Feb 2023 17:26:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=A2=84=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=8A=A8=E6=80=81=E6=8E=A7=E5=88=B6device=E5=86=85=E5=AD=98?= =?UTF-8?q?=E5=8D=A0=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kernels/aicpu/host_queue_dataset_op.cc | 92 +++++++++++++++++-- .../depends/ascendcl/src/ascendcl_stub.cc | 4 + 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc index 4defcaba2..22d2bc52a 100644 --- a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc +++ b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include #include @@ -45,6 +46,7 @@ #include "tf_adapter/util/host_allocator.h" #include "tf_adapter/kernels/aicpu/data_item_deliver.h" #include "tf_adapter/kernels/aicpu/npu_tensor.h" +#include namespace tensorflow { namespace data { @@ -61,11 +63,24 @@ const int64_t kUnknownShapeDepth = 3LL; const uint64_t PARALLEL_MEMORY_TRHESHOLD = 10 * 1024 * 1024ULL; const uint32_t MAX_THREAD_NUM = 4U; std::atomic tdt_release(false); + // total memory usage controlled below 2G const uint64_t kTotalBytes = 8 * 1024 * 1024 * 1024LL; const int64_t kMaxBytes = 2 * 1024 * 1024 * 1024LL; enum class ChannelType { TDT = 0, ACL_QUEUE = 1, HOST_QUEUE = 2 }; /* ACL_QUEUE indicates mbuf */ enum class ThreadType : size_t { RECV = 0, SEND = 1, BUTT }; +std::atomic is_hold_type(false); + +// total memory usage controlled below 2G +const uint64_t kTotalBytes = 8 * 1024 * 1024 * 1024LL; +const int64_t kMaxBytes = 2 * 1024 * 1024 * 1024LL; +// int64_t MbufMaxBytes = atoi(getenv("MbufMaxBytes")); + +enum class ChannelType { + TDT = 0, + ACL_QUEUE = 1, /* mbuf */ + HOST_QUEUE = 2 +}; class HostQueueDatasetOp : public DatasetOpKernel { public: @@ -192,6 +207,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { for (size_t i = 0UL; i < output_shape_size; i++) { DataType tensor_data_type = output_types_.at(i); if (tensor_data_type == DT_STRING) { + is_hold_type.store(true); ADP_LOG(INFO) << "Current tensor type is DT_STRING."; return kStringTypeDepth; } @@ -548,6 +564,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { args_tensor_size += tensor.TotalBytes(); total_bytes_ += tensor.TotalBytes(); } + ADP_LOG(INFO) << "get data in buffer, args_tensor_size = "<channel_type_ == ChannelType::ACL_QUEUE)) { @@ -596,22 +613,70 @@ class HostQueueDatasetOp : public DatasetOpKernel { ADP_LOG(INFO) << "Slave SendDataThread exit."; } - Status SendDataByAclQueue(const vector &args, const acltdtTensorType &data_type) { + void RecordMbufQueueBytes(uint64_t args_total_bytes) { + mbuf_queue_rear_ = (mbuf_queue_rear_ + 1) % kStringTypeDepth; + mbuf_queue_bytes[mbuf_queue_rear_] = args_total_bytes; + } + + bool IsHoldDataTrans() { + clock_t start; + clock_t finish; + double duration; + start = clock(); + if (!is_hold_type) { return false; } + size_t mbuf_size; + aclError status = acltdtQueryChannelSize(acl_handle_, &mbuf_size); + if (status != ACL_SUCCESS) { + ADP_LOG(ERROR) << "Failed to get the mbuf size, status = " << status; + return false; + } + ADP_LOG(ERROR) << "mbuf_size = "<= static_cast(kMaxBytes)); + } + + Status SendDataByAclQueue(const vector &args, const acltdtTensorType &data_type, uint64_t args_total_bytes) { Status status; bool is_need_resend = false; + bool is_hold_data_trans = false; do { { mutex_lock lck(mu_); if (finish_send_) { break; } } - auto start = std::chrono::steady_clock::now(); - status = SendTensorsByAcl(acl_handle_, data_type, args, is_need_resend); - auto end = std::chrono::steady_clock::now(); - if (status.ok() && !is_need_resend) { - auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::SEND, elapsed_time, args); + if (is_need_resend || !IsHoldDataTrans()) { + ADP_LOG(ERROR) << "resend data or not control data trans, is_need_resend : " << is_need_resend << "IsHoldDataTrans : "< args; acltdtTensorType data_type = ACL_TENSOR_DATA_TENSOR; + uint64_t args_total_bytes = 0ULL; { mutex_lock lck(mu_); while ((!finish_send_) && buffer_.empty()) { @@ -709,15 +775,16 @@ class HostQueueDatasetOp : public DatasetOpKernel { args = buffer_.front().value; buffer_.pop_front(); for (auto &tensor : args) { - total_bytes_ -= tensor.TotalBytes(); + args_total_bytes += tensor.TotalBytes(); } + total_bytes_ -= args_total_bytes; } ADP_LOG(INFO) << "Host queue " << dataset()->channel_name_ << ", buffer_size: " << buffer_.size() << ", data_type:" << data_type; } Status status; if (dataset()->channel_type_ == ChannelType::ACL_QUEUE) { - status = SendDataByAclQueue(args, data_type); + status = SendDataByAclQueue(args, data_type, args_total_bytes); } else { status = SendDataByHostQueue(args, data_type); } @@ -996,10 +1063,15 @@ class HostQueueDatasetOp : public DatasetOpKernel { acltdtChannelHandle *acl_handle_; uint32_t queue_id_; int active_thread_num = 0; +<<<<<<< HEAD struct DataThreadPerf { double elapsed_time = 0; uint64_t total_bytes = 0; } data_thread_perf_stat_[static_cast(ThreadType::BUTT)]; +======= + uint64_t mbuf_queue_bytes[kStringTypeDepth]; + size_t mbuf_queue_rear_ = 0; +>>>>>>> 8fe9c497 (数据预处理动态控制device内存占用) }; const std::vector inputs_; std::string channel_name_; diff --git a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc index d16284249..2d2994c0b 100644 --- a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc +++ b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc @@ -103,6 +103,10 @@ aclError aclrtResetDevice(int32_t deviceId) { return ACL_SUCCESS; } +aclError acltdtQueryChannelSize(const acltdtChannelHandle *handle, size_t *size) { + return ACL_SUCCESS; +} + acltdtChannelHandle *acltdtCreateChannelWithCapacity(uint32_t deviceId, const char *name, size_t capacity) { -- Gitee