From 20bfe8d472b544f2451d9a7c0f0d308fb62c373d Mon Sep 17 00:00:00 2001 From: movead Date: Wed, 27 Sep 2023 09:54:07 +0800 Subject: [PATCH 1/2] for parallel redo, change txn check lsn method from finish lsn to tying lsn --- .../access/transam/parallel_recovery/dispatcher.cpp | 4 ++-- .../access/transam/parallel_recovery/page_redo.cpp | 7 +++++++ .../access/transam/parallel_recovery/txn_redo.cpp | 10 +++++++--- src/include/access/parallel_recovery/dispatcher.h | 2 +- src/include/access/parallel_recovery/page_redo.h | 2 ++ 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp index 615e6cf16a..e426775781 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp @@ -1888,13 +1888,13 @@ void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr) *endPtr = minEnd; } -void GetReplayedRecPtrFromWorkers(XLogRecPtr *endPtr) +void GetReplayingRecPtrFromWorkers(XLogRecPtr *endPtr) { XLogRecPtr minEnd = MAX_XLOG_REC_PTR; for (uint32 i = 0; i < g_dispatcher->pageWorkerCount; i++) { if (!RedoWorkerIsIdle(g_dispatcher->pageWorkers[i])) { - XLogRecPtr end = GetCompletedRecPtr(g_dispatcher->pageWorkers[i]); + XLogRecPtr end = GetReplyingRecPtr(g_dispatcher->pageWorkers[i]); if (XLByteLT(end, minEnd)) { minEnd = end; } diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp index c9b465f221..36c324fd97 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp @@ -412,6 +412,7 @@ void ApplyProcHead(RedoItem *head) while (head != NULL) { RedoItem *cur = head; g_redoWorker->current_item = &cur->record; + pg_atomic_write_u64((volatile uint64*)&g_redoWorker->curReplayingReadRecPtr, cur->record.ReadRecPtr); head = head->nextByWorker[g_redoWorker->id + 1]; ApplyAndFreeRedoItem(cur); } @@ -971,6 +972,12 @@ XLogRecPtr GetCompletedRecPtr(PageRedoWorker *worker) return pg_atomic_read_u64(&worker->lastReplayedEndRecPtr); } +XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker) +{ + pg_read_barrier(); + return pg_atomic_read_u64(&worker->curReplayingReadRecPtr); +} + /* automic write for lastReplayedReadRecPtr and lastReplayedEndRecPtr */ void SetCompletedReadEndPtr(PageRedoWorker *worker, XLogRecPtr readPtr, XLogRecPtr endPtr) { diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp index 0c707c40fa..c3776dad30 100644 --- a/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/txn_redo.cpp @@ -255,32 +255,35 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) while (item != NULL) { XLogReaderState *record = &item->record; XLogRecPtr lrEnd; + XLogRecPtr curRead; if (forceAll) { GetRedoStartTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]); XLogRecPtr lrRead; /* lastReplayedReadPtr */ GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); /* we need to get lastCompletedPageLSN as soon as possible,so */ /* we can not sleep here. */ XLogRecPtr oldReplayedPageLSN = InvalidXLogRecPtr; - while (XLByteLT(lrEnd, record->EndRecPtr)) { + while (XLByteLT(curRead, record->EndRecPtr)) { /* update lastreplaylsn */ if (!XLByteEQ(oldReplayedPageLSN, lrEnd)) { SetXLogReplayRecPtr(lrRead, lrEnd); oldReplayedPageLSN = lrEnd; } GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); RedoInterruptCallBack(); } CountRedoTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_6]); } - GetReplayedRecPtrFromWorkers(&lrEnd); + GetReplayingRecPtrFromWorkers(&curRead); /* * Make sure we can replay this record. This check is necessary * on the master and on the hot backup after it reaches consistency. */ - if (XLByteLE(record->EndRecPtr, lrEnd)) { + if (XLByteLE(record->EndRecPtr, curRead)) { item = ProcTxnItem(item); } else { break; @@ -295,6 +298,7 @@ void ApplyReadyTxnLogRecords(TxnRedoWorker *worker, bool forceAll) XLogRecPtr oldReplayedPageLSN = InvalidXLogRecPtr; XLogRecPtr lrRead; XLogRecPtr lrEnd; + XLogRecPtr curRead; do { GetReplayedRecPtrFromWorkers(&lrRead, &lrEnd); if (XLByteLT(g_dispatcher->dispatchEndRecPtr, lrEnd)) { diff --git a/src/include/access/parallel_recovery/dispatcher.h b/src/include/access/parallel_recovery/dispatcher.h index cad5d9481a..11b01424c3 100644 --- a/src/include/access/parallel_recovery/dispatcher.h +++ b/src/include/access/parallel_recovery/dispatcher.h @@ -121,7 +121,7 @@ uint32 GetWorkerId(const RelFileNode& node, BlockNumber block, ForkNumber forkNu XLogReaderState* NewReaderState(XLogReaderState* readerState, bool bCopyState = false); void FreeAllocatedRedoItem(); void GetReplayedRecPtrFromWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr); -void GetReplayedRecPtrFromWorkers(XLogRecPtr *endPtr); +void GetReplayingRecPtrFromWorkers(XLogRecPtr *endPtr); void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr); List* CheckImcompleteAction(List* imcompleteActionList); void SetPageWorkStateByThreadId(uint32 threadState); diff --git a/src/include/access/parallel_recovery/page_redo.h b/src/include/access/parallel_recovery/page_redo.h index e3056d9fc2..13e0ff6d7f 100644 --- a/src/include/access/parallel_recovery/page_redo.h +++ b/src/include/access/parallel_recovery/page_redo.h @@ -72,6 +72,7 @@ struct PageRedoWorker { */ XLogRecPtr lastReplayedReadRecPtr; XLogRecPtr lastReplayedEndRecPtr; + XLogRecPtr curReplayingReadRecPtr; #if (!defined __x86_64__) && (!defined __aarch64__) /* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */ slock_t ptrLck; @@ -224,6 +225,7 @@ bool ProcessPendingPageRedoItems(PageRedoWorker* worker); /* Run-time worker states. */ uint64 GetCompletedRecPtr(PageRedoWorker* worker); +XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker); bool IsRecoveryRestartPointSafe(PageRedoWorker* worker, XLogRecPtr restartPoint); void SetWorkerRestartPoint(PageRedoWorker* worker, XLogRecPtr restartPoint); -- Gitee From 36a0f3e0e01b45b5c17eb9cb70a4ad86a44f6000 Mon Sep 17 00:00:00 2001 From: movead Date: Wed, 27 Sep 2023 15:56:28 +0800 Subject: [PATCH 2/2] in GetReplyingRecPtr pick bigger between curReplayingReadRecPtr and lastReplayedEndRecPtr; --- .../access/transam/parallel_recovery/page_redo.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp index 36c324fd97..89ca2e2623 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/page_redo.cpp @@ -974,8 +974,15 @@ XLogRecPtr GetCompletedRecPtr(PageRedoWorker *worker) XLogRecPtr GetReplyingRecPtr(PageRedoWorker *worker) { + XLogRecPtr curReplayingReadRecPtr; + XLogRecPtr lastReplayedEndRecPtr; + XLogRecPtr result; pg_read_barrier(); - return pg_atomic_read_u64(&worker->curReplayingReadRecPtr); + + curReplayingReadRecPtr = pg_atomic_read_u64(&worker->curReplayingReadRecPtr); + lastReplayedEndRecPtr = pg_atomic_read_u64(&worker->lastReplayedEndRecPtr); + + return lastReplayedEndRecPtr > curReplayingReadRecPtr ? lastReplayedEndRecPtr : curReplayingReadRecPtr; } /* automic write for lastReplayedReadRecPtr and lastReplayedEndRecPtr */ -- Gitee