diff --git a/src/dcf_interface.c b/src/dcf_interface.c index 5d7c4b1b5b18ba47ab5e8512409a1a2bdd4263da..38e6e59d4baabac4c72b9711a048a461f3dcd0ff 100644 --- a/src/dcf_interface.c +++ b/src/dcf_interface.c @@ -280,11 +280,9 @@ static void exception_thread_entry(thread_t *thread) { dcf_exception_report_t *cur_exception = (dcf_exception_report_t *)thread->argument; (void)cm_set_thread_name("exception reporting thread"); - bool32 is_triggered = CM_FALSE; while (!thread->closed) { (void)cm_event_timedwait(&(cur_exception->exception_event), CM_SLEEP_1000_FIXED); - if (cur_exception->exception != DCF_RUNNING_NORMAL && is_triggered == CM_FALSE) { - is_triggered = CM_TRUE; + if (cur_exception->exception != DCF_RUNNING_NORMAL) { if (g_cb_exception_notify != NULL) { int ret = g_cb_exception_notify(cur_exception->stream_id, (uint32) (cur_exception->exception)); LOG_DEBUG_INF("exception report callback: g_cb_exception_notify, retcode=%d", ret); @@ -1810,9 +1808,9 @@ int dcf_check_if_all_logs_applied(unsigned int stream_id, unsigned int *all_appl /* Internal Invocation */ -void dcf_set_exception(int stream_id, dcf_exception_t exception) +void dcf_set_exception(unsigned int stream_id, dcf_exception_t exception) { - g_dcf_exception.stream_id = (uint32)stream_id; + g_dcf_exception.stream_id = stream_id; g_dcf_exception.exception = exception; } @@ -1889,7 +1887,6 @@ int dcf_broadcast_msg(unsigned int stream_id, const char* msg, unsigned int msg_ return CM_SUCCESS; } -#define DCF_PAUSE_TIME 1000000 /* Suspend replication log for this node */ @@ -1901,7 +1898,7 @@ int dcf_pause_rep(unsigned int stream_id, unsigned int node_id, unsigned int tim return CM_ERROR; } LOG_OPER("dcf set pausing time for replication, stream_id=%u node_id=%d time_us=%u", stream_id, node_id, time_us); - if (time_us > DCF_PAUSE_TIME) { + if (time_us != DCF_MAX_PAUSE_TIME && time_us > DCF_MAX_NORMAL_USE_PAUSE_TIME) { LOG_DEBUG_ERR("time_us %u is greater than 1000000.", time_us); return CM_ERROR; } @@ -2310,6 +2307,38 @@ void dcf_set_timer(void *timer) cm_set_timer((gs_timer_t *)timer); } +int dcf_is_diff_endian(unsigned int stream_id, unsigned int dest_node_id) +{ + cm_reset_error(); + return (int)mec_is_diff_endian(stream_id, dest_node_id); +} + +int dcf_get_conn_status(unsigned int stream_id, unsigned int dest_node_id) +{ + uint32 count; + uint32 node_list[CM_MAX_NODE_COUNT]; + uint32 node_id; + uint32 dest_node; + cm_reset_error(); + + if (md_get_cur_node() == dest_node_id) { + return CM_SUCCESS; + } + + CM_RETURN_IFERR(md_get_stream_nodes(stream_id, node_list, &count)); + dest_node = dest_node_id; + + for (uint32 i = 0; i < count; i++) { + node_id = node_list[i]; + if (node_id == dest_node) { + status_t ret = mec_check_one_connect(node_id) ? CM_SUCCESS : CM_ERROR; + LOG_DEBUG_INF("check connect to dest node: %u, ret:%d", dest_node, ret); + return ret; + } + } + return CM_ERROR; +} + #ifdef __cplusplus } #endif diff --git a/src/interface/dcf_interface.h b/src/interface/dcf_interface.h index d89147b7732aa84bd1b9584db7eb8571e690e988..8302d4f10be721dc80fe5997538007f13f987a24 100644 --- a/src/interface/dcf_interface.h +++ b/src/interface/dcf_interface.h @@ -61,6 +61,10 @@ typedef enum en_dcf_commit_index_type { DCF_INDEX_LEVEL_CEIL, } dcf_commit_index_type_t; +#define DCF_MAX_PAUSE_TIME ((unsigned int)(-1)) +#define DCF_MIN_PAUSE_TIME 0 +#define DCF_MAX_NORMAL_USE_PAUSE_TIME 1000000U // 1 second + /* param_name: The parameter types are as follows "ELECTION_TIMEOUT" --unit:ms @@ -259,6 +263,14 @@ EXPORT_API int dcf_query_stream_info(unsigned int stream_id, char *buffer, unsig EXPORT_API int dcf_query_leader_info(unsigned int stream_id, char *ip, unsigned int ip_len, unsigned int *port, unsigned int *node_id); + +/* + check if different endian with dest_node + * @return = 0 means false, same endian with dest_node + * @return != 0 means true, different endian with dest_node + */ +EXPORT_API int dcf_is_diff_endian(unsigned int stream_id, unsigned int dest_node_id); + /* Get the specific error code of API call */ @@ -351,9 +363,9 @@ EXPORT_API int dcf_check_if_all_logs_applied(unsigned int stream_id, unsigned in EXPORT_API int dcf_set_trace_key(unsigned long long trace_key); /* - Internal Invocation + Set dcf exception type, usually set DCF_RUNNING_NORMAL */ -void dcf_set_exception(int stream_id, dcf_exception_t exception); +EXPORT_API void dcf_set_exception(unsigned int stream_id, dcf_exception_t exception); /** * send msg @@ -432,6 +444,13 @@ EXPORT_API int dcf_set_election_priority(unsigned int stream_id, unsigned long l */ EXPORT_API void dcf_set_timer(void *timer); +/* + get current dcf node connection status between dest dcf node + * @return != 0 means error, disconnected + * @return = 0 means success, connection ok +*/ +EXPORT_API int dcf_get_conn_status(unsigned int stream_id, unsigned int dest_node_id); + #ifdef __cplusplus } #endif diff --git a/src/metadata/md_defs.h b/src/metadata/md_defs.h index 4706ce3662f35c4df308e8ccb16d5b9c86a9b0be..190ae7643f4d571d2c1c15037362e8aeb7991aeb 100644 --- a/src/metadata/md_defs.h +++ b/src/metadata/md_defs.h @@ -50,6 +50,8 @@ extern "C" { #define CM_INVALID_STREAM_ID 0 #define CM_INVALID_TERM_ID 0 #define CM_INVALID_INDEX_ID 0 +#define CM_INVALID_MIN_KEY 0 +#define CM_INVALID_MAX_KEY ((uint64)(-1)) #define CM_MAX_ELC_INIT_WAIT_TIMES (20) #define CM_MD_MISMATCH_REP_INTERVAL 5000 // ms diff --git a/src/network/mec.h b/src/network/mec.h index 80643e3756f07c2d2b758c8dd5c5936f40c7079b..6597f6ce0a401d8220c8b165f74ec1d8d441ae5c 100644 --- a/src/network/mec.h +++ b/src/network/mec.h @@ -153,9 +153,12 @@ uint32 mec_get_send_que_count(msg_priv_t priv); uint32 mec_get_recv_que_count(msg_priv_t priv); int64 mec_get_send_mem_capacity(msg_priv_t priv); int64 mec_get_recv_mem_capacity(msg_priv_t priv); +bool32 mec_check_one_connect(uint32 inst_id); bool32 mec_check_all_connect(); bool32 mec_is_ready(uint32 stream_id, uint32 dst_inst, msg_priv_t priv); status_t mec_get_peer_version(uint32 stream_id, uint32 dst_inst, uint32 *peer_version); +bool32 mec_is_diff_endian(uint32 stream_id, uint32 dst_inst); + static inline uint32 mec_get_recv_pack_version(const mec_message_t *pack) { return pack->head->version; diff --git a/src/network/mec/mec_api.c b/src/network/mec/mec_api.c index d3aa177c6d59ed7e8f2b321d517a03a15191621a..82ac381628c97851d0554cdde2f2b917e24bd4c3 100644 --- a/src/network/mec/mec_api.c +++ b/src/network/mec/mec_api.c @@ -641,13 +641,33 @@ status_t mec_get_peer_version(uint32 stream_id, uint32 dst_inst, uint32 *peer_ve uint32 channel_id = MEC_STREAM_TO_CHANNEL_ID(stream_id, profile->channel_num); mec_channel_t *channel = &mec_ctx->channels[dst_inst][channel_id]; if (channel == NULL) { - LOG_DEBUG_ERR("[MEC]null channel or peer_version, stream_id %u, dst_inst %u", stream_id, dst_inst); + LOG_DEBUG_ERR("[MEC] null channel or peer_version, stream_id %u, dst_inst %u", stream_id, dst_inst); return CM_ERROR; } *peer_version = channel->pipe[PRIV_HIGH].send_pipe.version; return CM_SUCCESS; } +bool32 mec_is_diff_endian(uint32 stream_id, uint32 dst_inst) +{ + mec_context_t *mec_ctx = get_mec_ctx(); + mec_profile_t *profile = get_mec_profile(); + uint32 channel_id = MEC_STREAM_TO_CHANNEL_ID(stream_id, profile->channel_num); + mec_channel_t *channel = &mec_ctx->channels[dst_inst][channel_id]; + if (channel == NULL) { + LOG_DEBUG_ERR("[MEC] null channel, stream_id %u, dst_inst %u", stream_id, dst_inst); + return CM_FALSE; + } + uint32 low_options = channel->pipe[PRIV_LOW].send_pipe.options; + uint32 hig_options = channel->pipe[PRIV_HIGH].send_pipe.options; + if (CS_DIFFERENT_ENDIAN(low_options) || CS_DIFFERENT_ENDIAN(hig_options)) { + LOG_RUN_INF("[MEC] diff endian, stream_id %u, dst_inst %u, low_options %u, hig_options %u", + stream_id, dst_inst, low_options, hig_options); + return CM_TRUE; + } + return CM_FALSE; +} + #ifdef __cplusplus } #endif diff --git a/src/network/mec/mec_func.c b/src/network/mec/mec_func.c index cbd476c2ef3b4df98df3a0744b4d4c588e7d071b..85b1c5b2dc2a77a978aa83205829244019fc5e9a 100644 --- a/src/network/mec/mec_func.c +++ b/src/network/mec/mec_func.c @@ -335,7 +335,7 @@ status_t mec_discard_recv_msg(mec_pipe_t *pipe) return CM_ERROR; } - LOG_DEBUG_WAR("[MEC]discard the message, msg len[%u], src inst[%d], dst inst[%d], " + LOG_DEBUG_ERR("[MEC]discard the message, msg len[%u], src inst[%d], dst inst[%d], " "cmd[%u], flag[%u], stream id[%u], serial no[%u], batch size[%u], frag no[%u].", pack.head->size, pack.head->src_inst, pack.head->dst_inst, pack.head->cmd, pack.head->flags, pack.head->stream_id, pack.head->serial_no, pack.head->batch_size, diff --git a/src/replication/rep_common.c b/src/replication/rep_common.c index 391e0775eb7994be9c35fcb1aacb3749c59a87e3..f15dc4afa7adda05eda705b9d14f866bba06babe 100644 --- a/src/replication/rep_common.c +++ b/src/replication/rep_common.c @@ -520,4 +520,3 @@ void print_state() } } } - diff --git a/src/replication/rep_follower.c b/src/replication/rep_follower.c index b99c142356df1d651fe7a06beebe07c3cd6c5ff7..05b78a6f27fbf6e85e8b646ef8087fea54bcb14d 100644 --- a/src/replication/rep_follower.c +++ b/src/replication/rep_follower.c @@ -34,10 +34,11 @@ typedef struct st_rep_follower_state_t { uint64 last_ack_time; uint64 leader_term; uint64 leader_last_index; + bool32 has_received; }rep_follower_state_t; // follower state -rep_follower_state_t g_follower_state[CM_MAX_STREAM_COUNT]; +rep_follower_state_t g_follower_state[CM_MAX_STREAM_COUNT] = {0}; #define LEADER_COMMIT_IDX (g_follower_state[stream_id].leader_commit_log) #define LEADER_LAST_IDX (g_follower_state[stream_id].leader_last_index) diff --git a/src/storage/stream.c b/src/storage/stream.c index 72c1359047bc203b9e4b78cb240cc3d4191ca88b..d3491c0dba986326a093e28a1dce62fbc5b9122c 100644 --- a/src/storage/stream.c +++ b/src/storage/stream.c @@ -173,6 +173,8 @@ status_t load_stream(stream_t *stream) stream->last_term = stream->log_storage.last_term; stream->last_index = stream->log_storage.last_index; stream->entry_cache.begin_index = stream->last_index + 1; + stream->boot_last_index = stream->last_index; + stream->has_received = CM_FALSE; return CM_SUCCESS; } @@ -195,19 +197,32 @@ static inline void stream_trunc_cache_suffix(entry_cache_t *cache, uint64 last_i stream_clear_entry_cache(cache, begin, last_index); } -static inline status_t stream_trunc_suffix(stream_t *stream, uint64 last_index_kept, uint64 term) +static inline status_t stream_trunc_suffix(stream_t *stream, uint64 last_index_kept, uint64 term, bool8 check_apply) { - LOG_DEBUG_INF("[STG]truncate suffix conflict id (%llu, %llu)", term, last_index_kept + 1); - if (stream->applied_index > last_index_kept) { + LOG_RUN_INF("[STG]truncate suffix conflict id (%llu, %llu)", term, last_index_kept + 1); + cm_spin_lock(&stream->lock, NULL); + if (check_apply && stream->applied_index > last_index_kept) { + cm_spin_unlock(&stream->lock); + LOG_RUN_ERR("[STG]Can not truncate index which has been applied %llu", stream->applied_index); + return CM_ERROR; + } + uint64 old_last_index = stream->last_index; + if (last_index_kept >= old_last_index) { cm_spin_unlock(&stream->lock); - LOG_DEBUG_ERR("[STG]Can not truncate index which has been applied"); + LOG_RUN_ERR("[STG] no need truncate, last_index_dept=%llu not less than old_last_index=%llu", + last_index_kept, old_last_index); return CM_SUCCESS; } stream->last_term = term; - uint64 old_last_index = stream->last_index; - stream->last_index = last_index_kept + 1; + stream->last_index = last_index_kept; cm_spin_unlock(&stream->lock); stream_trunc_cache_suffix(&stream->entry_cache, last_index_kept, old_last_index); + uint64 old_last_disk_index = storage_get_last_id(&stream->log_storage).index; + if (last_index_kept >= old_last_disk_index) { + LOG_RUN_ERR("[STG] no need truncate, last_index_dept=%llu not less than old_last_disk_index=%llu", + last_index_kept, old_last_disk_index); + return CM_SUCCESS; + } return storage_trunc_suffix(&stream->log_storage, last_index_kept, term); } @@ -239,7 +254,7 @@ static status_t stream_check_conflict(stream_t *stream, uint64 index, uint64 ter // check conflict log index with leader term if (term != stream_get_term(stream, index)) { // stream lock will be unlocked in this function - return stream_trunc_suffix(stream, index - 1, term); + return stream_trunc_suffix(stream, index - 1, term, CM_TRUE); } cm_spin_unlock(&stream->lock); // duplicate index which already appended diff --git a/src/storage/stream.h b/src/storage/stream.h index 947e2d7974a6d3b4b043a0661593828ec8e003c0..b20487378a81528f94d75f5eaebdb6fe066b5a40 100644 --- a/src/storage/stream.h +++ b/src/storage/stream.h @@ -59,6 +59,8 @@ typedef struct st_stream { stg_meta_t stg_meta; log_storage_t log_storage; entry_cache_t entry_cache; + uint64 boot_last_index; + bool32 has_received; } stream_t; void disk_thread_entry(thread_t *thread);