From 4c8e059250a7c65dc633ef9b2309c44fced5e927 Mon Sep 17 00:00:00 2001 From: hejiahuan11 Date: Wed, 4 Sep 2024 14:28:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=9B=9E=E5=90=88=E8=87=B36.?= =?UTF-8?q?0.0RDMA=E7=9A=84worker=E7=BA=BF=E7=A8=8B=E7=9A=84=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=96=B9=E5=BC=8F=E5=8F=98=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cm_mes/mes_queue.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/cm_mes/mes_queue.c b/src/cm_mes/mes_queue.c index f70bb56..0368cf1 100644 --- a/src/cm_mes/mes_queue.c +++ b/src/cm_mes/mes_queue.c @@ -888,6 +888,7 @@ void mes_task_proc_inner(thread_t *thread) *tid = cm_get_current_thread_id(); bool32 need_serial = MES_GLOBAL_INST_MSG.profile.need_serial; + mes_pipe_type_t pipe_type = MES_GLOBAL_INST_MSG.profile.pipe_type; uint32 start_task_idx = task_priority->start_task_idx; uint32 task_num = task_priority->task_num; uint32 loop = 0; @@ -895,10 +896,12 @@ void mes_task_proc_inner(thread_t *thread) bool32 is_empty = CM_FALSE; uint32 queue_num = task_num > MES_PRIORITY_TASK_QUEUE_NUM ? MES_PRIORITY_TASK_QUEUE_NUM : task_num; while (!thread->closed && mes_ctx->phase == SHUTDOWN_PHASE_NOT_BEGIN) { - is_empty = mes_is_empty_queue_count(mq_ctx, priority); - if (is_empty) { - if (cm_event_timedwait(event, CM_SLEEP_1_FIXED) != CM_SUCCESS) { - continue; + if (pipe_type != MES_TYPE_RDMA) { + is_empty = mes_is_empty_queue_count(mq_ctx, priority); + if (is_empty) { + if (cm_event_timedwait(event, CM_SLEEP_1_FIXED) != CM_SUCCESS) { + continue; + } } } @@ -914,6 +917,9 @@ void mes_task_proc_inner(thread_t *thread) } if (msgitem == NULL) { + if (pipe_type == MES_TYPE_RDMA) { + cm_sleep(1); + } continue; } task_priority->pop_cursor = queue_id + 1; @@ -995,6 +1001,7 @@ status_t mes_start_task_dynamically(bool32 is_send, uint32 index) } mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + mes_pipe_type_t pipe_type = MES_GLOBAL_INST_MSG.profile.pipe_type; bool32 need_serial = MES_GLOBAL_INST_MSG.profile.need_serial; if (need_serial && !mq_ctx->work_thread_idx[index].is_start) { cm_spin_lock(&mq_ctx->work_thread_idx[index].lock, NULL); @@ -1016,7 +1023,7 @@ status_t mes_start_task_dynamically(bool32 is_send, uint32 index) } cm_spin_unlock(&mq_ctx->work_thread_idx[index].lock); } - if (mq_ctx->work_thread_idx[index].is_start) { + if (mq_ctx->work_thread_idx[index].is_start && pipe_type != MES_TYPE_RDMA) { cm_event_notify(&mq_ctx->work_thread_idx[index].event); } return CM_SUCCESS; -- Gitee