diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index 1671219a5fdcb0f0b80914d279b20ed14d1bf84f..5747a39e95e93e829372ffefa40d0711e7b493c1 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -79,8 +79,10 @@ typedef enum SyncStandbyNumState { STANDBIES_NOT_ENOUGH, STANDBIES_ENOUGH } SyncStandbyNumState; -static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys); -static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state); + +static SyncStandbyNumState check_sync_standbys_num(const SyncRepStandbyData* sync_standbys, int num_standbys); +static bool judge_sync_standbys_num(const SyncRepStandbyData* sync_standbys, int num_standbys, SyncStandbyNumState* state); + static void SyncRepQueueInsert(int mode); static bool SyncRepCancelWait(void); @@ -92,9 +94,9 @@ static void SyncRepGetStandbyGroupAndPriority(int* gid, int* prio); static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTime); #endif static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, - XLogRecPtr* replayPtr, List* sync_standbys); + XLogRecPtr* replayPtr, SyncRepStandbyData* sync_standbys, int num_standbys, int groupid); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, - XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth); + XLogRecPtr* replayPtr, SyncRepStandbyData* sync_standbys, int num_standbys, int groupid, uint8 nth); static void SyncPaxosQueueInsert(void); static void SyncPaxosCancelWait(void); @@ -105,10 +107,10 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); static bool SyncPaxosQueueIsOrderedByLSN(void); #endif -static List *SyncRepGetSyncStandbysPriority(bool *am_sync, int groupid, List** catchup_standbys = NULL); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, int groupid, List** catchup_standbys = NULL); +static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int groupid, List** catchup_standbys); + +static int standby_priority_comparator(const void *a, const void *b); static inline void free_sync_standbys_list(List* sync_standbys); -static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state); static int cmp_lsn(const void *a, const void *b); static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state = STANDBIES_EMPTY); @@ -569,7 +571,7 @@ void SyncRepInitConfig(void) SyncRepGetStandbyGroupAndPriority(&group, &priority); if (t_thrd.walsender_cxt.MyWalSnd->sync_standby_group != group || t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&t_thrd.walsender_cxt.MyWalSnd->mutex); t_thrd.walsender_cxt.MyWalSnd->sync_standby_group = group; t_thrd.walsender_cxt.MyWalSnd->sync_standby_priority = priority; @@ -580,7 +582,7 @@ void SyncRepInitConfig(void) */ SyncRepCheckSyncStandbyAlive(); - LWLockRelease(SyncRepLock); + SpinLockRelease(&t_thrd.walsender_cxt.MyWalSnd->mutex); ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby group and priority: %d %d", u_sess->attr.attr_common.application_name, group, priority))); } @@ -731,27 +733,38 @@ void SetXactLastCommitToSyncedStandby(XLogRecPtr recptr) } #endif -static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys) +static SyncStandbyNumState check_sync_standbys_num(const SyncRepStandbyData* sync_standbys, int num_standbys) { + int i; + int* num_group_standbys; + const SyncRepStandbyData* stby; + + SyncStandbyNumState res = STANDBIES_ENOUGH; + + if (t_thrd.syncrep_cxt.SyncRepConfig == NULL) return STANDBIES_ENOUGH; - ListCell* lc = NULL; - List* per_group = NIL; - int gid = 0; - int alive = 0; - SyncStandbyNumState res = STANDBIES_ENOUGH; - foreach(lc, sync_standbys) { - per_group = (List*)lfirst(lc); - if (list_length(per_group) < t_thrd.syncrep_cxt.SyncRepConfig[gid]->num_sync) - res = STANDBIES_NOT_ENOUGH; + if(num_standbys == 0) { + return STANDBIES_EMPTY; + } + + num_group_standbys = (int*)palloc0(t_thrd.syncrep_cxt.SyncRepConfigGroups * sizeof(int)); - alive += list_length(per_group); - gid++; + for(i = 0; i < num_standbys; i++) { + stby = sync_standbys + i; + (*(num_group_standbys + stby->sync_standby_group))++; } - if (alive == 0) res = STANDBIES_EMPTY; + for(i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) { + if((*(num_group_standbys + stby->sync_standby_group)) < t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync) { + res = STANDBIES_NOT_ENOUGH; + break; + } + } + + pfree(num_group_standbys); return res; } @@ -766,17 +779,17 @@ static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys) static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state) { bool result = false; + SyncRepStandbyData *sync_standbys; + int num_standbys; if (!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || !u_sess->attr.attr_storage.keep_sync_window) { return result; } if (checkSyncNum) { - List *sync_standbys = NIL; - bool am_sync = false; - sync_standbys = SyncRepGetSyncStandbys(&am_sync); - state = check_sync_standbys_num(sync_standbys); - free_sync_standbys_list(sync_standbys); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); + state = check_sync_standbys_num(sync_standbys, num_standbys); + pfree(sync_standbys); } if (state == STANDBIES_ENOUGH) { @@ -810,9 +823,9 @@ static bool DelayIntoMostAvaSync(bool checkSyncNum, SyncStandbyNumState state) * the alive sync standbys, even if the quantity does not meet the configuration * requirements. */ -static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumState* state) +static bool judge_sync_standbys_num(const SyncRepStandbyData* sync_standbys, int num_standbys, SyncStandbyNumState* state) { - *state = check_sync_standbys_num(sync_standbys); + *state = check_sync_standbys_num(sync_standbys, num_standbys); if (*state == STANDBIES_ENOUGH) { DelayIntoMostAvaSync(false, STANDBIES_ENOUGH); // just for refresh keep_sync_window if needed @@ -836,7 +849,8 @@ static bool judge_sync_standbys_num(const List* sync_standbys, SyncStandbyNumSta */ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr* replayPtr, bool *am_sync, bool check_am_sync) { - List *sync_standbys = NIL; + SyncRepStandbyData *sync_standbys = NULL; + int i,num_standbys; *receivePtr = InvalidXLogRecPtr; *writePtr = InvalidXLogRecPtr; @@ -845,7 +859,17 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP *am_sync = false; /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); + + + /* Am I among the candidate sync standbys? */ + for (i = 0; i < num_standbys; i++) { + if (sync_standbys[i].is_me) { + *am_sync = true; + break; + } + } + /* * Quick exit if we are not managing a sync standby (or not check for check_am_sync is false) * or there are not enough synchronous standbys. @@ -854,8 +878,8 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP SyncStandbyNumState state; if ((!(*am_sync) && check_am_sync) || t_thrd.syncrep_cxt.SyncRepConfig == NULL || - !judge_sync_standbys_num(sync_standbys, &state)) { - free_sync_standbys_list(sync_standbys); + !judge_sync_standbys_num(sync_standbys, num_standbys, &state)) { + pfree(sync_standbys); return false; } @@ -879,22 +903,18 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP *receivePtr = *writePtr; *replayPtr = *flushPtr; } else { - int i = 0; - ListCell *lc = NULL; - foreach(lc, sync_standbys) { - List *per_group = (List*)lfirst(lc); - if (per_group == NIL) { - /* do nothing, this group of sync standbys are all offline or no sync standbys. */ - } else if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) { - SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, per_group); + for(i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) { + if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) { + SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys, num_standbys, i); } else { SyncRepGetNthLatestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, - per_group, t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync); + sync_standbys, num_standbys, i, t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync); + } - i++; + } } - free_sync_standbys_list(sync_standbys); + pfree(sync_standbys); return true; } @@ -907,17 +927,17 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP */ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTime) { - List* sync_standbys = NIL; + SyncRepStandbyData *sync_standbys; + int num_standbys; List* catchup_standbys = NIL; - bool am_sync = false; ListCell* cell = NULL; *leftTime = 0; /* Get standbys that are considered as synchronous at this moment. */ - sync_standbys = SyncRepGetSyncStandbys(&am_sync, &catchup_standbys); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys, &catchup_standbys); /* Skip here if there is at lease one sync standby, or no standby in catchup. */ - if (check_sync_standbys_num(sync_standbys) != STANDBIES_EMPTY || list_length(catchup_standbys) == 0) { - free_sync_standbys_list(sync_standbys); + if (check_sync_standbys_num(sync_standbys, num_standbys) != STANDBIES_EMPTY || list_length(catchup_standbys) == 0) { + pfree(sync_standbys); list_free(catchup_standbys); return false; } @@ -940,7 +960,7 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi } } - free_sync_standbys_list(sync_standbys); + pfree(sync_standbys); list_free(catchup_standbys); return true; } @@ -950,28 +970,30 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, - XLogRecPtr* replayPtr, List* sync_standbys) + XLogRecPtr* replayPtr, SyncRepStandbyData* sync_standbys, int num_standbys, int groupid) { - ListCell *cell = NULL; + int i; + SyncRepStandbyData* stby; + XLogRecPtr receive; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; /* * Scan through all sync standbys and calculate the oldest * Write, Flush and Apply positions. */ - foreach (cell, sync_standbys) { - WalSnd* walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr receive; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; - - SpinLockAcquire(&walsnd->mutex); - receive = walsnd->receive; - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - + for(i = 0; i < num_standbys; i++) { + stby = sync_standbys + i; + if(stby->sync_standby_group != groupid) { + continue; + } + + receive = stby->receive; + write = stby->write; + flush = stby->flush; + apply = stby->apply; + if (XLogRecPtrIsInvalid(*writePtr) || !XLByteLE(*writePtr, write)) *writePtr = write; if (XLogRecPtrIsInvalid(*flushPtr) || !XLByteLE(*flushPtr, flush)) @@ -988,49 +1010,61 @@ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* write * standbys. */ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, - XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth) + XLogRecPtr* replayPtr, SyncRepStandbyData* sync_standbys, int num_standbys, int groupid, uint8 nth) { + List *stby_list = NIL; ListCell *cell = NULL; XLogRecPtr *receive_array = NULL; XLogRecPtr *write_array = NULL; XLogRecPtr *flush_array = NULL; XLogRecPtr* apply_array = NULL; - int len; - int i = 0; + int group_len; + int i; + SyncRepStandbyData* stby; - len = list_length(sync_standbys); - receive_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); - write_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len); - foreach (cell, sync_standbys) { - WalSnd* walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[lfirst_int(cell)]; + for(i = 0; i < num_standbys; i++) { + stby = sync_standbys + i; + if(stby->sync_standby_group != groupid) { + continue; + } + stby_list = lappend_int(stby_list, i); + } - if (walsnd->is_cross_cluster) { + group_len = list_length(stby_list); + + receive_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * group_len); + write_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * group_len); + flush_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * group_len); + apply_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * group_len); + + i = 0; + foreach(cell, stby_list) { + stby = sync_standbys + lfirst_int(cell); + + if (stby->is_cross_cluster) { continue; } - SpinLockAcquire(&walsnd->mutex); - receive_array[i] = walsnd->receive; - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + + receive_array[i] = stby->receive; + write_array[i] = stby->write; + flush_array[i] = stby->flush; + apply_array[i] = stby->apply; i++; } - qsort(receive_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(receive_array, group_len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, group_len, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, group_len, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, group_len, sizeof(XLogRecPtr), cmp_lsn); /* * rewrite nth if current sync standby num < nth, when most_available_sync is true, * primary only wait the alive sync standbys if list_length(sync_standbys) doesn't satisfy num_sync in quroum. */ - if (t_thrd.walsender_cxt.WalSndCtl->most_available_sync && list_length(sync_standbys) < nth) { - nth = (uint8)list_length(sync_standbys); + if (t_thrd.walsender_cxt.WalSndCtl->most_available_sync && group_len < nth) { + nth = (uint8)group_len; } /* Get Nth latest Write, Flush, Apply positions */ @@ -1043,6 +1077,9 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr if (XLogRecPtrIsInvalid(*replayPtr) || XLByteLE(apply_array[nth - 1], *replayPtr)) *replayPtr = apply_array[nth - 1]; + + list_free(stby_list); + pfree(receive_array); receive_array = NULL; pfree(write_array); @@ -1344,261 +1381,122 @@ void SyncRepUpdateSyncStandbysDefined(void) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about walsenders that are candidates to be sync standbys. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *sync_standbys is set to a palloc'd array of structs of per-walsender data, + * and the number of valid entries (candidate sync senders) is returned. + * (This might be more or fewer than num_sync; caller must check.) */ -List *SyncRepGetSyncStandbys(bool *am_sync, List** catchup_standbys) +int SyncRepGetSyncStandbys(SyncRepStandbyData** sync_standbys, List** catchup_standbys) { - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + int num_sync = 0; + *sync_standbys = (SyncRepStandbyData *)palloc( + g_instance.attr.attr_storage.max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (t_thrd.syncrep_cxt.SyncRepConfig == NULL) - return NIL; + return 0; - List* results = NIL; for(int i = 0; i < t_thrd.syncrep_cxt.SyncRepConfigGroups; i++) { - if (t_thrd.syncrep_cxt.SyncRepConfig[i]->syncrep_method == SYNC_REP_PRIORITY) { - results = lappend(results, SyncRepGetSyncStandbysPriority(am_sync, i,catchup_standbys)); - } else { - results = lappend(results, SyncRepGetSyncStandbysQuorum(am_sync, i, catchup_standbys)); - } + num_sync += SyncRepGetSyncStandbysInGroup(sync_standbys, i,catchup_standbys); } - return results; + return num_sync; } -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, int groupid, List** catchup_standbys) + +static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int groupid, List** catchup_standbys) { - List *result = NIL; int i; + int num_sync = 0; /* how many sync standbys in current group */ volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */ - - Assert(t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_QUORUM); + SyncRepStandbyData *stby = NULL; + + /* state/peer_state/peer_role is not included in SyncRepStandbyData */ + WalSndState state; + DbState peer_state; + ServerMode peer_role; for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) { walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[i]; + stby = *sync_standbys + num_sync; + + SpinLockAcquire(&walsnd->mutex); + stby->pid = walsnd->pid; + stby->lwpId = walsnd->lwpId; + state = walsnd->state; + peer_state = walsnd->peer_state; + peer_role = walsnd->peer_role; + stby->receive = walsnd->receive; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; + stby->sync_standby_group = walsnd->sync_standby_group; + stby->is_cross_cluster = walsnd->is_cross_cluster; + SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (walsnd->pid == 0) + if (stby->pid == 0) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0 || walsnd->sync_standby_group != groupid) + if (stby->sync_standby_priority == 0 || stby->sync_standby_group != groupid) continue; - if ((walsnd->state == WALSNDSTATE_CATCHUP || walsnd->peer_state == CATCHUP_STATE) && + if ((state == WALSNDSTATE_CATCHUP || peer_state == CATCHUP_STATE) && catchup_standbys != NULL) { *catchup_standbys = lappend_int(*catchup_standbys, i); } /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; - if (walsnd->peer_role == STANDBY_CLUSTER_MODE) { + if (t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_QUORUM && peer_role == STANDBY_CLUSTER_MODE) { continue; } - /* - * Consider this standby as a candidate for quorum sync standbys - * and append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == t_thrd.walsender_cxt.MyWalSnd) - *am_sync = true; + stby->walsnd_index = i; + stby->is_me = (walsnd == t_thrd.walsender_cxt.MyWalSnd); + num_sync++; } - return result; + if (t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_PRIORITY && + num_sync > t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { + /* Sort by priority ... */ + qsort(*sync_standbys, num_sync, sizeof(SyncRepStandbyData), + standby_priority_comparator); + /* ... then report just the first num_sync ones */ + num_sync = t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync; + } + + return num_sync; } /* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * qsort comparator to sort SyncRepStandbyData entries by priority */ -static List *SyncRepGetSyncStandbysPriority(bool *am_sync, int groupid, List** catchup_standbys) +static int +standby_priority_comparator(const void *a, const void *b) { - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; - int i; - bool am_in_pending = false; - volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */ - - Assert(t_thrd.syncrep_cxt.SyncRepConfig[groupid]->syncrep_method == SYNC_REP_PRIORITY); - - lowest_priority = t_thrd.syncrep_cxt.SyncRepConfig[groupid]->nmembers; - next_highest_priority = lowest_priority + 1; - - /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. - */ - for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) { - walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[i]; - - /* Must be active */ - if (walsnd->pid == 0) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0 || walsnd->sync_standby_group != groupid) - continue; - - if ((walsnd->state == WALSNDSTATE_CATCHUP || walsnd->peer_state == CATCHUP_STATE) && - catchup_standbys != NULL) { - *catchup_standbys = lappend_int(*catchup_standbys, i); - } - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) - continue; - - /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) - continue; - - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == t_thrd.walsender_cxt.MyWalSnd) - *am_sync = true; - if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } else { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == t_thrd.walsender_cxt.MyWalSnd) - am_in_pending = true; - - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } - - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { - bool needfree = (result != NIL && pending != NIL); + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; - - result = list_concat(result, pending); - if (needfree) { - pfree(pending); - pending = NULL; - } - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) { - ListCell *cell = NULL; - ListCell *prev = NULL; - ListCell *next = NULL; - - next_highest_priority = lowest_priority + 1; - - for (cell = list_head(pending); cell != NULL; cell = next) { - i = lfirst_int(cell); - walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[i]; - - next = lnext(cell); - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == t_thrd.walsender_cxt.MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig[groupid]->num_sync. - */ - if (list_length(result) == t_thrd.syncrep_cxt.SyncRepConfig[groupid]->num_sync) { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = list_delete_cell(pending, cell, prev); - - continue; - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - - prev = cell; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } /* diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 93b0d7a6e9606216bc4a448ae86b97906fc0c310..ebbd7e6c37374ec33990406c4cd844745c7d9256 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -3030,7 +3030,7 @@ static void ProcessStandbySwitchRequestMessage(void) t_thrd.walsender_cxt.Demotion != t_thrd.walsender_cxt.WalSndCtl->demotion) { SpinLockRelease(&t_thrd.walsender_cxt.WalSndCtl->mutex); ereport(NOTICE, (errmsg("master is doing switchover,\ - probably another standby already requested switchover."))); + probably another standby already requested switchover."))); return; } else if (message.demoteMode <= t_thrd.walsender_cxt.Demotion) { SpinLockRelease(&t_thrd.walsender_cxt.WalSndCtl->mutex); @@ -6097,8 +6097,8 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int *sync_priority = NULL; int i = 0; volatile HaShmemData *hashmdata = t_thrd.postmaster_cxt.HaShmData; - List *sync_standbys = NIL; - ListCell *lc = NULL; + SyncRepStandbyData *sync_standbys; + int num_standbys; Tuplestorestate *tupstore = BuildTupleResult(fcinfo, &tupdesc); @@ -6125,11 +6125,10 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) } /* - * Get the currently active synchronous standbys. + * Get the currently active synchronous standbys.This could be out of + * date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetSyncStandbys(&sync_standbys); for (i = 0; i < g_instance.attr.attr_storage.max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ @@ -6155,11 +6154,14 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; int j = 0; + int k = 0; errno_t rc = 0; int ret = 0; int group = 0; int priority = 0; + bool is_sync_standby = false; + SpinLockAcquire(&hashmdata->mutex); local_role = hashmdata->current_mode; if (walsnd->pid == 0 || walsnd->lwpId == 0) { @@ -6207,6 +6209,18 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) AlignFreeShareStorageCtl(ctlInfo); } + /* + * if the walsener's pid has changed,we consider is is not a sync standby + */ + for(k = 0; k < num_standbys; k++) { + if(sync_standbys[k].walsnd_index == i + && sync_standbys[k].pid == walsnd->pid + && sync_standbys[k].lwpId == walsnd->lwpId) { + is_sync_standby = true; + break; + } + } + rc = memset_s(nulls, sizeof(nulls), 0, sizeof(nulls)); securec_check(rc, "\0", "\0"); @@ -6352,7 +6366,7 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) { values[j++] = CStringGetTextDatum("Async"); - } else if (list_member_int((List*)list_nth(sync_standbys, group), i)) { + } else if (is_sync_standby) { values[j++] = GetWalsndSyncRepConfig(walsnd)->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("Sync") : CStringGetTextDatum("Quorum"); @@ -6381,10 +6395,11 @@ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); } - foreach(lc, sync_standbys) { - list_free((List*)lfirst(lc)); + if (sync_standbys != NULL) { + pfree(sync_standbys); + sync_standbys = NULL; } - list_free(sync_standbys); + if (sync_priority != NULL) { pfree(sync_priority); sync_priority = NULL; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 13bc75455465e8168b871ed74a74f528ff5378c9..a6ad5899689a2003a25b4538a8bf227803ab6eb3 100755 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -49,6 +49,30 @@ extern volatile bool most_available_sync; #define GetWalsndSyncRepConfig(walsnder) \ (t_thrd.syncrep_cxt.SyncRepConfig[(walsnder)->sync_standby_group]) + +/* + * SyncRepGetCandidateStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + ThreadId pid; + int lwpId; + XLogRecPtr receive; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + uint8 sync_standby_group; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; + bool is_cross_cluster; +} SyncRepStandbyData; + + /* * Struct for the configuration of synchronous replication. * @@ -96,7 +120,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void); extern void SyncRepCheckSyncStandbyAlive(void); /* called by wal sender and user backend */ -extern List* SyncRepGetSyncStandbys(bool* am_sync, List** catchup_standbys = NULL); +extern int SyncRepGetSyncStandbys(SyncRepStandbyData** sync_standbys, List** catchup_standbys = NULL); extern bool check_synchronous_standby_names(char** newval, void** extra, GucSource source); extern void assign_synchronous_standby_names(const char* newval, void* extra);