diff --git a/src/cm_agent/cma_main.cpp b/src/cm_agent/cma_main.cpp index 2371eb6fe1c6402f2d09d3b65642132394c55678..207a3de69982ba66edb9ee434539e7f180c4899f 100644 --- a/src/cm_agent/cma_main.cpp +++ b/src/cm_agent/cma_main.cpp @@ -88,7 +88,7 @@ char *g_eventTriggers[EVENT_COUNT] = {NULL}; static const uint32 MAX_MSG_BUF_POOL_SIZE = 102400; static const uint32 MAX_MSG_BUF_POOL_COUNT = 200; - +static const int32 INVALID_ID = -1; /* unify log style */ void create_system_call_log(void); int check_one_instance_status(const char *processName, const char *cmdLine, int *isPhonyDead); @@ -2113,15 +2113,21 @@ void GetEventTrigger() ParseEventTriggers(eventTriggerString); } -void ExecuteEventTrigger(const EventTriggerType triggerType) +void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId) { if (g_eventTriggers[triggerType] == NULL) { return; } write_runlog(LOG, "Event trigger %s was triggered.\n", triggerTypeStringMap[triggerType].typeStr); char execTriggerCmd[MAX_COMMAND_LEN] = {0}; - int rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1, + int rc; + if (staPrimId != INVALID_ID && triggerType == EVENT_FAILOVER) { + rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1, + SYSTEMQUOTE "%s %d >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], staPrimId, system_call_log); + } else { + rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1, SYSTEMQUOTE "%s >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], system_call_log); + } securec_check_intval(rc, (void)rc); write_runlog(LOG, "event trigger command: \"%s\".\n", execTriggerCmd); RunCmd(execTriggerCmd); diff --git a/src/cm_agent/cma_process_messages.cpp b/src/cm_agent/cma_process_messages.cpp index ff8bded189eb1b3aea456db0391b3bda3c3191a4..584bbebc528e46545599db18b6e20a4008a2a779 100644 --- a/src/cm_agent/cma_process_messages.cpp +++ b/src/cm_agent/cma_process_messages.cpp @@ -443,7 +443,8 @@ void GetDnFailoverCommand(char *command, uint32 cmdLen, const char *dataDir, uin securec_check_intval(rc, (void)rc); } -static void process_failover_command(const char* dataDir, int instanceType, uint32 instance_id, uint32 term) +static void process_failover_command(const char* dataDir, int instanceType, + uint32 instance_id, uint32 term, int32 staPrimId) { char command[MAXPGPATH]; errno_t rc; @@ -501,7 +502,7 @@ static void process_failover_command(const char* dataDir, int instanceType, uint RunCmd(command); if (instanceType == INSTANCE_TYPE_DATANODE) { - ExecuteEventTrigger(EVENT_FAILOVER); + ExecuteEventTrigger(EVENT_FAILOVER, staPrimId); } return; @@ -1403,23 +1404,39 @@ static void MsgCmAgentFailover(const AgentMsgPkg* msg, char *dataPath, const cm_ { int instanceType; int ret; + uint32 node, term, instanceId; + int32 staPrimId = -1; - const cm_to_agent_failover *msgTypeFailoverPtr = - (const cm_to_agent_failover *)CmGetMsgBytesPtr(msg, sizeof(cm_to_agent_failover)); - if (msgTypeFailoverPtr == NULL) { - return; + if (undocumentedVersion != 0 && undocumentedVersion < FAILOVER_STAPRI_VERSION) { + const cm_to_agent_failover *failoverMsg = + (const cm_to_agent_failover *)CmGetMsgBytesPtr(msg, sizeof(cm_to_agent_failover)); + if (failoverMsg == NULL) { + return; + } + term = failoverMsg->term; + node = failoverMsg->node; + instanceId = failoverMsg->instanceId; + } else { + const cm_to_agent_failover_sta *failoverMsg = + (const cm_to_agent_failover_sta *)CmGetMsgBytesPtr(msg, sizeof(cm_to_agent_failover_sta)); + if (failoverMsg == NULL) { + return; + } + term = failoverMsg->term; + node = failoverMsg->node; + instanceId = failoverMsg->instanceId; + staPrimId = failoverMsg->staPrimId; } - uint32 term = msgTypeFailoverPtr->term; + ret = FindInstancePathAndType( - msgTypeFailoverPtr->node, msgTypeFailoverPtr->instanceId, dataPath, &instanceType); + node, instanceId, dataPath, &instanceType); if (ret != 0) { write_runlog(ERROR, "can't find the instance node is %u, instance is %u\n", - msgTypeFailoverPtr->node, - msgTypeFailoverPtr->instanceId); + node, instanceId); return; } - process_failover_command(dataPath, instanceType, msgTypeFailoverPtr->instanceId, term); + process_failover_command(dataPath, instanceType, instanceId, term, staPrimId); } static void MsgCmAgentBuild(const AgentMsgPkg* msg, char *dataPath, const cm_msg_type* msgTypePtr) diff --git a/src/cm_server/cms_arbitrate_datanode_pms.cpp b/src/cm_server/cms_arbitrate_datanode_pms.cpp index 82447693edf186774a1e1aea0af47ce926243639..e46fc197ae945d969a45629dab6ca3fce1367b64 100644 --- a/src/cm_server/cms_arbitrate_datanode_pms.cpp +++ b/src/cm_server/cms_arbitrate_datanode_pms.cpp @@ -122,8 +122,18 @@ static void SendLock2Messange(const DnArbCtx *ctx, const char *dhost, int dlen, (void)RespondMsg(ctx->recvMsgInfo, 'S', (char *)&lock2MsgPtr, sizeof(cm_to_agent_lock2)); } +static void copy_cm_to_agent_failover_msg(cm_to_agent_failover* failover_msg_ptr, + cm_to_agent_failover_sta* staMsg, int32 staId) +{ + staMsg->msg_type = failover_msg_ptr->msg_type; + staMsg->node = failover_msg_ptr->node; + staMsg->instanceId = failover_msg_ptr->instanceId; + staMsg->term = failover_msg_ptr->term; + staMsg->staPrimId = staId; +} + static void send_failover_message(MsgRecvInfo* recvMsgInfo, uint32 node, uint32 instanceId, uint32 group_index, - int member_index, cm_to_agent_failover* failover_msg_ptr) + int member_index, cm_to_agent_failover* failover_msg_ptr, int32 staPrimId) { cm_instance_role_group* role_group = &g_instance_role_group_ptr[group_index]; int count = role_group->count; @@ -178,7 +188,14 @@ static void send_failover_message(MsgRecvInfo* recvMsgInfo, uint32 node, uint32 node_restarting, pass_term, dnReportStatus[i].sendFailoverTimes); } - (void)RespondMsg(recvMsgInfo, 'S', (char*)failover_msg_ptr, sizeof(cm_to_agent_failover)); + if (undocumentedVersion != 0 && undocumentedVersion < FAILOVER_STAPRI_VERSION) { + (void)RespondMsg(recvMsgInfo, 'S', (char*)failover_msg_ptr, sizeof(cm_to_agent_failover)); + } else { + cm_to_agent_failover_sta staMsg; + copy_cm_to_agent_failover_msg(failover_msg_ptr, &staMsg, staPrimId); + (void)RespondMsg(recvMsgInfo, 'S', (char*)(&staMsg), sizeof(cm_to_agent_failover_sta)); + } + dnReportStatus[member_index].arbitrateFlag = true; dnReportStatus[member_index].sendFailoverTimes++; cm_pending_notify_broadcast_msg(group_index, instanceId); @@ -1394,6 +1411,16 @@ static bool InstanceForceFinishRedo(DnArbCtx *ctx) return false; } +static int32 GetFailoverMsgStaPriID(DnArbCtx *ctx) +{ + ArbiCond *cond = &(ctx->cond); + if (cond->staticPriIdx != INVALID_INDEX) { + cm_instance_role_status *role = ctx->roleGroup->instanceMember; + return role[cond->staticPriIdx].instanceId; + } + return INVALID_INDEX; +} + static bool InstanceForceFailover(DnArbCtx *ctx) { bool res = InstanceForceFinishRedo(ctx); @@ -1410,7 +1437,9 @@ static bool InstanceForceFailover(DnArbCtx *ctx) if (cond->candiIdx == ctx->memIdx && CanFailoverDn(isMajority) && cond->redoDone > HALF_COUNT(cond->vaildCount)) { cm_to_agent_failover failoverMsg; - send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg); + int32 staPrimId = GetFailoverMsgStaPriID(ctx); + send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, + ctx->memIdx, &failoverMsg, staPrimId); write_runlog(LOG, "[ForceFailover], line %d: Redo done, non force failover message sent to instance %u, " "requested by cm_ctl, arbitrate_time=%u\n", __LINE__, ctx->instId, cond->maxMemArbiTime); return true; @@ -1764,9 +1793,11 @@ static void SendFailoverMsg(DnArbCtx *ctx, uint32 arbitInterval, bool isStaPrim, ctx->repGroup->time = 0; ClearDnArbiCond(ctx->groupIdx, CLEAR_ARBI_TIME); cm_to_agent_failover failoverMsg; + int32 staPrimId = GetFailoverMsgStaPriID(ctx); if ((!cond->instMainta && !IsSyncListEmpty(ctx->groupIdx, ctx->instId, ctx->maintaMode)) || isStaPrim) { GroupStatusShow(sfMsg->tyName, ctx->groupIdx, ctx->instId, cond->vaildCount, cond->finishRedo); - send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg); + send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, + ctx->memIdx, &failoverMsg, staPrimId); write_runlog(LOG, "%s, line %d: Failover message has sent to instance %u in reduce standy condition(%d), %s.\n", sfMsg->tyName, __LINE__, ctx->instId, cond->isDegrade, sfMsg->sendMsg); } else { @@ -1922,7 +1953,9 @@ static void SendFailoverInQuarmBackup(DnArbCtx *ctx) cm_to_agent_failover failoverMsg; if (!cond->instMainta || ctx->localRole->role == INSTANCE_ROLE_PRIMARY) { GroupStatusShow(sfMsg.tyName, ctx->groupIdx, ctx->instId, cond->vaildCount, cond->finishRedo); - send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, ctx->memIdx, &failoverMsg); + int32 staPrimId = GetFailoverMsgStaPriID(ctx); + send_failover_message(ctx->recvMsgInfo, ctx->node, ctx->instId, ctx->groupIdx, + ctx->memIdx, &failoverMsg, staPrimId); ctx->repGroup->lastFailoverDn = ctx->instId; write_runlog(LOG, "%s, line %d: Failover message has sent to instance %u, %s.\n", sfMsg.tyName, __LINE__, ctx->instId, sfMsg.sendMsg); diff --git a/src/include/cm/cm_agent/cma_main.h b/src/include/cm/cm_agent/cma_main.h index ad31966925b999063d96d4b5d6cba2d8387a3ad8..78d2be5e41c7393026cb1b7a772cc037ce1363f9 100644 --- a/src/include/cm/cm_agent/cma_main.h +++ b/src/include/cm/cm_agent/cma_main.h @@ -247,7 +247,7 @@ extern pthread_rwlock_t g_datanodesFailoverLock; extern pthread_rwlock_t g_gtmsFailoverLock; extern int g_gtmMode; extern char *g_eventTriggers[EVENT_COUNT]; -extern void ExecuteEventTrigger(const EventTriggerType triggerType); +extern void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId = -1); extern int node_match_find(const char *node_type, const char *node_port, const char *node_host, const char *node_port1, const char *node_host1, int *node_index, int *instance_index, int *inode_type); diff --git a/src/include/cm/cm_agent/cma_process_messages.h b/src/include/cm/cm_agent/cma_process_messages.h index 9f24840ce58c428e01848df5ff676b5236b8427e..1d693531f4087b1eff194bb2600d17d80c00c093 100644 --- a/src/include/cm/cm_agent/cma_process_messages.h +++ b/src/include/cm/cm_agent/cma_process_messages.h @@ -40,7 +40,8 @@ void *ProcessRecvCmsMsgMain(void *arg); extern void process_notify_command(const char* data_dir, int instance_type, int role, uint32 term); extern void process_restart_command(const char* data_dir, int instance_type); extern int FindInstancePathAndType(uint32 node, uint32 instanceId, char* data_path, int* instance_type); -extern void process_failover_command(const char* dataDir, int instance_type, uint32 instance_id, uint32 term); +extern void process_failover_command(const char* dataDir, int instance_type, + uint32 instance_id, uint32 term, int32 staPrimId); extern void process_rep_most_available_command(const char* dataDir, int instance_type); extern void process_heartbeat_command(int cluster_status); #endif diff --git a/src/include/cm/cm_msg.h b/src/include/cm/cm_msg.h index 66ef5422105968a7f8eaf51ade1c8e35b786c704..5e8bb88434491230e6dfaa6ee4b5ce0a0a228c0b 100644 --- a/src/include/cm/cm_msg.h +++ b/src/include/cm/cm_msg.h @@ -669,6 +669,16 @@ typedef struct cm_to_agent_failover_st { uint32 term; } cm_to_agent_failover; +typedef struct cm_to_agent_failover_sta_st { + int msg_type; + uint32 node; + uint32 instanceId; + int instance_type; + int wait_seconds; + int32 staPrimId; + uint32 term; +} cm_to_agent_failover_sta; + typedef struct cm_to_agent_build_st { int msg_type; uint32 node; diff --git a/src/include/opengauss/cm/elog.h b/src/include/opengauss/cm/elog.h index 00d01e55c08bd2356f581e4326a3f6acb9ab75d1..077b8448f02383dd8dd9e4242923ee31d96b6e59 100644 --- a/src/include/opengauss/cm/elog.h +++ b/src/include/opengauss/cm/elog.h @@ -186,6 +186,7 @@ extern const char* prefix_name; extern volatile uint32 undocumentedVersion; #define INPLACE_UPGRADE_PRECOMMIT_VERSION 1 #define DORADO_UPGRADE_VERSION (92574) +#define FAILOVER_STAPRI_VERSION (94503) template void write_runlog2(int elevel, T... args)