From 870a088dbadc2e77e94bc143f77ab46ce14966a2 Mon Sep 17 00:00:00 2001 From: peibaoyi Date: Fri, 30 Aug 2024 10:59:16 +0800 Subject: [PATCH] =?UTF-8?q?1.=20CBB=E5=86=85=E9=83=A8=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=8D=A2=E6=88=90monotonic=E6=97=B6=E9=97=B4?= =?UTF-8?q?=202.=20CBB=E5=86=85=E9=83=A8=E6=B6=88=E6=81=AF=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E6=8D=A2=E6=88=90=E7=BB=9F=E8=AE=A1=E4=B8=8A=E5=B1=82?= =?UTF-8?q?app=E7=9A=84cmd=E7=B1=BB=E5=9E=8B=203.=20=E5=88=A0=E9=99=A4CBB?= =?UTF-8?q?=E4=B8=AD=E6=97=A0=E7=94=A8=E7=9A=84=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=97=B6=E9=97=B4=E9=98=B6=E6=AE=B5=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=204.=20=E4=BF=AE=E5=A4=8Dmes=5Fmessage=5Fhea?= =?UTF-8?q?d=5Ft=E7=BB=93=E6=9E=84=E4=BD=93=E5=A2=9E=E5=8A=A0app=5Fcmd?= =?UTF-8?q?=E5=90=8E=E5=8F=98=E5=A4=A7=EF=BC=8C=E5=AF=BC=E8=87=B4=E4=B8=8D?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cm_defines/cm_defs.h | 1 - src/cm_mec_adapter/mec_adapter.c | 8 +- src/cm_mes/mes_cb.c | 11 +++ src/cm_mes/mes_func.c | 15 ++-- src/cm_mes/mes_func.h | 133 +++---------------------------- src/cm_mes/mes_interface.h | 26 +++--- src/cm_mes/mes_interface_impl.c | 26 +++++- src/cm_mes/mes_msg_pool.c | 11 +-- src/cm_mes/mes_queue.c | 24 +++--- src/cm_mes/mes_rdma_rpc.c | 4 + src/cm_mes/mes_stat.c | 126 +++++++++++++++-------------- src/cm_mes/mes_stat.h | 98 +++++++++++++++++++++-- src/cm_mes/mes_tcp.c | 36 ++++++--- src/cm_mes/mes_type.h | 8 +- src/cm_protocol/cs_packet.h | 6 +- 15 files changed, 280 insertions(+), 253 deletions(-) diff --git a/src/cm_defines/cm_defs.h b/src/cm_defines/cm_defs.h index 41e7579..e0fc2b7 100644 --- a/src/cm_defines/cm_defs.h +++ b/src/cm_defines/cm_defs.h @@ -114,7 +114,6 @@ extern "C" { #define CM_MES_ROOMS_PER_FREELIST (uint32)(CM_MAX_MES_ROOMS / CM_MAX_ROOM_FREELIST_NUM) #define CM_MAX_MES_MSG_CMD (uint8)255 -/* DAAC */ #define CM_MES_MIN_CHANNEL_NUM (uint32)(1) #define CM_MES_MAX_CHANNEL_NUM (uint32)(32) #define CM_MES_MIN_TASK_NUM (16) diff --git a/src/cm_mec_adapter/mec_adapter.c b/src/cm_mec_adapter/mec_adapter.c index 84e9323..c63acec 100644 --- a/src/cm_mec_adapter/mec_adapter.c +++ b/src/cm_mec_adapter/mec_adapter.c @@ -141,8 +141,7 @@ int mec_accept(cs_pipe_t *pipe) static int mec_get_message_buf(mes_message_t *msg, const mec_message_head_adapter_t *mec_head) { - uint64 stat_time = 0; - mes_get_consume_time_start(&stat_time); + uint64 stat_time = cm_get_time_usec(); char *msg_buf = NULL; mes_priority_t priority = MEC_PRIV_LOW_ADAPTER(mec_head->flags) ? MES_PRIORITY_ONE : MES_PRIORITY_ZERO; uint32 size = mec_head->size; @@ -220,12 +219,11 @@ static status_t mec_check_recv_head_info(const mec_message_head_adapter_t *mec_h int mec_process_event(mes_pipe_t *pipe) { LOG_DEBUG_INF("[mes_mec] mec_process_event start"); - uint64 stat_time = 0; mes_message_t msg; mec_message_head_adapter_t mec_head; mes_message_head_t mes_head; - mes_get_consume_time_start(&stat_time); + uint64 stat_time = cm_get_time_usec(); int ret = cs_read_fixed_size(&pipe->recv_pipe, (char *)&mec_head, MEC_MSG_HEAD_SIZE_ADAPTER); if (ret != CM_SUCCESS) { @@ -275,7 +273,7 @@ int mec_process_event(mes_pipe_t *pipe) return ERR_MES_SOCKET_FAIL; } - mes_consume_with_time(msg.head->cmd, MES_TIME_READ_MES, stat_time); + mes_consume_with_time(msg.head->app_cmd, MES_TIME_READ_SOCKET, stat_time); (void)cm_atomic_inc(&(pipe->recv_count)); diff --git a/src/cm_mes/mes_cb.c b/src/cm_mes/mes_cb.c index e3a15eb..ee3ba40 100644 --- a/src/cm_mes/mes_cb.c +++ b/src/cm_mes/mes_cb.c @@ -30,6 +30,7 @@ extern "C" { static mes_thread_init_t g_cb_thread_init = NULL; static mes_thread_deinit_t g_cb_thread_deinit = NULL; +static mes_app_cmd_cb_t g_cb_app_cmd = NULL; mes_thread_init_t mes_get_worker_init_cb(void) { @@ -51,6 +52,16 @@ void mes_set_worker_deinit_cb(mes_thread_deinit_t callback) g_cb_thread_deinit = callback; } +void mes_set_app_cmd_cb(mes_app_cmd_cb_t callback) +{ + g_cb_app_cmd = callback; +} + +mes_app_cmd_cb_t mes_get_app_cmd_cb() +{ + return g_cb_app_cmd; +} + #ifdef __cplusplus } #endif diff --git a/src/cm_mes/mes_func.c b/src/cm_mes/mes_func.c index 0fe1c47..e9acd2d 100644 --- a/src/cm_mes/mes_func.c +++ b/src/cm_mes/mes_func.c @@ -45,10 +45,10 @@ mes_callback_t g_cbb_mes_callback; static spinlock_t g_profile_lock; static mes_global_ptr_t g_mes_ptr = { - .g_cbb_mes_ptr = &g_cbb_mes, - .g_mes_stat_ptr = &g_mes_stat, - .g_mes_elapsed_stat = &g_mes_elapsed_stat, - .g_mes_msg_size_stat_ptr = &g_mes_msg_size_stat + .mes_ptr = &g_cbb_mes, + .cmd_count_stats_ptr = &g_mes_stat, + .cmd_time_stats_ptr = &g_mes_elapsed_stat, + .cmd_size_stats_ptr = &g_mes_msg_size_stat }; #define MES_CONNECT(pipe) g_cbb_mes_callback.connect_func(pipe) @@ -1060,8 +1060,7 @@ void mes_process_message(mes_msgqueue_t *my_queue, mes_message_t *msg) return; } - uint64 start_time = 0; - mes_get_consume_time_start(&start_time); + uint64 start_time = cm_get_time_usec(); mes_msgitem_t *msgitem = NULL; mes_recv_message_stat(msg); @@ -1081,7 +1080,7 @@ void mes_process_message(mes_msgqueue_t *my_queue, mes_message_t *msg) msgitem->msg.head = msg->head; msgitem->msg.buffer = msg->buffer; - msgitem->enqueue_time = g_timer()->monotonic_now; + msgitem->enqueue_time = cm_get_time_usec(); if (ENABLE_MES_TASK_THREADPOOL) { mes_put_msgitem_to_threadpool(msgitem); @@ -1090,7 +1089,7 @@ void mes_process_message(mes_msgqueue_t *my_queue, mes_message_t *msg) uint32 work_index = 0; mes_put_msgitem_enqueue(msgitem, CM_FALSE, &work_index); - mes_consume_with_time(msg->head->cmd, MES_TIME_PUT_QUEUE, start_time); + mes_consume_with_time(msg->head->app_cmd, MES_TIME_PUT_QUEUE, start_time); if (work_index == CM_INVALID_ID32 || work_index >= MES_MAX_TASK_NUM) { mes_release_message_buf(msg); LOG_RUN_ERR("[mes] mes_process_message, get work index failed."); diff --git a/src/cm_mes/mes_func.h b/src/cm_mes/mes_func.h index 34ed4be..daf5148 100644 --- a/src/cm_mes/mes_func.h +++ b/src/cm_mes/mes_func.h @@ -277,6 +277,13 @@ typedef struct st_mes_instance { mes_task_threadpool_t task_tpool; } mes_instance_t; +typedef struct st_mes_global_ptr { + mes_instance_t* mes_ptr; + mes_stat_t* cmd_count_stats_ptr; + mes_elapsed_stat_t* cmd_time_stats_ptr; + mes_msg_size_stats_t* cmd_size_stats_ptr; +} mes_global_ptr_t; + #define CHANNEL_ID_BITS (8) #define CHANNEL_ID_MASK (((unsigned)1 << CHANNEL_ID_BITS) - 1) // for ssl @@ -322,132 +329,8 @@ extern mes_callback_t g_cbb_mes_callback; bool32 mes_connection_ready(uint32 inst_id); int mes_send_bufflist(mes_bufflist_t *buff_list); - void mes_process_message(mes_msgqueue_t *my_queue, mes_message_t *msg); -typedef struct st_mes_command_stat { - union { - struct { - uint32 cmd; - int64 send_count; - int64 recv_count; - int64 local_count; - atomic32_t occupy_buf; - spinlock_t lock; - }; - char padding[CM_CACHE_LINE_SIZE]; - }; -} mes_command_stat_t; - -typedef struct st_mes_command_time_stat { - union { - struct { - uint64 time; - int64 count; - spinlock_t lock; - }; - char padding[CM_CACHE_LINE_SIZE]; - }; -}mes_command_time_stat_t; - -typedef struct st_mes_time_consume { - mes_command_time_stat_t cmd_time_stats[MES_TIME_CEIL]; - union { - uint32 cmd; - char aligned1[CM_CACHE_LINE_SIZE]; - }; -} mes_time_consume_t; - -typedef struct st_mes_elapsed_stat { - char aligned1[CM_CACHE_LINE_SIZE]; - union { - bool8 mes_elapsed_switch; - char aligned2[CM_CACHE_LINE_SIZE]; - }; - mes_time_consume_t time_consume_stat[CM_MAX_MES_MSG_CMD]; -} mes_elapsed_stat_t; - -typedef struct st_mes_stat { - char aligned1[CM_CACHE_LINE_SIZE]; - union { - bool8 mes_elapsed_switch; - char aligned2[CM_CACHE_LINE_SIZE]; - }; - mes_command_stat_t mes_command_stat[CM_MAX_MES_MSG_CMD]; -} mes_stat_t; - -#define CMD_SIZE_HISTOGRAM_COUNT 10 -#define CMD_SIZE_2_MIN_POWER 7 -#define CMD_SIZE_2_MAX_POWER 15 - -typedef struct st_size_histogram { - spinlock_t lock; - uint64 min_size; - uint64 max_size; - uint64 avg_size; - uint64 count; - char reserved[CM_CACHE_LINE_SIZE - sizeof(spinlock_t) - 4 * sizeof(uint64)]; -} size_histogram_t; - -typedef struct st_mes_msg_size_stats { - bool32 enable; - /* - * 0 -- 128B - * 1 -- 256B - * 2 -- 512B - * 3 -- 1KB - * 4 -- 2KB - * 5 -- 4KB - * 6 -- 8KB - * 7 -- 16KB - * 8 -- 32KB - * 9 -- > 32KB - */ - size_histogram_t histograms[CMD_SIZE_HISTOGRAM_COUNT]; -} mes_msg_size_stats_t; - -extern mes_elapsed_stat_t g_mes_elapsed_stat; -extern mes_stat_t g_mes_stat; -extern mes_msg_size_stats_t g_mes_msg_size_stat; - -typedef struct st_mes_global_ptr { - mes_instance_t* g_cbb_mes_ptr; - mes_stat_t* g_mes_stat_ptr; - mes_elapsed_stat_t* g_mes_elapsed_stat; - mes_msg_size_stats_t* g_mes_msg_size_stat_ptr; -} mes_global_ptr_t; - -status_t mes_verify_ssl_key_pwd(ssl_config_t *ssl_cfg, char *plain, uint32 size); - -static inline void mes_get_consume_time_start(uint64 *stat_time) -{ - if (g_mes_elapsed_stat.mes_elapsed_switch) { - *stat_time = cm_get_time_usec(); - } - return; -} - -static inline void mes_consume_with_time(uint32 cmd, mes_time_stat_t type, uint64 start_time) -{ - if (g_mes_elapsed_stat.mes_elapsed_switch) { - uint64 elapsed_time = cm_get_time_usec() - start_time; - cm_spin_lock(&(g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].lock), NULL); - g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].time += elapsed_time; - g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count++; - cm_spin_unlock(&(g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].lock)); - cm_atomic_inc(&(g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count)); - } - return; -} - -static inline void mes_elapsed_stat(uint32 cmd, mes_time_stat_t type) -{ - if (g_mes_elapsed_stat.mes_elapsed_switch) { - cm_atomic_inc(&(g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count)); - } - return; -} - void mes_mutex_destroy(mes_mutex_t *mutex); int mes_mutex_create(mes_mutex_t *mutex); #ifndef WIN32 @@ -480,6 +363,8 @@ mes_channel_t *mes_get_active_send_channel(uint32 dest_id, uint32 caller_tid, ui void mes_destroy_all_broadcast_msg(); int mes_init_single_inst_broadcast_msg(unsigned int inst_id); int mes_ensure_inst_channel_exist(unsigned int inst_id); +status_t mes_verify_ssl_key_pwd(ssl_config_t *ssl_cfg, char *plain, uint32 size); + #ifdef __cplusplus } #endif diff --git a/src/cm_mes/mes_interface.h b/src/cm_mes/mes_interface.h index 270a2b5..1c176b8 100644 --- a/src/cm_mes/mes_interface.h +++ b/src/cm_mes/mes_interface.h @@ -94,19 +94,15 @@ typedef enum en_mes_priority_t { } mes_priority_t; typedef enum en_mes_time_stat { - MES_TIME_TEST_SEND = 0, - MES_TIME_SEND_IO, - MES_TIME_TEST_RECV, - MES_TIME_TEST_MULTICAST, - MES_TIME_TEST_MULTICAST_AND_WAIT, - MES_TIME_TEST_WAIT_AND_RECV, - MES_TIME_GET_BUF, - MES_TIME_READ_MES, - MES_TIME_PROC_FUN, - MES_TIME_PUT_QUEUE, - MES_TIME_GET_QUEUE, - MES_TIME_QUEUE_PROC, - MES_TIME_PUT_BUF, + MES_TIME_MSG_SEND = 0, // time of `send request message` + MES_TIME_MSG_RECV, // time of `recv response message`, it's not accurate + MES_TIME_WRITE_SOCKET, // time of `write message to socket`, similar to `MSG_SEND` + MES_TIME_GET_BUF, // time of `get appropriate buffer from free list` for receiving message + MES_TIME_READ_SOCKET, // time of `read message from socket`, it contains `GET_BUF` + MES_TIME_PUT_QUEUE, // time of `put message to queue` + MES_TIME_GET_QUEUE, // time of `get message from queue` + MES_TIME_QUEUE_PROC, // time of `process message` + MES_TIME_PUT_BUF, // time of `put buffer to free list` after message processed MES_TIME_CEIL } mes_time_stat_t; @@ -283,6 +279,7 @@ typedef struct st_mes_msg_pool_minimum_info { typedef void (*mes_thread_init_t)(unsigned char need_startup, char **reg_data); typedef void (*mes_thread_deinit_t)(); +typedef unsigned short (*mes_app_cmd_cb_t)(char *msg_buf); /* * @brief mes init @@ -611,6 +608,9 @@ void mes_set_worker_init_cb(mes_thread_init_t callback); */ void mes_set_worker_deinit_cb(mes_thread_deinit_t callback); +void mes_set_app_cmd_cb(mes_app_cmd_cb_t callback); +mes_app_cmd_cb_t mes_get_app_cmd_cb(); + /* * @brief estimated total memory consumed by mes_init * @param profile - config value diff --git a/src/cm_mes/mes_interface_impl.c b/src/cm_mes/mes_interface_impl.c index 6a51fdb..6626b0d 100644 --- a/src/cm_mes/mes_interface_impl.c +++ b/src/cm_mes/mes_interface_impl.c @@ -26,6 +26,15 @@ #define MES_ALLOC_ROOM_SLEEP_TIME 1000 +static uint16 mes_get_app_cmd(char *buff, uint8 cmd) +{ + mes_app_cmd_cb_t cb = mes_get_app_cmd_cb(); + if (cb == NULL) { + return (uint16)cmd; + } + return cb(buff); +} + static inline void mes_clean_broadcast_msg_ptr(mes_waiting_room_t *room) { for (uint32 inst_id = 0; inst_id < MES_MAX_INSTANCES; ++inst_id) { @@ -163,13 +172,22 @@ static int mes_send_data_x_inner(mes_message_head_t *head, unsigned int count, v return CM_ERROR; } + mes_app_cmd_cb_t cb = mes_get_app_cmd_cb(); + if (cb != NULL) { + if (buff_list.cnt > 1) { + head->app_cmd = mes_get_app_cmd(buff_list.buffers[1].buf, head->cmd); + } else { + head->app_cmd = CM_MAX_MES_MSG_CMD; + } + } + bool32 is_send = head->dst_inst == MES_MY_ID ? CM_FALSE : CM_TRUE; - mes_get_consume_time_start(&start_stat_time); + start_stat_time = cm_get_time_usec(); MES_RESET_COMPRESS_ALGORITHM_FLAG(head->flags); int ret = mes_put_buffer_list_queue(&buff_list, is_send); if (ret == CM_SUCCESS) { - mes_send_stat(head->cmd); - mes_consume_with_time(head->cmd, MES_TIME_TEST_SEND, start_stat_time); + mes_send_stat(head->app_cmd, head->size); + mes_consume_with_time(head->app_cmd, MES_TIME_MSG_SEND, start_stat_time); } return ret; } @@ -361,7 +379,7 @@ int mes_get_response(ruid_type ruid, mes_msg_t* response, int timeout_ms) } mes_free_room(room); - mes_consume_with_time((&msg)->head->cmd, MES_TIME_TEST_RECV, start_stat_time); + mes_consume_with_time((&msg)->head->app_cmd, MES_TIME_MSG_RECV, start_stat_time); return CM_SUCCESS; } diff --git a/src/cm_mes/mes_msg_pool.c b/src/cm_mes/mes_msg_pool.c index cb3e4b3..56bf98e 100644 --- a/src/cm_mes/mes_msg_pool.c +++ b/src/cm_mes/mes_msg_pool.c @@ -1113,15 +1113,6 @@ char *mes_alloc_buf_item_fc(uint32 len, bool32 is_send, uint32 dst_inst, mes_pri return mes_alloc_buf_item_inner(len, is_send, dst_inst, priority, CM_TRUE); } -static void mes_release_buf_stat(uint32 cmd) -{ - if (g_mes_stat.mes_elapsed_switch) { - cm_atomic32_dec(&(g_mes_stat.mes_command_stat[cmd].occupy_buf)); - mes_elapsed_stat(cmd, MES_TIME_PUT_BUF); - } - return; -} - void mes_free_buf_item(char *buffer) { if (buffer == NULL) { @@ -1154,7 +1145,7 @@ void mes_free_buf_item(char *buffer) queue = &priority_pool->queues[push % priority_pool->queue_num]; } - uint32 cmd = ((mes_message_head_t *)buffer)->cmd; + uint16 cmd = ((mes_message_head_t *)buffer)->app_cmd; uint32 buf_count = 0; mes_put_buf_item_to_queue(buf_item, queue, &buf_count); diff --git a/src/cm_mes/mes_queue.c b/src/cm_mes/mes_queue.c index b5e455f..56d40b0 100644 --- a/src/cm_mes/mes_queue.c +++ b/src/cm_mes/mes_queue.c @@ -720,10 +720,10 @@ int mes_put_msg_queue(mes_message_t *msg, bool32 is_send) return ERR_MES_ALLOC_MSGITEM_FAIL; } - mes_local_stat(msg->head->cmd); + mes_local_stat(msg->head->app_cmd); msgitem->msg.head = msg->head; msgitem->msg.buffer = msg->buffer; - msgitem->enqueue_time = g_timer()->monotonic_now; + msgitem->enqueue_time = cm_get_time_usec(); if (!is_send && ENABLE_MES_TASK_THREADPOOL) { mes_put_msgitem_to_threadpool(msgitem); @@ -820,19 +820,18 @@ void mes_send_proc(mes_msgitem_t *msgitem, uint32 work_idx) void mes_work_proc(mes_msgitem_t *msgitem, uint32 work_idx) { mes_msg_t app_msg; - uint64 start_stat_time = 0; - mes_get_consume_time_start(&start_stat_time); + uint64 start_stat_time; if (msgitem->msg.head->cmd == MES_CMD_SYNCH_ACK) { mes_notify_msg_recv(&msgitem->msg); } else { - mes_consume_with_time(msgitem->msg.head->cmd, MES_TIME_GET_QUEUE, start_stat_time); - mes_get_consume_time_start(&start_stat_time); + mes_consume_with_time(msgitem->msg.head->app_cmd, MES_TIME_GET_QUEUE, msgitem->enqueue_time); + start_stat_time = cm_get_time_usec(); app_msg.buffer = msgitem->msg.buffer + sizeof(mes_message_head_t); app_msg.size = msgitem->msg.head->size - (unsigned int)sizeof(mes_message_head_t); app_msg.src_inst = (unsigned int)msgitem->msg.head->src_inst; MES_GLOBAL_INST_MSG.proc(work_idx, msgitem->msg.head->ruid, &app_msg); - mes_consume_with_time(msgitem->msg.head->cmd, MES_TIME_QUEUE_PROC, start_stat_time); + mes_consume_with_time(msgitem->msg.head->app_cmd, MES_TIME_QUEUE_PROC, start_stat_time); mes_release_message_buf(&msgitem->msg); } } @@ -931,8 +930,15 @@ void mes_task_proc_inner(thread_t *thread) (uint64)MES_RUID_GET_RSN((head)->ruid), (head)->src_inst, (head)->dst_inst, (head)->size, (head)->flags, my_task_index, mq_ctx->tasks[queue_id + start_task_idx].queue.count, is_empty); if (MES_GLOBAL_INST_MSG.profile.max_wait_time != CM_INVALID_INT32) { - if ((g_timer()->monotonic_now - msgitem->enqueue_time) / MICROSECS_PER_MILLISEC >= - MES_GLOBAL_INST_MSG.profile.max_wait_time) { + /* + * when enable statistics, enquue_time is real-time, + * but monotonic_now in timer is updated every 0.1ms (MES_DEFAULT_SLEEP_TIME), + * so monotonic_now may be less than enqueue_time. + */ + uint64 now = g_timer()->monotonic_now; + if ((now > msgitem->enqueue_time) && + ((now - msgitem->enqueue_time) / MICROSECS_PER_MILLISEC >= + MES_GLOBAL_INST_MSG.profile.max_wait_time)) { LOG_DEBUG_WAR("[mes]proc wait timeout, message is discarded "); mes_release_message_buf(&msgitem->msg); continue; diff --git a/src/cm_mes/mes_rdma_rpc.c b/src/cm_mes/mes_rdma_rpc.c index 1abdcd6..21fed91 100644 --- a/src/cm_mes/mes_rdma_rpc.c +++ b/src/cm_mes/mes_rdma_rpc.c @@ -861,6 +861,8 @@ int mes_rdma_rpc_send_data(const void* msg_data) return ERR_MES_SENDPIPE_NO_READY; } + head->app_cmd = 0; + head->unused = 0; OckRpcClient client = pipe->rdma_client.client_handle; OckRpcMessage request = {.data = (void*)msg_data, .len = head->size}; @@ -921,6 +923,8 @@ int mes_rdma_rpc_send_bufflist(mes_bufflist_t *buff_list) return ERR_MES_SENDPIPE_NO_READY; } + head->app_cmd = 0; + head->unused = 0; OckRpcClientCallParams param; OckRpcMessage msgs[MES_MAX_BUFFERLIST]; init_ockrpc_client_iov_param(¶m, buff_list, msgs, pipe->rdma_client.client_handle); diff --git a/src/cm_mes/mes_stat.c b/src/cm_mes/mes_stat.c index ec3ca34..4d3ada3 100644 --- a/src/cm_mes/mes_stat.c +++ b/src/cm_mes/mes_stat.c @@ -21,6 +21,8 @@ * * ------------------------------------------------------------------------- */ +#include "cm_atomic.h" +#include "cm_spinlock.h" #include "mes_func.h" #include "mes_stat.h" @@ -111,17 +113,28 @@ void mes_init_stat(const mes_profile_t *profile) return; } -void mes_send_stat(uint32 cmd) +void mes_send_stat(uint16 cmd, uint32 size) { - if (g_mes_stat.mes_elapsed_switch) { - (void)cm_atomic_inc(&(g_mes_stat.mes_command_stat[cmd].send_count)); + if (g_mes_stat.mes_elapsed_switch && cmd < CM_MAX_MES_MSG_CMD) { + mes_command_stat_t *stats = &g_mes_stat.mes_command_stat[cmd]; + cm_spin_lock(&stats->lock, NULL); + uint64 avg = stats->avg_size; + if (avg == 0 || avg == size) { + stats->avg_size = size; + } else { + double f1 = 1.0 / (stats->send_count + 1); + double f2 = (stats->send_count) * f1; + stats->avg_size = (uint64)(avg * f2 + size * f1); + } + (void)cm_atomic_inc(&(stats->send_count)); + cm_spin_unlock(&stats->lock); } return; } -void mes_local_stat(uint32 cmd) +void mes_local_stat(uint16 cmd) { - if (g_mes_stat.mes_elapsed_switch) { + if (g_mes_stat.mes_elapsed_switch && cmd < CM_MAX_MES_MSG_CMD) { (void)cm_atomic_inc(&(g_mes_stat.mes_command_stat[cmd].local_count)); (void)cm_atomic32_inc(&(g_mes_stat.mes_command_stat[cmd].occupy_buf)); } @@ -131,47 +144,31 @@ void mes_local_stat(uint32 cmd) void mes_recv_message_stat(const mes_message_t *msg) { if (g_mes_stat.mes_elapsed_switch) { - (void)cm_atomic_inc(&(g_mes_stat.mes_command_stat[msg->head->cmd].recv_count)); - (void)cm_atomic32_inc(&(g_mes_stat.mes_command_stat[msg->head->cmd].occupy_buf)); + (void)cm_atomic_inc(&(g_mes_stat.mes_command_stat[msg->head->app_cmd].recv_count)); + (void)cm_atomic32_inc(&(g_mes_stat.mes_command_stat[msg->head->app_cmd].occupy_buf)); } return; } -static void cm_get_time_of_day(cm_timeval *tv) -{ - (void)cm_gettimeofday(tv); -} - uint64 cm_get_time_usec(void) { if (g_mes_elapsed_stat.mes_elapsed_switch) { - cm_timeval now; - uint64 now_usec; - cm_get_time_of_day(&now); - now_usec = (uint64)now.tv_sec * MICROSECS_PER_SECOND + (uint64)now.tv_usec; - return now_usec; + return cm_clock_monotonic_now(); } - return 0; + return g_timer()->monotonic_now; } -uint64 mes_get_stat_send_count(unsigned int cmd) +void mes_consume_with_time(uint16 cmd, mes_time_stat_t type, uint64 start_time) { - return (uint64)g_mes_stat.mes_command_stat[cmd].send_count; -} - -uint64 mes_get_stat_recv_count(unsigned int cmd) -{ - return (uint64)g_mes_stat.mes_command_stat[cmd].recv_count; -} - -volatile long mes_get_stat_occupy_buf(unsigned int cmd) -{ - return g_mes_stat.mes_command_stat[cmd].occupy_buf; -} - -unsigned char mes_get_elapsed_switch(void) -{ - return (bool8)g_mes_elapsed_stat.mes_elapsed_switch; + if (g_mes_elapsed_stat.mes_elapsed_switch && cmd < CM_MAX_MES_MSG_CMD) { + uint64 elapsed_time = cm_get_time_usec() - start_time; + mes_command_time_stat_t *stats = &g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type]; + cm_spin_lock(&stats->lock, NULL); + stats->time += elapsed_time; + stats->count++; + cm_spin_unlock(&stats->lock); + } + return; } void mes_set_elapsed_switch(unsigned char elapsed_switch) @@ -180,32 +177,6 @@ void mes_set_elapsed_switch(unsigned char elapsed_switch) g_mes_stat.mes_elapsed_switch = elapsed_switch; } -uint64 mes_get_elapsed_time(unsigned int cmd, mes_time_stat_t type) -{ - return g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].time; -} - -uint64 mes_get_elapsed_count(unsigned int cmd, mes_time_stat_t type) -{ - return (uint64)g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count; -} - -void mes_get_wait_event(unsigned int cmd, unsigned long long *event_cnt, unsigned long long *event_time) -{ - unsigned long long cnt = 0; - unsigned long long time = 0; - for (int type = 0; type < MES_TIME_CEIL; ++type) { - cnt += g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count; - time += g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].time; - } - if (event_cnt != NULL) { - *event_cnt = cnt; - } - if (event_time != NULL) { - *event_time = time; - } -} - #ifdef WIN32 static uint32 cmd_size_to_histogram_index(uint32 size) { @@ -256,4 +227,37 @@ void mes_msg_size_stats(uint32 size) hist->count++; cm_spin_unlock(&hist->lock); } +} + +void mes_elapsed_stat(uint16 cmd, mes_time_stat_t type) +{ + if (g_mes_elapsed_stat.mes_elapsed_switch && cmd < CM_MAX_MES_MSG_CMD) { + cm_atomic_inc(&(g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count)); + } + return; +} + +void mes_release_buf_stat(uint16 cmd) +{ + if (g_mes_stat.mes_elapsed_switch && cmd < CM_MAX_MES_MSG_CMD) { + cm_atomic32_dec(&(g_mes_stat.mes_command_stat[cmd].occupy_buf)); + mes_elapsed_stat(cmd, MES_TIME_PUT_BUF); + } + return; +} + +void mes_get_wait_event(unsigned int cmd, unsigned long long *event_cnt, unsigned long long *event_time) +{ + unsigned long long cnt = 0; + unsigned long long time = 0; + for (int type = 0; type < MES_TIME_CEIL; ++type) { + cnt += g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].count; + time += g_mes_elapsed_stat.time_consume_stat[cmd].cmd_time_stats[type].time; + } + if (event_cnt != NULL) { + *event_cnt = cnt; + } + if (event_time != NULL) { + *event_time = time; + } } \ No newline at end of file diff --git a/src/cm_mes/mes_stat.h b/src/cm_mes/mes_stat.h index 739552e..9469332 100644 --- a/src/cm_mes/mes_stat.h +++ b/src/cm_mes/mes_stat.h @@ -32,14 +32,102 @@ extern "C" { #endif +typedef struct st_mes_command_stat { + union { + struct { + uint32 cmd; + int64 send_count; + int64 recv_count; + int64 local_count; + atomic32_t occupy_buf; + uint64 avg_size; + spinlock_t lock; + }; + char padding[CM_CACHE_LINE_SIZE]; + }; +} mes_command_stat_t; + +typedef struct st_mes_command_time_stat { + union { + struct { + uint64 time; + int64 count; + spinlock_t lock; + }; + char padding[CM_CACHE_LINE_SIZE]; + }; +}mes_command_time_stat_t; + +typedef struct st_mes_time_consume { + mes_command_time_stat_t cmd_time_stats[MES_TIME_CEIL]; + union { + uint32 cmd; + char aligned1[CM_CACHE_LINE_SIZE]; + }; +} mes_time_consume_t; + +typedef struct st_mes_elapsed_stat { + char aligned1[CM_CACHE_LINE_SIZE]; + union { + bool8 mes_elapsed_switch; + char aligned2[CM_CACHE_LINE_SIZE]; + }; + mes_time_consume_t time_consume_stat[CM_MAX_MES_MSG_CMD]; +} mes_elapsed_stat_t; + +typedef struct st_mes_stat { + char aligned1[CM_CACHE_LINE_SIZE]; + union { + bool8 mes_elapsed_switch; + char aligned2[CM_CACHE_LINE_SIZE]; + }; + mes_command_stat_t mes_command_stat[CM_MAX_MES_MSG_CMD]; +} mes_stat_t; + +#define CMD_SIZE_HISTOGRAM_COUNT 10 +#define CMD_SIZE_2_MIN_POWER 7 +#define CMD_SIZE_2_MAX_POWER 15 + +typedef struct st_size_histogram { + spinlock_t lock; + uint64 min_size; + uint64 max_size; + uint64 avg_size; + uint64 count; + char reserved[CM_CACHE_LINE_SIZE - sizeof(spinlock_t) - 4 * sizeof(uint64)]; +} size_histogram_t; + +typedef struct st_mes_msg_size_stats { + bool32 enable; + /* + * 0 -- 128B + * 1 -- 256B + * 2 -- 512B + * 3 -- 1KB + * 4 -- 2KB + * 5 -- 4KB + * 6 -- 8KB + * 7 -- 16KB + * 8 -- 32KB + * 9 -- > 32KB + */ + size_histogram_t histograms[CMD_SIZE_HISTOGRAM_COUNT]; +} mes_msg_size_stats_t; + +extern mes_elapsed_stat_t g_mes_elapsed_stat; +extern mes_stat_t g_mes_stat; +extern mes_msg_size_stats_t g_mes_msg_size_stat; + void mes_init_stat(const mes_profile_t *profile); -void mes_send_stat(uint32 cmd); +void mes_send_stat(uint16 cmd, uint32 size); void mes_recv_message_stat(const mes_message_t *msg); - -uint64 cm_get_time_usec(void); -void mes_local_stat(uint32 cmd); -void mes_get_wait_event(unsigned int cmd, unsigned long long *event_cnt, unsigned long long *event_time); +void mes_local_stat(uint16 cmd); +void mes_elapsed_stat(uint16 cmd, mes_time_stat_t type); +void mes_release_buf_stat(uint16 cmd); +uint64 cm_get_time_usec(); +void mes_consume_with_time(uint16 cmd, mes_time_stat_t type, uint64 start_time); void mes_msg_size_stats(uint32 size); +void mes_get_wait_event(unsigned int cmd, unsigned long long *event_cnt, unsigned long long *event_time); #ifdef __cplusplus } diff --git a/src/cm_mes/mes_tcp.c b/src/cm_mes/mes_tcp.c index 154a8ae..6b0cde9 100644 --- a/src/cm_mes/mes_tcp.c +++ b/src/cm_mes/mes_tcp.c @@ -96,8 +96,7 @@ static int mes_read_message_head(cs_pipe_t *pipe, mes_message_head_t *head) static int mes_get_message_buf(mes_message_t *msg, const mes_message_head_t *head) { - uint64 stat_time = 0; - mes_get_consume_time_start(&stat_time); + uint64 stat_time = cm_get_time_usec(); char *msg_buf; uint32 size = head->size; if (MES_COMPRESS_ALGORITHM(head->flags)) { @@ -112,7 +111,7 @@ static int mes_get_message_buf(mes_message_t *msg, const mes_message_head_t *hea return ERR_MES_ALLOC_MSGITEM_FAIL; } MES_MESSAGE_ATTACH(msg, msg_buf); - mes_consume_with_time(head->cmd, MES_TIME_GET_BUF, stat_time); + mes_consume_with_time(head->app_cmd, MES_TIME_GET_BUF, stat_time); return CM_SUCCESS; } @@ -156,7 +155,7 @@ static status_t check_recv_head_info(const mes_message_head_t *head, mes_priorit // receive static int mes_process_event(mes_pipe_t *pipe) { - uint64 stat_time = 0; + uint64 stat_time; mes_message_t msg; mes_message_head_t head; @@ -176,7 +175,7 @@ static int mes_process_event(mes_pipe_t *pipe) return mec_process_event(pipe); } - mes_get_consume_time_start(&stat_time); + stat_time = cm_get_time_usec(); int ret = mes_read_message_head(&pipe->recv_pipe, &head); if (ret != CM_SUCCESS) { @@ -218,7 +217,7 @@ static int mes_process_event(mes_pipe_t *pipe) return ERR_MES_SOCKET_FAIL; } - mes_consume_with_time(msg.head->cmd, MES_TIME_READ_MES, stat_time); + mes_consume_with_time(msg.head->app_cmd, MES_TIME_READ_SOCKET, stat_time); (void)cm_atomic_inc(&(pipe->recv_count)); @@ -305,6 +304,8 @@ void mes_tcp_try_connect(uintptr_t pipePtr) char buf[sizeof(mes_message_head_t)]; mes_message_head_t *head = (mes_message_head_t *)buf; head->cmd = MES_CMD_CONNECT; + head->app_cmd = 0; + head->unused = 0; head->dst_inst = MES_INSTANCE_ID(pipe->channel->id); head->src_inst = MES_GLOBAL_INST_MSG.profile.inst_id; head->caller_tid = MES_CHANNEL_ID(pipe->channel->id); // use caller_tid to represent channel id @@ -872,6 +873,11 @@ int mes_init_tcp_resource(void) return CM_SUCCESS; } +static bool32 is_old_mes_cmd(int32 version) +{ + return version < CS_VERSION_6; +} + // send int mes_tcp_send_data(const void *msg_data) { @@ -904,7 +910,7 @@ int mes_tcp_send_data(const void *msg_data) return ERR_MES_SENDPIPE_NO_READY; } - mes_get_consume_time_start(&stat_time); + stat_time = cm_get_time_usec(); if (head->cmd == MES_CMD_SYNCH_ACK) { CM_ASSERT(MES_RUID_GET_RSN((head)->ruid) != 0); } @@ -922,6 +928,10 @@ int mes_tcp_send_data(const void *msg_data) } if (!is_old_mec_version(version)) { + if (is_old_mes_cmd(version)) { + head->app_cmd = 0; + head->unused = 0; + } ret = cs_send_fixed_size(&pipe->send_pipe, (char *)msg_data, (int32)head->size); } else { mec_message_head_adapter_t *mec_head = @@ -941,7 +951,7 @@ int mes_tcp_send_data(const void *msg_data) } pipe->last_send_time = g_timer()->monotonic_now; - mes_consume_with_time(head->cmd, MES_TIME_SEND_IO, stat_time); + mes_consume_with_time(head->app_cmd, MES_TIME_WRITE_SOCKET, stat_time); cm_rwlock_unlock(&pipe->send_lock); (void)cm_atomic_inc(&(pipe->send_count)); @@ -983,7 +993,8 @@ int mes_tcp_send_bufflist(mes_bufflist_t *buff_list) head->dst_inst, priority); return ERR_MES_SENDPIPE_NO_READY; } - mes_get_consume_time_start(&stat_time); + + stat_time = cm_get_time_usec(); if (head->cmd == MES_CMD_SYNCH_ACK) { CM_ASSERT(MES_RUID_GET_RSN((head)->ruid) != 0); @@ -1004,6 +1015,11 @@ int mes_tcp_send_bufflist(mes_bufflist_t *buff_list) buff_list->buffers[0].buf = buff_list->buffers[0].buf + sizeof(mes_message_head_t); buff_list->buffers[0].len = buff_list->buffers[0].len - (unsigned int)sizeof(mes_message_head_t); } + + if (is_old_mes_cmd(version)) { + head->app_cmd = 0; + head->unused = 0; + } if (pipe->msgbuf == NULL) { pipe->msgbuf = (char *)cm_malloc_prot(totalsz); @@ -1049,7 +1065,7 @@ int mes_tcp_send_bufflist(mes_bufflist_t *buff_list) } pipe->last_send_time = g_timer()->monotonic_now; - mes_consume_with_time(head->cmd, MES_TIME_SEND_IO, stat_time); + mes_consume_with_time(head->app_cmd, MES_TIME_WRITE_SOCKET, stat_time); cm_rwlock_unlock(&pipe->send_lock); (void)cm_atomic_inc(&(pipe->send_count)); diff --git a/src/cm_mes/mes_type.h b/src/cm_mes/mes_type.h index 9cb8cfd..01c62b7 100644 --- a/src/cm_mes/mes_type.h +++ b/src/cm_mes/mes_type.h @@ -45,7 +45,9 @@ typedef unsigned long long uint64; typedef struct st_mes_message_head { unsigned int version; - unsigned int cmd; + unsigned int cmd : 8; // mes command + unsigned int app_cmd : 16; // upper application command + unsigned int unused : 8; unsigned int flags; unsigned int caller_tid; unsigned long long ruid; @@ -104,7 +106,9 @@ typedef struct st_mes_bufflist { #define MES_INIT_MESSAGE_HEAD(head, v_version, v_cmd, v_flags, v_src_inst, v_dst_inst, v_ruid, v_size) \ do { \ - (head)->cmd = (uint32)(v_cmd); \ + (head)->cmd = (uint32)(v_cmd); \ + (head)->app_cmd = 0; \ + (head)->unused = 0; \ (head)->version = (uint32)(v_version); \ (head)->flags = (uint32)(v_flags); \ (head)->src_inst = (uint32)(v_src_inst); \ diff --git a/src/cm_protocol/cs_packet.h b/src/cm_protocol/cs_packet.h index 648632e..33c4824 100644 --- a/src/cm_protocol/cs_packet.h +++ b/src/cm_protocol/cs_packet.h @@ -43,6 +43,7 @@ typedef enum en_cs_minor_version { MIN_VERSION_3 = 3, MIN_VERSION_4 = 4, MIN_VERSION_5 = 5, /* send version and proto_code when tcp connect */ + MIN_VERSION_6 = 6, /* add app_cmd in mes message head */ } cs_minor_version_t; typedef enum en_cs_major_version { @@ -63,7 +64,10 @@ typedef enum en_cs_major_version { /* send version and proto_code when tcp connect */ #define CS_VERSION_5 (uint32) CS_PROTOCOL(MJR_VERSION_0, MIN_VERSION_5) -#define CS_LOCAL_VERSION CS_VERSION_5 + +/* add app_cmd in mes message head */ +#define CS_VERSION_6 (uint32) CS_PROTOCOL(MJR_VERSION_0, MIN_VERSION_6) +#define CS_LOCAL_VERSION CS_VERSION_6 #define CS_CMD_UNKONOW (uint8)0 #define CS_CMD_HANDSHAKE (uint8)1 /* process before login, added since v2.0; for SSL only since v9.0 */ -- Gitee