From 7a3acf69519e670b26e25864dae4e162343409ba Mon Sep 17 00:00:00 2001 From: qinyudongfang Date: Fri, 6 Jun 2025 17:33:30 +0800 Subject: [PATCH] fix(mes):release inter message buff --- pkg/src/cluster/dtc_btree.c | 2 ++ pkg/src/cluster/dtc_dc.c | 9 +++++++++ pkg/src/cluster/dtc_dcs.c | 4 ++++ pkg/src/cluster/dtc_dls.c | 2 ++ pkg/src/mec/mes_func.c | 9 +++++---- pkg/src/mec/mes_func.h | 6 +++--- pkg/src/mec/mes_uc.c | 12 ++++++++++-- 7 files changed, 35 insertions(+), 9 deletions(-) diff --git a/pkg/src/cluster/dtc_btree.c b/pkg/src/cluster/dtc_btree.c index 47b9f8bd..486c0329 100644 --- a/pkg/src/cluster/dtc_btree.c +++ b/pkg/src/cluster/dtc_btree.c @@ -120,6 +120,7 @@ void dtc_btree_process_root_page(void *sess, mes_message_t *msg) { if (sizeof(msg_btree_broadcast_t) + DEFAULT_PAGE_SIZE(sess) != msg->head->size) { CT_LOG_RUN_ERR("btree process root page msg size is invalid, msg size %u.", msg->head->size); + mes_release_message_buf(msg->buffer); return; } msg_btree_broadcast_t *bcast = (msg_btree_broadcast_t *)msg->buffer; @@ -134,6 +135,7 @@ void dtc_btree_process_root_page(void *sess, mes_message_t *msg) "[DTC] process btree root page[%u-%u], part-subpart[%u-%u], failed to check root page," "table-uid-index[%u-%u-%u]", page_id.file, page_id.page, bcast->part_loc.part_no, bcast->part_loc.subpart_no, bcast->table_id, bcast->uid, bcast->index_id); + mes_release_message_buf(msg->buffer); return; } if (!DC_IS_READY(session)) { diff --git a/pkg/src/cluster/dtc_dc.c b/pkg/src/cluster/dtc_dc.c index 14018223..c8fe23fe 100644 --- a/pkg/src/cluster/dtc_dc.c +++ b/pkg/src/cluster/dtc_dc.c @@ -852,6 +852,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case BTREE_SPLITTING: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_btree_data_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] btree splitting, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } ret = dtc_process_btree_splitting(session, (char*)bcast + sizeof(msg_broadcast_data_t), @@ -860,6 +861,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case BTREE_SPLIT_STATUS: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_btree_data_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] btree split status, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } dtc_process_btree_split_status(session, msg, (char*)bcast + sizeof(msg_broadcast_data_t)); @@ -867,6 +869,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case HEAP_EXTEND: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_heap_data_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] heap extend, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } ret = dtc_process_heap_extend(session, (char*)bcast + sizeof(msg_broadcast_data_t), bcast->head.src_inst); @@ -874,6 +877,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case HEAP_EXTEND_STATUS: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_heap_data_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] heap extend status, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } dtc_process_heap_extend_status(session, msg, (char*)bcast + sizeof(msg_broadcast_data_t)); @@ -881,6 +885,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case USER_STATUS: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_user_data_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] user status, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } dtc_process_user_status(session, (char*)bcast + sizeof(msg_broadcast_data_t)); @@ -888,6 +893,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case INVALIDATE_DC: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_invalidate_dc_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] invalidate dc, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } dtc_process_invalidate_dc(session, (char*)bcast + sizeof(msg_broadcast_data_t)); @@ -895,6 +901,7 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case USER_LOCK_STATUS: if (sizeof(msg_broadcast_data_t) + sizeof(msg_broadcast_user_data_t) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] btree splitting, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } dtc_process_get_user_lock_status(session, msg, (char*)bcast + sizeof(msg_broadcast_data_t)); @@ -902,12 +909,14 @@ void dtc_process_broadcast_data(void *sess, mes_message_t * msg) case REMOVE_DF_WATCH: if (sizeof(msg_broadcast_data_t) + sizeof(uint32) != msg->head->size) { CT_LOG_RUN_ERR("[DTC] remove datafile device watch, msg size is invalid, size=%u", msg->head->size); + mes_release_message_buf(msg->buffer); return; } ret = dtc_process_remove_df_watch(session, (char*)bcast + sizeof(msg_broadcast_data_t)); break; default: CT_LOG_RUN_ERR("[DTC] process broadcast data, type is invalid, type=%d", bcast->type); + mes_release_message_buf(msg->buffer); return; } diff --git a/pkg/src/cluster/dtc_dcs.c b/pkg/src/cluster/dtc_dcs.c index e83b8d41..35628fcd 100644 --- a/pkg/src/cluster/dtc_dcs.c +++ b/pkg/src/cluster/dtc_dcs.c @@ -3349,14 +3349,17 @@ void dcs_process_arch_set_request(void *sess, mes_message_t *msg) item = cm_get_config_item(GET_CONFIG, &name, CT_TRUE); if (item == NULL) { CT_THROW_ERROR(ERR_INVALID_PARAMETER_NAME, arch_set_param); + mes_release_message_buf(msg->buffer); return; } if (req->scope != CONFIG_SCOPE_DISK) { if (item->notify && item->notify((knl_handle_t)session, (void *)item, req->value)) { + mes_release_message_buf(msg->buffer); return; } } else { if (item->notify_pfile && item->notify_pfile((knl_handle_t)session, (void *)item, req->value)) { + mes_release_message_buf(msg->buffer); return; } } @@ -3369,6 +3372,7 @@ void dcs_process_arch_set_request(void *sess, mes_message_t *msg) #endif } if (cm_alter_config(session->kernel->attr.config, arch_set_param, req->value, req->scope, force) != CT_SUCCESS) { + mes_release_message_buf(msg->buffer); return; } diff --git a/pkg/src/cluster/dtc_dls.c b/pkg/src/cluster/dtc_dls.c index bf956823..133470d1 100644 --- a/pkg/src/cluster/dtc_dls.c +++ b/pkg/src/cluster/dtc_dls.c @@ -1952,6 +1952,7 @@ void dls_process_txn_wait(knl_session_t *session, mes_message_t * receive_msg) send_msg = (uint8*)cm_push(session->stack, mes_size); if (send_msg == NULL) { CT_LOG_RUN_ERR("msg failed to malloc memory"); + mes_release_message_buf(receive_msg->buffer); return; } head = (mes_message_head_t*)send_msg; @@ -2000,6 +2001,7 @@ void dls_process_txn_msg(void *sess, mes_message_t * receive_msg) dls_process_txn_awake(session, receive_msg); } else { CT_LOG_RUN_ERR("[DLS] invalid cmd %u, not process", receive_msg->head->cmd); + mes_release_message_buf(receive_msg->buffer); } return; } diff --git a/pkg/src/mec/mes_func.c b/pkg/src/mec/mes_func.c index 5cd1ffd0..553bf89b 100644 --- a/pkg/src/mec/mes_func.c +++ b/pkg/src/mec/mes_func.c @@ -1101,13 +1101,13 @@ status_t mes_send_data3(mes_message_head_t *head, uint32 head_size, const void * return CT_SUCCESS; } -static inline void mes_protect_when_timeout(mes_waiting_room_t *room) +static inline void mes_protect_when_recv_abnormal(mes_waiting_room_t *room) { cm_spin_lock(&room->lock, NULL); (void)cm_atomic32_inc((atomic32_t *)(&room->rsn)); if (pthread_mutex_trylock(&room->mutex) == 0) { // trylock to avoid mutex has been unlocked. mes_release_message_buf(room->msg_buf); - DTC_MES_LOG_INF("[mes]%s: mutex has unlock, rsn=%u, room rsn=%u.", (char *)__func__, + CT_LOG_RUN_WAR("[mes]mes protect when recv abnormal, rsn=%u, room rsn=%u.", ((mes_message_head_t *)room->msg_buf)->rsn, room->rsn); } cm_spin_unlock(&room->lock); @@ -1141,7 +1141,8 @@ status_t mes_recv_impl(uint32 sid, mes_message_t *msg, bool32 check_rsn, uint32 for (;;) { if (mes_check_connect_ready() != CT_SUCCESS) { - MES_LOGGING_WAR(MES_LOGGING_UNMATCH_MSG, "[mes]%s:Network connection interrupted.", (char *)__func__); + CT_LOG_RUN_ERR("[mes]Network connection interrupted."); + mes_protect_when_recv_abnormal(room); return CT_ERROR; } @@ -1149,7 +1150,7 @@ status_t mes_recv_impl(uint32 sid, mes_message_t *msg, bool32 check_rsn, uint32 timeout_time = (uint32)cm_atomic_get(&room->timeout); if ((timeout_time == 0) || ((quick_stop_check == CT_TRUE) && (mes_message_need_timeout()))) { // when timeout the ack msg may reach, so need do some check and protect. - mes_protect_when_timeout(room); + mes_protect_when_recv_abnormal(room); CT_THROW_ERROR_EX(ERR_TCP_TIMEOUT, "sid(%u) recv timeout, rsn=%u, expect_rsn=%u, timeou=%u.", sid, room->rsn, expect_rsn, timeout); return CT_ERROR; diff --git a/pkg/src/mec/mes_func.h b/pkg/src/mec/mes_func.h index a572c9c0..5279213d 100644 --- a/pkg/src/mec/mes_func.h +++ b/pkg/src/mec/mes_func.h @@ -57,9 +57,9 @@ extern "C" { #define MES_MESSAGE_BUFFER_SIZE \ (uint32)(SIZE_K(32) + MES_MESSAGE_TINY_SIZE) /* biggest: pcr page ack: head + ack + page */ #define MES_512K_MESSAGE_BUFFER_SIZE (uint32)SIZE_K(512) -#define MES_LOGGING_INTERVAL (60000) // ms -#define MES_CMD_LOGGING_INTERVAL (60000) // ms -#define MES_GROUP_LOGGING_INTERVAL (60000) // ms +#define MES_LOGGING_INTERVAL (500) // ms +#define MES_CMD_LOGGING_INTERVAL (500) // ms +#define MES_GROUP_LOGGING_INTERVAL (500) // ms #define MES_WAIT_TIMEOUT (5) // ms #define MES_WAIT_MAX_TIME (0xFFFFFFFF) // ms #define MES_MSG_RETRY_TIME (100) // ms diff --git a/pkg/src/mec/mes_uc.c b/pkg/src/mec/mes_uc.c index 475f43f0..c50cb2fa 100644 --- a/pkg/src/mec/mes_uc.c +++ b/pkg/src/mec/mes_uc.c @@ -339,7 +339,7 @@ status_t mes_uc_add_buf_list_to_msg(dpuc_msg *mes_uc_msg, mes_message_head_t *he page_num = (head->size + (MES_UC_BYTE_PER_PAGE_PI - 1)) / MES_UC_BYTE_PER_PAGE_PI; MES_UC_ALLOC_PAGES_SYNC(page_num, &sgl); if (sgl == NULL) { - MES_LOGGING(MES_LOGGING_SEND, "mes alloc sgl failed, page num %u", page_num); + CT_LOG_RUN_ERR("mes alloc sgl failed, page num %u", page_num); return CT_ERROR; } @@ -357,7 +357,7 @@ status_t mes_uc_add_buf_list_to_msg(dpuc_msg *mes_uc_msg, mes_message_head_t *he mes_modify_last_entry_len(sgl, (head->size - (page_num - 1) * MES_UC_BYTE_PER_PAGE_PI)); // 按圆整后的长度发送 if (mes_global_handle()->dpuc_sgl_addr_set(mes_uc_msg, sgl, head->size, __FUNCTION__) != DP_OK) { - MES_LOGGING(MES_LOGGING_SEND, "mes set sgl to uc failed, page num %u", page_num); + CT_LOG_RUN_ERR("mes set sgl to uc failed, page num %u", page_num); MES_UC_FREE_PAGES(sgl); return CT_ERROR; } @@ -486,6 +486,7 @@ status_t mes_uc_send_data(const void *msg_data) if (pContext == NULL) { MES_LOGGING(MES_LOGGING_SEND, "mes set uc send context failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u", g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst], head->cmd); + mes_uc_free_mem(pContext, mes_uc_msg); return CT_ERROR; } pContext->cmd = head->cmd; @@ -565,6 +566,7 @@ status_t mes_uc_send_bufflist(mes_bufflist_t *buff_list) if (pContext == NULL) { MES_LOGGING(MES_LOGGING_SEND, "mes set uc send context failed, src_eid 0x%lx, dst_eid 0x%lx, cmd %u", g_mes_uc_config.eid, g_mes_uc_config.dst_eid[dst_inst], head->cmd); + mes_uc_free_mem(pContext, mes_uc_msg); return CT_ERROR; } pContext->cmd = head->cmd; @@ -593,6 +595,7 @@ status_t mes_uc_get_mes_msg_from_uc_head(dpuc_msg *uc_msg, mes_message_t *mes_ms user_data = (char*)mes_global_handle()->dpuc_data_addr_get(uc_msg, __FUNCTION__); if (user_data == NULL) { MES_LOGGING(MES_LOGGING_SEND, "mes recv uc msg head failed"); + mes_uc_free_uc_msg_sgl(uc_msg); return CT_ERROR; } @@ -601,16 +604,19 @@ status_t mes_uc_get_mes_msg_from_uc_head(dpuc_msg *uc_msg, mes_message_t *mes_ms CT_THROW_ERROR_EX(ERR_MES_ILEGAL_MESSAGE, "mes message length=%u, cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, " "src_sid=%u, dst_sid=%u, thead id=%d", head->size, head->cmd, head->rsn, head->src_inst, head->dst_inst, head->src_sid, head->dst_sid, g_thread_queue_id); + mes_uc_free_uc_msg_sgl(uc_msg); return CT_ERROR; } mes_get_message_buf(mes_msg, head); if ((mes_msg->buffer == NULL) || (mes_msg->head == NULL)) { MES_LOGGING(MES_LOGGING_SEND, "mes get msg buf failed"); + mes_uc_free_uc_msg_sgl(uc_msg); return CT_ERROR; } err = memcpy_s(mes_msg->buffer, head->size, head, head->size); MEMS_RETURN_IFERR(err); + mes_uc_free_uc_msg_sgl(uc_msg); return CT_SUCCESS; } @@ -706,6 +712,7 @@ int32_t mes_uc_msg_recv_func(dpuc_msg *uc_msg, dpuc_msg_mem_free_mode_e *freeMod if (g_mes_uc_channel_status[mes_msg.head->src_inst].is_allow_msg_transfer != CT_TRUE) { MES_LOGGING(MES_LOGGING_RECV, "mes not allow msg transfer, src_inst=%u.", mes_msg.head->src_inst); + mes_free_buf_item(mes_msg.buffer); cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock); return RETURN_ERROR; } @@ -716,6 +723,7 @@ int32_t mes_uc_msg_recv_func(dpuc_msg *uc_msg, dpuc_msg_mem_free_mode_e *freeMod cm_thread_unlock(&g_mes_uc_recv_thead[g_thread_queue_id].lock); CT_LOG_RUN_ERR("[mes] check cks failed, cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u", mes_msg.head->cmd, mes_msg.head->rsn, mes_msg.head->src_inst, mes_msg.head->dst_inst); + mes_free_buf_item(mes_msg.buffer); return RETURN_ERROR; } } -- Gitee