From 563971b4c27e9574e395f309f0b04fc0e5ab09f6 Mon Sep 17 00:00:00 2001 From: movead Date: Wed, 27 Sep 2023 09:54:07 +0800 Subject: [PATCH] for parallel redo, change txn check lsn method from finish lsn to tying lsn --- .../access/transam/parallel_recovery/dispatcher.cpp | 4 ++-- .../access/transam/parallel_recovery/page_redo.cpp | 7 +++++++ .../access/transam/parallel_recovery/txn_redo.cpp | 10 +++++++--- src/include/access/parallel_recovery/dispatcher.h | 2 +- src/include/access/parallel_recovery/page_redo.h | 2 ++ 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp index 8c1ae45485..a34c369187 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp @@ -1935,13 +1935,13 @@ void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr) *endPtr = minEnd; } -void GetReplayedRecPtrFromWorkers(XLogRecPtr *endPtr) +void GetReplayingRecPtrFromWorkers(XLogRecPtr *endPtr) { XLogRecPtr minEnd = MAX_XLOG_REC_PTR; for (uint32 i = 0; i < g_dispatcher->pageWorkerCount; i++) { if (!RedoWorkerIsIdle(g_dispatcher->pageWorkers[i])) { - XLogRecPtr end = GetCompletedRecPtr(g_dispatcher->pageWorkers[i]); + XLogRecPtr end = GetReplyingRecPtr(g_dispatcher->pageWorkers[i]); if (XLByteLT(end, minEnd)) { minEnd = end; } diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp index a7981319c7..5a46bb1b8f 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp @@ -411,6 +411,7 @@ void ApplyProcHead(RedoItem *head) while (head != NULL) { RedoItem *cur = head; g_redoWorker->current_item = &cur->record; + pg_atomic_write_u64((volatile uint64*)&g_redoWorker->curReplayingReadRecPtr, cur->record.ReadRecPtr); head = head->nextByWorker[g_redoWorker->id + 1]; ApplyAndFreeRedoItem(cur); } @@ -1022,6 +1023,12 @@ XLogRecPtr GetCompletedRecPtr(PageRedoWorker *worker) return pg_atomic_read_u64(&worker->lastReplayedEndRecPtr); } +XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker) +{ + pg_read_barrier(); + return pg_atomic_read_u64(&worker->curReplayingReadRecPtr); +} + /* automic write for lastReplayedReadRecPtr and lastReplayedEndRecPtr */ void SetCompletedReadEndPtr(PageRedoWorker *worker, XLogRecPtr readPtr, XLogRecPtr endPtr) { diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp index 0079b4a61b..077a79491a 100644 --- a/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp @@ -272,33 +272,36 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) while (item != NULL) { XLogReaderState *record = &item->record; XLogRecPtr lrEnd; + XLogRecPtr curRead; pg_atomic_write_u64(&worker->txn_trying_lsn, record->EndRecPtr); if (forceAll) { GetRedoStartTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]); XLogRecPtr lrRead; /* lastReplayedReadPtr */ GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); /* we need to get lastCompletedPageLSN as soon as possible,so */ /* we can not sleep here. */ XLogRecPtr oldReplayedPageLSN = InvalidXLogRecPtr; - while (XLByteLT(lrEnd, record->EndRecPtr)) { + while (XLByteLT(curRead, record->EndRecPtr)) { /* update lastreplaylsn */ if (!XLByteEQ(oldReplayedPageLSN, lrEnd)) { SetXLogReplayRecPtr(lrRead, lrEnd); oldReplayedPageLSN = lrEnd; } GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); RedoInterruptCallBack(); } CountRedoTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]); } - GetReplayedRecPtrFromWorkers(&lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); /* * Make sure we can replay this record. This check is necessary * on the master and on the hot backup after it reaches consistency. */ - if (XLByteLE(record->EndRecPtr, lrEnd)) { + if (XLByteLE(record->EndRecPtr, curRead)) { item = ProcTxnItem(item); } else { break; @@ -313,6 +316,7 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) XLogRecPtr oldReplayedPageLSN = InvalidXLogRecPtr; XLogRecPtr lrRead; XLogRecPtr lrEnd; + XLogRecPtr curRead; do { GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); if (XLByteLT(g_dispatcher->dispatchEndRecPtr, lrEnd)) { diff --git a/src/include/access/parallel_recovery/dispatcher.h b/src/include/access/parallel_recovery/dispatcher.h index 047c36002e..86c173e4bd 100644 --- a/src/include/access/parallel_recovery/dispatcher.h +++ b/src/include/access/parallel_recovery/dispatcher.h @@ -123,7 +123,7 @@ uint32 GetWorkerId(const RelFileNode& node, BlockNumber block, ForkNumber forkNu XLogReaderState* NewReaderState(XLogReaderState* readerState, bool bCopyState = false); void FreeAllocatedRedoItem(); void GetReplayedRecPtrFromWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr); -void GetReplayedRecPtrFromWorkers(XLogRecPtr *endPtr); +void GetReplayingRecPtrFromWorkers(XLogRecPtr *endPtr); void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr); List* CheckImcompleteAction(List* imcompleteActionList); void SetPageWorkStateByThreadId(uint32 threadState); diff --git a/src/include/access/parallel_recovery/page_redo.h b/src/include/access/parallel_recovery/page_redo.h index 22dbfa51ad..b1a4aadfb0 100644 --- a/src/include/access/parallel_recovery/page_redo.h +++ b/src/include/access/parallel_recovery/page_redo.h @@ -72,6 +72,7 @@ struct PageRedoWorker { */ XLogRecPtr lastReplayedReadRecPtr; XLogRecPtr lastReplayedEndRecPtr; + XLogRecPtr curReplayingReadRecPtr; #if (!defined __x86_64__) && (!defined __aarch64__) /* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */ slock_t ptrLck; @@ -223,6 +224,7 @@ bool ProcessPendingPageRedoItems(PageRedoWorker* worker); /* Run-time worker states. */ uint64 GetCompletedRecPtr(PageRedoWorker* worker); +XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker); bool IsRecoveryRestartPointSafe(PageRedoWorker* worker, XLogRecPtr restartPoint); void SetWorkerRestartPoint(PageRedoWorker* worker, XLogRecPtr restartPoint); -- Gitee