diff --git a/src/bin/pg_controldata/pg_controldata.cpp b/src/bin/pg_controldata/pg_controldata.cpp index e157fe200b114d39492f7224a9fa6ffda650feb0..f8fe52c25d347993503fd06625e549c853f8d4fc 100644 --- a/src/bin/pg_controldata/pg_controldata.cpp +++ b/src/bin/pg_controldata/pg_controldata.cpp @@ -88,8 +88,8 @@ static const char* SSClusterState(SSGlobalClusterState state) { switch (state) { case CLUSTER_IN_ONDEMAND_BUILD: return _("in on-demand build"); - case CLUSTER_IN_ONDEMAND_RECOVERY: - return _("in on-demand recovery"); + case CLUSTER_IN_ONDEMAND_REDO: + return _("in on-demand redo"); case CLUSTER_NORMAL: return _("normal"); default: diff --git a/src/common/backend/parser/analyze.cpp b/src/common/backend/parser/analyze.cpp index 1a9b8f89e3376cd129d415b34167ce13cf0d53cf..123332e80108c6f846c86f0c8f7250407f941512 100644 --- a/src/common/backend/parser/analyze.cpp +++ b/src/common/backend/parser/analyze.cpp @@ -615,7 +615,7 @@ Query* transformStmt(ParseState* pstate, Node* parseTree, bool isFirstNode, bool result->rightRefState = nullptr; } - PreventCommandDuringSSOndemandRecovery(parseTree); + PreventCommandDuringSSOndemandRedo(parseTree); return result; } diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 0cf347208ce259595e938806664f1f3fce41b27e..1cc615efe0e03686731b8c540059a6710415f413 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -504,7 +504,7 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo bool SSOndemandRequestPrimaryRedo(BufferTag tag) { dms_context_t dms_ctx; - int32 redo_status = ONDEMAND_REDO_INVALID; + int32 redo_status = ONDEMAND_REDO_TIMEOUT; if (!SS_STANDBY_ONDEMAND_RECOVERY) { return true; @@ -522,7 +522,7 @@ bool SSOndemandRequestPrimaryRedo(BufferTag tag) ereport(LOG, (errmodule(MOD_DMS), errmsg("[on-demand] request primary node redo page failed, page id [%d/%d/%d/%d/%d %d-%d], " - "redo statu %d", tag.rnode.spcNode, tag.rnode.dbNode, tag.rnode.relNode, (int)tag.rnode.bucketNode, + "redo status %d", tag.rnode.spcNode, tag.rnode.dbNode, tag.rnode.relNode, (int)tag.rnode.bucketNode, (int)tag.rnode.opt, tag.forkNum, tag.blockNum, redo_status))); return false; } diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 1d64bb66186fdf371017ab148c489ed63e71b3ef..d0dee7cf1c66a849689066406b7ea6da068ee129 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -37,15 +37,6 @@ #include "storage/smgr/segment_internal.h" #include "replication/walreceiver.h" -/* - * Add xlog reader private structure for page read. - */ -typedef struct XLogPageReadPrivate { - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - int SSXLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_path) { char path[MAXPGPATH]; @@ -87,7 +78,7 @@ int SSXLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_ } /* Couldn't find it. For simplicity, complain about front timeline */ - errorno = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%08X%08X%08X", SS_XLOGDIR, + errorno = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%08X%08X%08X", xlog_path, t_thrd.xlog_cxt.recoveryTargetTLI, (uint32)((segno) / XLogSegmentsPerXLogId), (uint32)((segno) % XLogSegmentsPerXLogId)); securec_check_ss(errorno, "", ""); diff --git a/src/gausskernel/process/tcop/utility.cpp b/src/gausskernel/process/tcop/utility.cpp index b4f3303f99b70ec0cc00c560040ec91a2fb77e63..05d22c428ebace9b32dfad97dc33b40e52be4d85 100755 --- a/src/gausskernel/process/tcop/utility.cpp +++ b/src/gausskernel/process/tcop/utility.cpp @@ -643,7 +643,7 @@ void PreventCommandDuringRecovery(const char* cmd_name) errmsg("cannot execute %s during recovery", cmd_name))); } -void PreventCommandDuringSSOndemandRecovery(Node* parseTree) +void PreventCommandDuringSSOndemandRedo(Node* parseTree) { switch(nodeTag(parseTree)) { case T_InsertStmt: diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index af937063b0d09e150cdcc4a1cba33116c351df3f..8bcf7571a9ce89ee8d53e9ac508ef4371f8b093d 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1429,6 +1429,7 @@ static void knl_t_storage_init(knl_t_storage_context* storage_cxt) storage_cxt->timeoutRemoteOpera = 0; storage_cxt->dmsBufCtl = NULL; storage_cxt->ondemandXLogMem = NULL; + storage_cxt->ondemandXLogFileIdCache = NULL; } static void knl_t_port_init(knl_t_port_context* port_cxt) diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index c5bcc4e0adfa179e83f1ae8f1c4206052354ec61..cab1ab125f9f05b5e806f38f06a1a543eef15c08 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -62,6 +62,7 @@ #include "access/ondemand_extreme_rto/spsc_blocking_queue.h" #include "access/ondemand_extreme_rto/redo_item.h" #include "access/ondemand_extreme_rto/batch_redo.h" +#include "access/ondemand_extreme_rto/xlog_read.h" #include "catalog/storage.h" #include @@ -98,7 +99,7 @@ static const int XLOG_INFO_SHIFT_SIZE = 4; /* xlog info flag shift size */ static const int32 MAX_PENDING = 1; static const int32 MAX_PENDING_STANDBY = 1; -static const int32 ITEM_QUQUE_SIZE_RATIO = 5; +static const int32 ITEM_QUQUE_SIZE_RATIO = 1; static const uint32 EXIT_WAIT_DELAY = 100; /* 100 us */ uint32 g_readManagerTriggerFlag = TRIGGER_NORMAL; @@ -439,9 +440,10 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); g_instance.comm_cxt.predo_cxt.redoItemHash = PRRedoItemHashInitialize(g_instance.comm_cxt.redoItemCtx); + g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE * + ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker uint32 maxParseBufNum = (uint32)((uint64)g_instance.attr.attr_storage.dms_attr.ondemand_recovery_mem_size * 1024 / (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc) + sizeof(RedoMemSlot))); - g_dispatcher->maxItemNum = 4 * PAGE_WORK_QUEUE_SIZE * ITEM_QUQUE_SIZE_RATIO + maxParseBufNum; XLogParseBufferInitFunc(&(g_dispatcher->parseManager), maxParseBufNum, &recordRefOperate, RedoInterruptCallBack); /* alloc for record readbuf */ SSAllocRecordReadBuffer(xlogreader, privateLen); @@ -663,6 +665,7 @@ static void StopRecoveryWorkers(int code, Datum arg) pg_atomic_write_u32(&g_dispatcher->rtoXlogBufState.readWorkerState, WORKER_STATE_EXIT); ShutdownWalRcv(); + CloseAllXlogFileInFdCache(); FreeAllocatedRedoItem(); SSDestroyRecoveryWorkers(); g_startupTriggerState = TRIGGER_NORMAL; @@ -1910,7 +1913,7 @@ void WaitRedoFinish() SpinLockRelease(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); /* for other nodes in cluster */ - g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_RECOVERY; + g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_IN_ONDEMAND_REDO; SSSaveReformerCtrl(); #ifdef USE_ASSERT_CHECKING @@ -1931,7 +1934,7 @@ void WaitRedoFinish() AllItemCheck(); #endif SpinLockAcquire(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); - t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone = true; + t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRedoDone = true; SpinLockRelease(&t_thrd.shemem_ptr_cxt.XLogCtl->info_lck); } diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index 4b5a6415aaa28797bdfba81556616ae5461b2bf9..51e880f1f6104e5fa02c34d030cfeec0e22df9b3 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -109,6 +109,7 @@ RedoItem g_GlobalLsnForwarder; RedoItem g_cleanupMark; RedoItem g_closefdMark; RedoItem g_cleanInvalidPageMark; +RedoItem g_forceDistributeMark; static const int PAGE_REDO_WORKER_ARG = 3; static const int REDO_SLEEP_50US = 50; @@ -553,6 +554,22 @@ void WaitAllRedoWorkerQueueEmpty() } } +bool OndemandXLogParseMemApproachLimit() +{ + float4 ratio = (float4)g_dispatcher->parseManager.memctl.usedblknum / g_dispatcher->parseManager.memctl.totalblknum; + if (ratio > ONDEMAND_DISTRIBUTE_RATIO) { + return true; + } + return false; +} + +void BatchRedoSendDistributeMarkToPageRedoManager(RedoItem *distributeMark) +{ + PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId]; + AddPageRedoItem(myRedoLine->managerThd, distributeMark); + pg_usleep(1000000); // 1 sec +} + bool BatchRedoDistributeItems(void **eleArry, uint32 eleNum) { bool parsecomplete = false; @@ -568,6 +585,9 @@ bool BatchRedoDistributeItems(void **eleArry, uint32 eleNum) } else if (eleArry[i] == (void *)&g_cleanInvalidPageMark) { forget_range_invalid_pages((void *)eleArry[i]); } else { + if (OndemandXLogParseMemApproachLimit()) { + BatchRedoSendDistributeMarkToPageRedoManager(&g_forceDistributeMark); + } GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); RedoItem *item = (RedoItem *)eleArry[i]; UpdateRecordGlobals(item, g_redoWorker->standbyState); @@ -723,7 +743,6 @@ void RedoPageManagerDistributeToRedoThd(PageRedoPipeline *myRedoLine, void RedoPageManagerDistributeBlockRecord(HTAB *redoItemHash, XLogRecParseState *parsestate) { - static uint32 total_count = 0; PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId]; const uint32 WorkerNumPerMng = myRedoLine->redoThdNum; HASH_SEQ_STATUS status; @@ -731,7 +750,6 @@ void RedoPageManagerDistributeBlockRecord(HTAB *redoItemHash, XLogRecParseState HTAB *curMap = redoItemHash; hash_seq_init(&status, curMap); - total_count++; while ((redoItemEntry = (RedoItemHashEntry *)hash_seq_search(&status)) != NULL) { uint32 workId = GetWorkerId(&redoItemEntry->redoItemTag, WorkerNumPerMng); ReleaseRecParseState(myRedoLine, curMap, redoItemEntry, workId); @@ -1053,19 +1071,14 @@ static void WaitNextBarrier(XLogRecParseState *parseState) static void OnDemandPageManagerRedoSegParseState(XLogRecParseState *preState) { - static uint32 seg_total_count = 0; - static uint32 seg_full_count = 0; - Assert(g_redoWorker->slotId == 0); switch (preState->blockparse.blockhead.block_valid) { case BLOCK_DATA_SEG_EXTEND: - seg_total_count++; GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_4]); OnDemandPageManagerProcSegPipeLineSyncState(preState); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_4]); break; case BLOCK_DATA_SEG_FULL_SYNC_TYPE: - seg_full_count++; GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_8]); OnDemandPageManagerProcSegFullSyncState(preState); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_8]); @@ -1082,6 +1095,7 @@ static void OnDemandPageManagerRedoSegParseState(XLogRecParseState *preState) void PageManagerRedoParseState(XLogRecParseState *preState) { HTAB *hashMap = g_instance.comm_cxt.predo_cxt.redoItemHash[g_redoWorker->slotId]; + RedoItem *item = GetRedoItemPtr((XLogReaderState *)preState->refrecord); switch (preState->blockparse.blockhead.block_valid) { case BLOCK_DATA_MAIN_DATA_TYPE: @@ -1092,6 +1106,11 @@ void PageManagerRedoParseState(XLogRecParseState *preState) PRTrackAddBlock(preState, hashMap); SetCompletedReadEndPtr(g_redoWorker, preState->blockparse.blockhead.start_ptr, preState->blockparse.blockhead.end_ptr); +#ifdef USE_ASSERT_CHECKING + DoRecordCheck(preState, InvalidXLogRecPtr, false); +#endif + DereferenceRedoItem(item); // for less ondmeand recovery memory consume + preState->refrecord = NULL; CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); break; case BLOCK_DATA_DDL_TYPE: @@ -1176,6 +1195,16 @@ bool PageManagerRedoDistributeItems(void **eleArry, uint32 eleNum) } else if (eleArry[i] == (void *)&g_cleanInvalidPageMark) { forget_range_invalid_pages((void *)eleArry[i]); continue; + } else if (eleArry[i] == (void *)&g_forceDistributeMark) { + // double check + if (OndemandXLogParseMemApproachLimit()) { + RedoPageManagerDistributeBlockRecord(hashMap, NULL); + ereport(WARNING, (errcode(ERRCODE_LOG), + errmsg("[On-demand] Parse buffer num approach critical value, distribute block record by force," + " slotid %d, usedblknum %d, totalblknum %d", g_redoWorker->slotId, + g_dispatcher->parseManager.memctl.usedblknum, g_dispatcher->parseManager.memctl.totalblknum))); + } + continue; } XLogRecParseState *recordblockstate = (XLogRecParseState *)eleArry[i]; XLogRecParseState *nextState = recordblockstate; @@ -1196,19 +1225,6 @@ bool PageManagerRedoDistributeItems(void **eleArry, uint32 eleNum) } while (nextState != NULL); } - float4 ratio = g_dispatcher->parseManager.memctl.usedblknum / g_dispatcher->parseManager.memctl.totalblknum; - while (ratio > ONDEMAND_DISTRIBUTE_RATIO) { - ereport(WARNING, (errcode(ERRCODE_LOG), - errmsg("[On-demand] Parse buffer num approach critical value, distribute block record by force," - " slotid %d, usedblknum %d, totalblknum %d", g_redoWorker->slotId, - g_dispatcher->parseManager.memctl.usedblknum, g_dispatcher->parseManager.memctl.totalblknum))); - GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_9]); - RedoPageManagerDistributeBlockRecord(hashMap, NULL); - CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_9]); - pg_usleep(1000000); /* 1 sec */ - ratio = g_dispatcher->parseManager.memctl.usedblknum / g_dispatcher->parseManager.memctl.totalblknum; - } - return false; } @@ -1608,12 +1624,12 @@ void RedoPageWorkerMain() bool needRelease = true; XLogRecParseState *procState = redoblockstateHead; + XLogRecParseState *reloadBlockState = NULL; Assert(procState->distributeStatus != XLOG_NO_DISTRIBUTE); MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx); while (procState != NULL) { XLogRecParseState *redoblockstate = procState; - g_redoWorker->curRedoBlockState = (XLogBlockDataParse*)(&redoblockstate->blockparse.extra_rec); // nextrecord will be redo in backwards position procState = (procState->distributeStatus == XLOG_TAIL_DISTRIBUTE) ? NULL : (XLogRecParseState *)procState->nextrecord; @@ -1624,8 +1640,11 @@ void RedoPageWorkerMain() case BLOCK_DATA_FSM_TYPE: needRelease = false; GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); - notfound = XLogBlockRedoForExtremeRTO(redoblockstate, &bufferinfo, notfound, + // reload from disk, because RedoPageManager already release refrecord in on-demand build stage + reloadBlockState = OndemandRedoReloadXLogRecord(redoblockstate); + notfound = XLogBlockRedoForExtremeRTO(reloadBlockState, &bufferinfo, notfound, g_redoWorker->timeCostList[TIME_COST_STEP_4], g_redoWorker->timeCostList[TIME_COST_STEP_5]); + OndemandRedoReleaseXLogRecord(reloadBlockState); DereferenceRecParseState(redoblockstate); SetCompletedReadEndPtr(g_redoWorker, redoblockstate->blockparse.blockhead.start_ptr, redoblockstate->blockparse.blockhead.end_ptr); @@ -1754,10 +1773,6 @@ static inline bool ReadPageWorkerStop() void PushToWorkerLsn() { - static uint32 cur_recor_count = 0; - - cur_recor_count++; - if (!IsExtremeRtoRunning()) { return; } @@ -1767,7 +1782,6 @@ void PushToWorkerLsn() refCount = pg_atomic_read_u32(&g_GlobalLsnForwarder.record.refcount); RedoInterruptCallBack(); } while (refCount != 0 && !ReadPageWorkerStop()); - cur_recor_count = 0; SendLsnFowarder(); } diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp index 7ba976b25c40451c1b758205cc8ceb7509bd2e7b..c93e275f06911188189eddef4d37392a8314c437 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp @@ -25,8 +25,17 @@ #include "access/ondemand_extreme_rto/batch_redo.h" #include "access/ondemand_extreme_rto/dispatcher.h" #include "access/ondemand_extreme_rto/redo_utils.h" +#include "access/ondemand_extreme_rto/xlog_read.h" #include "storage/lock/lwlock.h" +/* + * Add xlog reader private structure for page read. + */ +typedef struct XLogPageReadPrivate { + const char* datadir; + TimeLineID tli; +} XLogPageReadPrivate; + Size OndemandRecoveryShmemSize(void) { Size size = 0; @@ -48,6 +57,22 @@ void OndemandRecoveryShmemInit(void) } } +void OndemandXlogFileIdCacheInit(void) +{ + HASHCTL ctl; + + /* hash accessed by database file id */ + errno_t rc = memset_s(&ctl, sizeof(ctl), 0, sizeof(ctl)); + securec_check(rc, "", ""); + ctl.keysize = sizeof(XLogFileId); + ctl.entrysize = sizeof(XLogFileIdCacheEntry); + ctl.hash = tag_hash; + t_thrd.storage_cxt.ondemandXLogFileIdCache = hash_create("Ondemand extreme rto xlogfile handle cache", 8, &ctl, + HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX); + if (!t_thrd.storage_cxt.ondemandXLogFileIdCache) + ereport(FATAL, (errmsg("could not initialize ondemand xlogfile handle hash table"))); +} + /* add for batch redo mem manager */ void *OndemandXLogMemCtlInit(RedoMemManager *memctl, Size itemsize, int itemnum) { @@ -73,7 +98,7 @@ RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl) { RedoMemSlot *nextfreeslot = NULL; do { - LWLockAcquire(OndemandXlogMemAllocLock, LW_EXCLUSIVE); + LWLockAcquire(OndemandXLogMemAllocLock, LW_EXCLUSIVE); if (memctl->firstfreeslot == InvalidBuffer) { memctl->firstfreeslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer); pg_read_barrier(); @@ -85,7 +110,7 @@ RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl) memctl->usedblknum++; nextfreeslot->freeNext = InvalidBuffer; } - LWLockRelease(OndemandXlogMemAllocLock); + LWLockRelease(OndemandXLogMemAllocLock); if (memctl->doInterrupt != NULL) { memctl->doInterrupt(); @@ -106,14 +131,14 @@ void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid) } bufferslot = &(memctl->memslot[bufferid - 1]); Assert(bufferslot->freeNext == InvalidBuffer); - LWLockAcquire(OndemandXlogMemAllocLock, LW_EXCLUSIVE); + LWLockAcquire(OndemandXLogMemAllocLock, LW_EXCLUSIVE); Buffer oldFirst = AtomicReadBuffer(&memctl->firstreleaseslot); pg_memory_barrier(); do { AtomicWriteBuffer(&bufferslot->freeNext, oldFirst); } while (!AtomicCompareExchangeBuffer(&memctl->firstreleaseslot, &oldFirst, bufferid)); memctl->usedblknum--; - LWLockRelease(OndemandXlogMemAllocLock); + LWLockRelease(OndemandXLogMemAllocLock); } @@ -142,46 +167,63 @@ void OndemandXLogParseBufferDestory(RedoParseManager *parsemanager) XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanager, XLogRecParseState *blkstatehead, void *record) { - RedoMemManager *memctl = &(parsemanager->memctl); - RedoMemSlot *allocslot = NULL; - ParseBufferDesc *descstate = NULL; XLogRecParseState *recordstate = NULL; - allocslot = OndemandXLogMemAlloc(memctl); - if (allocslot == NULL) { - ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), - errmsg("XLogParseBufferAlloc Allocated buffer failed!, taoalblknum:%u, usedblknum:%u", - memctl->totalblknum, memctl->usedblknum))); - return NULL; + if (parsemanager == NULL) { + recordstate = (XLogRecParseState*)palloc(sizeof(XLogRecParseState)); + errno_t rc = memset_s((void*)recordstate, sizeof(XLogRecParseState), 0, sizeof(XLogRecParseState)); + securec_check(rc, "\0", "\0"); + recordstate->manager = &(ondemand_extreme_rto::g_dispatcher->parseManager); + recordstate->distributeStatus = XLOG_SKIP_DISTRIBUTE; + } else { + RedoMemManager *memctl = &(parsemanager->memctl); + RedoMemSlot *allocslot = NULL; + ParseBufferDesc *descstate = NULL; + + allocslot = OndemandXLogMemAlloc(memctl); + if (allocslot == NULL) { + ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("XLogParseBufferAlloc Allocated buffer failed!, taoalblknum:%u, usedblknum:%u", + memctl->totalblknum, memctl->usedblknum))); + return NULL; + } + + pg_read_barrier(); + Assert(allocslot->buf_id != InvalidBuffer); + Assert(memctl->itemsize == (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc))); + descstate = (ParseBufferDesc *)((char *)parsemanager->parsebuffers + memctl->itemsize * (allocslot->buf_id - 1)); + descstate->buff_id = allocslot->buf_id; + Assert(descstate->state == 0); + descstate->state = 1; + descstate->refcount = 0; + recordstate = (XLogRecParseState *)((char *)descstate + sizeof(ParseBufferDesc)); + recordstate->manager = parsemanager; + recordstate->distributeStatus = XLOG_NO_DISTRIBUTE; + + if (parsemanager->refOperate != NULL) { + parsemanager->refOperate->refCount(record); + } } - pg_read_barrier(); - Assert(allocslot->buf_id != InvalidBuffer); - Assert(memctl->itemsize == (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc))); - descstate = (ParseBufferDesc *)((char *)parsemanager->parsebuffers + memctl->itemsize * (allocslot->buf_id - 1)); - descstate->buff_id = allocslot->buf_id; - Assert(descstate->state == 0); - descstate->state = 1; - descstate->refcount = 0; - recordstate = (XLogRecParseState *)((char *)descstate + sizeof(ParseBufferDesc)); recordstate->nextrecord = NULL; - recordstate->manager = parsemanager; recordstate->refrecord = record; recordstate->isFullSync = false; - recordstate->distributeStatus = XLOG_NO_DISTRIBUTE; if (blkstatehead != NULL) { recordstate->nextrecord = blkstatehead->nextrecord; blkstatehead->nextrecord = (void *)recordstate; } - if (parsemanager->refOperate != NULL) - parsemanager->refOperate->refCount(record); - return recordstate; } void OndemandXLogParseBufferRelease(XLogRecParseState *recordstate) { + if (recordstate->distributeStatus == XLOG_SKIP_DISTRIBUTE) { + // alloc in pageRedoWorker or backends + pfree(recordstate); + return; + } + RedoMemManager *memctl = &(recordstate->manager->memctl); ParseBufferDesc *descstate = NULL; @@ -209,6 +251,7 @@ BufferDesc *RedoForOndemandExtremeRTOQuery(BufferDesc *bufHdr, char relpersisten ondemand_extreme_rto::RedoItemHashEntry *redoItemEntry = NULL; ondemand_extreme_rto::RedoItemTag redoItemTag; XLogRecParseState *procState = NULL; + XLogRecParseState *reloadBlockState = NULL; XLogBlockHead *procBlockHead = NULL; XLogBlockHead *blockHead = NULL; RedoBufferInfo bufferInfo; @@ -280,7 +323,10 @@ BufferDesc *RedoForOndemandExtremeRTOQuery(BufferDesc *bufHdr, char relpersisten case BLOCK_DATA_VM_TYPE: case BLOCK_DATA_FSM_TYPE: needMarkDirty = true; - XlogBlockRedoForOndemandExtremeRTOQuery(redoBlockState, &bufferInfo); + // reload from disk, because RedoPageManager already release refrecord in on-demand build stage + reloadBlockState = OndemandRedoReloadXLogRecord(redoBlockState); + XlogBlockRedoForOndemandExtremeRTOQuery(reloadBlockState, &bufferInfo); + OndemandRedoReleaseXLogRecord(reloadBlockState); break; case BLOCK_DATA_XLOG_COMMON_TYPE: case BLOCK_DATA_DDL_TYPE: @@ -307,6 +353,71 @@ BufferDesc *RedoForOndemandExtremeRTOQuery(BufferDesc *bufHdr, char relpersisten return bufHdr; } +bool IsTargetBlockState(XLogRecParseState *targetblockstate, XLogRecParseState* curblockstate) +{ + if (memcmp(&targetblockstate->blockparse.blockhead, &curblockstate->blockparse.blockhead, sizeof(XLogBlockHead)) != 0) { + return false; + } + return true; +} + +// only used in ondemand redo stage +XLogRecParseState *OndemandRedoReloadXLogRecord(XLogRecParseState *redoblockstate) +{ + uint32 blockNum = 0; + char *errormsg = NULL; + XLogRecParseState *recordBlockState = NULL; + XLogPageReadPrivate readPrivate = { + .datadir = NULL, + .tli = GetRecoveryTargetTLI() + }; + + XLogReaderState *xlogreader = XLogReaderAllocate(&SimpleXLogPageReadInFdCache, &readPrivate); // do not use pre-read + + // step1: read record + XLogRecord *record = XLogReadRecord(xlogreader, redoblockstate->blockparse.blockhead.start_ptr, &errormsg, + true, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); + if (record == NULL) { + ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("[On-demand] reload xlog record failed at %X/%X, errormsg: %s", + (uint32)(redoblockstate->blockparse.blockhead.start_ptr >> 32), + (uint32)redoblockstate->blockparse.blockhead.start_ptr, errormsg))); + } + + // step2: parse to block + do { + recordBlockState = XLogParseToBlockForExtermeRTO(xlogreader, &blockNum); + if (recordBlockState != NULL) { + break; + } + Assert(blockNum != 0); // out of memory + } while (true); + + // step3: find target parse state + XLogRecParseState *nextState = recordBlockState; + XLogRecParseState *targetState = NULL; + do { + XLogRecParseState *preState = nextState; + nextState = (XLogRecParseState *)nextState->nextrecord; + preState->nextrecord = NULL; + + if (IsTargetBlockState(preState, redoblockstate)) { + targetState = preState; + } else { + OndemandXLogParseBufferRelease(preState); + } + } while (nextState != NULL); + + return targetState; +} + +// only used in ondemand redo stage +void OndemandRedoReleaseXLogRecord(XLogRecParseState *reloadBlockState) +{ + XLogReaderFree((XLogReaderState*)reloadBlockState->refrecord); + OndemandXLogParseBufferRelease(reloadBlockState); +} + void OnDemandSendRecoveryEndMarkToWorkersAndWaitForReach(int code) { ondemand_extreme_rto::SendRecoveryEndMarkToWorkersAndWaitForReach(code); diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp index e3c8726dfb85340951d3314b2b56a62f2125b3af..075e17027172f6dc9ebcba7394f735f5b22ae99a 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp @@ -23,6 +23,7 @@ #include "access/ondemand_extreme_rto/spsc_blocking_queue.h" #include "access/ondemand_extreme_rto/dispatcher.h" +#include "access/ondemand_extreme_rto/xlog_read.h" #include "access/multi_redo_api.h" #include "access/xlog.h" #include "ddes/dms/ss_reform_common.h" @@ -30,6 +31,7 @@ #include "replication/dcf_replication.h" #include "replication/shared_storage_walreceiver.h" #include "storage/ipc.h" +#include "storage/file/fio_device.h" namespace ondemand_extreme_rto { static bool DoEarlyExit() @@ -251,14 +253,7 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, xlogreader->readBuf = g_dispatcher->rtoXlogBufState.readBuf; for (;;) { - uint32 readSource = pg_atomic_read_u32(&(g_recordbuffer->readSource)); - if (readSource & XLOG_FROM_STREAM) { - readLen = ParallelXLogReadWorkBufRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); - } else { - readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, xlogreader->readBuf, - readTLI, NULL); - } - + readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, xlogreader->readBuf, readTLI, NULL); if (readLen > 0 || t_thrd.xlog_cxt.recoveryTriggered || !t_thrd.xlog_cxt.StandbyMode || DoEarlyExit()) { return readLen; } @@ -704,4 +699,100 @@ XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader) } } -} // namespace ondemand_extreme_rto \ No newline at end of file +} // namespace ondemand_extreme_rto + +typedef struct XLogPageReadPrivate { + const char *datadir; + TimeLineID tli; +} XLogPageReadPrivate; + +void InitXLogFileId(XLogRecPtr targetPagePtr, TimeLineID timeLine, XLogFileId* id) +{ + XLByteToSeg(targetPagePtr, id->segno); + id->tli = timeLine; +} + +/* XLogreader callback function, to read a WAL page */ +int SimpleXLogPageReadInFdCache(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, + char *readBuf, TimeLineID *pageTLI, char* xlog_path) +{ + XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data; + uint32 targetPageOff; + int ss_c = 0; + char xlogfpath[MAXPGPATH]; + XLogFileId xlogfileid; + XLogFileIdCacheEntry *entry; + int xlogreadfd = -1; + bool found = false; + + InitXLogFileId(targetPagePtr, readprivate->tli, &xlogfileid); + + (void)LWLockAcquire(OndemandXLogFileHandleLock, LW_SHARED); + entry = (XLogFileIdCacheEntry *)hash_search(t_thrd.storage_cxt.ondemandXLogFileIdCache, + (void *)&xlogfileid, HASH_FIND, &found); + if (found) { + xlogreadfd = entry->fd; + } + LWLockRelease(OndemandXLogFileHandleLock); + + if (xlogreadfd == -1) { + Assert(!found); + + (void)LWLockAcquire(OndemandXLogFileHandleLock, LW_EXCLUSIVE); + entry = (XLogFileIdCacheEntry *)hash_search(t_thrd.storage_cxt.ondemandXLogFileIdCache, + (void *)&xlogfileid, HASH_ENTER, &found); + if (entry == NULL) { + report_invalid_record(xlogreader, + "SimpleXLogPageReadInFdCache could not create xlogfile handle entry \"%s\": %s\n", + xlogfpath, strerror(errno)); + LWLockRelease(OndemandXLogFileHandleLock); + return -1; + } + + if (!found) { + ss_c = snprintf_s(xlogfpath, MAXPGPATH, MAXPGPATH - 1, "%s/%08X%08X%08X", xlog_path, readprivate->tli, + (uint32)((xlogfileid.segno) / XLogSegmentsPerXLogId), + (uint32)((xlogfileid.segno) % XLogSegmentsPerXLogId)); + securec_check_ss(ss_c, "", ""); + entry->fd = open(xlogfpath, O_RDONLY | PG_BINARY, 0); + if (entry->fd < 0) { + report_invalid_record(xlogreader, "SimpleXLogPageReadInFdCache could not open file \"%s\": %s\n", + xlogfpath, strerror(errno)); + LWLockRelease(OndemandXLogFileHandleLock); + return -1; + } + } + xlogreadfd = entry->fd; + LWLockRelease(OndemandXLogFileHandleLock); + } + + /* + * At this point, we have the right segment open. + */ + Assert(xlogreadfd != -1); + + targetPageOff = targetPagePtr % XLogSegSize; + /* Read the requested page */ + if (pread(xlogreadfd, readBuf, XLOG_BLCKSZ, (off_t)targetPageOff) != XLOG_BLCKSZ) { + report_invalid_record(xlogreader, "SimpleXLogPageReadInFdCache could not pread from file \"%s\": %s\n", + xlogfpath, strerror(errno)); + return -1; + } + *pageTLI = readprivate->tli; + return XLOG_BLCKSZ; +} + +void CloseAllXlogFileInFdCache(void) +{ + HASH_SEQ_STATUS status; + XLogFileIdCacheEntry *entry = NULL; + hash_seq_init(&status, t_thrd.storage_cxt.ondemandXLogFileIdCache); + + while ((entry = (XLogFileIdCacheEntry *)hash_seq_search(&status)) != NULL) { + close(entry->fd); + if (hash_search(t_thrd.storage_cxt.ondemandXLogFileIdCache, (void *)&entry->id, HASH_REMOVE, NULL) == NULL) { + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("[On-demand] xlogfile handle cache hash table corrupted"))); + } + } + return; +} diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 86389e038e796930349cf1454101bbffeedd3c62..4cd9ef70d2b531fb6a364b2b516f0caa2f063fbb 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -6377,7 +6377,7 @@ void XLOGShmemInit(void) t_thrd.shemem_ptr_cxt.XLogCtl->SharedRecoveryInProgress = true; t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone = false; t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone = false; - t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone = false; + t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRedoDone = false; t_thrd.shemem_ptr_cxt.XLogCtl->SharedHotStandbyActive = false; t_thrd.shemem_ptr_cxt.XLogCtl->WalWriterSleeping = false; t_thrd.shemem_ptr_cxt.XLogCtl->xlogFlushPtrForPerRead = InvalidXLogRecPtr; @@ -8501,7 +8501,7 @@ static void XLogMakeUpRemainSegsContent(char *contentBuffer) void XLogCheckRemainSegs() { - if (SS_ONDEMAND_BUILD_DONE && !SS_ONDEMAND_RECOVERY_DONE) { + if (SS_ONDEMAND_BUILD_DONE && !SS_ONDEMAND_REDO_DONE) { return; } @@ -9302,7 +9302,7 @@ void StartupXLOG(void) t_thrd.shemem_ptr_cxt.XLogCtl->ckptXid = checkPoint.oldestXid; t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone = false; t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone = false; - t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone = false; + t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRedoDone = false; latestCompletedXid = checkPoint.nextXid; TransactionIdRetreat(latestCompletedXid); diff --git a/src/gausskernel/storage/access/transam/xlogutils.cpp b/src/gausskernel/storage/access/transam/xlogutils.cpp index 211d34d504b9add939b7f3f7213ce1cd7d6da89a..c5db30a85fc027be7d5cf963f220e683fd486391 100644 --- a/src/gausskernel/storage/access/transam/xlogutils.cpp +++ b/src/gausskernel/storage/access/transam/xlogutils.cpp @@ -513,7 +513,7 @@ static void CollectInvalidPagesStates(uint32 *nstates_ptr, InvalidPagesState *** /* Complain about any remaining invalid-page entries */ void XLogCheckInvalidPages(void) { - if (SS_ONDEMAND_BUILD_DONE && !SS_ONDEMAND_RECOVERY_DONE) { + if (SS_ONDEMAND_BUILD_DONE && !SS_ONDEMAND_REDO_DONE) { return; } diff --git a/src/gausskernel/storage/ipc/ipci.cpp b/src/gausskernel/storage/ipc/ipci.cpp index 607dd3435602e962809bd03c2bf74d4001fe5722..0f1a238384e0d4a7e2317af42ad4bf6fc0e4778d 100644 --- a/src/gausskernel/storage/ipc/ipci.cpp +++ b/src/gausskernel/storage/ipc/ipci.cpp @@ -451,6 +451,7 @@ void CreateSharedMemoryAndSemaphores(bool makePrivate, int port) if (g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery) { OndemandRecoveryShmemInit(); + OndemandXlogFileIdCacheInit(); } if (g_instance.ckpt_cxt_ctl->prune_queue_lock == NULL) { diff --git a/src/gausskernel/storage/lmgr/lwlocknames.txt b/src/gausskernel/storage/lmgr/lwlocknames.txt index 3a3fe6b7551da9fe097c3f72e15f0bfbbd5f204b..dffbc1a6ef32e12e708cfd1d3b73bd0692c081e1 100755 --- a/src/gausskernel/storage/lmgr/lwlocknames.txt +++ b/src/gausskernel/storage/lmgr/lwlocknames.txt @@ -138,4 +138,5 @@ GsStackLock 128 ConfigFileLock 129 DropArchiveSlotLock 130 AboCacheLock 131 -OndemandXlogMemAllocLock 132 +OndemandXLogMemAllocLock 132 +OndemandXLogFileHandleLock 133 diff --git a/src/include/access/ondemand_extreme_rto/dispatcher.h b/src/include/access/ondemand_extreme_rto/dispatcher.h index 17f9958cfac2b1e18351cdfec002e5169d01e83d..70a216f6a89eef07f59d181c1892d123e9c0aa54 100644 --- a/src/include/access/ondemand_extreme_rto/dispatcher.h +++ b/src/include/access/ondemand_extreme_rto/dispatcher.h @@ -179,6 +179,7 @@ typedef struct { extern LogDispatcher *g_dispatcher; extern RedoItem g_GlobalLsnForwarder; extern RedoItem g_cleanupMark; +extern RedoItem g_forceDistributeMark; extern THR_LOCAL RecordBufferState *g_recordbuffer; const static uint64 OUTPUT_WAIT_COUNT = 0x7FFFFFF; diff --git a/src/include/access/ondemand_extreme_rto/page_redo.h b/src/include/access/ondemand_extreme_rto/page_redo.h index 005d76a9daa632e2e6cbaddf3c35d6b8818bb33a..285d66c2aecee8941ba9b4124fe582f366ae83ad 100644 --- a/src/include/access/ondemand_extreme_rto/page_redo.h +++ b/src/include/access/ondemand_extreme_rto/page_redo.h @@ -42,7 +42,7 @@ namespace ondemand_extreme_rto { #define ONDEMAND_DISTRIBUTE_RATIO 0.9 -static const uint32 PAGE_WORK_QUEUE_SIZE = 2097152; +static const uint32 PAGE_WORK_QUEUE_SIZE = 65536; static const uint32 ONDEMAND_EXTREME_RTO_ALIGN_LEN = 16; /* need 128-bit aligned */ static const uint32 MAX_REMOTE_READ_INFO_NUM = 100; @@ -185,7 +185,6 @@ struct PageRedoWorker { RedoBufferManager bufferManager; RedoTimeCost timeCostList[TIME_COST_NUM]; char page[BLCKSZ]; - XLogBlockDataParse *curRedoBlockState; }; diff --git a/src/include/access/ondemand_extreme_rto/redo_utils.h b/src/include/access/ondemand_extreme_rto/redo_utils.h index a5a95a2b56ee56c249000f94b0e8ae7df7396a18..6a56cb3d9843345c3b5cb8937885b5fd9fd45971 100644 --- a/src/include/access/ondemand_extreme_rto/redo_utils.h +++ b/src/include/access/ondemand_extreme_rto/redo_utils.h @@ -28,12 +28,15 @@ Size OndemandRecoveryShmemSize(void); void OndemandRecoveryShmemInit(void); +void OndemandXlogFileIdCacheInit(void); void OndemandXLogParseBufferInit(RedoParseManager *parsemanager, int buffernum, RefOperate *refOperate, InterruptFunc interruptOperte); void OndemandXLogParseBufferDestory(RedoParseManager *parsemanager); XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanager, XLogRecParseState *blkstatehead, void *record); void OndemandXLogParseBufferRelease(XLogRecParseState *recordstate); +XLogRecParseState *OndemandRedoReloadXLogRecord(XLogRecParseState *redoblockstate); +void OndemandRedoReleaseXLogRecord(XLogRecParseState *reloadBlockState); void OnDemandSendRecoveryEndMarkToWorkersAndWaitForReach(int code); void OnDemandWaitRedoFinish(); diff --git a/src/include/access/ondemand_extreme_rto/xlog_read.h b/src/include/access/ondemand_extreme_rto/xlog_read.h index 6642a013e8d9770b075df028e308bef9f900afe7..10cb8c2515510d5a31fa26d160cabf9333c060e5 100644 --- a/src/include/access/ondemand_extreme_rto/xlog_read.h +++ b/src/include/access/ondemand_extreme_rto/xlog_read.h @@ -31,6 +31,21 @@ namespace ondemand_extreme_rto { XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader); XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode); +XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg); } // namespace ondemand_extreme_rto + +typedef struct XLogFileId { + XLogSegNo segno; + TimeLineID tli; +} XLogFileId; + +typedef struct XLogFileIdCacheEntry { + XLogFileId id; + int fd; +} XLogFileIdCacheEntry; + +int SimpleXLogPageReadInFdCache(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI, char* xlog_path = NULL); +void CloseAllXlogFileInFdCache(void); #endif /* ONDEMAND_EXTREME_RTO_XLOG_READ_H */ \ No newline at end of file diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 66ba231bd01962c09a194a1982b5b727b80ac1a2..04fc92a3862f062de1959428f0bffc9ee3db901d 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -541,7 +541,7 @@ typedef struct XLogCtlData { bool IsRecoveryDone; bool IsOnDemandBuildDone; - bool IsOnDemandRecoveryDone; + bool IsOnDemandRedoDone; /* * SharedHotStandbyActive indicates if we're still in crash or archive diff --git a/src/include/access/xlogproc.h b/src/include/access/xlogproc.h index 9d76ca5fac3932121284eb08e55b65ded155d818..fba9b6e3fd385d08ea0b2b3f4f63b0c5c8d38604 100755 --- a/src/include/access/xlogproc.h +++ b/src/include/access/xlogproc.h @@ -693,6 +693,7 @@ typedef enum { XLOG_HEAD_DISTRIBUTE, XLOG_MID_DISTRIBUTE, XLOG_TAIL_DISTRIBUTE, + XLOG_SKIP_DISTRIBUTE, } XlogDistributePos; typedef struct { diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index c56b1978a6b18762251bd2d91de3938e5c86b225..cefce498968bd4e1bf37ad2eb6ff8a40d4140fa7 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -150,7 +150,7 @@ #define SS_CLUSTER_ONDEMAND_BUILD \ (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_BUILD)) #define SS_CLUSTER_ONDEMAND_RECOVERY \ - (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_RECOVERY)) + (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_REDO)) #define SS_CLUSTER_ONDEMAND_NORMAL \ (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_NORMAL)) #define SS_STANDBY_ONDEMAND_BUILD (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_BUILD) @@ -226,7 +226,7 @@ typedef enum SSReformType { typedef enum SSGlobalClusterState { CLUSTER_IN_ONDEMAND_BUILD = 0, - CLUSTER_IN_ONDEMAND_RECOVERY, + CLUSTER_IN_ONDEMAND_REDO, CLUSTER_NORMAL } SSGlobalClusterState; @@ -234,7 +234,7 @@ typedef enum SSOndemandRequestRedoStatus { ONDEMAND_REDO_DONE = 0, ONDEMAND_REDO_SKIP, ONDEMAND_REDO_FAIL, - ONDEMAND_REDO_INVALID + ONDEMAND_REDO_TIMEOUT } SSOndemandRequestRedoStatus; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index 98d210b17d8891810eea678db80290c778c03d2d..8645feef71fc0e7ea37510f349bfa52f78368b77 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -35,11 +35,11 @@ #define SS_IN_ONDEMAND_RECOVERY (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery == true) #define SS_ONDEMAND_BUILD_DONE (ENABLE_DMS && SS_IN_ONDEMAND_RECOVERY \ && t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone == true) -#define SS_ONDEMAND_RECOVERY_DONE (ENABLE_DMS && SS_IN_ONDEMAND_RECOVERY \ - && t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone == true) +#define SS_ONDEMAND_REDO_DONE (SS_IN_ONDEMAND_RECOVERY \ + && t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRedoDone == true) #define SS_REPLAYED_BY_ONDEMAND (ENABLE_DMS && !SS_IN_ONDEMAND_RECOVERY && \ t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone == true && \ - t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone == true) + t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRedoDone == true) #define REFORM_CTRL_VERSION 1 diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index f58b2a0ca2c18c93f47c0b83f40cf0bdf642e60c..524c1524f5a48c8dca01955c78def517da60fd1d 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2806,6 +2806,7 @@ typedef struct knl_t_storage_context { char* PcaBufferBlocks; dms_buf_ctrl_t* dmsBufCtl; char* ondemandXLogMem; + struct HTAB* ondemandXLogFileIdCache; } knl_t_storage_context; typedef struct knl_t_port_context { diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 5dce1abc43ea678c910323dc09de1d4d02abd583..9e4c9276754b546fa1f28d04883b8a9fcf300382 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -416,7 +416,7 @@ extern bool stack_is_too_deep(void); /* in tcop/utility.c */ extern void PreventCommandIfReadOnly(const char* cmdname); extern void PreventCommandDuringRecovery(const char* cmdname); -extern void PreventCommandDuringSSOndemandRecovery(Node* parseTree); +extern void PreventCommandDuringSSOndemandRedo(Node* parseTree); extern int trace_recovery(int trace_level);