diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp index 615e6cf16a6ef97f2bf03919dbfb997ef6431a25..e426775781a68b83c23ecdfeb0dad739f87a3590 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp @@ -1888,13 +1888,13 @@ void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr) *endPtr = minEnd; } -void GetReplayedRecPtrFromWorkers(XLogRecPtr *endPtr) +void GetReplayingRecPtrFromWorkers(XLogRecPtr *endPtr) { XLogRecPtr minEnd = MAX_XLOG_REC_PTR; for (uint32 i = 0; i < g_dispatcher->pageWorkerCount; i++) { if (!RedoWorkerIsIdle(g_dispatcher->pageWorkers[i])) { - XLogRecPtr end = GetCompletedRecPtr(g_dispatcher->pageWorkers[i]); + XLogRecPtr end = GetReplyingRecPtr(g_dispatcher->pageWorkers[i]); if (XLByteLT(end, minEnd)) { minEnd = end; } diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp index c9b465f22109c39c00a32f72a4eb9f3cf635692a..89ca2e2623a1a819d638e0cae8e9cec2c0a46b1b 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp @@ -412,6 +412,7 @@ void ApplyProcHead(RedoItem *head) while (head != NULL) { RedoItem *cur = head; g_redoWorker->current_item = &cur->record; + pg_atomic_write_u64((volatile uint64*)&g_redoWorker->curReplayingReadRecPtr, cur->record.ReadRecPtr); head = head->nextByWorker[g_redoWorker->id + 1]; ApplyAndFreeRedoItem(cur); } @@ -971,6 +972,19 @@ XLogRecPtr GetCompletedRecPtr(PageRedoWorker *worker) return pg_atomic_read_u64(&worker->lastReplayedEndRecPtr); } +XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker) +{ + XLogRecPtr curReplayingReadRecPtr; + XLogRecPtr lastReplayedEndRecPtr; + XLogRecPtr result; + pg_read_barrier(); + + curReplayingReadRecPtr = pg_atomic_read_u64(&worker->curReplayingReadRecPtr); + lastReplayedEndRecPtr = pg_atomic_read_u64(&worker->lastReplayedEndRecPtr); + + return lastReplayedEndRecPtr > curReplayingReadRecPtr ? lastReplayedEndRecPtr : curReplayingReadRecPtr; +} + /* automic write for lastReplayedReadRecPtr and lastReplayedEndRecPtr */ void SetCompletedReadEndPtr(PageRedoWorker *worker, XLogRecPtr readPtr, XLogRecPtr endPtr) { diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp index 0c707c40fa09f6258b7a4bee9236e2f11cf961c7..c3776dad30210b21156a707a828749607870d796 100644 --- a/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp @@ -255,32 +255,35 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) while (item != NULL) { XLogReaderState *record = &item->record; XLogRecPtr lrEnd; + XLogRecPtr curRead; if (forceAll) { GetRedoStartTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]); XLogRecPtr lrRead; /* lastReplayedReadPtr */ GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); /* we need to get lastCompletedPageLSN as soon as possible,so */ /* we can not sleep here. */ XLogRecPtr oldReplayedPageLSN = InvalidXLogRecPtr; - while (XLByteLT(lrEnd, record->EndRecPtr)) { + while (XLByteLT(curRead, record->EndRecPtr)) { /* update lastreplaylsn */ if (!XLByteEQ(oldReplayedPageLSN, lrEnd)) { SetXLogReplayRecPtr(lrRead, lrEnd); oldReplayedPageLSN = lrEnd; } GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); RedoInterruptCallBack(); } CountRedoTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]); } - GetReplayedRecPtrFromWorkers(&lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); /* * Make sure we can replay this record. This check is necessary * on the master and on the hot backup after it reaches consistency. */ - if (XLByteLE(record->EndRecPtr, lrEnd)) { + if (XLByteLE(record->EndRecPtr, curRead)) { item = ProcTxnItem(item); } else { break; @@ -295,6 +298,7 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) XLogRecPtr oldReplayedPageLSN = InvalidXLogRecPtr; XLogRecPtr lrRead; XLogRecPtr lrEnd; + XLogRecPtr curRead; do { GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); if (XLByteLT(g_dispatcher->dispatchEndRecPtr, lrEnd)) { diff --git a/src/include/access/parallel_recovery/dispatcher.h b/src/include/access/parallel_recovery/dispatcher.h index cad5d9481aef597045da4e017da3957049ec7d8c..11b01424c3c7a741967c58ea164c8d99539d7a7c 100644 --- a/src/include/access/parallel_recovery/dispatcher.h +++ b/src/include/access/parallel_recovery/dispatcher.h @@ -121,7 +121,7 @@ uint32 GetWorkerId(const RelFileNode& node, BlockNumber block, ForkNumber forkNu XLogReaderState* NewReaderState(XLogReaderState* readerState, bool bCopyState = false); void FreeAllocatedRedoItem(); void GetReplayedRecPtrFromWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr); -void GetReplayedRecPtrFromWorkers(XLogRecPtr *endPtr); +void GetReplayingRecPtrFromWorkers(XLogRecPtr *endPtr); void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr); List* CheckImcompleteAction(List* imcompleteActionList); void SetPageWorkStateByThreadId(uint32 threadState); diff --git a/src/include/access/parallel_recovery/page_redo.h b/src/include/access/parallel_recovery/page_redo.h index e3056d9fc24d8327905845436ccd2a79e6b6b38b..13e0ff6d7f1e7fde3ce53381f6466ab62039cebd 100644 --- a/src/include/access/parallel_recovery/page_redo.h +++ b/src/include/access/parallel_recovery/page_redo.h @@ -72,6 +72,7 @@ struct PageRedoWorker { */ XLogRecPtr lastReplayedReadRecPtr; XLogRecPtr lastReplayedEndRecPtr; + XLogRecPtr curReplayingReadRecPtr; #if (!defined __x86_64__) && (!defined __aarch64__) /* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */ slock_t ptrLck; @@ -224,6 +225,7 @@ bool ProcessPendingPageRedoItems(PageRedoWorker* worker); /* Run-time worker states. */ uint64 GetCompletedRecPtr(PageRedoWorker* worker); +XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker); bool IsRecoveryRestartPointSafe(PageRedoWorker* worker, XLogRecPtr restartPoint); void SetWorkerRestartPoint(PageRedoWorker* worker, XLogRecPtr restartPoint);