From 73439a96aa39dc4262f992636f201a1788bd357c Mon Sep 17 00:00:00 2001 From: chenzhikai <895543892@qq.com> Date: Tue, 13 Jun 2023 16:13:34 +0800 Subject: [PATCH] dorado standby cluster failover --- src/bin/pg_ctl/pg_ctl.cpp | 191 +++++++- .../ddes/adapter/ss_dms_callback.cpp | 5 + .../ddes/adapter/ss_dms_recovery.cpp | 2 +- .../ddes/adapter/ss_reform_common.cpp | 108 +---- .../process/postmaster/checkpointer.cpp | 3 +- .../process/postmaster/postmaster.cpp | 21 + .../storage/access/transam/xlog.cpp | 451 +++++++++++++++++- src/include/access/xlog.h | 8 + src/include/ddes/dms/ss_reform_common.h | 5 +- 9 files changed, 664 insertions(+), 130 deletions(-) diff --git a/src/bin/pg_ctl/pg_ctl.cpp b/src/bin/pg_ctl/pg_ctl.cpp index e2dff5607a..3d72a582f9 100755 --- a/src/bin/pg_ctl/pg_ctl.cpp +++ b/src/bin/pg_ctl/pg_ctl.cpp @@ -220,6 +220,9 @@ static char remove_member_file[MAXPGPATH]; static char change_role_file[MAXPGPATH]; static char* new_role = "passive"; static char start_minority_file[MAXPGPATH]; +static int get_instance_id(void); +static int ss_get_primary_id(void); +static bool ss_read_dorado_config(void); static unsigned int vote_num = 0; static unsigned int xmode = 2; static char postport_lock_file[MAXPGPATH]; @@ -2579,6 +2582,8 @@ static void do_switchover(uint32 term) int ret; char term_path[MAXPGPATH]; + bool is_cluster_standby = ss_read_dorado_config(); + pg_log(PG_WARNING, _("switchover term (%u)\n"), term); ret = snprintf_s(term_path, MAXPGPATH, MAXPGPATH - 1, "%s/term_file", pg_data); @@ -2618,7 +2623,10 @@ static void do_switchover(uint32 term) } origin_run_mode = run_mode = get_runmode(); - if (run_mode == PRIMARY_MODE) { + instance_config.dss.instance_id = get_instance_id(); + + if ((run_mode == PRIMARY_MODE && !is_cluster_standby) || + (instance_config.dss.instance_id == ss_get_primary_id() && is_cluster_standby)) { pg_log(PG_WARNING, _("switchover completed (%s)\n"), pg_data); return; } else if (UNKNOWN_MODE == run_mode) { @@ -2708,24 +2716,38 @@ static void do_switchover(uint32 term) exit(1); } } - if ((run_mode = get_runmode()) == origin_run_mode) { - pg_log(PG_PRINT, "."); - pg_usleep(1000000); /* 1 sec */ - } - /* - * we query the status of server, if connection is failed, it will - * retry 3 times. - */ - else if (failed_count < 3) { - failed_count++; - pg_log(PG_PRINT, "."); - pg_usleep(1000000); /* 1 sec */ + /* + * in share storage dorado cluster,the server mode of standby cluster not have primary + * we can determine whether it is completed by reading the node ID and primaryid + */ + if (is_cluster_standby) { + if (instance_config.dss.instance_id != ss_get_primary_id()) { + pg_log(PG_PRINT, "."); + pg_usleep(1000000); /* 1 sec */ + } else { + break; + } } else { - break; + if ((run_mode = get_runmode()) == origin_run_mode) { + pg_log(PG_PRINT, "."); + pg_usleep(1000000); /* 1 sec */ + } + /* + * we query the status of server, if connection is failed, it will + * retry 3 times. + */ + else if (failed_count < 3) { + failed_count++; + pg_log(PG_PRINT, "."); + pg_usleep(1000000); /* 1 sec */ + } else { + break; + } } } pg_log(PG_PRINT, _("\n")); - if ((origin_run_mode == STANDBY_MODE && run_mode != PRIMARY_MODE) || + if ((!is_cluster_standby && origin_run_mode == STANDBY_MODE && run_mode != PRIMARY_MODE) || + (is_cluster_standby && instance_config.dss.instance_id != ss_get_primary_id()) || (origin_run_mode == CASCADE_STANDBY_MODE && run_mode != STANDBY_MODE)) { pg_log(PG_WARNING, _("\n switchover timeout after %d seconds. please manually check the cluster status.\n"), wait_seconds); } else { @@ -7173,3 +7195,142 @@ static void free_ctl() FREE_AND_RESET(pgha_str); FREE_AND_RESET(pgha_opt); } + +static int get_instance_id(void) +{ + PGconn* conn = NULL; + PGresult* res = NULL; + const char* sql_string = "show ss_instance_id;"; + char* instid = NULL; + + conn = get_connectionex(); + if (PQstatus(conn) != CONNECTION_OK) { + pg_log(PG_WARNING, _("could not connect to server: %s"), PQerrorMessage(conn)); + return -1; + } + + /* Get local role from the local server. */ + res = PQexec(conn, sql_string); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + PQclear(res); + pg_log(PG_WARNING, _("could not get local role from the local server: %s"), PQerrorMessage(conn)); + close_connection(); + conn = NULL; + return -1; + } + + if (PQnfields(res) != 1 || PQntuples(res) != 1) { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + + PQclear(res); + pg_log(PG_WARNING, + _("invalid response from primary server: " + "Expected 1 tuple with 1 fields, got %d tuples with %d fields."), + ntuples, + nfields); + close_connection(); + conn = NULL; + return -1; + } + + instid = PQgetvalue(res, 0, 0); + + PQclear(res); + close_connection(); + conn = NULL; + + return atoi(instid); +} + +static int ss_get_primary_id(void) +{ + if (instance_config.dss.socketpath == NULL) { + pg_log(PG_WARNING, _("socketpath cannot be NULL when enable dss\n")); + exit(1); + } + + if (instance_config.dss.vgname == NULL) { + pg_log(PG_WARNING, _("the DATADIR is not correct with enable dss\n")); + exit(1); + } + + int fd = -1; + int len = 0; + int err = 0; + struct stat statbuf; + char control_file_path[MAXPGPATH]; + + err = memset_s(control_file_path, MAXPGPATH, 0, MAXPGPATH); + securec_check_c(err, "\0", "\0"); + err = snprintf_s(control_file_path, MAXPGPATH, MAXPGPATH - 1, "%s/pg_control", instance_config.dss.vgname); + securec_check_ss_c(err, "\0", "\0"); + + if (dss_device_init(instance_config.dss.socketpath, true) != DSS_SUCCESS) { + pg_log(PG_WARNING, _("failed to init dss device\n")); + exit(1); + } + + fd = open(control_file_path, O_RDONLY | PG_BINARY, 0); + if(fd < 0) { + pg_log(PG_WARNING, _("failed to open pg_contol\n")); + close(fd); + fd = -1; + exit(1); + } + + if (stat(control_file_path, &statbuf) < 0) { + pg_log(PG_WARNING, _("failed to stat pg_contol\n")); + close(fd); + fd = -1; + exit(1); + } + + len = statbuf.st_size; + char* tmpBuffer = (char*)malloc(len + 1); + + if ((read(fd, tmpBuffer, len)) != len) { + close(fd); + fd = -1; + pg_log(PG_WARNING, _("failed to read pg_contol\n")); + exit(1); + } + + ss_reformer_ctrl_t* reformerCtrl; + + /* Calculate the offset to obtain the primary_id of the last page */ + reformerCtrl = (ss_reformer_ctrl_t*)(tmpBuffer + REFORMER_CTL_INSTANCEID * PG_CONTROL_SIZE); + return reformerCtrl->primaryInstId; +} + +/* +* read dorado config, if it is dorado standby cluster, +* we will get ss_dss_conn_path and ss_dss_vg_name. +*/ +static bool ss_read_dorado_config(void) +{ + char config_file[MAXPGPATH] = {0}; + char** optlines = NULL; + int ret = EOK; + + ret = snprintf_s(config_file, MAXPGPATH, MAXPGPATH - 1, "%s/postgresql.conf", pg_data); + securec_check_ss_c(ret, "\0", "\0"); + config_file[MAXPGPATH - 1] = '\0'; + optlines = readfile(config_file); + char cluster_run_mode[MAXPGPATH] = {0}; + + (void)find_guc_optval((const char**)optlines, "cluster_run_mode", cluster_run_mode); + + /* this is not dorado cluster_standby, wo do not need to do anythiny else */ + if(strncmp(cluster_run_mode, "cluster_standby", sizeof("cluster_standby")) != 0) { + return false; + } + + instance_config.dss.socketpath = (char*)malloc(sizeof(char) * MAXPGPATH); + instance_config.dss.vgname = (char*)malloc(sizeof(char) * MAXPGPATH); + (void)find_guc_optval((const char**)optlines, "ss_dss_conn_path", instance_config.dss.socketpath); + (void)find_guc_optval((const char**)optlines, "ss_dss_vg_name", (char*)instance_config.dss.vgname); + freefile(optlines); + optlines = NULL; + return true; +} \ No newline at end of file diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 23c98cb14b..857218a250 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -33,6 +33,7 @@ #include "access/xact.h" #include "access/transam.h" #include "access/csnlog.h" +#include "access/xlog.h" #include "ddes/dms/ss_dms_bufmgr.h" #include "storage/buf/buf_internals.h" #include "ddes/dms/ss_transaction.h" @@ -1527,6 +1528,10 @@ static void CBReformSetDmsRole(void *db_handle, unsigned int reformer_id) dms_role_t new_dms_role = reformer_id == (unsigned int)SS_MY_INST_ID ? DMS_ROLE_REFORMER : DMS_ROLE_PARTNER; if (new_dms_role == DMS_ROLE_REFORMER) { ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS switchover]begin to set currrent DSS as primary"))); + /* standby of standby cluster need to set mode to STANDBY_MODE in dual cluster*/ + if (DORADO_STANDBY_CLUSTER) { + t_thrd.postmaster_cxt.HaShmData->current_mode = STANDBY_MODE; + } while (dss_set_server_status_wrapper() != GS_SUCCESS) { pg_usleep(REFORM_WAIT_LONG); ereport(WARNING, (errmodule(MOD_DMS), diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index a8d8c71d3b..e9ae32649b 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -138,7 +138,7 @@ bool SSRecoveryApplyDelay() if (!ENABLE_REFORM) { return false; } - + while (g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag) { /* might change the trigger file's location */ RedoInterruptCallBack(); diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 0dd6a2d109..a0f67de258 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -46,7 +46,7 @@ typedef struct XLogPageReadPrivate { bool randAccess; } XLogPageReadPrivate; -static int SSXLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_path) +int SSXLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_path) { char path[MAXPGPATH]; ListCell *cell = NULL; @@ -111,112 +111,6 @@ static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) return emode; } -static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen, - XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path) -{ - /* Load reader private data */ - XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data; - int emode = IsExtremeRedo() ? LOG : readprivate->emode; - bool randAccess = IsExtremeRedo() ? false : readprivate->randAccess; - XLogRecPtr RecPtr = targetPagePtr; - uint32 targetPageOff; - -#ifdef USE_ASSERT_CHECKING - XLogSegNo targetSegNo; - - XLByteToSeg(targetPagePtr, targetSegNo); -#endif - targetPageOff = targetPagePtr % XLogSegSize; - - /* - * See if we need to switch to a new segment because the requested record - * is not in the currently open one. - */ - if (t_thrd.xlog_cxt.readFile >= 0 && !XLByteInSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo)) { - close(t_thrd.xlog_cxt.readFile); - t_thrd.xlog_cxt.readFile = -1; - t_thrd.xlog_cxt.readSource = 0; - } - - XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo); - XLByteAdvance(RecPtr, expectReadLen); - - /* In archive or crash recovery. */ - if (t_thrd.xlog_cxt.readFile < 0) { - uint32 sources; - - /* Reset curFileTLI if random fetch. */ - if (randAccess) { - t_thrd.xlog_cxt.curFileTLI = 0; - } - - sources = XLOG_FROM_PG_XLOG; - if (t_thrd.xlog_cxt.InArchiveRecovery) { - sources |= XLOG_FROM_ARCHIVE; - } - - t_thrd.xlog_cxt.readFile = SSXLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo, emode, sources, xlog_path); - - if (t_thrd.xlog_cxt.readFile < 0) { - return -1; - } - } - - /* - * At this point, we have the right segment open and if we're streaming we - * know the requested record is in it. - */ - Assert(t_thrd.xlog_cxt.readFile != -1); - - /* read size for XLOG_FROM_PG_XLOG */ - t_thrd.xlog_cxt.readLen = XLOG_BLCKSZ; - - /* Read the requested page */ - t_thrd.xlog_cxt.readOff = targetPageOff; - - bool ret = SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, buf); - if (!ret) { - ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m", - static_cast(targetPagePtr >> BIT_NUM_INT32), - static_cast(targetPagePtr), targetPageOff, - expectReadLen))); - ereport(emode_for_corrupt_record(emode, RecPtr), - (errcode_for_file_access(), - errmsg("could not read from log file %s to offset %u: %m", - XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), - t_thrd.xlog_cxt.readOff))); - goto next_record_is_invalid; - } - - Assert(targetSegNo == t_thrd.xlog_cxt.readSegNo); - Assert(targetPageOff == t_thrd.xlog_cxt.readOff); - Assert((uint32)expectReadLen <= t_thrd.xlog_cxt.readLen); - - *readTLI = t_thrd.xlog_cxt.curFileTLI; - - return (int)t_thrd.xlog_cxt.readLen; - -next_record_is_invalid: - t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource; - - if (t_thrd.xlog_cxt.readFile >= 0) { - close(t_thrd.xlog_cxt.readFile); - } - t_thrd.xlog_cxt.readFile = -1; - t_thrd.xlog_cxt.readLen = 0; - t_thrd.xlog_cxt.readSource = 0; - - return -1; -} - -int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path) -{ - int read_len = SSReadXLog(xlogreader, targetPagePtr, Max(XLOG_BLCKSZ, reqLen), targetRecPtr, - readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlogDir); - return read_len; -} - bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf) { uint32 preReadOff; diff --git a/src/gausskernel/process/postmaster/checkpointer.cpp b/src/gausskernel/process/postmaster/checkpointer.cpp index aae43be215..a5084828cb 100755 --- a/src/gausskernel/process/postmaster/checkpointer.cpp +++ b/src/gausskernel/process/postmaster/checkpointer.cpp @@ -40,6 +40,7 @@ #include #include "access/xlog_internal.h" +#include "access/xlog.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -541,7 +542,7 @@ void CheckpointerMain(void) } else { CheckPointBuffers(flags, true); } - } else if (!do_restartpoint) { + } else if (!do_restartpoint && !DORADO_STANDBY_CLUSTER_MAINSTANDBY_NODE) { CreateCheckPoint(flags); ckpt_performed = true; if (!bgwriter_first_startup && CheckFpwBeforeFirstCkpt()) { diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 903c4deaa7..7896e89bd1 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -6510,11 +6510,19 @@ dms_demote: signal_child(g_instance.pid_cxt.StatementPID, SIGTERM); } + if (g_instance.pid_cxt.StartupPID != 0 && DORADO_STANDBY_CLUSTER) { + signal_child(g_instance.pid_cxt.StartupPID, SIGTERM); + } + /* and the walwriter too, to avoid checkpoint hang after ss switchover */ if (g_instance.pid_cxt.WalWriterPID != 0) signal_child(g_instance.pid_cxt.WalWriterPID, SIGTERM); StopAliveBuildSender(); + if (g_instance.pid_cxt.WalReceiverPID != 0 && DORADO_STANDBY_CLUSTER) { + signal_child(g_instance.pid_cxt.WalReceiverPID, SIGTERM); + } + if (g_instance.pid_cxt.WalWriterAuxiliaryPID != 0) signal_child(g_instance.pid_cxt.WalWriterAuxiliaryPID, SIGTERM); @@ -10053,6 +10061,11 @@ static void sigusr1_handler(SIGNAL_ARGS) /* shut down all backends and autovac workers */ (void)SignalSomeChildren(SIGTERM, BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC); + if (g_instance.pid_cxt.PgStatPID != 0 && + g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY) { + signal_child(g_instance.pid_cxt.PgStatPID, SIGQUIT); + } + /* and the autovac launcher too */ if (g_instance.pid_cxt.AutoVacPID != 0) signal_child(g_instance.pid_cxt.AutoVacPID, SIGTERM); @@ -10191,6 +10204,11 @@ static void sigusr1_handler(SIGNAL_ARGS) if (g_instance.pid_cxt.AutoVacPID != 0) signal_child(g_instance.pid_cxt.AutoVacPID, SIGTERM); + if (g_instance.pid_cxt.PgStatPID != 0 && + g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY) { + signal_child(g_instance.pid_cxt.PgStatPID, SIGQUIT); + } + if (g_instance.pid_cxt.PgJobSchdPID != 0) signal_child(g_instance.pid_cxt.PgJobSchdPID, SIGTERM); /* @@ -12504,6 +12522,9 @@ const char* wal_get_db_state_string(DbState db_state) static ServerMode get_cur_mode(void) { if (ENABLE_DMS) { + if (DORADO_STANDBY_CLUSTER) { + return STANDBY_MODE; + } /* except for main standby in standby cluster, current mode of instance is determined by SS_OFFICIAL_PRIMARY*/ if (g_instance.attr.attr_storage.xlog_file_path !=0 && SS_OFFICIAL_PRIMARY && t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE) { diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 8e6acb2eec..21501735dc 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -424,6 +424,11 @@ static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); static XLogRecPtr XLogInsertRecordSingle(XLogRecData *rdata, XLogRecPtr fpw_lsn); static bool DoEarlyExit(); +static int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path); +static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen, + XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path); + void ArchiveXlogForForceFinishRedo(XLogReaderState *xlogreader, TermFileData *term_file); TermFileData GetTermFileDataAndClear(void); XLogRecPtr mpfl_read_max_flush_lsn(); @@ -11679,7 +11684,11 @@ void ShutdownXLOG(int code, Datum arg) { if (SS_PRIMARY_DEMOTING) { ereport(LOG, (errmsg("[SS switchover] primary demote: doing shutdown checkpoint"))); - CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); + if (DORADO_STANDBY_CLUSTER) { + CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); + } else { + CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); + } ckpt_shutdown_pagewriter(); if (g_instance.ckpt_cxt_ctl->dirty_page_queue != NULL) { @@ -11707,8 +11716,13 @@ void ShutdownXLOG(int code, Datum arg) (void)RequestXLogSwitch(); } - if (g_instance.wal_cxt.upgradeSwitchMode != ExtremelyFast) - CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); + if (g_instance.wal_cxt.upgradeSwitchMode != ExtremelyFast) { + if (DORADO_STANDBY_CLUSTER) { + CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); + } else { + CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE); + } + } } } @@ -19815,3 +19829,434 @@ bool SSModifySharedLunAllowed() } return false; } + +static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen, + XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path) +{ + /* Load reader private data */ + XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data; + int emode = IsExtremeRedo() ? LOG : readprivate->emode; + XLogRecPtr RecPtr = targetPagePtr; + uint32 targetPageOff; + bool processtrxn = false; + bool fetching_ckpt = readprivate->fetching_ckpt; + bool randAccess = IsExtremeRedo() ? false : readprivate->randAccess; + XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; + XLogSegNo replayedSegNo; + +#ifdef USE_ASSERT_CHECKING + XLogSegNo targetSegNo; + + XLByteToSeg(targetPagePtr, targetSegNo); +#endif + targetPageOff = targetPagePtr % XLogSegSize; + + if (t_thrd.xlog_cxt.readFile >= 0 && !XLByteInSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo)) { + /* + * Request a restartpoint if we've replayed too much xlog since the + * last one. + */ + if (t_thrd.xlog_cxt.StandbyModeRequested && + (t_thrd.xlog_cxt.bgwriterLaunched || t_thrd.xlog_cxt.pagewriter_launched) && !dummyStandbyMode) { + if (get_real_recovery_parallelism() > 1) { + XLByteToSeg(GetXLogReplayRecPtr(NULL), replayedSegNo); + } else { + replayedSegNo = t_thrd.xlog_cxt.readSegNo; + } + if (XLogCheckpointNeeded(replayedSegNo)) { + (void)GetRedoRecPtr(); + if (XLogCheckpointNeeded(replayedSegNo)) { + RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); + } + } + } + + close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + t_thrd.xlog_cxt.readSource = 0; + } + + XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo); + XLByteAdvance(RecPtr, expectReadLen); + +retry: + /* See if we need to retrieve more data */ + if (t_thrd.xlog_cxt.readFile < 0 || !DORADO_STANDBY_CLUSTER || + (t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM && XLByteLT(t_thrd.xlog_cxt.receivedUpto, RecPtr))) { + if (t_thrd.xlog_cxt.StandbyMode && t_thrd.xlog_cxt.startup_processing && DORADO_STANDBY_CLUSTER) { + /* + * In standby mode, wait for the requested record to become + * available, either via restore_command succeeding to restore the + * segment, or via walreceiver having streamed the record. + */ + for (;;) { + /* + * Need to check here also for the case where consistency level is + * already reached without replaying any record i.e. just after reading + * of checkpoint data it has reached to minRecoveryPoint. Also + * whenever we are going to loop in the data receive from master + * node its bettwe we check if consistency level has reached. So + * instead of keeping in all places before ReadRecord, we can keep + * here in centralised location. + */ + ProcTxnWorkLoad(false); + + CheckRecoveryConsistency(); + if (WalRcvInProgress()) { + XLogRecPtr expectedRecPtr = RecPtr; + bool havedata = false; + + /* + * If we find an invalid record in the WAL streamed from + * master, something is seriously wrong. There's little + * chance that the problem will just go away, but PANIC is + * not good for availability either, especially in hot + * standby mode. Disconnect, and retry from + * archive/pg_xlog again. The WAL in the archive should be + * identical to what was streamed, so it's unlikely that + * it helps, but one can hope... + */ + if (t_thrd.xlog_cxt.failedSources & XLOG_FROM_STREAM) { + ProcTxnWorkLoad(true); + ereport(LOG, (errmsg("read from stream failed, request xlog receivedupto at %X/%X. targetRecPtr:%X/%x", + (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), + (uint32)t_thrd.xlog_cxt.receivedUpto, + (uint32)(targetRecPtr >> 32), + (uint32)targetRecPtr + ))); + ShutdownWalRcv(); + continue; + } + + /* + * Walreceiver is active, so see if new data has arrived. + * + * We only advance XLogReceiptTime when we obtain fresh + * WAL from walreceiver and observe that we had already + * processed everything before the most recent "chunk" + * that it flushed to disk. In steady state where we are + * keeping up with the incoming data, XLogReceiptTime will + * be updated on each cycle. When we are behind, + * XLogReceiptTime will not advance, so the grace time + * alloted to conflicting queries will decrease. + */ + if (RecPtr % XLogSegSize == 0) { + XLByteAdvance(expectedRecPtr, SizeOfXLogLongPHD); + } else if (RecPtr % XLOG_BLCKSZ == 0) { + XLByteAdvance(expectedRecPtr, SizeOfXLogShortPHD); + } + + if (XLByteLT(expectedRecPtr, t_thrd.xlog_cxt.receivedUpto)) { + havedata = true; + } else { + XLogRecPtr latestChunkStart; + + t_thrd.xlog_cxt.receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); + if (XLByteLT(expectedRecPtr, t_thrd.xlog_cxt.receivedUpto)) { + havedata = true; + if (!XLByteLT(RecPtr, latestChunkStart)) { + t_thrd.xlog_cxt.XLogReceiptTime = GetCurrentTimestamp(); + SetCurrentChunkStartTime(t_thrd.xlog_cxt.XLogReceiptTime); + } + } else { + havedata = false; + } + } + if (havedata) { + /* + * Great, streamed far enough. Open the file if it's + * not open already. Use XLOG_FROM_STREAM so that + * source info is set correctly and XLogReceiptTime + * isn't changed. + */ + if (t_thrd.xlog_cxt.readFile < 0) { + t_thrd.xlog_cxt.readFile = XLogFileRead(t_thrd.xlog_cxt.readSegNo, PANIC, + t_thrd.xlog_cxt.recoveryTargetTLI, XLOG_FROM_STREAM, + false); + Assert(t_thrd.xlog_cxt.readFile >= 0); + } else { + /* just make sure source info is correct... */ + t_thrd.xlog_cxt.readSource = XLOG_FROM_STREAM; + t_thrd.xlog_cxt.XLogReceiptSource = XLOG_FROM_STREAM; + } + + break; + } + + t_thrd.xlog_cxt.RedoDone = IsRedoDonePromoting(); + pg_memory_barrier(); + + if (IS_SHARED_STORAGE_MODE) { + uint32 disableConnectionNode = + pg_atomic_read_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node); + if (disableConnectionNode && WalRcvIsRunning()) { + ereport(LOG, (errmsg("request xlog receivedupto at %X/%X.", + (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), + (uint32)t_thrd.xlog_cxt.receivedUpto))); + ShutdownWalRcv(); + } + } + if (!processtrxn) { + ProcTxnWorkLoad(true); + processtrxn = true; + goto retry; + } + /* + * Wait for more WAL to arrive, or timeout to be reached + */ + WaitLatch(&t_thrd.shemem_ptr_cxt.XLogCtl->recoveryWakeupLatch, WL_LATCH_SET | WL_TIMEOUT, 1000L); + processtrxn = false; + ResetLatch(&t_thrd.shemem_ptr_cxt.XLogCtl->recoveryWakeupLatch); + } else { + uint32 sources; + /* + * Until walreceiver manages to reconnect, poll the + * archive. + */ + if (t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + } + /* Reset curFileTLI if random fetch. */ + if (randAccess) { + t_thrd.xlog_cxt.curFileTLI = 0; + } + + /* + * Try to restore the file from archive, or read an + * existing file from pg_xlog. + */ + sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; + ereport(DEBUG5, (errmsg("failedSources: %u", t_thrd.xlog_cxt.failedSources))); + if (!(sources & ~t_thrd.xlog_cxt.failedSources)) { + /* + * We've exhausted all options for retrieving the + * file. Retry. + */ + t_thrd.xlog_cxt.failedSources = 0; + + /* + * Before we sleep, re-scan for possible new timelines + * if we were requested to recover to the latest + * timeline. + */ + if (t_thrd.xlog_cxt.recoveryTargetIsLatest) { + if (rescanLatestTimeLine()) { + continue; + } + } + + if (t_thrd.startup_cxt.shutdown_requested) { + ereport(LOG, (errmsg("startup shutdown"))); + proc_exit(0); + } + + if (!xlogctl->IsRecoveryDone) { + g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = GetCurrentTimestamp(); + g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = t_thrd.xlog_cxt.ReadRecPtr; + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("XLogPageRead IsRecoveryDone is set true," + "ReadRecPtr:%X/%X, EndRecPtr:%X/%X", + (uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.ReadRecPtr), + (uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.EndRecPtr)))); + parallel_recovery::redo_dump_all_stats(); + } + + /* + * signal postmaster to update local redo end + * point to gaussdb state file. + */ + ProcTxnWorkLoad(true); + if (!xlogctl->IsRecoveryDone) { + SendPostmasterSignal(PMSIGNAL_LOCAL_RECOVERY_DONE); + if (DORADO_STANDBY_CLUSTER && SS_PERFORMING_SWITCHOVER) { + g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTED; + } + } + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->IsRecoveryDone = true; + SpinLockRelease(&xlogctl->info_lck); + static uint64 printFrequency = 0; + if (!(IS_SHARED_STORAGE_MODE) || + pg_atomic_read_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->rcvDoneFromShareStorage)) { + knl_g_set_redo_finish_status(REDO_FINISH_STATUS_LOCAL | REDO_FINISH_STATUS_CM); + if ((printFrequency & 0xFF) == 0) { + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("XLogPageRead set redo finish status," + "ReadRecPtr:%X/%X, EndRecPtr:%X/%X", + (uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.ReadRecPtr), + (uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.EndRecPtr)))); + } + printFrequency++; + /* + * If it hasn't been long since last attempt, sleep 1s to + * avoid busy-waiting. + */ + if (IS_SHARED_STORAGE_MODE) { + uint32 connMode = + pg_atomic_read_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node); + if (connMode) { + pg_atomic_write_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node, + false); + } + pg_usleep(2000000L); + } else { +#ifdef ENABLE_LITE_MODE + pg_usleep(1000000L); +#else + pg_usleep(50000L); +#endif + } + } + /* + * If primary_conninfo is set, launch walreceiver to + * try to stream the missing WAL, before retrying to + * restore from archive/pg_xlog. + * + * If fetching_ckpt is TRUE, RecPtr points to the + * initial checkpoint location. In that case, we use + * RedoStartLSN as the streaming start position + * instead of RecPtr, so that when we later jump + * backwards to start redo at RedoStartLSN, we will + * have the logs streamed already. + */ + load_server_mode(); + + if (IS_SHARED_STORAGE_STANBY_MODE && !IS_SHARED_STORAGE_MAIN_STANDBY_MODE) { + ProcTxnWorkLoad(false); + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; +#ifndef ENABLE_MULTIPLE_NODES + rename_recovery_conf_for_roach(); +#endif + + ereport(LOG, (errmsg("request xlog stream from shared storage at %X/%X.", + fetching_ckpt ? (uint32)(t_thrd.xlog_cxt.RedoStartLSN >> 32) + : (uint32)(targetRecPtr >> 32), + fetching_ckpt ? (uint32)t_thrd.xlog_cxt.RedoStartLSN + : (uint32)targetRecPtr))); + ShutdownWalRcv(); + t_thrd.xlog_cxt.receivedUpto = 0; + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = 0; + SpinLockRelease(&walrcv->mutex); + + RequestXLogStreaming(fetching_ckpt ? &t_thrd.xlog_cxt.RedoStartLSN : &targetRecPtr, 0, + REPCONNTARGET_SHARED_STORAGE, 0); + continue; + } + } + /* Don't try to read from a source that just failed */ + sources &= ~t_thrd.xlog_cxt.failedSources; + t_thrd.xlog_cxt.readFile = SSXLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo, + emode, sources, xlog_path); + if (t_thrd.xlog_cxt.readFile >= 0) { + break; + } + + ereport(DEBUG5, (errmsg("do not find any more files.sources=%u failedSources=%u", sources, + t_thrd.xlog_cxt.failedSources))); + /* + * Nope, not found in archive and/or pg_xlog. + */ + t_thrd.xlog_cxt.failedSources |= sources; + + } + + /* + * This possibly-long loop needs to handle interrupts of + * startup process. + */ + RedoInterruptCallBack(); + } + } else { + /* In archive or crash recovery. */ + if (t_thrd.xlog_cxt.readFile < 0) { + uint32 sources; + + /* Reset curFileTLI if random fetch. */ + if (randAccess) { + t_thrd.xlog_cxt.curFileTLI = 0; + } + + sources = XLOG_FROM_PG_XLOG; + if (t_thrd.xlog_cxt.InArchiveRecovery) { + sources |= XLOG_FROM_ARCHIVE; + } + + t_thrd.xlog_cxt.readFile = SSXLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo, + emode, sources, xlog_path); + + if (t_thrd.xlog_cxt.readFile < 0) { + return -1; + } + } + } + } + + /* + * At this point, we have the right segment open and if we're streaming we + * know the requested record is in it. + */ + Assert(t_thrd.xlog_cxt.readFile != -1); + + if (t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM) { + if ((targetPagePtr / XLOG_BLCKSZ) != (t_thrd.xlog_cxt.receivedUpto / XLOG_BLCKSZ)) { + t_thrd.xlog_cxt.readLen = XLOG_BLCKSZ; + } else { + t_thrd.xlog_cxt.readLen = t_thrd.xlog_cxt.receivedUpto % XLogSegSize - targetPageOff; + } + } else { + t_thrd.xlog_cxt.readLen = XLOG_BLCKSZ; + } + + /* Read the requested page */ + t_thrd.xlog_cxt.readOff = targetPageOff; + + bool ret = SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, buf); + if (!ret) { + ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m", + static_cast(targetPagePtr >> BIT_NUM_INT32), + static_cast(targetPagePtr), targetPageOff, + expectReadLen))); + ereport(emode_for_corrupt_record(emode, RecPtr), + (errcode_for_file_access(), + errmsg("could not read from log file %s to offset %u: %m", + XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), + t_thrd.xlog_cxt.readOff))); + goto next_record_is_invalid; + } + + Assert(targetSegNo == t_thrd.xlog_cxt.readSegNo); + Assert(targetPageOff == t_thrd.xlog_cxt.readOff); + Assert((uint32)expectReadLen <= t_thrd.xlog_cxt.readLen); + + *readTLI = t_thrd.xlog_cxt.curFileTLI; + + return (int)t_thrd.xlog_cxt.readLen; + +next_record_is_invalid: + t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource; + + if (t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + } + t_thrd.xlog_cxt.readFile = -1; + t_thrd.xlog_cxt.readLen = 0; + t_thrd.xlog_cxt.readSource = 0; + + return -1; +} + +int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path) +{ + int read_len = SSReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, + readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlogDir); + return read_len; +} diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 3ec4ad7899..3b991f20d1 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -84,6 +84,14 @@ extern volatile uint64 sync_system_identifier; #define XLOG_FROM_PG_XLOG (1 << 1) /* Existing file in pg_xlog */ #define XLOG_FROM_STREAM (1 << 2) /* Streamed from master */ +#define DORADO_STANDBY_CLUSTER (g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY && \ + g_instance.attr.attr_storage.xlog_file_path != 0) +#define DORADO_PRIMARY_CLUSTER (g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_PRIMARY && \ + g_instance.attr.attr_storage.xlog_file_path != 0) +#define DORADO_STANDBY_CLUSTER_MAINSTANDBY_NODE ((t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE) && \ + (g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY) && \ + (g_instance.attr.attr_storage.xlog_file_path != 0)) + /* * Recovery target type. * Only set during a Point in Time recovery, not when standby_mode = on diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index b3b4539c7e..40cad18845 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -35,12 +35,11 @@ typedef struct SSBroadcastCancelTrx { SSBroadcastOp type; // must be first } SSBroadcastCancelTrx; -int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path); bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf); XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize); void SSGetXlogPath(); void SSSaveReformerCtrl(); void SSClearSegCache(); int SSCancelTransactionOfAllStandby(SSBroadcastOp type); -int SSProcessCancelTransaction(SSBroadcastOp type); \ No newline at end of file +int SSProcessCancelTransaction(SSBroadcastOp type); +int SSXLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_path); \ No newline at end of file -- Gitee