From 0055cf14edb3c0fdcfeacbf238ec5d0d239edfcd Mon Sep 17 00:00:00 2001 From: dongning12 Date: Mon, 29 Jul 2024 21:13:00 +0800 Subject: [PATCH] =?UTF-8?q?[=E9=9C=80=E6=B1=82]msg=5Fpool=E6=94=B9?= =?UTF-8?q?=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cm_mes/mes_func.c | 90 +-- src/cm_mes/mes_func.h | 1 - src/cm_mes/mes_interface.h | 60 +- src/cm_mes/mes_mem_stat.c | 42 +- src/cm_mes/mes_msg_pool.c | 1388 ++++++++++++++++++++++++++++-------- src/cm_mes/mes_msg_pool.h | 103 ++- src/cm_mes/mes_queue.c | 36 +- src/cm_mes/mes_queue.h | 5 +- 8 files changed, 1287 insertions(+), 438 deletions(-) diff --git a/src/cm_mes/mes_func.c b/src/cm_mes/mes_func.c index 18797a6..fb979a4 100644 --- a/src/cm_mes/mes_func.c +++ b/src/cm_mes/mes_func.c @@ -21,6 +21,8 @@ * * ------------------------------------------------------------------------- */ +#include +#include #include "mes_func.h" #include "cm_ip.h" #include "cm_memory.h" @@ -344,44 +346,10 @@ static int mes_set_priority_task_worker_num(mes_priority_t priority, uint32 task return CM_SUCCESS; } -static int mes_set_buffer_pool(const mes_profile_t *profile) +int mes_set_msg_pool(mes_profile_t *profile) { - for (uint32 priority = 0; priority < profile->priority_cnt; priority++) { - uint32 pool_count = profile->buffer_pool_attr[priority].pool_count; - uint32 queue_count = profile->buffer_pool_attr[priority].queue_count; - - if ((pool_count == 0) || (pool_count > MES_MAX_BUFFPOOL_NUM)) { - LOG_RUN_ERR("[mes] pool_count %u is invalid, legal scope is [1, %d], priority:%u.", - pool_count, MES_MAX_BUFFPOOL_NUM, priority); - return CM_ERROR; - } - - if ((queue_count == 0) || (queue_count > MES_MAX_BUFFER_QUEUE_NUM)) { - LOG_RUN_ERR("[mes] pool_queue_count %u is invalid, legal scope is [1, %d], priority:%u.", - queue_count, MES_MAX_BUFFER_QUEUE_NUM, priority); - return CM_ERROR; - } - - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].pool_count = pool_count; - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].queue_count = queue_count; - - uint32 max_index = 0; - for (uint32 i = 0; i < pool_count; i++) { - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[i].size = - profile->buffer_pool_attr[priority].buf_attr[i].size + (unsigned int)sizeof(mes_message_head_t); - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[i].count = - profile->buffer_pool_attr[priority].buf_attr[i].count; - if (MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[max_index].size < - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[i].size) { - max_index = i; - } - } - - // for compress reserved - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[max_index].size += MES_BUFFER_RESV_SIZE; - } - - return CM_SUCCESS; + int ret = mes_check_msg_pool_attr(profile, &MES_GLOBAL_INST_MSG.profile, CM_TRUE, NULL); + return ret; } void mes_set_specified_priority_enable_compress(mes_priority_t priority, bool8 enable_compress) @@ -546,11 +514,6 @@ static int mes_set_profile(mes_profile_t *profile) return ret; } - ret = mes_set_buffer_pool(profile); - if (ret != CM_SUCCESS) { - LOG_RUN_ERR("[mes]: set buffer pool failed."); - return ret; - } MES_GLOBAL_INST_MSG.profile.pipe_type = profile->pipe_type; MES_GLOBAL_INST_MSG.profile.conn_created_during_init = profile->conn_created_during_init; MES_GLOBAL_INST_MSG.profile.frag_size = profile->frag_size; @@ -592,6 +555,12 @@ static int mes_set_profile(mes_profile_t *profile) return ERR_MES_MEMORY_COPY_FAIL; } + ret = mes_set_msg_pool(profile); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes]: set msg pool failed."); + return ret; + } + // pipe work method and bind core MES_GLOBAL_INST_MSG.profile.rdma_rpc_use_busypoll = profile->rdma_rpc_use_busypoll; MES_GLOBAL_INST_MSG.profile.rdma_rpc_is_bind_core = profile->rdma_rpc_is_bind_core; @@ -843,32 +812,16 @@ static int mes_start_work_thread_statically(bool32 is_send) static int mes_init_mq_instance(bool32 is_send) { - LOG_RUN_INF("[mes] mes_init_mq_instance begin."); - int ret; mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + + LOG_RUN_INF("[mes] mes_init_mq_instance begin, is_send:%u.", is_send); + int ret; for (uint32 loop = 0; loop < MES_MAX_TASK_NUM; loop++) { mq_ctx->tasks[loop].choice = 0; mes_init_msgqueue(&mq_ctx->tasks[loop].queue); } mes_init_msgitem_pool(&mq_ctx->pool); - GS_INIT_SPIN_LOCK(mq_ctx->msg_pool_init_lock); - for (uint32 i = 0; i < MES_GLOBAL_INST_MSG.profile.inst_cnt; i++) { - inst_type inst_id = MES_GLOBAL_INST_MSG.profile.inst_net_addr[i].inst_id; - if (is_send && (inst_id == MES_GLOBAL_INST_MSG.profile.inst_id)) { - continue; - } - for (uint32 priority = 0; priority < MES_GLOBAL_INST_MSG.profile.priority_cnt; priority++) { - if (mes_init_message_pool(is_send, inst_id, priority) != CM_SUCCESS) { - for (uint32 k = 0; k < i; k++) { - for (uint32 priority1 = 0; priority1 < priority; priority1++) { - mes_destroy_message_pool(is_send, inst_id, priority1); - } - } - return CM_ERROR; - } - } - } mq_ctx->priority.assign_task_idx = 0; ret = mes_init_priority_task(is_send); @@ -877,12 +830,23 @@ static int mes_init_mq_instance(bool32 is_send) return ret; } + mq_ctx->enable_inst_dimension = MES_GLOBAL_INST_MSG.profile.msg_pool_attr.enable_inst_dimension; + mq_ctx->msg_pool_inited = CM_FALSE; + GS_INIT_SPIN_LOCK(mq_ctx->msg_pool_init_lock); + ret = mes_init_message_pool(is_send); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] mes init message pool failed, is_send:%u.", + is_send); + return ret; + } + ret = mes_start_work_thread_statically(is_send); if (ret != CM_SUCCESS) { LOG_RUN_ERR("[mes] mes start work thread statically failed, is_send:%u.", is_send); return ret; } - LOG_RUN_INF("[mes] mes_init_mq_instance end."); + + LOG_RUN_INF("[mes] mes_init_mq_instance end, is_send:%u.", is_send); return CM_SUCCESS; } @@ -1490,7 +1454,7 @@ void mes_uninit(void) mes_task_threadpool_uninit(); } mes_destroy_msgitem_pool(); - mes_destroy_all_message_pool(); + mes_deinit_all_message_pool(); mes_stop_channels(); mes_destroy_resource(); mes_destroy_all_broadcast_msg(); diff --git a/src/cm_mes/mes_func.h b/src/cm_mes/mes_func.h index 76143b4..2b60d0d 100644 --- a/src/cm_mes/mes_func.h +++ b/src/cm_mes/mes_func.h @@ -464,7 +464,6 @@ void mes_close_send_pipe(mes_pipe_t *pipe); void mes_close_send_pipe_nolock(mes_pipe_t *pipe); void mes_close_recv_pipe(mes_pipe_t *pipe); void mes_close_recv_pipe_nolock(mes_pipe_t *pipe); -int64 mes_get_mem_capacity_internal(mq_context_t *mq_ctx, mes_priority_t priority); status_t mes_get_inst_net_add_index(inst_type inst_id, uint32 *index); int mes_connect_single(inst_type inst_id); mes_channel_t *mes_get_active_send_channel(uint32 dest_id, uint32 caller_tid, uint32 flags); diff --git a/src/cm_mes/mes_interface.h b/src/cm_mes/mes_interface.h index 5ea8b20..9e3bdea 100644 --- a/src/cm_mes/mes_interface.h +++ b/src/cm_mes/mes_interface.h @@ -126,16 +126,26 @@ typedef enum en_compress_algorithm { COMPRESS_CEIL = 2, } compress_algorithm_t; -typedef struct st_mes_buffer_attr { - unsigned int size; - unsigned int count; -} mes_buffer_attr_t; - -typedef struct st_mes_buffer_pool_attr { - unsigned int pool_count; - unsigned int queue_count; - mes_buffer_attr_t buf_attr[MES_MAX_BUFFPOOL_NUM]; -} mes_buffer_pool_attr_t; +typedef struct st_mes_msg_buffer_inner_pool_attr { + unsigned int queue_num; +} mes_msg_buffer_inner_pool_attr_t; + +typedef struct st_mes_msg_buffer_pool_attr { + unsigned int buf_size; + double proportion; + mes_msg_buffer_inner_pool_attr_t priority_pool_attr[MES_PRIORITY_CEIL]; + mes_msg_buffer_inner_pool_attr_t shared_pool_attr; +} mes_msg_buffer_pool_attr_t; + +typedef struct st_mes_msg_pool_attr { + unsigned long long total_size; + unsigned char enable_inst_dimension; + unsigned int buf_pool_count; + mes_msg_buffer_pool_attr_t buf_pool_attr[MES_MAX_BUFFPOOL_NUM]; + unsigned int max_buf_size[MES_PRIORITY_CEIL]; + // max buf size in priority, used for compress feature + // if no buffer belong to priority, follow anthor priority setting +} mes_msg_pool_attr_t; typedef struct st_mes_addr { inst_type inst_id; @@ -170,7 +180,7 @@ typedef struct st_mes_profile { inst_type inst_id; unsigned int inst_cnt; mes_pipe_type_t pipe_type; - mes_buffer_pool_attr_t buffer_pool_attr[MES_PRIORITY_CEIL]; + mes_msg_pool_attr_t msg_pool_attr; unsigned int channel_cnt; unsigned int priority_cnt; unsigned char mes_elapsed_switch : 1; @@ -260,6 +270,16 @@ typedef struct st_mes_mem_info_stat { double used_percentage; } mes_mem_info_stat_t; +typedef struct st_mes_msg_pool_minimum_info { + unsigned long long total_minimum_size; + unsigned long long metadata_size; + unsigned long long buf_pool_total_size; // sum of buf_pool_minimum_size + unsigned char buf_pool_count; + unsigned long long buf_pool_minimum_size[MES_MAX_BUFFPOOL_NUM]; + // buf order in buf_pool_minimum_size same as profile.msg_pool_attr.buf_pool_attr + // if want buf_pool[i] smallest, buf_pool_attr[i].proportion = buf_pool_minimum_size[i] / buf_pool_total_size +} mes_msg_pool_minimum_info_t; + typedef void (*mes_thread_init_t)(unsigned char need_startup, char **reg_data); typedef void (*mes_thread_deinit_t)(); @@ -551,13 +571,11 @@ void mes_set_compress_level(unsigned int level); int mes_is_different_endian(inst_type dst_inst); /* - * @brief get memory capacity of a specified priority. - * every instance is the same + * @brief get memory capacity of send/receive message pool * @param is_send - 1:send; 0:receive - * @param priority - priority - * @return memory capacity of a specified priority + * @return memory capacity of a send/receive message pool */ -long long mes_get_mem_capacity(unsigned char is_send, mes_priority_t priority); +long long mes_get_mem_capacity(unsigned char is_send); /* * @brief get the count of started work thread task. @@ -631,6 +649,16 @@ void mes_collect_mem_usage_stat(); */ void mes_get_mem_usage_stat_row(mes_mem_stat_t mem_id, mes_mem_info_stat_t *mes_mem_stat_row_result); +/* + * @brief get minimum message pool size + * @[in]param profile - config value + * @[in]param is_send - true:send message pool; false receive message pool + * @[out]param minimum_info - message pool minimum info + * @return whether success, if not, some parameter in profile not correct + */ +int mes_get_message_pool_minimum_info(mes_profile_t *profile, unsigned char is_send, + mes_msg_pool_minimum_info_t *minimum_info); + #ifdef __cplusplus } #endif diff --git a/src/cm_mes/mes_mem_stat.c b/src/cm_mes/mes_mem_stat.c index d16e6ee..33c5992 100644 --- a/src/cm_mes/mes_mem_stat.c +++ b/src/cm_mes/mes_mem_stat.c @@ -60,22 +60,23 @@ uint64 mes_calc_room_pool() return total_mem; } -uint64 mes_calc_buffer_pool_mem(mes_profile_t *profile) +uint64 mes_calc_buffer_pool_mem(mes_profile_t *profile, bool8 is_send) { - uint64 total_mem = 0; - for (uint32 i = 0; i < profile->priority_cnt; i++) { - total_mem += mes_calc_message_pool_size(profile, i); + if (is_send && profile->send_directly && + (profile->enable_compress_priority == 0 || profile->algorithm == COMPRESS_NONE || + profile->algorithm >= COMPRESS_CEIL)) { + return 0; } - return total_mem; + return profile->msg_pool_attr.total_size; } long long mes_calc_mem_usage(mes_profile_t *profile) { // mes send buffer pool uint64 total_mem = 0; - total_mem += mes_calc_buffer_pool_mem(profile) * (CM_MAX_INSTANCES - 1); + total_mem += mes_calc_buffer_pool_mem(profile, CM_TRUE); // mes receive buffer pool - total_mem += mes_calc_buffer_pool_mem(profile) * CM_MAX_INSTANCES; + total_mem += mes_calc_buffer_pool_mem(profile, CM_FALSE); // mes channels total_mem += mes_calc_channels_mem(profile->channel_cnt); // mes room pool @@ -90,13 +91,13 @@ static void calc_percentage(mes_mem_info_stat_t *mem_stat_row_results) mem_stat_row_results->used_percentage = used_percentage; } -uint64 mes_get_mem_remain_from_pool(mes_pool_t *pool) +uint64 mes_get_mem_remain_from_pool(mes_msg_pool_t *pool) { uint64 remain_size = 0; - for (uint32 i = 0; i < pool->count; i++) { - mes_buf_chunk_t *chunk = &pool->chunk[i]; - for (uint32 j = 0; j < chunk->queue_num; j++) { - mes_buf_queue_t *queue = &chunk->queues[j]; + for (uint8 buf_pool_no = 0; buf_pool_no < pool->buf_pool_count; buf_pool_no++) { + mes_msg_buffer_inner_pool_t *shared_pool = &pool->buf_pool[buf_pool_no]->shared_pool; + for (uint32 qn = 0; qn < shared_pool->queue_num; qn++) { + mes_buf_queue_t *queue = &shared_pool->queues[qn]; remain_size += (uint64)queue->count * queue->buf_size; } } @@ -106,17 +107,13 @@ uint64 mes_get_mem_remain_from_pool(mes_pool_t *pool) uint64 mes_calc_buffer_pool_remain(bool32 is_send) { uint64 remain_size = 0; - mes_pool_t *pool; mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; - for (uint32 i = 0; i < MES_GLOBAL_INST_MSG.profile.inst_cnt; i++) { - inst_type inst_id = MES_GLOBAL_INST_MSG.profile.inst_net_addr[i].inst_id; - if (is_send && (inst_id == MES_GLOBAL_INST_MSG.profile.inst_id)) { - continue; - } - for (uint32 priority = 0; priority < MES_GLOBAL_INST_MSG.profile.priority_cnt; priority++) { - pool = mq_ctx->msg_pool[inst_id][priority]; - remain_size += mes_get_mem_remain_from_pool(pool); + if (!mq_ctx->enable_inst_dimension) { + remain_size = mes_get_mem_remain_from_pool(mq_ctx->single_pool); + } else { + for (int inst_id = 0; inst_id < mq_ctx->inst_pool_set.inst_pool_count; inst_id++) { + remain_size += mes_get_mem_remain_from_pool(mq_ctx->inst_pool_set.inst_pool[inst_id]); } } return remain_size; @@ -140,8 +137,7 @@ static uint64 get_msg_item_unused_num() void mes_collect_mem_usage_stat() { // mes receive buffer pool - g_mes_mem_info_stat[MEM_RECEIVE_BUF_POOL].total = - mes_calc_buffer_pool_mem(&MES_GLOBAL_INST_MSG.profile) * (MES_GLOBAL_INST_MSG.profile.inst_cnt); + g_mes_mem_info_stat[MEM_RECEIVE_BUF_POOL].total = mes_calc_buffer_pool_mem(&MES_GLOBAL_INST_MSG.profile, CM_FALSE); g_mes_mem_info_stat[MEM_RECEIVE_BUF_POOL].used = g_mes_mem_info_stat[MEM_RECEIVE_BUF_POOL].total - mes_calc_buffer_pool_remain(CM_FALSE); calc_percentage(&g_mes_mem_info_stat[MEM_RECEIVE_BUF_POOL]); diff --git a/src/cm_mes/mes_msg_pool.c b/src/cm_mes/mes_msg_pool.c index 83cb732..aae1066 100644 --- a/src/cm_mes/mes_msg_pool.c +++ b/src/cm_mes/mes_msg_pool.c @@ -20,88 +20,239 @@ * src/cm_mes/mes_msg_pool.c * * ------------------------------------------------------------------------- + * + * introduction of msg pool. + * when receive message, we need a buffer from receive msg pool to save message + * so that the worker thread can process this message later. + * when we enable send_directly and send message, we also need send msg pool to + * find a buffer to save message so that the sender thread can send message later. + * when we disable send_directly and send message, we do not need send msg pool + * any more. + * + * 1.architecture + * 1)use only single pool, not divived pool by instance + * single_pool |--- buf_pool(buf1) |--- shared_pool + * | |--- private_pool |--- priority 0 pool + * | |--- priority 1 pool + * | |--- ... + * | + * |--- buf_pool(buf2) |--- shared_pool + * | |--- private_pool |--- priority 0 pool + * | |--- priority 1 pool + * | |--- ... + * |--- ... + * + * 2)enable inst dimension, divived pool by instance + * inst_pool_set |---inst_pool(inst0) |--- buf_pool(buf0) |--- shared_pool + * | | |--- private_pool |--- priority 0 pool + * | | |--- priority 1 pool + * | | |--- ... + * | | + * | |--- buf_pool(buf1) ... + * | |--- ... + * |--- inst_pool(inst1) .... + * + * in order to unify the concept, we call single_pool and inst_pool msg_pool; + * they use the same struct mes_msg_pool_t. + * + * 2.design concept + * 1) priority pool has *appropriate amount* buffer, so most time we just use priority pool. + * these buffer only belong to this priority, can not used by other priority. we tag + * these buffer with *private* + * what is *appropriate amount* : 2 * worker-thread number + * the ability of process message depend of worker thread number and single message + * process time. under maximum load conditions, all the worker thread process together, + * we need at least worker-thread number buffer, consider proceess message need time + * and alloc-free competition problem, the number multiply 2 + * + * 2) receive workload become bigger, if priority pool has no buffer, then we get buffer + * from shared pool. the buffer is tagged with shared. + * after the message proccessed and free this buffer, put this buffer to the priority + * pool. although the buffer is in priority pool, but it tag:shared not changed + * + * 3) because of some priority pool take up extra buffer (belong to shared pool), when + * receive other priority messages. the priority pool has not enough message, try to get + * buffer from shared_pool, however shared pool does not has enough buffer either. + * how to solve this problem + * 1) receive thread need buffer to save message right now, so receive thread steal the + * buffer with *shared tag* from over-take priority pool. + * 2) when shared pool available capacity touch the threshold (eg. 10% capacity), tag the + * buf_pool need recycle. when worker thread free buffer, find need-recycle tag and + * buffer tag is shared, put buffer backto shared pool until shared pool reach over + * threshold + * + * 3. how to find a buf_pool + * *is_send* can distinguish send buffer_msg_pool and receive buffer_msg_pool + * *enable_inst_dimension* can distinguish single_pool and inst_pool_set + * if enable_inst_dimension true, we also need *inst_id* to tell us which inst_pool + * buf_pool_no or meessage len can find which buf_pool + * + * 4. alloc poilcy + * find the correct buf_pool + * 1) find buffer from priority_pool, if has return. + * 2) find buffer from shared_pool, if has return. + * 3) find buffer from other priority_pool, + * if buffer tagged with private, can not use this buffer, give back to priority_pool; + * if buffer tagged with shared, return + * 4) wait and retry + * + * 5. free poilcy + * 1) buffer tagged with private, give back to priority_pool. + * 2) buffer tagged with shared, buf_pool need recycle, give back to shared_pool. + * 3) buffer tagged with shared, no need recycle, put it into priority_pool. + * */ + #include "mes_msg_pool.h" #include "mes_func.h" #define RECV_MSG_POOL_FC_THRESHOLD 10 +#define MSG_POOL_TEMP_STR_LEN 1000 +#define MSG_POOL_SHORT_TEMP_STR_LEN 50 +#define MSG_BUF_POOL_THRESHOLD_RATIO 0.1 +#define MSG_PRIORITY_POOL_BUFFER_NUM_MAGNIFICATION 2 -static mes_buf_chunk_t *mes_get_buffer_chunk(uint32 len, bool32 is_send, uint32 inst_id, mes_priority_t priority) +static int cmp_by_msg_buffer_pool_buf_size(const void *a, const void *b) { - mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; - mes_buf_chunk_t *chunk; + mes_msg_buffer_pool_attr_t *mpa1 = (mes_msg_buffer_pool_attr_t*)a; + mes_msg_buffer_pool_attr_t *mpa2 = (mes_msg_buffer_pool_attr_t*)b; + return mpa1->buf_size - mpa2->buf_size; +} - if (inst_id >= MES_MAX_INSTANCES || priority >= MES_PRIORITY_CEIL) { - LOG_RUN_ERR("[mes] mes_get_buffer_chunk failed, invalid inst_id[%u] or priority[%u], is_send:%u", - inst_id, priority, is_send); - return NULL; +static int mes_check_proportion_in_msg_pool_attr(mes_msg_pool_attr_t *msg_pool_attr) +{ + double actual_proportion = 0; + for (uint8 buf_pool_no = 0; buf_pool_no < msg_pool_attr->buf_pool_count; buf_pool_no++) { + if (msg_pool_attr->buf_pool_attr[buf_pool_no].proportion <= 0 || + msg_pool_attr->buf_pool_attr[buf_pool_no].proportion > 1) { + LOG_RUN_ERR("[mes][msg pool] buf_pool_no:%u proportion:%f is not legal, legal scope is (0, 1]", + buf_pool_no, msg_pool_attr->buf_pool_attr[buf_pool_no].proportion); + return CM_ERROR; + } + actual_proportion += msg_pool_attr->buf_pool_attr[buf_pool_no].proportion; } - if (mq_ctx->msg_pool[inst_id][priority] == NULL) { - cm_spin_lock(&mq_ctx->msg_pool_init_lock, NULL); - if (mq_ctx->msg_pool[inst_id][priority] == NULL) { - if (mes_init_message_pool(is_send, inst_id, priority) != CM_SUCCESS) { - cm_spin_unlock(&mq_ctx->msg_pool_init_lock); - LOG_RUN_ERR("[mes] mes_init_message_pool failed, inst_id:%u, priority:%u, is_send:%u", - inst_id, priority, is_send); - return NULL; + if (fabs(actual_proportion - 1) > DBL_EPSILON) { + LOG_RUN_ERR("[mes][msg pool] sum of proportion:%f, should be 1.", + actual_proportion); + return CM_ERROR; + } + return CM_SUCCESS; +} + +static int mes_add_compress_size_in_check_msg_pool_attr(mes_profile_t *out_profile) +{ + bool8 add_compress_size[MES_MAX_BUFFPOOL_NUM] = { CM_FALSE }; + uint32 target_buf_size = 0; + mes_msg_pool_attr_t *msg_pool_attr = &out_profile->msg_pool_attr; + for (mes_priority_t prio = 0; prio < out_profile->priority_cnt; prio++) { + if (msg_pool_attr->max_buf_size[prio] == 0) { + LOG_RUN_ERR("[mes][msg pool] msg pool attribute has something wrong, " + "priority:%u max buf size is zero.", + prio); + return CM_ERROR; + } + + target_buf_size = msg_pool_attr->max_buf_size[prio] + sizeof(mes_message_head_t); + bool8 find_target_buf = CM_FALSE; + for (uint8 buf_pool_no = 0; buf_pool_no < msg_pool_attr->buf_pool_count; buf_pool_no++) { + if (msg_pool_attr->buf_pool_attr[buf_pool_no].buf_size == target_buf_size) { + if (!add_compress_size[buf_pool_no]) { + add_compress_size[buf_pool_no] = CM_TRUE; + } + find_target_buf = CM_TRUE; } } - cm_spin_unlock(&mq_ctx->msg_pool_init_lock); + if (!find_target_buf) { + LOG_RUN_ERR("[mes][msg pool] msg pool attribute has something wrong, " + "priority:%u max buf size:%u can not find in msg pool attribute.", + prio, msg_pool_attr->max_buf_size[prio]); + return CM_ERROR; + } + msg_pool_attr->max_buf_size[prio] = target_buf_size + MES_BUFFER_RESV_SIZE; } - for (uint32 i = 0; i < mq_ctx->msg_pool[inst_id][priority]->count; i++) { - chunk = &mq_ctx->msg_pool[inst_id][priority]->chunk[i]; - if (len <= chunk->buf_size) { - return chunk; + for (uint8 buf_pool_no = 0; buf_pool_no < msg_pool_attr->buf_pool_count; buf_pool_no++) { + if (add_compress_size[buf_pool_no]) { + msg_pool_attr->buf_pool_attr[buf_pool_no].buf_size += MES_BUFFER_RESV_SIZE; } } - - LOG_RUN_ERR("[mes] There is not long enough buffer pool for %u, is_send:%u, inst_id:%u, priority:%u.", - len, is_send, inst_id, priority); - return NULL; + return CM_SUCCESS; } -static void mes_format_buf_queue_memory(mes_buf_queue_t *queue) +int mes_check_msg_pool_attr(mes_profile_t *profile, mes_profile_t *out_profile, bool8 check_proportion, + mes_msg_buffer_relation_t *buf_rel) { - mes_buffer_item_t *buf_node = NULL; - mes_buffer_item_t *buf_node_next = NULL; - uint64 buf_item_size = (uint64)sizeof(mes_buffer_item_t) + queue->buf_size; - char *temp_buffer = queue->addr; + int ret; + mes_msg_pool_attr_t *input_msg_pool_attr = &profile->msg_pool_attr; + mes_msg_pool_attr_t *msg_pool_attr = &out_profile->msg_pool_attr; + *msg_pool_attr = *input_msg_pool_attr; + if (msg_pool_attr->buf_pool_count == 0 || msg_pool_attr->buf_pool_count > MES_MAX_BUFFPOOL_NUM) { + LOG_RUN_ERR("[mes][msg pool] buf_pool_count:%u is invalid, legal scope is [1, %u].", + msg_pool_attr->buf_pool_count, MES_MAX_BUFFPOOL_NUM); + return CM_ERROR; + } - cm_panic(!queue->inited); + if (buf_rel != NULL) { + buf_rel->buf_count = msg_pool_attr->buf_pool_count; + for (int buf_pool_no = 0; buf_pool_no < msg_pool_attr->buf_pool_count; buf_pool_no++) { + buf_rel->origin_buf_size[buf_pool_no] = + msg_pool_attr->buf_pool_attr[buf_pool_no].buf_size; + } + } - buf_node = (mes_buffer_item_t *)temp_buffer; - queue->first = buf_node; - for (uint32 i = 1; i < queue->count; i++) { - temp_buffer += buf_item_size; - buf_node_next = (mes_buffer_item_t *)temp_buffer; - buf_node->chunk_info = queue->chunk_info; - buf_node->queue_no = queue->queue_no; - buf_node->next = buf_node_next; - buf_node = buf_node_next; + if (check_proportion) { + ret = mes_check_proportion_in_msg_pool_attr(msg_pool_attr); + if (ret != CM_SUCCESS) { + return ret; + } } - buf_node->chunk_info = queue->chunk_info; - buf_node->queue_no = queue->queue_no; - buf_node->next = NULL; - queue->last = buf_node; - queue->inited = CM_TRUE; -} -static mes_buf_queue_t *mes_get_buffer_queue(mes_buf_chunk_t *chunk) -{ - mes_buf_queue_t *queue = NULL; - queue = &chunk->queues[chunk->current_no % chunk->queue_num]; - chunk->current_no++; + // add sizeof(mes_message_head_t) + for (int buf_pool_no = 0; buf_pool_no < msg_pool_attr->buf_pool_count; buf_pool_no++) { + mes_msg_buffer_pool_attr_t *buf_pool_attr = &msg_pool_attr->buf_pool_attr[buf_pool_no]; + if (buf_pool_attr->buf_size == 0) { + LOG_RUN_ERR("[mes][msg pool] buf_pool_no:%u, buf_size should not be 0", + buf_pool_no); + return CM_ERROR; + } + buf_pool_attr->buf_size = buf_pool_attr->buf_size + sizeof(mes_message_head_t); + + for (mes_priority_t prio = 0; prio < profile->priority_cnt; prio++) { + uint32 queue_num = buf_pool_attr->priority_pool_attr[prio].queue_num; + if (queue_num == 0 || queue_num > MES_MAX_BUFFER_QUEUE_NUM) { + LOG_RUN_ERR("[mes][msg pool] buf_pool_no:%u, priority:%u, " + "queue_num:%u is invalid, queue num legal scope is [1, %u].", + buf_pool_no, prio, queue_num, MES_MAX_BUFFER_QUEUE_NUM); + return CM_ERROR; + } + } - if (!queue->inited) { - cm_spin_lock(&queue->init_lock, NULL); - if (!queue->inited) { - mes_format_buf_queue_memory(queue); + uint32 queue_num = buf_pool_attr->shared_pool_attr.queue_num; + if (queue_num == 0 || queue_num > MES_MAX_BUFFER_QUEUE_NUM) { + LOG_RUN_ERR("[mes][msg pool] buf_pool_no:%u, shared pool " + "queue_num:%u is invalid, queue num legal scope is [1, %u].", + buf_pool_no, queue_num, MES_MAX_BUFFER_QUEUE_NUM); + return CM_ERROR; } - cm_spin_unlock(&queue->init_lock); } - return queue; + + ret = mes_add_compress_size_in_check_msg_pool_attr(out_profile); + if (ret != CM_SUCCESS) { + return ret; + } + + if (buf_rel != NULL) { + for (int buf_pool_no = 0; buf_pool_no < msg_pool_attr->buf_pool_count; buf_pool_no++) { + buf_rel->changed_buf_size[buf_pool_no] = + msg_pool_attr->buf_pool_attr[buf_pool_no].buf_size; + } + } + + // sort + qsort(&msg_pool_attr->buf_pool_attr, msg_pool_attr->buf_pool_count, sizeof(mes_msg_buffer_pool_attr_t), + cmp_by_msg_buffer_pool_buf_size); + return CM_SUCCESS; } static void mes_init_buf_queue(mes_buf_queue_t *queue) @@ -115,302 +266,781 @@ static void mes_init_buf_queue(mes_buf_queue_t *queue) queue->inited = CM_FALSE; } -static int mes_create_buffer_queue(mes_buf_queue_t *queue, memory_chunk_t *mem_chunk, - mes_chunk_info_t chunk_info, uint8 queue_no, uint32 buf_count, uint32 buf_size) +static void mes_init_msg_buffer_pool_queues(mes_msg_buffer_pool_t *buf_pool, + bool8 is_shared, mes_priority_t priority, uint32 queue_num) { - uint64 mem_size; - uint64 buf_item_size; + mes_msg_buffer_inner_pool_t *inner_pool; + if (is_shared) { + inner_pool = &buf_pool->shared_pool; + } else { + inner_pool = &buf_pool->private_pool[priority]; + } - if (buf_count == 0) { - LOG_RUN_ERR("[mes]: mes_pool_size should greater than 0."); - return ERR_MES_PARAM_INVALID; + for (int i = 0; i < queue_num; i++) { + mes_buf_queue_t* queue = &inner_pool->queues[i]; + mes_init_buf_queue(queue); + queue->queue_no = i; + queue->buf_size = buf_pool->buf_size; + queue->count = 0; } +} - /* init queue */ - mes_init_buf_queue(queue); - queue->queue_no = queue_no; - queue->buf_size = buf_size; - queue->count = buf_count; - queue->chunk_info = chunk_info; +static void mes_init_msg_shared_pool(mes_msg_buffer_pool_t *buf_pool, memory_chunk_t *mem_chunk, + uint64 *actual_metadata_size, uint32 left_num) +{ + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + mes_msg_buffer_pool_tag_t *pool_tag = &buf_pool->tag; + uint8 buf_pool_no = pool_tag->buf_pool_no; + uint32 shared_queue_num = mpa->buf_pool_attr[buf_pool_no].shared_pool_attr.queue_num; + buf_pool->shared_pool.queue_num = shared_queue_num; + mes_msg_buffer_inner_pool_t *shared_pool = &buf_pool->shared_pool; + uint64 shared_queue_size = shared_queue_num * sizeof(mes_buf_queue_t); + char* addr = cm_alloc_memory_from_chunk(mem_chunk, shared_queue_size); + *actual_metadata_size += shared_queue_size; + shared_pool->queues = (mes_buf_queue_t*)addr; + shared_pool->pop_cursor = 0; + shared_pool->push_cursor = 0; + mes_init_msg_buffer_pool_queues(buf_pool, CM_TRUE, 0, shared_queue_num); + uint32 per_queue_buf_num = left_num / shared_queue_num; + uint32 left_buf_num = left_num % shared_queue_num; + for (int qn = 0; qn < shared_queue_num; qn++) { + shared_pool->queues[qn].init_count = per_queue_buf_num; + if (qn < left_buf_num) { + shared_pool->queues[qn].init_count++; + } + } +} - /* alloc memory from memory chunk */ - buf_item_size = (uint64)(sizeof(mes_buffer_item_t) + buf_size); - mem_size = (uint64)buf_count * buf_item_size; - queue->addr = cm_alloc_memory_from_chunk(mem_chunk, mem_size); +static void mes_init_msg_private_pool(mes_msg_buffer_pool_t *buf_pool, memory_chunk_t *mem_chunk, + uint64* actual_metadata_size, uint32* alloc_buffer_num) +{ + mes_profile_t *profile = &MES_GLOBAL_INST_MSG.profile; + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + *alloc_buffer_num = 0; + mes_msg_buffer_pool_tag_t *pool_tag = &buf_pool->tag; + uint8 buf_pool_no = buf_pool->tag.buf_pool_no; + for (int prio = 0; prio < buf_pool->priority_cnt; prio++) { + uint32 queue_num = mpa->buf_pool_attr[buf_pool_no].priority_pool_attr[prio].queue_num; + mes_msg_buffer_inner_pool_t *prio_pool = &buf_pool->private_pool[prio]; + prio_pool->queue_num = queue_num; + + uint64 priority_queue_size = queue_num * sizeof(mes_buf_queue_t); + char* addr = cm_alloc_memory_from_chunk(mem_chunk, priority_queue_size); + *actual_metadata_size += priority_queue_size; + prio_pool->queues = (mes_buf_queue_t*)addr; + mes_init_msg_buffer_pool_queues(buf_pool, CM_FALSE, prio, queue_num); + + prio_pool->pop_cursor = 0; + uint32 prio_buf_num = 0; + if (!pool_tag->is_send) { + prio_buf_num = profile->work_task_count[prio] * MSG_PRIORITY_POOL_BUFFER_NUM_MAGNIFICATION; + } else { + prio_buf_num = profile->send_task_count[prio] * MSG_PRIORITY_POOL_BUFFER_NUM_MAGNIFICATION; + } + uint32 per_queue_buf_num = prio_buf_num / queue_num; + uint32 left_buf_num = prio_buf_num % queue_num; + for (int qn = 0; qn < queue_num; qn++) { + prio_pool->queues[qn].init_count = per_queue_buf_num; + if (qn < left_buf_num) { + prio_pool->queues[qn].init_count++; + } + } + prio_pool->push_cursor = left_buf_num; + *alloc_buffer_num += prio_buf_num; + } +} - /* defer format memory to buffer item allocation, to speed mes_init */ - return CM_SUCCESS; +static void mes_init_msg_buffer_pool(uint8 buf_pool_no, memory_chunk_t *mem_chunk, + mes_msg_buffer_pool_t** buf_pool_ptr, uint64 available_size, + uint64 metadata_size, mes_msg_pool_tag_t *msg_pool_tag) +{ + uint64 actual_metadata_size = 0; + char* addr = cm_alloc_memory_from_chunk(mem_chunk, sizeof(mes_msg_buffer_pool_t)); + actual_metadata_size += sizeof(mes_msg_buffer_pool_t); + *buf_pool_ptr = (mes_msg_buffer_pool_t*)addr; + mes_msg_buffer_pool_t *buf_pool = (mes_msg_buffer_pool_t*)addr; + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + mes_msg_buffer_pool_attr_t *buf_attr = &mpa->buf_pool_attr[buf_pool_no]; + + buf_pool->tag.is_send = msg_pool_tag->is_send; + buf_pool->tag.enable_inst_dimension = msg_pool_tag->enable_inst_dimension; + buf_pool->tag.inst_id = msg_pool_tag->inst_id; + buf_pool->tag.buf_pool_no = buf_pool_no; + buf_pool->buf_size = buf_attr->buf_size; + buf_pool->buf_num = available_size / (buf_attr->buf_size + sizeof(mes_buffer_item_t)); + buf_pool->priority_cnt = MES_GLOBAL_INST_MSG.profile.priority_cnt; + + uint32 alloc_buffer_num = 0; + mes_init_msg_private_pool(buf_pool, mem_chunk, &actual_metadata_size, &alloc_buffer_num); + if (alloc_buffer_num > buf_pool->buf_num) { + cm_panic_log(0, "[mes][msg pool] we already check pool whether enough and check pass, " + "but now pool size is not enough. something unexpected happen, already alloc_buffer_num:%u, " + "total num:%u", + alloc_buffer_num, buf_pool->buf_num); + } + uint32 all_left_num = buf_pool->buf_num - alloc_buffer_num; + mes_init_msg_shared_pool(buf_pool, mem_chunk, &actual_metadata_size, all_left_num); + + CM_ASSERT(metadata_size == actual_metadata_size); + buf_pool->mem_chunk.addr = cm_alloc_memory_from_chunk(mem_chunk, available_size); + buf_pool->mem_chunk.offset = 0; + buf_pool->mem_chunk.total_size = available_size; + GS_INIT_SPIN_LOCK(buf_pool->mem_chunk_lock); + buf_pool->pop_priority = 0; + buf_pool->need_recycle = CM_FALSE; + uint32 per_buf_count = buf_pool->shared_pool.queues[0].init_count; + buf_pool->recycle_threshold = per_buf_count * MSG_BUF_POOL_THRESHOLD_RATIO; + buf_pool->inited = CM_TRUE; + LOG_DEBUG_INF("[mes][msg pool][buf pool] buf_pool_no:%d, buf_size:%u, buf_num:%u, " + "buf_pool {metadata size:%llu, msg actual size:%llu}", + buf_pool_no, buf_pool->buf_size, buf_pool->buf_num, metadata_size, + available_size); } -static void mes_set_buffer_queue_count(mes_buf_chunk_t *chunk, uint32 queue_num, uint32 total_count) +static void mes_assemble_msg_pool_proportion_print_info_error_branch(char* buf) { - uint32 buf_count; - uint32 buf_leftover; + (void)snprintf_s(buf, MSG_POOL_TEMP_STR_LEN, MSG_POOL_TEMP_STR_LEN - 1, + "%s", "assemble proportion info occur something wrong"); +} - buf_count = total_count / queue_num; - buf_leftover = total_count % queue_num; +static void mes_assemble_msg_pool_proportion_print_info(char* buf) +{ + int ret; + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + for (uint8 i = 0; i < mpa->buf_pool_count; i++) { + char temp_buf[MSG_POOL_SHORT_TEMP_STR_LEN] = { 0 }; + ret = snprintf_s(temp_buf, MSG_POOL_SHORT_TEMP_STR_LEN, MSG_POOL_SHORT_TEMP_STR_LEN - 1, + "buf_pool_no:%u, proportion:%f;", + i, mpa->buf_pool_attr[i].proportion); + if (ret < 0) { + mes_assemble_msg_pool_proportion_print_info_error_branch(buf); + return; + } + ret = strcat_s(buf, MSG_POOL_TEMP_STR_LEN, temp_buf); + if (ret != EOK) { + mes_assemble_msg_pool_proportion_print_info_error_branch(buf); + return; + } + } +} + +static int mes_get_buffer_pool_minimum_size(mes_profile_t *profile, bool8 is_send, + uint64 *buffer_pool_minimum_list) +{ + unsigned int all_task_count = 0; + unsigned int *task_count = is_send ? profile->send_task_count : profile->work_task_count; + + for (int prio = 0; prio < profile->priority_cnt; prio++) { + if (task_count[prio] > MES_MAX_TASK_NUM) { + LOG_RUN_ERR("[mes][msg pool] %s thread count:%u, legal scope is [1, %u]", + is_send ? "send" : "work", + task_count[prio], MES_MAX_TASK_NUM); + return CM_ERROR; + } - for (uint32 i = 0; i < queue_num; i++) { - chunk->queues[i].count = buf_count; + if (task_count[prio] == 0) { + all_task_count += 1; + } else { + all_task_count += task_count[prio]; + } } - for (uint32 i = 0; i < buf_leftover; i++) { - chunk->queues[i].count++; + mes_msg_pool_attr_t *mpa = &profile->msg_pool_attr; + for (int buf_pool_no = 0; buf_pool_no < mpa->buf_pool_count; buf_pool_no++) { + uint32 buf_item_size = mpa->buf_pool_attr[buf_pool_no].buf_size + sizeof(mes_buffer_item_t); + buffer_pool_minimum_list[buf_pool_no] = (uint64)buf_item_size * + all_task_count * MSG_PRIORITY_POOL_BUFFER_NUM_MAGNIFICATION; } + return CM_SUCCESS; +} - return; +static int mes_check_msg_pool_size_whether_enough(bool8 is_send, uint64 metadata_size, uint64 pool_size) +{ + mes_profile_t *profile = &MES_GLOBAL_INST_MSG.profile; + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + if (pool_size < metadata_size) { + LOG_RUN_ERR("[mes][msg pool] pool size is not enough. " + "pool size is less than metadata size, pool_size:%llu, metadata_size:%llu.", + pool_size, metadata_size); + return CM_ERROR; + } + + int ret = CM_SUCCESS; + uint64 left_size = pool_size - metadata_size; + uint64 buffer_pool_minimum_list[MES_MAX_BUFFPOOL_NUM] = { 0 }; + ret = mes_get_buffer_pool_minimum_size(profile, is_send, buffer_pool_minimum_list); + if (ret != CM_SUCCESS) { + return ret; + } + + for (int buf_pool_no = 0; buf_pool_no < mpa->buf_pool_count; buf_pool_no++) { + uint64 actual_alloc = (uint64)(left_size * mpa->buf_pool_attr[buf_pool_no].proportion); + if (actual_alloc < buffer_pool_minimum_list[buf_pool_no]) { + ret = CM_ERROR; + } + } + + if (ret != CM_SUCCESS) { + char buf[MSG_POOL_TEMP_STR_LEN] = { 0 }; + mes_assemble_msg_pool_proportion_print_info(buf); + uint64 at_least_size = 0; + uint64 tmp_size = 0; + for (int buf_pool_no = 0; buf_pool_no < mpa->buf_pool_count; buf_pool_no++) { + tmp_size = (uint64)((double)buffer_pool_minimum_list[buf_pool_no] / + mpa->buf_pool_attr[buf_pool_no].proportion) + 1; + if (tmp_size > at_least_size) { + at_least_size = tmp_size; + } + } + at_least_size += metadata_size; + LOG_RUN_ERR("[mes][msg pool] pool size is not enough. size:%llu. " + "if keep the parameter unchanged(%s), msg_buffer_pool need at least size:%llu.", + pool_size, buf, at_least_size); + return ret; + } + return ret; } -static int mes_create_buffer_chunk(mes_buf_chunk_t *chunk, memory_chunk_t *mem_chunk, mes_chunk_info_t chunk_info, - uint32 queue_count, const mes_buffer_attr_t *buf_attr) +static int mes_get_buffer_pool_metadata_size(mes_profile_t *profile, uint8 buf_pool_no) { - errno_t ret; - uint64 queues_size = (uint64)(queue_count * sizeof(mes_buf_queue_t)); + mes_msg_pool_attr_t *mpa = &profile->msg_pool_attr; + mes_msg_buffer_pool_attr_t *buf_attr = &mpa->buf_pool_attr[buf_pool_no]; - if (queue_count == 0 || queue_count > MES_MAX_BUFFER_QUEUE_NUM) { - LOG_RUN_ERR("[mes]: pool_count %u is invalid, legal scope is [1, %d].", queue_count, MES_MAX_BUFFPOOL_NUM); - return ERR_MES_PARAM_INVALID; + uint64 metadata_size = sizeof(mes_msg_buffer_pool_t); + uint32 queue_num = 0; + for (int i = 0; i < profile->priority_cnt; i++) { + queue_num += buf_attr->priority_pool_attr[i].queue_num; } + queue_num += buf_attr->shared_pool_attr.queue_num; + metadata_size += queue_num * sizeof(mes_buf_queue_t); + return metadata_size; +} - chunk->queues = (mes_buf_queue_t *)cm_alloc_memory_from_chunk(mem_chunk, queue_count * sizeof(mes_buf_queue_t)); - ret = memset_sp(chunk->queues, queues_size, 0, queues_size); - if (ret != EOK) { - return ERR_MES_MEMORY_SET_FAIL; +static uint64 mes_get_msg_pool_metadata_size(mes_profile_t *profile, uint64 *buf_pool_metadata) +{ + uint64 all_metadata_size = sizeof(mes_msg_pool_t); + for (uint8 i = 0; i < profile->msg_pool_attr.buf_pool_count; i++) { + uint64 buffer_pool_metadata = mes_get_buffer_pool_metadata_size(profile, i); + buf_pool_metadata[i] = buffer_pool_metadata; + all_metadata_size += buffer_pool_metadata; } + return all_metadata_size; +} - chunk->chunk_no = (uint8)chunk_info.chunk_no; - chunk->buf_size = buf_attr->size; - chunk->queue_num = (uint8)queue_count; - chunk->current_no = 0; +int mes_init_msg_pool(mes_msg_pool_t **msg_pool_ptr, uint64 pool_size, + mes_msg_pool_tag_t *tag) +{ + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; - mes_set_buffer_queue_count(chunk, queue_count, buf_attr->count); + char* addr = cm_malloc_prot(pool_size); + *msg_pool_ptr = (mes_msg_pool_t*)addr; + mes_msg_pool_t *msg_pool = *msg_pool_ptr; + int ret; - for (uint32 i = 0; i < queue_count; i++) { - ret = mes_create_buffer_queue(&chunk->queues[i], mem_chunk, chunk_info, (uint8)i, - chunk->queues[i].count, buf_attr->size); - if (ret != CM_SUCCESS) { - LOG_RUN_ERR("[mes]: create buf queue failed."); - return ret; - } + msg_pool->mem_chunk.addr = addr; + msg_pool->mem_chunk.offset = sizeof(mes_msg_pool_t); + msg_pool->mem_chunk.total_size = pool_size; + msg_pool->tag = *tag; + msg_pool->size = pool_size; + msg_pool->buf_pool_count = mpa->buf_pool_count; + uint64 metadata_size[MES_MAX_BUFFPOOL_NUM] = { 0 }; + uint64 all_metadata_size = mes_get_msg_pool_metadata_size(&MES_GLOBAL_INST_MSG.profile, metadata_size); + ret = mes_check_msg_pool_size_whether_enough(tag->is_send, all_metadata_size, pool_size); + if (ret != CM_SUCCESS) { + return ret; } + uint64 all_left_size = pool_size - all_metadata_size; + uint64 left_size = all_left_size; + uint64 allowed_size = 0; + for (uint8 buf_pool_no = 0; buf_pool_no < mpa->buf_pool_count; buf_pool_no++) { + if (buf_pool_no == mpa->buf_pool_count - 1) { + allowed_size = left_size; // reduce waste + } else { + allowed_size = all_left_size * mpa->buf_pool_attr[buf_pool_no].proportion; + } + uint32 buf_item_size = mpa->buf_pool_attr[buf_pool_no].buf_size + sizeof(mes_buffer_item_t); + allowed_size = allowed_size - (allowed_size % buf_item_size); + mes_init_msg_buffer_pool(buf_pool_no, &msg_pool->mem_chunk, &msg_pool->buf_pool[buf_pool_no], + allowed_size, metadata_size[buf_pool_no], &msg_pool->tag); + left_size = left_size - allowed_size; + msg_pool->buf_pool[buf_pool_no]->msg_pool = msg_pool; + } return CM_SUCCESS; } -uint64 mes_calc_message_pool_size(mes_profile_t *profile, uint32 priority) +void mes_deinit_msg_pool(mes_msg_pool_t **msg_pool) { - uint64 total_size = 0; - total_size += (uint64)sizeof(mes_pool_t); + if (*msg_pool == NULL) { + return; + } + cm_free_prot(*msg_pool); + *msg_pool = NULL; +} - uint32 pool_count = profile->buffer_pool_attr[priority].pool_count; - uint32 queue_count = 0; - uint32 buf_size = 0; - uint32 buf_count = 0; +int mes_init_msg_single_pool(bool8 is_send) +{ + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + mes_msg_pool_tag_t tag = { + .is_send = is_send, + .enable_inst_dimension = CM_FALSE, + .inst_id = 0, + }; + LOG_DEBUG_INF("[mes][msg pool] init single pool, is_send:%u", is_send); + int ret = mes_init_msg_pool(&mq_ctx->single_pool, mpa->total_size, &tag); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] init single pool failed, is_send:%u", is_send); + mes_deinit_msg_pool(&mq_ctx->single_pool); + return ret; + } + LOG_DEBUG_INF("[mes][msg pool] init single pool success, is_send:%u", is_send); + return ret; +} - for (uint32 i = 0; i < pool_count; i++) { - queue_count = profile->buffer_pool_attr[priority].queue_count; - buf_count = profile->buffer_pool_attr[priority].buf_attr[i].count; - buf_size = profile->buffer_pool_attr[priority].buf_attr[i].size; +int mes_init_msg_inst_pool_set(bool8 is_send) +{ + int ret = CM_SUCCESS; + mes_profile_t *profile = &MES_GLOBAL_INST_MSG.profile; + mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + mes_msg_pool_attr_t *mpa = &MES_GLOBAL_INST_MSG.profile.msg_pool_attr; + mes_msg_inst_pool_set_t *pool_set = &mq_ctx->inst_pool_set; - total_size += (uint64)(queue_count * sizeof(mes_buf_queue_t)); - total_size += ((uint64)(sizeof(mes_buffer_item_t) + buf_size)) * buf_count; + pool_set->total_size = mpa->total_size; + if (is_send) { + pool_set->inst_pool_count = profile->inst_cnt - 1; + } else { + pool_set->inst_pool_count = profile->inst_cnt; } + pool_set->per_inst_pool_size = pool_set->total_size / pool_set->inst_pool_count; + + LOG_DEBUG_INF("[mes][msg pool] init instance pool set, is_send:%u", is_send); + for (uint8 inst_id = 0; inst_id < pool_set->inst_pool_count; inst_id++) { + if (is_send && inst_id == MES_GLOBAL_INST_MSG.profile.inst_id) { + pool_set->inst_pool[inst_id] = NULL; + continue; + } - return total_size; + mes_msg_pool_tag_t tag = { + .is_send = is_send, + .enable_inst_dimension = CM_TRUE, + .inst_id = inst_id, + }; + LOG_DEBUG_INF("[mes][msg pool] init instance pool, inst_id:%u, is_send:%u", + inst_id, is_send); + ret = mes_init_msg_pool(&pool_set->inst_pool[inst_id], pool_set->per_inst_pool_size, &tag); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] init instance pool failed, inst_id:%u, is_send:%u", + inst_id, is_send); + for (int i = inst_id; i >= 0; i--) { + mes_deinit_msg_pool(&pool_set->inst_pool[i]); + } + return ret; + } + LOG_DEBUG_INF("[mes][msg pool] init instance pool success, inst_id:%u, is_send:%u", + inst_id, is_send); + } + return ret; } -mes_pool_t *mes_alloc_message_pool(bool32 is_send, uint32 inst_id, uint32 priority) +int mes_init_message_pool(bool8 is_send) { - uint64 total_size = mes_calc_message_pool_size(&MES_GLOBAL_INST_MSG.profile, priority); - char *addr = cm_malloc_prot(total_size); - if (addr == NULL) { - LOG_RUN_ERR("[mes]failed to allocate memory for message pool, total_size = %llu," - "priority:%u, inst_id:%u, is_send:%u,", total_size, priority, inst_id, is_send); - return NULL; + int ret = CM_SUCCESS; + mes_profile_t *profile = &MES_GLOBAL_INST_MSG.profile; + mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + + cm_spin_lock(&mq_ctx->msg_pool_init_lock, NULL); + if (mq_ctx->msg_pool_inited) { + cm_spin_unlock(&mq_ctx->msg_pool_init_lock); + LOG_DEBUG_INF("[mes][msg pool] no need to reinit message pool, already inited."); + return CM_SUCCESS; } - mes_pool_t *pool = (mes_pool_t *)addr; - if (memset_s(pool, sizeof(mes_pool_t), 0, sizeof(mes_pool_t)) != EOK) { - cm_free_prot(addr); - cm_panic(0); - return NULL; + if (is_send && profile->send_directly && + (profile->enable_compress_priority == 0 || profile->algorithm == COMPRESS_NONE || + profile->algorithm >= COMPRESS_CEIL)) { + // send_directly and disable compress + cm_spin_unlock(&mq_ctx->msg_pool_init_lock); + LOG_RUN_INF("[mes][msg pool] no need to init send message pool, cause send directly and compress disable."); + return CM_SUCCESS; } - pool->mem_chunk.addr = addr; - pool->mem_chunk.offset = (uint64)sizeof(mes_pool_t); - pool->mem_chunk.total_size = total_size; - return pool; + if (!mq_ctx->enable_inst_dimension) { + ret = mes_init_msg_single_pool(is_send); + } else { + ret = mes_init_msg_inst_pool_set(is_send); + } + mq_ctx->msg_pool_inited = CM_TRUE; + cm_spin_unlock(&mq_ctx->msg_pool_init_lock); + return ret; } -void mes_free_message_pool(mes_pool_t *pool) +void mes_deinit_message_pool(mq_context_t *mq_ctx) { - /* - * DO NOT USE CM_FREE_PROT_PTR, because the freed memory contains msg_pool variable's memory. - * when the memory is freed, can't assign anything to it. - */ - cm_panic(pool->mem_chunk.addr != NULL); - cm_free_prot(pool->mem_chunk.addr); + cm_spin_lock(&mq_ctx->msg_pool_init_lock, NULL); + if (!mq_ctx->enable_inst_dimension) { + mes_deinit_msg_pool(&mq_ctx->single_pool); + } else { + mes_msg_inst_pool_set_t *set = &mq_ctx->inst_pool_set; + for (int i = 0; i < set->inst_pool_count; i++) { + mes_deinit_msg_pool(&set->inst_pool[i]); + } + } + mq_ctx->msg_pool_inited = CM_FALSE; + cm_spin_unlock(&mq_ctx->msg_pool_init_lock); } -int mes_init_message_pool(bool32 is_send, uint32 inst_id, mes_priority_t priority) +void mes_deinit_all_message_pool() { - int ret; - mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + mes_deinit_message_pool(&MES_GLOBAL_INST_MSG.send_mq); + mes_deinit_message_pool(&MES_GLOBAL_INST_MSG.recv_mq); +} - if (inst_id >= MES_MAX_INSTANCES || priority >= MES_PRIORITY_CEIL) { - LOG_RUN_ERR("[mes] mes_init_message_pool failed, invalid inst_id[%u] or priority[%u], is_send:%u.", - inst_id, priority, is_send); - return ERR_MES_PARAM_INVALID; +mes_buffer_item_t* mes_get_buf_item_from_queue(mes_buf_queue_t *queue, uint32 *count) +{ + mes_buffer_item_t *buf_item = NULL; + cm_spin_lock(&queue->lock, NULL); + if (queue->count > 0) { + buf_item = queue->first; + queue->count--; + if (queue->count == 0) { + queue->first = NULL; + queue->last = NULL; + } else { + queue->first = buf_item->next; + CM_ASSERT(queue->first != NULL); + } + CM_ASSERT(buf_item != NULL); + buf_item->next = NULL; + } + + if (count != NULL) { + *count = queue->count; + } + cm_spin_unlock(&queue->lock); + return buf_item; +} + +static void mes_put_buf_item_to_queue(mes_buffer_item_t *buf_item, mes_buf_queue_t *queue, + uint32 *count) +{ + cm_spin_lock(&queue->lock, NULL); + if (queue->count > 0) { + queue->last->next = buf_item; + queue->last = buf_item; + } else { + queue->first = buf_item; + queue->last = buf_item; } + buf_item->next = NULL; + queue->count++; - if ((MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].pool_count == 0) || - (MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].pool_count > MES_MAX_BUFFPOOL_NUM)) { - LOG_RUN_ERR("[mes] pool_count %u is invalid, legal scope is [1, %d], priority:%u, inst_id:%u, is_send:%u.", - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].pool_count, MES_MAX_BUFFPOOL_NUM, priority, inst_id, - is_send); - return ERR_MES_PARAM_INVALID; + if (count != NULL) { + *count = queue->count; } + cm_spin_unlock(&queue->lock); + return; +} + +static void mes_init_buffer_item_tag(mes_buffer_item_tag_t *tag, mes_msg_buffer_pool_tag_t *pool_tag, + bool8 is_shared, mes_priority_t priority, uint8 queue_no) +{ + tag->is_send = pool_tag->is_send; + tag->inst_id = pool_tag->inst_id; + tag->buf_pool_no = pool_tag->buf_pool_no; + tag->is_shared = is_shared; + tag->priority = priority; + tag->queue_no = queue_no; +} - mes_pool_t *curr_pool = mes_alloc_message_pool(is_send, inst_id, priority); - if (curr_pool == NULL) { - return ERR_MES_MALLOC_FAIL; +static void mes_format_buf_for_queue(mes_buf_queue_t *queue, mes_msg_buffer_pool_t *buf_pool, + bool8 is_shared, mes_priority_t priority) +{ + uint64 buf_item_size = buf_pool->buf_size + sizeof(mes_buffer_item_t); + uint32 init_count = queue->init_count; + if (init_count == 0) { + queue->inited = CM_TRUE; + return; + } + + uint64 size = init_count * buf_item_size; + cm_spin_lock(&buf_pool->mem_chunk_lock, NULL); + char* addr = cm_alloc_memory_from_chunk(&buf_pool->mem_chunk, size); + cm_spin_unlock(&buf_pool->mem_chunk_lock); + + mes_buffer_item_t *buf_item = (mes_buffer_item_t*)addr; + mes_buffer_item_t *next_buf_item = NULL; + mes_msg_buffer_pool_tag_t *pool_tag = &buf_pool->tag; + for (uint32 i = 0; i < init_count - 1; i++) { + addr += buf_item_size; + next_buf_item = (mes_buffer_item_t*)addr; + mes_init_buffer_item_tag(&buf_item->tag, pool_tag, is_shared, priority, queue->queue_no); + mes_put_buf_item_to_queue(buf_item, queue, NULL); + buf_item = next_buf_item; } - curr_pool->count = MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].pool_count; - mes_buffer_pool_attr_t *pool_attr = &MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority]; - for (uint32 i = 0; i < curr_pool->count; i++) { - mes_chunk_info_t chunk_info = {.inst_id = inst_id, .priority = priority, .chunk_no = i, .is_send = is_send}; - ret = mes_create_buffer_chunk(&curr_pool->chunk[i], &curr_pool->mem_chunk, - chunk_info, pool_attr->queue_count, &pool_attr->buf_attr[i]); - if (ret != CM_SUCCESS) { - mes_free_message_pool(curr_pool); - LOG_RUN_ERR("[mes] create buf chunk failed, priority:%u.", priority); - return ret; + mes_init_buffer_item_tag(&buf_item->tag, pool_tag, is_shared, priority, queue->queue_no); + buf_item->next = NULL; + mes_put_buf_item_to_queue(buf_item, queue, NULL); + + queue->inited = CM_TRUE; + return; +} + +static mes_buf_queue_t *mes_get_priority_private_queue(mes_msg_buffer_pool_t *buf_pool, mes_priority_t priority) +{ + mes_msg_buffer_inner_pool_t *priority_pool = &buf_pool->private_pool[priority]; + if (priority_pool->queue_num == 0) { + cm_panic_log(0, "[mes] get priority:%u queue failed, queue_num is zero, buf_pool_no:%u.", + priority, buf_pool->tag.buf_pool_no); + } + + int32 pop = cm_atomic32_inc(&priority_pool->pop_cursor); + mes_buf_queue_t *queue = &priority_pool->queues[pop % priority_pool->queue_num]; + if (!queue->inited) { + cm_spin_lock(&queue->init_lock, NULL); + if (!queue->inited) { + mes_format_buf_for_queue(queue, buf_pool, CM_FALSE, priority); } + cm_spin_unlock(&queue->init_lock); } + return queue; +} - mq_ctx->msg_pool[inst_id][priority] = curr_pool; - return CM_SUCCESS; +static mes_buffer_item_t* mes_get_buf_item_from_private_pool(mes_msg_buffer_pool_t *buf_pool, mes_priority_t priority) +{ + mes_buf_queue_t *queue = mes_get_priority_private_queue(buf_pool, priority); + if (queue == NULL) { + return NULL; + } + return mes_get_buf_item_from_queue(queue, NULL); } -void mes_destroy_all_message_pool() +static mes_buffer_item_t* mes_get_buf_item_from_shared_pool(mes_msg_buffer_pool_t *buf_pool, + bool8 enable_flow_control) { - uint32 i; - uint32 priority; + mes_buffer_item_t *buf_item = NULL; + mes_msg_buffer_inner_pool_t *shared_pool = &buf_pool->shared_pool; + int32 pop = cm_atomic32_inc(&shared_pool->pop_cursor); + mes_buf_queue_t *queue = &shared_pool->queues[pop % shared_pool->queue_num]; + + if (!queue->inited) { + cm_spin_lock(&queue->init_lock, NULL); + if (!queue->inited) { + mes_format_buf_for_queue(queue, buf_pool, CM_TRUE, 0); + } + cm_spin_unlock(&queue->init_lock); + } - for (i = 0; i < MES_MAX_INSTANCES; i++) { - for (priority = 0; priority < MES_PRIORITY_CEIL; priority++) { - mes_destroy_message_pool(CM_TRUE, i, priority); - mes_destroy_message_pool(CM_FALSE, i, priority); + uint32 buf_count = 0; + if (enable_flow_control) { + cm_spin_lock(&queue->lock, NULL); + buf_count = queue->count; + cm_spin_unlock(&queue->lock); + if (buf_count > 0 && queue->init_count / buf_count <= RECV_MSG_POOL_FC_THRESHOLD) { + return NULL; } } + + buf_item = mes_get_buf_item_from_queue(queue, &buf_count); + if (buf_count < buf_pool->recycle_threshold) { + buf_pool->need_recycle = CM_TRUE; + buf_pool->recycle_queue_no = pop % shared_pool->queue_num; + } + return buf_item; } -void mes_destroy_message_pool(bool32 is_send, uint32 inst_id, mes_priority_t priority) +static mes_buffer_item_t* mes_steal_buf_item_from_other_private_pool(mes_msg_buffer_pool_t *buf_pool, + mes_priority_t priority) { - if (inst_id >= MES_MAX_INSTANCES || priority >= MES_PRIORITY_CEIL) { - LOG_RUN_WAR("[mes] mes_destroy_message_pool invalid inst_id[%u] or priority[%u]", inst_id, priority); - return; + mes_buf_queue_t *queue; + int32 pop_priority = cm_atomic32_inc(&buf_pool->pop_priority) % buf_pool->priority_cnt; + if (pop_priority == priority) { + // no need steal self + return NULL; + } + + queue = mes_get_priority_private_queue(buf_pool, pop_priority); + if (queue == NULL) { + return NULL; + } + + mes_buffer_item_t *buf_item = mes_get_buf_item_from_queue(queue, NULL); + if (buf_item != NULL && !buf_item->tag.is_shared) { + mes_put_buf_item_to_queue(buf_item, queue, NULL); + buf_item = NULL; } + return buf_item; +} + +static mes_msg_buffer_pool_t* mes_get_buf_pool_by_buf_tag(mes_buffer_item_tag_t *buf_tag) +{ + bool8 is_send = buf_tag->is_send; mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; - mes_pool_t *msg_pool = mq_ctx->msg_pool[inst_id][priority]; - if (msg_pool == NULL) { - return; + if (!mq_ctx->msg_pool_inited) { + int ret = mes_init_message_pool(is_send); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] mes init msg pool failed, is_send:%u.", + is_send); + return NULL; + } + + if (!mq_ctx->msg_pool_inited) { + LOG_DEBUG_INF("[mes][msg pool] msg pool(is_send:%u) is not inited, " + "so can not get buf_pool.", + is_send); + return NULL; + } } - - mes_free_message_pool(msg_pool); - mq_ctx->msg_pool[inst_id][priority] = NULL; + + if (!mq_ctx->enable_inst_dimension) { + return mq_ctx->single_pool->buf_pool[buf_tag->buf_pool_no]; + } + return mq_ctx->inst_pool_set.inst_pool[buf_tag->inst_id]->buf_pool[buf_tag->buf_pool_no]; } -char *mes_alloc_buf_item(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority) +static mes_msg_buffer_pool_t* mes_get_buf_pool(bool8 is_send, uint32 dst_inst, uint32 len) { - mes_buf_chunk_t *chunk = NULL; - mes_buf_queue_t *queue = NULL; - mes_buffer_item_t *buf_node = NULL; - uint32 find_times = 0; + mq_context_t *mq_ctx = is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; + if (!mq_ctx->msg_pool_inited) { + int ret = mes_init_message_pool(is_send); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] mes init msg pool failed, is_send:%u.", + is_send); + return NULL; + } - if (MES_GLOBAL_INST_MSG.mes_ctx.phase != SHUTDOWN_PHASE_NOT_BEGIN) { - LOG_DEBUG_ERR("[mes] mes_alloc_buf_item fail, phase %u", MES_GLOBAL_INST_MSG.mes_ctx.phase); - return NULL; + if (!mq_ctx->msg_pool_inited) { + LOG_DEBUG_INF("[mes][msg pool] msg pool(is_send:%u) is not inited, " + "so can not get buf_pool.", + is_send); + return NULL; + } } - chunk = mes_get_buffer_chunk(len, is_send, dst_inst, priority); - if (chunk == NULL) { - LOG_RUN_ERR("[mes]: Get buffer failed."); + if (!mq_ctx->enable_inst_dimension) { + mes_msg_pool_t *single_pool = mq_ctx->single_pool; + for (uint32 i = 0; i < single_pool->buf_pool_count; i++) { + mes_msg_buffer_pool_t *buf_pool = single_pool->buf_pool[i]; + if (len <= buf_pool->buf_size) { + return buf_pool; + } + } + LOG_RUN_ERR("[mes] There is not long enough buffer for this message. " + "message len:%u, is_send:%u, dst_inst:%u.", + len, is_send, dst_inst); return NULL; } - do { - queue = mes_get_buffer_queue(chunk); - cm_spin_lock(&queue->lock, NULL); - if (queue->count > 0) { - buf_node = queue->first; - queue->count--; - if (queue->count == 0) { - queue->first = NULL; - queue->last = NULL; - } else { - queue->first = buf_node->next; - } - CM_ASSERT(buf_node != NULL); - buf_node->next = NULL; - cm_spin_unlock(&queue->lock); - break; - } else { - cm_spin_unlock(&queue->lock); - find_times++; - if ((find_times % chunk->queue_num) == 0) { - LOG_RUN_WAR_INHIBIT(LOG_INHIBIT_LEVEL5, "[mes]: There is no buffer, sleep and try again."); - cm_sleep(1); + mes_msg_inst_pool_set_t *pool_set = &mq_ctx->inst_pool_set; + if (pool_set->inst_pool[dst_inst] == NULL) { + // occur when cluster add new instance + cm_spin_lock(&mq_ctx->msg_pool_init_lock, NULL); + if (pool_set->inst_pool[dst_inst] == NULL) { + mes_msg_pool_tag_t tag = { + .is_send = is_send, + .enable_inst_dimension = CM_TRUE, + .inst_id = dst_inst, + }; + LOG_RUN_INF("[mes][msg pool] receive inst:%u message, this instance message pool not inited. " + "now we init this message pool, is_send:%u", + dst_inst, is_send); + int ret = mes_init_msg_pool(&pool_set->inst_pool[dst_inst], pool_set->per_inst_pool_size, &tag); + if (ret != CM_SUCCESS) { + cm_spin_unlock(&mq_ctx->msg_pool_init_lock); + LOG_RUN_ERR("[mes][msg pool] init inst pool failed, " + "inst_id:%u, is_send:%u, enable_inst_dimension:%u", + dst_inst, is_send, CM_TRUE); + mes_deinit_msg_pool(&pool_set->inst_pool[dst_inst]); + return NULL; } + LOG_RUN_INF("[mes][msg pool] init instance:%u message pool success, is_send:%u.", + dst_inst, is_send); + pool_set->total_size = pool_set->total_size + pool_set->per_inst_pool_size; + pool_set->inst_pool_count++; + } + cm_spin_unlock(&mq_ctx->msg_pool_init_lock); + } + + mes_msg_pool_t *inst_pool = pool_set->inst_pool[dst_inst]; + for (int i = 0; i < inst_pool->buf_pool_count; i++) { + mes_msg_buffer_pool_t *buf_pool = inst_pool->buf_pool[i]; + if (len <= buf_pool->buf_size) { + return buf_pool; } - } while (buf_node == NULL); + } - return buf_node->data; + LOG_RUN_ERR("[mes] There is not long enough buffer for this message. " + "message len:%u, is_send:%u, dst_inst:%u.", + len, is_send, dst_inst); + return NULL; } -char *mes_alloc_buf_item_fc(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority) +char* mes_alloc_buf_item_inner(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority, + bool8 enable_flow_control) { - mes_buf_chunk_t *chunk = NULL; - mes_buf_queue_t *queue = NULL; - mes_buffer_item_t *buf_node = NULL; + mes_buffer_item_t *buf_item = NULL; uint32 find_times = 0; - chunk = mes_get_buffer_chunk(len, is_send, dst_inst, priority); - if (chunk == NULL) { - LOG_RUN_ERR("[mes]: Get buffer failed."); + if (MES_GLOBAL_INST_MSG.mes_ctx.phase != SHUTDOWN_PHASE_NOT_BEGIN) { + LOG_DEBUG_ERR("[mes] mes_alloc_buf_item_inner fail, phase %u", MES_GLOBAL_INST_MSG.mes_ctx.phase); return NULL; } - uint32_t count = - MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[chunk->chunk_no].count / chunk->queue_num; + mes_msg_buffer_pool_t *buf_pool = mes_get_buf_pool(is_send, dst_inst, len); + if (buf_pool == NULL) { + LOG_RUN_ERR("[mes]: get buf_pool failed."); + return NULL; + } + mes_msg_buffer_pool_t *begin_buf_pool = buf_pool; + mes_msg_pool_t *msg_pool = NULL; do { - queue = mes_get_buffer_queue(chunk); - cm_spin_lock(&queue->lock, NULL); - if (queue->count > 0 && count / queue->count <= RECV_MSG_POOL_FC_THRESHOLD) { - buf_node = queue->first; - queue->count--; - if (queue->count == 0) { - queue->first = NULL; - queue->last = NULL; - } else { - queue->first = buf_node->next; + do { + buf_item = mes_get_buf_item_from_private_pool(buf_pool, priority); + if (buf_item != NULL) { + break; } - buf_node->next = NULL; - cm_spin_unlock(&queue->lock); - break; - } else { - cm_spin_unlock(&queue->lock); + + buf_item = mes_get_buf_item_from_shared_pool(buf_pool, enable_flow_control); + if (buf_item != NULL) { + break; + } + + buf_item = mes_steal_buf_item_from_other_private_pool(buf_pool, priority); + if (buf_item != NULL) { + break; + } + find_times++; - if ((find_times % chunk->queue_num) == 0) { + if (find_times % buf_pool->private_pool[priority].queue_num == 0) { LOG_RUN_WAR_INHIBIT(LOG_INHIBIT_LEVEL5, "[mes]: There is no buffer, sleep and try again."); cm_sleep(1); + break; // try anthor buf_pool + } + } while (buf_item == NULL); + + if (buf_item == NULL) { + uint8 next_buf_pool_no = buf_pool->tag.buf_pool_no + 1; + msg_pool = (mes_msg_pool_t*)buf_pool->msg_pool; + if (next_buf_pool_no >= msg_pool->buf_pool_count) { + buf_pool = begin_buf_pool; + } else { + buf_pool = msg_pool->buf_pool[next_buf_pool_no]; } } - } while (buf_node == NULL); + } while (buf_item == NULL); + return buf_item->data; +} + +char *mes_alloc_buf_item(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority) +{ + return mes_alloc_buf_item_inner(len, is_send, dst_inst, priority, CM_FALSE); +} - return buf_node->data; +char *mes_alloc_buf_item_fc(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority) +{ + return mes_alloc_buf_item_inner(len, is_send, dst_inst, priority, CM_TRUE); } static void mes_release_buf_stat(uint32 cmd) @@ -432,54 +1062,236 @@ void mes_free_buf_item(char *buffer) return; } - mes_buffer_item_t *buf_item = (mes_buffer_item_t *)(buffer - MES_BUFFER_ITEM_SIZE); - mes_chunk_info_t chunk_info = buf_item->chunk_info; - mq_context_t *mq_ctx = chunk_info.is_send ? &MES_GLOBAL_INST_MSG.send_mq : &MES_GLOBAL_INST_MSG.recv_mq; - mes_pool_t *msg_pool = mq_ctx->msg_pool[chunk_info.inst_id][chunk_info.priority]; - if (msg_pool == NULL) { - return; - } - mes_buf_chunk_t *chunk = &msg_pool->chunk[chunk_info.chunk_no]; - if (chunk == NULL || chunk->queues == NULL) { - return; - } - mes_buf_queue_t *queue = &chunk->queues[buf_item->queue_no]; - if (queue == NULL) { + mes_buffer_item_t *buf_item = (mes_buffer_item_t*)(buffer - MES_BUFFER_ITEM_SIZE); + mes_buffer_item_tag_t *buf_tag = &buf_item->tag; + mes_msg_buffer_pool_t *buf_pool = mes_get_buf_pool_by_buf_tag(buf_tag); + if (buf_pool == NULL) { return; } - cm_panic(queue->inited); - - uint32 cmd = 0; - cm_spin_lock(&queue->lock, NULL); - if (buffer == NULL) { - cm_spin_unlock(&queue->lock); - return; - } - if (queue->count > 0) { - queue->last->next = buf_item; - queue->last = buf_item; + mes_buf_queue_t *queue = NULL; + bool8 check_recycle = CM_FALSE; + mes_msg_buffer_inner_pool_t *priority_pool; + if (!buf_tag->is_shared) { + priority_pool = &buf_pool->private_pool[buf_tag->priority]; + queue = &priority_pool->queues[buf_tag->queue_no]; + } else if (buf_pool->need_recycle) { + queue = &buf_pool->shared_pool.queues[buf_tag->priority]; + check_recycle = CM_TRUE; } else { - queue->first = buf_item; - queue->last = buf_item; + priority_pool = &buf_pool->private_pool[buf_tag->priority]; + int32 push = cm_atomic32_inc(&priority_pool->push_cursor); + queue = &priority_pool->queues[push % priority_pool->queue_num]; } - buf_item->next = NULL; - queue->count++; - cmd = ((mes_message_head_t *)buffer)->cmd; - buffer = NULL; - cm_spin_unlock(&queue->lock); + uint32 cmd = ((mes_message_head_t *)buffer)->cmd; + uint32 buf_count = 0; + mes_put_buf_item_to_queue(buf_item, queue, &buf_count); + + if (check_recycle && buf_pool->need_recycle && + buf_item->tag.queue_no == buf_pool->recycle_queue_no) { + if (buf_count > buf_pool->recycle_threshold) { + buf_pool->need_recycle = CM_FALSE; + } + } mes_release_buf_stat(cmd); return; } uint32 mes_get_priority_max_msg_size(mes_priority_t priority) { - uint32 max_size = 0; - for (uint32 i = 0; i < MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].pool_count; i++) { - if (max_size < MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[i].size) { - max_size = MES_GLOBAL_INST_MSG.profile.buffer_pool_attr[priority].buf_attr[i].size; + return MES_GLOBAL_INST_MSG.profile.msg_pool_attr.max_buf_size[priority]; +} + +static int mes_precheck_msg_pool_attr(mes_profile_t *profile) +{ + if (profile->inst_cnt >= MES_MAX_INSTANCES) { + LOG_RUN_ERR("[mes][msg pool] precheck, inst_cnt:%u is invalid, legal scope is [1, %u].", + profile->inst_cnt, MES_MAX_INSTANCES); + return CM_ERROR; + } + + if (profile->priority_cnt == 0 || profile->priority_cnt >= MES_PRIORITY_CEIL) { + LOG_RUN_ERR("[mes][msg pool] precheck, priority_cnt:%u is invalid, legal scope is [1, %u].", + profile->priority_cnt, MES_PRIORITY_CEIL); + return CM_ERROR; + } + + return CM_SUCCESS; +} + +static int mes_recheck_extra_size_whether_work(uint64 total_buf_pool_size, uint64 extra_size, + double proportion, uint64 buf_pool_minimum_size) +{ + uint64 alloc = (uint64)((total_buf_pool_size + extra_size) * (proportion)); + if (alloc < buf_pool_minimum_size) { + LOG_RUN_ERR("[mes][msg pool] calculate extra size to eliminate accuracy error failed, " + "alloc size for this buf pool is less than minimum size. " + "alloc_size:%llu, minimum_size:%llu, proportion:%f, total_buf_pool_size:%llu, " + "extra_size:%llu.", + alloc, buf_pool_minimum_size, proportion, total_buf_pool_size, extra_size); + return CM_ERROR; + } + return CM_SUCCESS; +} + +static int mes_calculate_extra_size_in_msg_pool_minimum_info(uint64 *buf_pool_size_list, + uint32 buf_pool_count, uint64 *extra_size) +{ + int ret; + uint64 max_extra = 0; + uint64 temp_extra = 0; + uint64 total_buf_pool_size = 0; + for (uint8 buf_pool_no = 0; buf_pool_no < buf_pool_count; buf_pool_no++) { + total_buf_pool_size += buf_pool_size_list[buf_pool_no]; + } + + double proportion_arr[MES_MAX_BUFFPOOL_NUM] = { 0 }; + for (uint8 buf_pool_no = 0; buf_pool_no < buf_pool_count; buf_pool_no++) { + proportion_arr[buf_pool_no] = (double)buf_pool_size_list[buf_pool_no] / total_buf_pool_size; + temp_extra = (uint64)(buf_pool_size_list[buf_pool_no] * DBL_EPSILON / proportion_arr[buf_pool_no]) + 1; + if (temp_extra > max_extra) { + max_extra = temp_extra; } + + ret = mes_recheck_extra_size_whether_work(total_buf_pool_size, max_extra, + proportion_arr[buf_pool_no], buf_pool_size_list[buf_pool_no]); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] buf pool:%u recheck proportion failed.", + buf_pool_no); + return ret; + } + } + + double last_proportion_arr[MES_MAX_BUFFPOOL_NUM] = { 0 }; + double left_proportion = 1; + for (uint8 tar_no = 0; tar_no < buf_pool_count; tar_no++) { + left_proportion = 1; + for (uint8 buf_pool_no = 0; buf_pool_no < buf_pool_count; buf_pool_no++) { + if (tar_no == buf_pool_no) { + continue; + } + left_proportion -= proportion_arr[buf_pool_no]; + } + last_proportion_arr[tar_no] = left_proportion; + if (left_proportion < 0) { + LOG_RUN_ERR("[mes][msg pool] calculate extra size to eliminate accuracy error failed, " + "buf_pool_no:%u left_proportion:%f is less than zero.", + tar_no, left_proportion); + return CM_ERROR; + } + } + + for (uint8 buf_pool_no = 0; buf_pool_no < buf_pool_count; buf_pool_no++) { + temp_extra = (uint64)(buf_pool_size_list[buf_pool_no] * DBL_EPSILON / last_proportion_arr[buf_pool_no]) + 1; + if (temp_extra > max_extra) { + max_extra = temp_extra; + } + ret = mes_recheck_extra_size_whether_work(total_buf_pool_size, max_extra, + last_proportion_arr[buf_pool_no], buf_pool_size_list[buf_pool_no]); + if (ret != CM_SUCCESS) { + LOG_RUN_ERR("[mes][msg pool] buf pool:%u recheck proportion twice failed.", + buf_pool_no); + return ret; + } + } + *extra_size = max_extra; + return CM_SUCCESS; +} + +void mes_get_message_pool_minimum_info_when_enable_inst_dimension(uint32 inst_cnt, bool8 is_send, + mes_msg_pool_minimum_info_t *minimum_info, uint64 *extra_size) +{ + if (is_send) { + minimum_info->metadata_size *= (inst_cnt - 1); + for (uint8 buf_pool_no = 0; buf_pool_no < minimum_info->buf_pool_count; buf_pool_no++) { + minimum_info->buf_pool_minimum_size[buf_pool_no] *= (inst_cnt - 1); + } + *extra_size *= (inst_cnt - 1); + } else { + minimum_info->metadata_size *= inst_cnt; + for (uint8 buf_pool_no = 0; buf_pool_no < minimum_info->buf_pool_count; buf_pool_no++) { + minimum_info->buf_pool_minimum_size[buf_pool_no] *= inst_cnt; + } + *extra_size *= inst_cnt; + } +} + +int mes_get_message_pool_minimum_info(mes_profile_t *profile, uint8 is_send, + mes_msg_pool_minimum_info_t *minimum_info) +{ + mes_profile_t out_profile = *profile; + mes_msg_buffer_relation_t buf_rel = { 0 }; + int ret; + + ret = mes_precheck_msg_pool_attr(profile); + if (ret != CM_SUCCESS) { + return ret; + } + + ret = mes_check_msg_pool_attr(profile, &out_profile, CM_FALSE, &buf_rel); + if (ret != CM_SUCCESS) { + return ret; } - return max_size; + + mes_msg_pool_attr_t *msg_pool_attr = &out_profile.msg_pool_attr; + minimum_info->buf_pool_count = profile->msg_pool_attr.buf_pool_count; + // metadata_size + uint64 buf_pool_metadata[MES_MAX_BUFFPOOL_NUM] = { 0 }; + uint64 message_pool_metadata = + mes_get_msg_pool_metadata_size(&out_profile, buf_pool_metadata); + minimum_info->metadata_size = message_pool_metadata; + + uint64 buffer_pool_minimum_list[MES_MAX_BUFFPOOL_NUM] = { 0 }; + ret = mes_get_buffer_pool_minimum_size(&out_profile, is_send, buffer_pool_minimum_list); + if (ret != CM_SUCCESS) { + return ret; + } + uint32 changed_size; + bool8 found; + for (uint8 buf_pool_no = 0; buf_pool_no < buf_rel.buf_count; buf_pool_no++) { + changed_size = buf_rel.changed_buf_size[buf_pool_no]; + found = CM_FALSE; + for (int i = 0; i < msg_pool_attr->buf_pool_count; i++) { + if (msg_pool_attr->buf_pool_attr[i].buf_size == changed_size) { + minimum_info->buf_pool_minimum_size[buf_pool_no] = buffer_pool_minimum_list[i]; + found = CM_TRUE; + break; + } + } + + if (!found) { + cm_panic_log(0, "[mes][msg pool] can not find buf_pool minimum size, which buf_size:%u.", + buf_rel.origin_buf_size[buf_pool_no]); + } + } + + // add extra size to eliminate accuracy error + uint64 extra_size = 0; + ret = mes_calculate_extra_size_in_msg_pool_minimum_info(buffer_pool_minimum_list, + minimum_info->buf_pool_count, &extra_size); + if (ret != CM_SUCCESS) { + return ret; + } + + if (msg_pool_attr->enable_inst_dimension) { + uint32 inst_cnt = 0; + // in dms_calc_mem_usage, inst_cnt is zero + if (profile->inst_cnt == 0) { + inst_cnt = CM_MAX_INSTANCES; + } else { + inst_cnt = profile->inst_cnt; + } + mes_get_message_pool_minimum_info_when_enable_inst_dimension(inst_cnt, is_send, + minimum_info, &extra_size); + } + + minimum_info->buf_pool_total_size = 0; + for (uint8 buf_pool_no = 0; buf_pool_no < buf_rel.buf_count; buf_pool_no++) { + minimum_info->buf_pool_total_size += minimum_info->buf_pool_minimum_size[buf_pool_no]; + } + + minimum_info->total_minimum_size = minimum_info->buf_pool_total_size + + minimum_info->metadata_size + extra_size; + return CM_SUCCESS; } \ No newline at end of file diff --git a/src/cm_mes/mes_msg_pool.h b/src/cm_mes/mes_msg_pool.h index 452768d..c19123e 100644 --- a/src/cm_mes/mes_msg_pool.h +++ b/src/cm_mes/mes_msg_pool.h @@ -38,18 +38,20 @@ extern "C" { #define MES_MAX_BUFFER_QUEUE_NUM (0xFF) -typedef struct st_mes_chunk_info { +typedef struct st_mes_buffer_item_tag { inst_type inst_id; mes_priority_t priority; - uint8 chunk_no; - bool8 is_send; -} mes_chunk_info_t; + uint8 buf_pool_no; + uint8 queue_no; + unsigned char is_send : 1; + unsigned char is_shared : 1; + unsigned char reserved : 6; + uint8 reserved2; +} mes_buffer_item_tag_t; typedef struct st_mes_buffer_item { struct st_mes_buffer_item *next; - mes_chunk_info_t chunk_info; - uint8 queue_no; - uint8 reserved[1]; + mes_buffer_item_tag_t tag; char data[0]; } mes_buffer_item_t; @@ -66,7 +68,7 @@ typedef struct st_mes_buffer_item { typedef struct st_mes_buf_queue { spinlock_t lock; spinlock_t init_lock; // defer format memory to buffer item allocation, to speed mes_init - mes_chunk_info_t chunk_info; + uint32 init_count; uint8 queue_no; volatile bool8 inited; uint8 reserved[2]; @@ -77,25 +79,6 @@ typedef struct st_mes_buf_queue { char *addr; } mes_buf_queue_t; -typedef struct st_mes_buf_chunk { - uint32 buf_size; - uint8 chunk_no; - volatile uint8 queue_num; - uint8 reserved[2]; - mes_buf_queue_t *queues; - char aligned1[CM_CACHE_LINE_SIZE]; - union { - volatile uint8 current_no; - char aligned2[CM_CACHE_LINE_SIZE]; - }; -} mes_buf_chunk_t; - -typedef struct st_mes_pool { - uint32 count; - mes_buf_chunk_t chunk[MES_MAX_BUFFPOOL_NUM]; - memory_chunk_t mem_chunk; -} mes_pool_t; - typedef struct st_message_pool { spinlock_t lock; spinlock_t *lock_arr; @@ -108,14 +91,72 @@ typedef struct st_message_pool { int mr_id; // used for xnet register id } message_pool_t; -int mes_init_message_pool(bool32 is_send, uint32 inst_id, mes_priority_t priority); -void mes_destroy_message_pool(bool32 is_send, uint32 inst_id, mes_priority_t priority); -void mes_destroy_all_message_pool(void); +typedef struct st_mes_msg_buffer_inner_pool { + uint32 queue_num; + mes_buf_queue_t *queues; + atomic32_t pop_cursor; // used for thread find queue + atomic32_t push_cursor; +} mes_msg_buffer_inner_pool_t; + +typedef struct st_mes_msg_buffer_pool_tag { + bool8 is_send; + bool8 enable_inst_dimension; + uint8 inst_id; + uint8 buf_pool_no; +} mes_msg_buffer_pool_tag_t; + +typedef struct st_mes_msg_buffer_pool { + bool8 inited; + mes_msg_buffer_pool_tag_t tag; + uint32 buf_size; + uint32 buf_num; + uint32 priority_cnt; + mes_msg_buffer_inner_pool_t private_pool[MES_PRIORITY_CEIL]; + mes_msg_buffer_inner_pool_t shared_pool; + void *msg_pool; + atomic32_t pop_priority; + bool8 need_recycle; + uint32 recycle_threshold; + uint32 recycle_queue_no; + spinlock_t mem_chunk_lock; + memory_chunk_t mem_chunk; +} mes_msg_buffer_pool_t; + +typedef struct st_mes_msg_pool_tag { + bool8 is_send; + bool8 enable_inst_dimension; + inst_type inst_id; +} mes_msg_pool_tag_t; + +typedef struct st_mes_msg_pool { + mes_msg_pool_tag_t tag; + unsigned long long size; + uint32 buf_pool_count; + mes_msg_buffer_pool_t *buf_pool[MES_MAX_BUFFPOOL_NUM]; + memory_chunk_t mem_chunk; +} mes_msg_pool_t; + +typedef struct st_mes_msg_inst_pool_set { + uint64 total_size; + uint32 inst_pool_count; + uint64 per_inst_pool_size; + mes_msg_pool_t *inst_pool[MES_MAX_INSTANCES]; +} mes_msg_inst_pool_set_t; + +typedef struct st_mes_msg_buffer_relation { + uint8 buf_count; + uint32 origin_buf_size[MES_MAX_BUFFPOOL_NUM]; + uint32 changed_buf_size[MES_MAX_BUFFPOOL_NUM]; +} mes_msg_buffer_relation_t; + +int mes_check_msg_pool_attr(mes_profile_t *profile, mes_profile_t *out_profile, bool8 check_proportion, + mes_msg_buffer_relation_t *buf_rel); +int mes_init_message_pool(bool8 is_send); +void mes_deinit_all_message_pool(); char *mes_alloc_buf_item(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority); char *mes_alloc_buf_item_fc(uint32 len, bool32 is_send, uint32 dst_inst, mes_priority_t priority); void mes_free_buf_item(char *buffer); uint32 mes_get_priority_max_msg_size(mes_priority_t priority); -uint64 mes_calc_message_pool_size(mes_profile_t *profile, uint32 priority); #ifdef __cplusplus } diff --git a/src/cm_mes/mes_queue.c b/src/cm_mes/mes_queue.c index 0368cf1..f5abac8 100644 --- a/src/cm_mes/mes_queue.c +++ b/src/cm_mes/mes_queue.c @@ -1077,34 +1077,40 @@ void mes_free_channel_msg_queue(bool32 is_send) CM_FREE_PROT_PTR(mq_ctx->channel_private_queue); } -int64 mes_get_mem_capacity_internal(mq_context_t *mq_ctx, mes_priority_t priority) +int64 mes_get_mem_capacity_internal(mq_context_t *mq_ctx) { if (mq_ctx == NULL) { return 0; } - mes_profile_t *profile = mq_ctx->profile; - mes_buffer_pool_attr_t buffer_pool_attr = profile->buffer_pool_attr[priority]; - + mes_msg_pool_t *msg_pool; + mes_msg_buffer_pool_t *buf_pool; int64 mem_capacity = 0; - for (uint32 i = 0; i < buffer_pool_attr.pool_count; i++) { - mes_buffer_attr_t buf_attr = buffer_pool_attr.buf_attr[i]; - mem_capacity += (buf_attr.count * buf_attr.size); + if (!mq_ctx->enable_inst_dimension) { + msg_pool = mq_ctx->single_pool; + for (uint8 buf_pool_no = 0; buf_pool_no < msg_pool->buf_pool_count; buf_pool_no++) { + buf_pool = msg_pool->buf_pool[buf_pool_no]; + mem_capacity += (buf_pool->buf_num * buf_pool->buf_size); + } + } else { + mes_msg_inst_pool_set_t *set = &mq_ctx->inst_pool_set; + for (int inst_id = 0; inst_id < set->inst_pool_count; inst_id++) { + msg_pool = set->inst_pool[inst_id]; + for (uint8 buf_pool_no = 0; buf_pool_no < msg_pool->buf_pool_count; buf_pool_no++) { + buf_pool = msg_pool->buf_pool[buf_pool_no]; + mem_capacity += (buf_pool->buf_num * buf_pool->buf_size); + } + } } return mem_capacity; } -long long mes_get_mem_capacity(bool8 is_send, mes_priority_t priority) +long long mes_get_mem_capacity(bool8 is_send) { - if (SECUREC_UNLIKELY(priority >= MES_PRIORITY_CEIL)) { - LOG_RUN_ERR("[mes] mes_get_mem_capacity invalid priority %u.", priority); - return -1; - } - if (is_send) { - return mes_get_mem_capacity_internal(&MES_GLOBAL_INST_MSG.send_mq, priority); + return mes_get_mem_capacity_internal(&MES_GLOBAL_INST_MSG.send_mq); } - return mes_get_mem_capacity_internal(&MES_GLOBAL_INST_MSG.recv_mq, priority); + return mes_get_mem_capacity_internal(&MES_GLOBAL_INST_MSG.recv_mq); } int mes_get_started_task_count(bool8 is_send) diff --git a/src/cm_mes/mes_queue.h b/src/cm_mes/mes_queue.h index 13b26bd..c96a23f 100644 --- a/src/cm_mes/mes_queue.h +++ b/src/cm_mes/mes_queue.h @@ -137,7 +137,10 @@ typedef struct st_mq_context_t { void *mes_ctx; mes_mq_priority_t priority; spinlock_t msg_pool_init_lock; - mes_pool_t *msg_pool[MES_MAX_INSTANCES][MES_PRIORITY_CEIL]; + bool8 enable_inst_dimension; + mes_msg_pool_t *single_pool; + mes_msg_inst_pool_set_t inst_pool_set; + bool8 msg_pool_inited; } mq_context_t; #define PROC_DIFF_ENDIAN(head) \ -- Gitee