diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index 45c289ada374a1ce52955e8956086e09d5f345c7..7a607f672d5782dc50c0364ee391378e4e29bfcb 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -89,7 +89,6 @@ const uint32 TIMESCALE_DB_VERSION_NUM = 92904; const uint32 MULTI_CHARSET_VERSION_NUM = 92903; const uint32 NBTREE_INSERT_OPTIMIZATION_VERSION_NUM = 92902; const uint32 NBTREE_DEDUPLICATION_VERSION_NUM = 92902; -const uint32 ONDEMAND_REDO_VERSION_NUM = 92901; const uint32 SRF_FUSION_VERSION_NUM = 92847; const uint32 INDEX_HINT_VERSION_NUM = 92845; const uint32 INNER_UNIQUE_VERSION_NUM = 92845; diff --git a/src/common/backend/utils/init/miscinit.cpp b/src/common/backend/utils/init/miscinit.cpp index 329706f92cbc89fb8890d2deb94be52c1cdded71..bd2e1763a99604159187370ac2733654f33d1e89 100644 --- a/src/common/backend/utils/init/miscinit.cpp +++ b/src/common/backend/utils/init/miscinit.cpp @@ -2040,17 +2040,6 @@ void register_backend_version(uint32 backend_version){ } } -void SSUpgradeFileBeforeCommit() -{ - // upgrade reform control file - if (pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) { - if (SS_PRIMARY_MODE) { - SSReadControlFile(REFORM_CTRL_PAGE); - SSSaveReformerCtrl(true); - } - } -} - /* * Check whether the version contains the backend_version parameter. */ diff --git a/src/common/backend/utils/misc/guc-file.l b/src/common/backend/utils/misc/guc-file.l index fc79beb87c8a385189aeec4e88f0c4ca4ede994a..3a7096b9cab2c05ef40b5ccc899b62bcb475f7a7 100644 --- a/src/common/backend/utils/misc/guc-file.l +++ b/src/common/backend/utils/misc/guc-file.l @@ -330,7 +330,6 @@ ProcessConfigFile(GucContext context) case MASTER_THREAD: { if (strcmp(item->name, "upgrade_mode") == 0) { if (strcmp(pre_value, "0") != 0 && strcmp(post_value, "0") == 0) { - SSUpgradeFileBeforeCommit(); pg_atomic_write_u32(&WorkingGrandVersionNum, GRAND_VERSION_NUM); } } diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 1ee3c7c7fecdd27ebd84044505bac3ed3c4817fc..bc44a8843a0010177251a6f756f2c06cd88e367d 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -215,7 +215,6 @@ static bool check_ss_rdma_work_config(char** newval, void** extra, GucSource sou static bool check_ss_dss_vg_name(char** newval, void** extra, GucSource source); static bool check_ss_dss_conn_path(char** newval, void** extra, GucSource source); static bool check_ss_enable_ssl(bool* newval, void** extra, GucSource source); -static bool check_ss_enable_ondemand_recovery(bool* newval, void** extra, GucSource source); static bool check_normal_cluster_replication_config_para(char** newval, void** extra, GucSource source); static bool check_ss_cluster_replication_control_para(bool* newval, void** extra, GucSource source); @@ -1054,7 +1053,7 @@ static void InitStorageConfigureNamesBool() GUC_SUPERUSER_ONLY}, &g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery, false, - check_ss_enable_ondemand_recovery, + NULL, NULL, NULL}, @@ -6320,17 +6319,6 @@ static bool check_ss_enable_ssl(bool *newval, void **extra, GucSource source) return true; } -static bool check_ss_enable_ondemand_recovery(bool* newval, void** extra, GucSource source) -{ - if (*newval) { - if (pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) { - ereport(ERROR, (errmsg("Do not allow enable ondemand_recovery if openGauss run in old version."))); - return false; - } - } - return true; -} - #ifdef USE_ASSERT_CHECKING static void assign_ss_enable_verify_page(bool newval, void *extra) { diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 816195ea19bec5f203926c156ffad657b3c1140d..d9df56c34551372ae0e01fda30356e5a0c06c662 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -484,16 +484,15 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo } // standby node must notify primary node for prepare lastest page in ondemand recovery - if (SS_STANDBY_ONDEMAND_RECOVERY) { - while (!SSOndemandRequestPrimaryRedo(buf_desc->tag)) { - SSReadControlFile(REFORM_CTRL_PAGE); - if (SS_STANDBY_ONDEMAND_NORMAL) { - break; // ondemand recovery finish, skip - } else if (SS_STANDBY_ONDEMAND_BUILD) { - return 0; // in new reform - } - // still need requset page + while (SS_STANDBY_ONDEMAND_NOT_NORMAL) { + /* in new reform */ + if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) { + return 0; + } + if (SSOndemandRequestPrimaryRedo(buf_desc->tag)) { + break; } + SSReadControlFile(REFORM_CTRL_PAGE); } if (!StartReadPage(buf_desc, mode)) { @@ -507,7 +506,9 @@ bool SSOndemandRequestPrimaryRedo(BufferTag tag) dms_context_t dms_ctx; int32 redo_status = ONDEMAND_REDO_TIMEOUT; - if (!SS_STANDBY_ONDEMAND_RECOVERY) { + if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) { + return false; + } else if (SS_STANDBY_ONDEMAND_NORMAL) { return true; } diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index d371ec6ad0826f7ad8ddcfd7ffb6771610509ca5..0c23b9214f00daa1c4635ca03bb99e09ba859950 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -430,8 +430,6 @@ static void CBSwitchoverResult(void *db_handle, int result) static int SetPrimaryIdOnStandby(int primary_id) { - g_instance.dms_cxt.SSReformerControl.primaryInstId = primary_id; - for (int ntries = 0;; ntries++) { SSReadControlFile(REFORM_CTRL_PAGE); /* need to double check */ if (g_instance.dms_cxt.SSReformerControl.primaryInstId == primary_id) { @@ -462,6 +460,7 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns unsigned long long list_in, unsigned int save_ctrl) { int primary_id = (int)reformer_id; + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.primaryInstId = primary_id; g_instance.dms_cxt.SSReformerControl.list_stable = list_stable; int ret = DMS_ERROR; @@ -473,7 +472,8 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns Assert(g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_PROMOTED || g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_FAILOVER_PROMOTING); } - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); Assert(g_instance.dms_cxt.SSReformerControl.primaryInstId == (int)primary_id); ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS %s] set current instance:%d as primary.", SS_PERFORMING_SWITCHOVER ? "switchover" : "reform", primary_id))); @@ -483,6 +483,7 @@ static int CBSaveStableList(void *db_handle, unsigned long long list_stable, uns } ret = DMS_SUCCESS; } else { /* we are on standby */ + LWLockRelease(ControlFileLock); ret = SetPrimaryIdOnStandby(primary_id); } return ret; @@ -1116,6 +1117,9 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ case BCAST_SEND_SNAPSHOT: ret = SSUpdateLatestSnapshotOfStandby(data, len); break; + case BCAST_RELOAD_REFORM_CTRL_PAGE: + ret = SSReloadReformCtrlPage(len); + break; default: ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid broadcast operate type"))); ret = DMS_ERROR; @@ -1863,7 +1867,6 @@ static int CBReformDoneNotify(void *db_handle) g_instance.dms_cxt.SSRecoveryInfo.startup_reform = false; g_instance.dms_cxt.SSRecoveryInfo.restart_failover_flag = false; g_instance.dms_cxt.SSRecoveryInfo.failover_ckpt_status = NOT_ACTIVE; - SSReadControlFile(REFORM_CTRL_PAGE); Assert(g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy == false); g_instance.dms_cxt.SSReformInfo.new_bitmap = g_instance.dms_cxt.SSReformerControl.list_stable; ereport(LOG, (errmsg("[SS reform] new cluster node bitmap: %lld", g_instance.dms_cxt.SSReformInfo.new_bitmap))); diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index f64bc60db265c6ec153417212853799d4d1b77a1..d965542cee298b85f78784f1b643c2230fb599be 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -51,8 +51,10 @@ int SSGetPrimaryInstId() void SSSavePrimaryInstId(int id) { + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.primaryInstId = id; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); } void SSWaitStartupExit() diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 38d411a29e74702a1660bf4529b85ac66394c788..1871fceaff5e6203475661f08fd62854741ba0cf 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -249,120 +249,13 @@ void SSDoradoGetInstidList() closedir(dssdir); } -static void SSSaveOldReformerCtrl() -{ - ss_reformer_ctrl_t new_ctrl = g_instance.dms_cxt.SSReformerControl; - ss_old_reformer_ctrl_t old_ctrl = {new_ctrl.list_stable, new_ctrl.primaryInstId, new_ctrl.crc}; - - int len = sizeof(ss_old_reformer_ctrl_t); - int write_size = (int)BUFFERALIGN(len); - char buffer[write_size] __attribute__((__aligned__(ALIGNOF_BUFFER))) = { 0 }; - char *fname[2]; - int fd = -1; - - errno_t err = memcpy_s(&buffer, write_size, &old_ctrl, len); - securec_check(err, "\0", "\0"); - - INIT_CRC32C(((ss_old_reformer_ctrl_t *)buffer)->crc); - COMP_CRC32C(((ss_old_reformer_ctrl_t *)buffer)->crc, (char *)buffer, offsetof(ss_old_reformer_ctrl_t, crc)); - FIN_CRC32C(((ss_old_reformer_ctrl_t *)buffer)->crc); - - fname[0] = XLOG_CONTROL_FILE_BAK; - fname[1] = XLOG_CONTROL_FILE; - - for (int i = 0; i < BAK_CTRL_FILE_NUM; i++) { - if (i == 0) { - fd = BasicOpenFile(fname[i], O_CREAT | O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); - } else { - fd = BasicOpenFile(fname[i], O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); - } - - if (fd < 0) { - ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname[i]))); - } - - SSWriteInstanceControlFile(fd, buffer, REFORM_CTRL_PAGE, write_size); - if (close(fd)) { - ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); - } - } -} - -static bool SSReadOldReformerCtrl() -{ - ss_reformer_ctrl_t *new_ctrl = &g_instance.dms_cxt.SSReformerControl; - ss_old_reformer_ctrl_t old_ctrl; - pg_crc32c crc; - int fd = -1; - bool retry = false; - char *fname = XLOG_CONTROL_FILE; - -loop: - fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) { - ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname))); - } - - off_t seekpos = (off_t)BLCKSZ * REFORM_CTRL_PAGE; - int len = sizeof(ss_old_reformer_ctrl_t); - - int read_size = (int)BUFFERALIGN(len); - char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER))); - if (pread(fd, buffer, read_size, seekpos) != read_size) { - ereport(PANIC, (errcode_for_file_access(), errmsg("could not read from control file: %m"))); - } - - errno_t rc = memcpy_s(&old_ctrl, len, buffer, len); - securec_check(rc, "", ""); - if (close(fd) < 0) { - ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); - } - - /* Now check the CRC. */ - INIT_CRC32C(crc); - COMP_CRC32C(crc, (char *)&old_ctrl, offsetof(ss_old_reformer_ctrl_t, crc)); - FIN_CRC32C(crc); - - if (!EQ_CRC32C(crc, old_ctrl.crc)) { - if (retry == false) { - ereport(WARNING, - (errmsg("control file \"%s\" contains incorrect checksum in upgrade mode, try backup file", fname))); - fname = XLOG_CONTROL_FILE_BAK; - retry = true; - goto loop; - } else { - ereport(WARNING, - (errmsg("backup control file \"%s\" contains incorrect checksum in upgrade mode, " - "try again in post-upgrade mode", fname))); - return false; - } - } - - // new params set to initial value - new_ctrl->version = REFORM_CTRL_VERSION; - new_ctrl->recoveryInstId = INVALID_INSTANCEID; - new_ctrl->clusterStatus = CLUSTER_NORMAL; - - // exist param inherit - new_ctrl->primaryInstId = old_ctrl.primaryInstId; - new_ctrl->list_stable = old_ctrl.list_stable; - new_ctrl->crc = old_ctrl.crc; - - return true; -} - -void SSSaveReformerCtrl(bool force) +void SSUpdateReformerCtrl() { int fd = -1; int len; errno_t err = EOK; char *fname[2]; - if ((pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) && !force) { - SSSaveOldReformerCtrl(); - return; - } - len = sizeof(ss_reformer_ctrl_t); int write_size = (int)BUFFERALIGN(len); char buffer[write_size] __attribute__((__aligned__(ALIGNOF_BUFFER))) = { 0 }; @@ -405,24 +298,12 @@ void SSReadControlFile(int id, bool updateDmsCtx) int read_size = 0; int len = 0; fname = XLOG_CONTROL_FILE; - - if ((pg_atomic_read_u32(&WorkingGrandVersionNum) < ONDEMAND_REDO_VERSION_NUM) && (id == REFORM_CTRL_PAGE)) { - if (SSReadOldReformerCtrl()) { - return; - } - - // maybe primary node already upgrade pg_control file, sleep and try read in lastest mode again - if (SS_STANDBY_MODE) { - pg_usleep(5000000); /* 5 sec */ - goto loop; - } else { - ereport(PANIC, (errmsg("incorrect checksum in control file"))); - } - } + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); loop: fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) { + LWLockRelease(ControlFileLock); ereport(FATAL, (errcode_for_file_access(), errmsg("could not open control file \"%s\": %m", fname))); } @@ -437,6 +318,7 @@ loop: read_size = (int)BUFFERALIGN(len); char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER))); if (pread(fd, buffer, read_size, seekpos) != read_size) { + LWLockRelease(ControlFileLock); ereport(PANIC, (errcode_for_file_access(), errmsg("could not read from control file: %m"))); } @@ -444,6 +326,7 @@ loop: rc = memcpy_s(&g_instance.dms_cxt.SSReformerControl, len, buffer, len); securec_check(rc, "", ""); if (close(fd) < 0) { + LWLockRelease(ControlFileLock); ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); } @@ -459,9 +342,11 @@ loop: retry = true; goto loop; } else { + LWLockRelease(ControlFileLock); ereport(FATAL, (errmsg("incorrect checksum in control file"))); } } + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= g_instance.dms_cxt.SSReformerControl.clusterStatus; } else { ControlFileData* controlFile = NULL; ControlFileData tempControlFile; @@ -474,6 +359,7 @@ loop: rc = memcpy_s(controlFile, (size_t)len, buffer, (size_t)len); securec_check(rc, "", ""); if (close(fd) < 0) { + LWLockRelease(ControlFileLock); ereport(PANIC, (errcode_for_file_access(), errmsg("could not close control file: %m"))); } @@ -489,6 +375,7 @@ loop: retry = true; goto loop; } else { + LWLockRelease(ControlFileLock); ereport(FATAL, (errmsg("incorrect checksum in control file"))); } } @@ -497,6 +384,7 @@ loop: g_instance.dms_cxt.ckptRedo = controlFile->checkPointCopy.redo; } } + LWLockRelease(ControlFileLock); } void SSClearSegCache() diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index 642ebbe0be95a81a9b652488dca4d78713c22072..247cfee09c805cd39a2db8e45ca6eb979342bf70 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -28,6 +28,7 @@ #include "storage/buf/bufmgr.h" #include "storage/smgr/segment_internal.h" #include "ddes/dms/ss_transaction.h" +#include "ddes/dms/ss_reform_common.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/sinvaladt.h" #include "replication/libpqsw.h" @@ -502,6 +503,16 @@ void SSIsPageHitDms(RelFileNode& node, BlockNumber page, int pagesNum, uint64 *p ereport(DEBUG1, (errmsg("SS get page map success, buffer_id = %u.", page))); } +int SSReloadReformCtrlPage(uint32 len) +{ + if (unlikely(len != sizeof(SSBroadcastCmdOnly))) { + return DMS_ERROR; + } + + SSReadControlFile(REFORM_CTRL_PAGE); + return DMS_SUCCESS; +} + int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len) { if (unlikely(len != sizeof(SSBroadcastDbBackends))) { @@ -553,6 +564,24 @@ bool SSCheckDbBackendsFromAllStandby(Oid dbid) return false; } +void SSRequestAllStandbyReloadReformCtrlPage() +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + SSBroadcastCmdOnly ssmsg; + ssmsg.type = BCAST_RELOAD_REFORM_CTRL_PAGE; + do { + ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly), + (unsigned char)false, SS_BROADCAST_WAIT_ONE_SECOND); + + if (ret == DMS_SUCCESS) { + return; + } + pg_usleep(5000L); + } while (ret != DMS_SUCCESS); +} + void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) { dms_context_t dms_ctx; diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 2a4e68afb36e62f81941a1e4e9dc65f42de036ff..535cee05a37d3d4e809eb01181f0b6d3eff3fb0d 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -10029,8 +10029,10 @@ static void sigusr1_handler(SIGNAL_ARGS) ereport(LOG, (errmsg("Failover between two dorado cluster start, change current run mode to primary_cluster"))); g_instance.attr.attr_common.cluster_run_mode = RUN_MODE_PRIMARY; + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterRunMode = RUN_MODE_PRIMARY; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); t_thrd.xlog_cxt.server_mode = PRIMARY_MODE; SetHaShmemData(); } diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 518e71f7f19e0a67be9f691607de7b8c0787ca91..dd85b265926824289f2a0c1d1a3779b2c680948c 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -184,6 +184,7 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) dms_cxt->SSReformInfo.redo_total_bytes = 0; dms_cxt->SSClusterState = NODESTATE_NORMAL; dms_cxt->SSRecoveryInfo.recovery_inst_id = INVALID_INSTANCEID; + dms_cxt->SSRecoveryInfo.cluster_ondemand_status= CLUSTER_NORMAL; dms_cxt->SSRecoveryInfo.recovery_pause_flag = true; dms_cxt->SSRecoveryInfo.failover_ckpt_status = NOT_ACTIVE; dms_cxt->SSRecoveryInfo.new_primary_reset_walbuf_flag = false; diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index 9e8e0336c016d409e4e38912fb0bd5817471e7c7..a19a0a37496df1e6f674571ca9b489f3bb77b181 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -78,6 +78,7 @@ #include "utils/atomic.h" #include "pgstat.h" #include "ddes/dms/ss_reform_common.h" +#include "ddes/dms/ss_transaction.h" #ifdef PGXC #include "pgxc/pgxc.h" @@ -1908,14 +1909,18 @@ void WaitRedoFinish() pmState = PM_RUN; write_stderr_with_prefix("[On-demand] LOG: database system is ready to accept connections"); + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_IN_ONDEMAND_REDO; + /* for other nodes in cluster */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_REDO; + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); + SpinLockAcquire(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone = true; SpinLockRelease(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); - /* for other nodes in cluster */ - g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_REDO; - SSSaveReformerCtrl(); - #ifdef USE_ASSERT_CHECKING XLogRecPtr minStart = MAX_XLOG_REC_PTR; XLogRecPtr minEnd = MAX_XLOG_REC_PTR; diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 30d497a054f535b1034abdd16391f993cf8d6dc6..9f586fa90fb3ad85fa6732236aff8a7ca49c2b8d 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -155,6 +155,7 @@ #include "vectorsonic/vsonichash.h" #include "ddes/dms/ss_reform_common.h" +#include "ddes/dms/ss_transaction.h" #include "ddes/dms/ss_dms_recovery.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/file/fio_device.h" @@ -9630,10 +9631,14 @@ void StartupXLOG(void) t_thrd.xlog_cxt.InRecovery == true) { if (SSOndemandRecoveryExitNormal) { g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = true; + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_IN_ONDEMAND_BUILD; /* for other nodes in cluster and ondeamnd recovery failed */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_BUILD; g_instance.dms_cxt.SSReformerControl.recoveryInstId = g_instance.dms_cxt.SSRecoveryInfo.recovery_inst_id; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); SetOndemandExtremeRtoMode(); ereport(LOG, (errmsg("[On-demand] replayed in extreme rto ondemand recovery mode"))); } else { @@ -9643,8 +9648,11 @@ void StartupXLOG(void) } if (SS_PRIMARY_MODE || SS_REPLICATION_MAIN_STANBY_NODE) { - g_instance.dms_cxt.SSReformerControl.clusterRunMode = (ClusterRunMode)g_instance.attr.attr_common.cluster_run_mode; - SSSaveReformerCtrl(); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + g_instance.dms_cxt.SSReformerControl.clusterRunMode = + (ClusterRunMode)g_instance.attr.attr_common.cluster_run_mode; + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); } ReadRemainSegsFile(); @@ -10206,8 +10214,12 @@ void StartupXLOG(void) ereport(LOG, (errmsg("redo is not required"))); if (SS_IN_ONDEMAND_RECOVERY) { g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = false; + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_NORMAL; + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_NORMAL; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); } } } @@ -10695,9 +10707,13 @@ void StartupXLOG(void) } if (SS_PRIMARY_MODE) { + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status= CLUSTER_NORMAL; /* for other nodes in cluster */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_NORMAL; - SSSaveReformerCtrl(); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); } ereport(LOG, (errmsg("redo done, nextXid: " XID_FMT ", startupMaxXid: " XID_FMT ", recentLocalXmin: " XID_FMT diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 65601d55a184356d42804f3705b18987655115d9..65ba79f4d3556f7f8045231a2f583719c1625056 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2423,16 +2423,15 @@ found_branch: /* DMS: Try get page remote */ if (ENABLE_DMS) { // standby node must notify primary node for prepare lastest page in ondemand recovery - if (SS_STANDBY_ONDEMAND_RECOVERY) { - while (!SSOndemandRequestPrimaryRedo(bufHdr->tag)) { - SSReadControlFile(REFORM_CTRL_PAGE); - if (SS_STANDBY_ONDEMAND_NORMAL) { - break; // ondemand recovery finish, skip - } else if (SS_STANDBY_ONDEMAND_BUILD) { - return 0; // in new reform - } - // still need requset page + while (SS_STANDBY_ONDEMAND_NOT_NORMAL) { + /* in new reform */ + if (unlikely(SS_STANDBY_ONDEMAND_BUILD)) { + return 0; + } + if (SSOndemandRequestPrimaryRedo(bufHdr->tag)) { + break; } + SSReadControlFile(REFORM_CTRL_PAGE); } MarkReadHint(bufHdr->buf_id, relpersistence, isExtend, pblk); if (mode != RBM_FOR_REMOTE && relpersistence != RELPERSISTENCE_TEMP && !isLocalBuf) { diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 92479483815865301e82f9eb9472360d7a6304c4..eb2588efead6fa1d7e01ae3f20ac5049231f4c3b 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -99,16 +99,17 @@ /* Mode in dorado hyperreplication and dms enabled as follow */ #define SS_CLUSTER_ONDEMAND_NOT_NORAML \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus != CLUSTER_NORMAL)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status!= CLUSTER_NORMAL)) #define SS_CLUSTER_ONDEMAND_BUILD \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_BUILD)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status== CLUSTER_IN_ONDEMAND_BUILD)) #define SS_CLUSTER_ONDEMAND_RECOVERY \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_REDO)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status== CLUSTER_IN_ONDEMAND_REDO)) #define SS_CLUSTER_ONDEMAND_NORMAL \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_NORMAL)) + (ENABLE_DMS && (g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status== CLUSTER_NORMAL)) #define SS_STANDBY_ONDEMAND_BUILD (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_BUILD) #define SS_STANDBY_ONDEMAND_RECOVERY (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_RECOVERY) #define SS_STANDBY_ONDEMAND_NORMAL (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_NORMAL) +#define SS_STANDBY_ONDEMAND_NOT_NORMAL (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_NOT_NORAML) /* DMS_BUF_NEED_LOAD */ #define BUF_NEED_LOAD 0x1 @@ -164,6 +165,7 @@ typedef enum SSBroadcastOp { BCAST_DDLLOCKRELEASE_ALL, BCAST_CHECK_DB_BACKENDS, BCAST_SEND_SNAPSHOT, + BCAST_RELOAD_REFORM_CTRL_PAGE, BCAST_END } SSBroadcastOp; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index ba4caf5e54a1db38442f1267f4ba9970466a0ca6..7f2a1321d58d13344a07a18b2cd51aad7db42066 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -88,6 +88,7 @@ typedef struct ss_recovery_info { volatile failover_ckpt_status_t failover_ckpt_status; char recovery_xlog_dir[MAXPGPATH]; int recovery_inst_id; + volatile SSGlobalClusterState cluster_ondemand_status; int instid_list[DMS_MAX_INSTANCE]; LWLock* update_seg_lock; bool new_primary_reset_walbuf_flag; diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index a11429d1dc8184d461073a49c4551471008c60c6..ec517364851eb5008dfa006aba20c9187af09ae2 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -44,7 +44,7 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize); void SSGetRecoveryXlogPath(); void SSDoradoGetInstidList(); -void SSSaveReformerCtrl(bool force = false); +void SSUpdateReformerCtrl(); void SSReadControlFile(int id, bool updateDmsCtx = false); void SSClearSegCache(); int SSCancelTransactionOfAllStandby(SSBroadcastOp type); diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index 723da9f9234d5c0b5da5334eaee8b23b5b343c0c..dabe4fecf168491b809a4f0487f781c49dcbc914 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -122,5 +122,7 @@ bool SSCheckDbBackendsFromAllStandby(Oid dbid); void SSStandbyUpdateRedirectInfo(); void SSSendLatestSnapshotToStandby(TransactionId xmin, TransactionId xmax, CommitSeqNo csn); int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len); +int SSReloadReformCtrlPage(uint32 len); +void SSRequestAllStandbyReloadReformCtrlPage(); #endif diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index abe48258f369469f9176744fd0fc3e5c77eec225..dd1de5b577c0d3792da68c77d231e54df5910197 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -42,7 +42,6 @@ extern const uint32 PARAM_MARK_VERSION_NUM; extern const uint32 TIMESCALE_DB_VERSION_NUM; extern const uint32 NBTREE_INSERT_OPTIMIZATION_VERSION_NUM; extern const uint32 NBTREE_DEDUPLICATION_VERSION_NUM; -extern const uint32 ONDEMAND_REDO_VERSION_NUM; extern const uint32 MULTI_CHARSET_VERSION_NUM; extern const uint32 SRF_FUSION_VERSION_NUM; extern const uint32 INNER_UNIQUE_VERSION_NUM; @@ -139,7 +138,6 @@ extern const uint32 GB18030_2022_VERSION_NUM; extern void register_backend_version(uint32 backend_version); extern bool contain_backend_version(uint32 version_number); -extern void SSUpgradeFileBeforeCommit(); #define INPLACE_UPGRADE_PRECOMMIT_VERSION 1