From b3d5af3f09ac31bed8253b0f4a187d6601f74458 Mon Sep 17 00:00:00 2001 From: "arcoalien@qq.com" Date: Wed, 1 Nov 2023 14:37:23 +0800 Subject: [PATCH 1/2] use 6 way CAS lock in pin buffer and lwlock ac --- .../pg_buffercache/pg_buffercache_pages.cpp | 2 +- src/common/backend/utils/adt/pgstatfuncs.cpp | 2 +- .../ddes/adapter/ss_dms_bufmgr.cpp | 12 +- .../ddes/adapter/ss_dms_callback.cpp | 26 +- .../optimizer/commands/verifyrepair.cpp | 4 +- .../process/postmaster/pagewriter.cpp | 18 +- .../storage/access/heap/rewriteheap.cpp | 12 +- .../storage/access/redo/redo_xlogutils.cpp | 2 +- .../storage/access/transam/double_write.cpp | 2 +- .../access/transam/seg_double_write.cpp | 2 +- .../access/transam/single_double_write.cpp | 2 +- .../storage/access/ustore/knl_uheap.cpp | 5 +- src/gausskernel/storage/buffer/buf_init.cpp | 2 +- src/gausskernel/storage/buffer/bufmgr.cpp | 275 +++++++++--------- src/gausskernel/storage/buffer/freelist.cpp | 22 +- src/gausskernel/storage/buffer/localbuf.cpp | 44 +-- src/gausskernel/storage/lmgr/lwlock.cpp | 161 +++++----- src/gausskernel/storage/nvm/nvmbuffer.cpp | 58 ++-- .../storage/smgr/segment/segbuffer.cpp | 61 ++-- .../storage/smgr/segment/segxlog.cpp | 2 +- .../storage/smgr/segment/space.cpp | 4 +- src/include/access/double_write.h | 6 +- src/include/access/xlogproc.h | 6 +- src/include/storage/buf/buf_internals.h | 55 ++-- src/include/storage/buf/bufmgr.h | 10 +- src/include/storage/lock/lwlock.h | 2 +- src/include/storage/smgr/segment.h | 2 +- src/include/utils/atomic.h | 10 + 28 files changed, 407 insertions(+), 402 deletions(-) diff --git a/contrib/pg_buffercache/pg_buffercache_pages.cpp b/contrib/pg_buffercache/pg_buffercache_pages.cpp index 322abd1a58..3d4e31bf8b 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.cpp +++ b/contrib/pg_buffercache/pg_buffercache_pages.cpp @@ -61,7 +61,7 @@ Datum pg_buffercache_pages(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { int i; BufferDescPadded *bufHdrPadded = NULL; - uint32 buf_state; + uint64 buf_state; funcctx = SRF_FIRSTCALL_INIT(); diff --git a/src/common/backend/utils/adt/pgstatfuncs.cpp b/src/common/backend/utils/adt/pgstatfuncs.cpp index 25231bfb07..4893c2777c 100644 --- a/src/common/backend/utils/adt/pgstatfuncs.cpp +++ b/src/common/backend/utils/adt/pgstatfuncs.cpp @@ -8689,7 +8689,7 @@ Datum pg_buffercache_pages(PG_FUNCTION_ARGS) */ for (i = 0; i < g_instance.attr.attr_storage.NBuffers; i++) { - uint32 buf_state; + uint64 buf_state; bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 19ddf86723..bdf9b75416 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -321,7 +321,7 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X static bool DmsStartBufferIO(BufferDesc *buf_desc, LWLockMode mode) { - uint32 buf_state; + uint64 buf_state; dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); if (IsSegmentBufferID(buf_desc->buf_id)) { @@ -335,7 +335,7 @@ static bool DmsStartBufferIO(BufferDesc *buf_desc, LWLockMode mode) } if (LockModeCompatible(buf_ctrl, mode)) { - if (!(pg_atomic_read_u32(&buf_desc->state) & BM_IO_IN_PROGRESS)) { + if (!(pg_atomic_read_u64(&buf_desc->state) & BM_IO_IN_PROGRESS)) { return false; } } @@ -680,7 +680,7 @@ void SSCheckBufferIfNeedMarkDirty(Buffer buf) void SSRecheckBufferPool() { - uint32 buf_state; + uint64 buf_state; for (int i = 0; i < TOTAL_BUFFER_NUM; i++) { /* * BUF_DIRTY_NEED_FLUSH was removed during mark buffer dirty and lsn_on_disk was set during sync buffer @@ -690,7 +690,7 @@ void SSRecheckBufferPool() */ BufferDesc *buf_desc = GetBufferDescriptor(i); pg_memory_barrier(); - buf_state = pg_atomic_read_u32(&buf_desc->state); + buf_state = pg_atomic_read_u64(&buf_desc->state); if (!(buf_state & BM_VALID || buf_state & BM_TAG_VALID)) { continue; } @@ -783,7 +783,7 @@ bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer) BufferDesc *buf_desc = BufferGetBufferDescriptor(buf); bool ret = false; - if ((pg_atomic_read_u32(&buf_desc->state) & BM_VALID) && buf_desc->extra->seg_fileno != EXTENT_INVALID) { + if ((pg_atomic_read_u64(&buf_desc->state) & BM_VALID) && buf_desc->extra->seg_fileno != EXTENT_INVALID) { SMGR_READ_STATUS rdStatus; if (reln->seg_space == NULL) { reln->seg_space = spc_open(reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, false); @@ -982,7 +982,7 @@ bool SSWaitIOTimeout(BufferDesc *buf) { bool ret = false; for (;;) { - uint32 buf_state; + uint64 buf_state; buf_state = LockBufHdr(buf); UnlockBufHdr(buf, buf_state); diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index bf5d1c6f15..477a89261e 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -555,7 +555,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ break; } - if (!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)) { + if (!(pg_atomic_read_u64(&buf_desc->state) & BM_VALID)) { ereport(WARNING, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d %d-%d] try enter page failed, buffer is not valid, state = 0x%x", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, @@ -566,7 +566,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ break; } - if (pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR) { + if (pg_atomic_read_u64(&buf_desc->state) & BM_IO_ERROR) { ereport(WARNING, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d %d-%d] try enter page failed, buffer is io error, state = 0x%x", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, @@ -636,7 +636,7 @@ static unsigned char CBPageDirty(dms_buf_ctrl_t *buf_ctrl) return 0; } BufferDesc *buf_desc = GetBufferDescriptor(buf_ctrl->buf_id); - bool is_dirty = (pg_atomic_read_u32(&buf_desc->state) & (BM_DIRTY | BM_JUST_DIRTIED)) > 0; + bool is_dirty = (pg_atomic_read_u64(&buf_desc->state) & (BM_DIRTY | BM_JUST_DIRTIED)) > 0; return (unsigned char)is_dirty; } @@ -670,7 +670,7 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig BufferTag* tag = (BufferTag *)pageid; uint32 hash; LWLock *partition_lock = NULL; - uint32 buf_state; + uint64 buf_state; int ret = DMS_SUCCESS; hash = BufTableHashCode(tag); @@ -753,8 +753,8 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig return ret; } - if ((!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)) || - (pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR)) { + if ((!(pg_atomic_read_u64(&buf_desc->state) & BM_VALID)) || + (pg_atomic_read_u64(&buf_desc->state) & BM_IO_ERROR)) { ereport(LOG, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%x", tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, @@ -832,7 +832,7 @@ static void CBVerifyPage(dms_buf_ctrl_t *buf_ctrl, char *new_page) } /* page content is not valid */ - if ((pg_atomic_read_u32(&buf_desc->state) & BM_VALID) == 0) { + if ((pg_atomic_read_u64(&buf_desc->state) & BM_VALID) == 0) { return; } @@ -1162,7 +1162,7 @@ static void CBSetDmsStatus(void *db_handle, int dms_status) g_instance.dms_cxt.dms_status = (dms_status_t)dms_status; } -static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state) +static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint64 buf_state) { bool ret = false; dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); @@ -1180,7 +1180,7 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state) * page. The stucked process will request the page again when it add content lock and the reformer will * become owner when it request the page. */ - ereport(WARNING, (errmsg("[%u/%u/%u/%d/0 %d-%u] Set lock mode to NULL, desc state:%u, ctrl state:%u, lock mode:%d.", + ereport(WARNING, (errmsg("[%u/%u/%u/%d/0 %d-%u] Set lock mode to NULL, desc state:%lu, ctrl state:%u, lock mode:%d.", buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_state, buf_ctrl->state, buf_ctrl->lock_mode))); buf_ctrl->lock_mode = DMS_LOCK_NULL; @@ -1222,7 +1222,7 @@ static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index) static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_index) { - uint32 buf_state; + uint64 buf_state; Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM); for (int i = begin; i < begin + len; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); @@ -1296,7 +1296,7 @@ static int32 CBDrcBufValidate(void *db_handle) SSReadControlFile(src_id, true); int buf_cnt = 0; - uint32 buf_state; + uint64 buf_state; ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform]CBDrcBufValidate starts before reform done."))); for (int i = 0; i < TOTAL_BUFFER_NUM; i++) { @@ -1371,8 +1371,8 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d break; } - Assert(!(pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR)); - *is_valid = (pg_atomic_read_u32(&buf_desc->state) & BM_VALID) != 0; + Assert(!(pg_atomic_read_u64(&buf_desc->state) & BM_IO_ERROR)); + *is_valid = (pg_atomic_read_u64(&buf_desc->state) & BM_VALID) != 0; *ret_buf_desc = buf_desc; } else { LWLockRelease(partition_lock); diff --git a/src/gausskernel/optimizer/commands/verifyrepair.cpp b/src/gausskernel/optimizer/commands/verifyrepair.cpp index ade79d42d8..b830d4a493 100644 --- a/src/gausskernel/optimizer/commands/verifyrepair.cpp +++ b/src/gausskernel/optimizer/commands/verifyrepair.cpp @@ -786,7 +786,7 @@ bool isNeedRepairPageByMem(char* disk_page_res, BlockNumber blockNum, char* mem_ securec_check_ss(rc, "\0", "\0"); } else { buf_desc = GetBufferDescriptor(buf - 1); - uint32 old_buf_state = LockBufHdr(buf_desc); + uint64 old_buf_state = LockBufHdr(buf_desc); isDirty = old_buf_state & BM_DIRTY; UnlockBufHdr(buf_desc, old_buf_state); Page page = BufferGetPage(buf); @@ -1994,4 +1994,4 @@ static void checkInstanceType() ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("Must be in primary DN.")))); } -} \ No newline at end of file +} diff --git a/src/gausskernel/process/postmaster/pagewriter.cpp b/src/gausskernel/process/postmaster/pagewriter.cpp index 3e22761bcf..ad3b0a2430 100755 --- a/src/gausskernel/process/postmaster/pagewriter.cpp +++ b/src/gausskernel/process/postmaster/pagewriter.cpp @@ -603,7 +603,7 @@ static uint32 ckpt_qsort_dirty_page_for_flush(bool *is_new_relfilenode, uint32 f scan_end = MIN(MAX_SCAN_NUM, dirty_page_num); for (i = 0; i < scan_end; i++) { - uint32 buf_state; + uint64 buf_state; Buffer buffer; BufferDesc* buf_desc = NULL; CkptSortItem* item = NULL; @@ -1902,7 +1902,7 @@ static void ckpt_try_prune_dirty_page_queue() if (can_found) { uint64 temp_loc; uint64 move_loc; - uint32 buf_state; + uint64 buf_state; volatile DirtyPageQueueSlot* slot = NULL; volatile DirtyPageQueueSlot* move_slot = NULL; BufferDesc* bufhdr = NULL; @@ -2010,7 +2010,7 @@ static uint32 incre_ckpt_pgwr_flush_dirty_page(WritebackContext *wb_context, const CkptSortItem *dirty_buf_list, int start, int batch_num) { uint32 num_actual_flush = 0; - uint32 buf_state; + uint64 buf_state; uint32 sync_state; BufferDesc *buf_desc = NULL; int buf_id; @@ -2164,7 +2164,7 @@ static bool check_buffer_dirty_flag(BufferDesc* buf_desc) bool segment_buf = (buf_desc->buf_id >= SegmentBufferStartID); Block tmpBlock = BufHdrGetBlock(buf_desc); - uint32 local_buf_state = pg_atomic_read_u32(&buf_desc->state); + uint64 local_buf_state = pg_atomic_read_u64(&buf_desc->state); bool check_lsn_not_match = (local_buf_state & BM_VALID) && !(local_buf_state & BM_DIRTY) && XLByteLT(buf_desc->extra->lsn_on_disk, PageGetLSN(tmpBlock)) && RecoveryInProgress() && !segment_buf; @@ -2178,7 +2178,7 @@ static bool check_buffer_dirty_flag(BufferDesc* buf_desc) PinBuffer(buf_desc, NULL); if (LWLockConditionalAcquire(buf_desc->content_lock, LW_SHARED)) { pg_memory_barrier(); - local_buf_state = pg_atomic_read_u32(&buf_desc->state); + local_buf_state = pg_atomic_read_u64(&buf_desc->state); check_lsn_not_match = (local_buf_state & BM_VALID) && !(local_buf_state & BM_DIRTY) && XLByteLT(buf_desc->extra->lsn_on_disk, PageGetLSN(tmpBlock)) && RecoveryInProgress(); if (check_lsn_not_match) { @@ -2187,7 +2187,7 @@ static bool check_buffer_dirty_flag(BufferDesc* buf_desc) UnpinBuffer(buf_desc, true); const uint32 shiftSize = 32; ereport(DEBUG1, (errmodule(MOD_INCRE_BG), - errmsg("check lsn is not matched on disk:%X/%X on page %X/%X, relnode info:%u/%u/%u %u %u stat:%u", + errmsg("check lsn is not matched on disk:%X/%X on page %X/%X, relnode info:%u/%u/%u %u %u stat:%lu", (uint32)(buf_desc->extra->lsn_on_disk >> shiftSize), (uint32)(buf_desc->extra->lsn_on_disk), (uint32)(PageGetLSN(tmpBlock) >> shiftSize), (uint32)(PageGetLSN(tmpBlock)), buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, @@ -2373,7 +2373,7 @@ static uint32 get_candidate_buf_and_flush_list(uint32 start, uint32 end, uint32 uint32 need_flush_num = 0; uint32 candidates = 0; BufferDesc *buf_desc = NULL; - uint32 local_buf_state; + uint64 local_buf_state; CkptSortItem* item = NULL; bool check_not_need_flush = false; bool check_usecount = false; @@ -2387,7 +2387,7 @@ static uint32 get_candidate_buf_and_flush_list(uint32 start, uint32 end, uint32 for (uint32 buf_id = start; buf_id < end; buf_id++) { buf_desc = GetBufferDescriptor(buf_id); - local_buf_state = pg_atomic_read_u32(&buf_desc->state); + local_buf_state = pg_atomic_read_u64(&buf_desc->state); /* during recovery, check the data page whether not properly marked as dirty */ if (RecoveryInProgress() && check_buffer_dirty_flag(buf_desc)) { @@ -2466,7 +2466,7 @@ static void push_to_candidate_list(BufferDesc *buf_desc) uint32 thread_id = t_thrd.pagewriter_cxt.pagewriter_id; PageWriterProc *pgwr = &g_instance.ckpt_cxt_ctl->pgwr_procs.writer_proc[thread_id]; int buf_id = buf_desc->buf_id; - uint32 buf_state = pg_atomic_read_u32(&buf_desc->state); + uint64 buf_state = pg_atomic_read_u64(&buf_desc->state); bool emptyUsageCount = (!NEED_CONSIDER_USECOUNT || BUF_STATE_GET_USAGECOUNT(buf_state) == 0); if (BUF_STATE_GET_REFCOUNT(buf_state) > 0 || !emptyUsageCount) { diff --git a/src/gausskernel/storage/access/heap/rewriteheap.cpp b/src/gausskernel/storage/access/heap/rewriteheap.cpp index e501edce1a..b417269c78 100644 --- a/src/gausskernel/storage/access/heap/rewriteheap.cpp +++ b/src/gausskernel/storage/access/heap/rewriteheap.cpp @@ -270,7 +270,7 @@ RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, Transactio state->rs_block_count = 0; for (int i = 0; i < REWRITE_BUFFERS_QUEUE_COUNT * 2; i++) { - pg_atomic_init_u32(&(state->rs_buffers_handler[i].state), 0); + pg_atomic_init_u64(&(state->rs_buffers_handler[i].state), 0); } } ADIO_ELSE() @@ -1027,7 +1027,7 @@ void rewrite_page_list_write(RewriteState state) for (int i = 0; i < n_bufs; i++) { AioDispatchDesc_t *aioDescp = NULL; BufferDesc *bufHdr = (BufferDesc *)(state->rs_buffers_handler + i); - uint32 buf_state; + uint64 buf_state; /* * Allocate an iocb, fill it in, and write the addr in the @@ -1143,8 +1143,8 @@ static void rewrite_flush_page(RewriteState state, Page page) CheckIOState((char *)(&(state->rs_buffers_handler[i]))); ereport(DEBUG1, (errmodule(MOD_ADIO), - errmsg("rewrite_flush_page, CheckIOState, flags(%d)", - (int)(pg_atomic_read_u32(&state->rs_buffers_handler[i].state) & BUF_FLAG_MASK)))); + errmsg("rewrite_flush_page, CheckIOState, flags(%lu)", + (pg_atomic_read_u64(&state->rs_buffers_handler[i].state) & BUF_FLAG_MASK)))); } } } else { @@ -1174,8 +1174,8 @@ static void rewrite_end_flush_page(RewriteState state) for (int i = 0; i < REWRITE_BUFFERS_QUEUE_COUNT * 2; i++) { CheckIOState((char *)(&(state->rs_buffers_handler_ptr[i]))); ereport(DEBUG1, (errmodule(MOD_ADIO), - errmsg("rewrite_end_flush_page, CheckIOState, flags(%d)", - (int)(pg_atomic_read_u32(&state->rs_buffers_handler[i].state) & BUF_FLAG_MASK)))); + errmsg("rewrite_end_flush_page, CheckIOState, flags(%lu)", + (pg_atomic_read_u64(&state->rs_buffers_handler[i].state) & BUF_FLAG_MASK)))); } } #endif diff --git a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp index 0fb813d5b5..2758b411d4 100644 --- a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp +++ b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp @@ -1112,7 +1112,7 @@ Block XLogRedoBufferGetPage(RedoBufferManager *buffermanager, Buffer bufferid) return blkdata; } -void XLogRedoBufferSetState(RedoBufferManager *buffermanager, RedoMemSlot *bufferslot, uint32 state) +void XLogRedoBufferSetState(RedoBufferManager *buffermanager, RedoMemSlot *bufferslot, uint64 state) { RedoMemManager *memctl = &(buffermanager->memctl); RedoBufferDesc *bufferdesc = NULL; diff --git a/src/gausskernel/storage/access/transam/double_write.cpp b/src/gausskernel/storage/access/transam/double_write.cpp index 4cc88c68a1..e9e52ad914 100644 --- a/src/gausskernel/storage/access/transam/double_write.cpp +++ b/src/gausskernel/storage/access/transam/double_write.cpp @@ -2102,7 +2102,7 @@ static XLogRecPtr dw_copy_page(ThrdDwCxt* thrd_dw_cxt, int buf_desc_id, bool* is XLogRecPtr page_lsn = InvalidXLogRecPtr; Block block; uint16 page_num; - uint32 buf_state; + uint64 buf_state; errno_t rc; *is_skipped = true; diff --git a/src/gausskernel/storage/access/transam/seg_double_write.cpp b/src/gausskernel/storage/access/transam/seg_double_write.cpp index 75909b718a..c170d824ec 100644 --- a/src/gausskernel/storage/access/transam/seg_double_write.cpp +++ b/src/gausskernel/storage/access/transam/seg_double_write.cpp @@ -314,7 +314,7 @@ uint16 seg_dw_single_flush(BufferDesc *buf_desc, bool* flush_old_file) BufferTag phy_tag = buf_desc->tag; Block block = BufHdrGetBlock(buf_desc); - uint32 buf_state = LockBufHdr(buf_desc); + uint64 buf_state = LockBufHdr(buf_desc); XLogRecPtr page_lsn = BufferGetLSN(buf_desc); UnlockBufHdr(buf_desc, buf_state); diff --git a/src/gausskernel/storage/access/transam/single_double_write.cpp b/src/gausskernel/storage/access/transam/single_double_write.cpp index 886cfa0749..25e4aef814 100644 --- a/src/gausskernel/storage/access/transam/single_double_write.cpp +++ b/src/gausskernel/storage/access/transam/single_double_write.cpp @@ -670,7 +670,7 @@ uint16 first_version_dw_single_flush(BufferDesc *buf_desc) (void)LWLockAcquire(g_instance.ckpt_cxt_ctl->snapshotBlockLock, LW_SHARED); LWLockRelease(g_instance.ckpt_cxt_ctl->snapshotBlockLock); - uint32 buf_state = LockBufHdr(buf_desc); + uint64 buf_state = LockBufHdr(buf_desc); Block block = BufHdrGetBlock(buf_desc); XLogRecPtr page_lsn = BufferGetLSN(buf_desc); UnlockBufHdr(buf_desc, buf_state); diff --git a/src/gausskernel/storage/access/ustore/knl_uheap.cpp b/src/gausskernel/storage/access/ustore/knl_uheap.cpp index 10eae062f3..3e1c3dc087 100644 --- a/src/gausskernel/storage/access/ustore/knl_uheap.cpp +++ b/src/gausskernel/storage/access/ustore/knl_uheap.cpp @@ -4438,8 +4438,9 @@ void UHeapResetPreparedUndo() BufferDesc *bufdesc = GetBufferDescriptor(t_thrd.ustore_cxt.undo_buffers[i].buf - 1); if (LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufdesc), LW_EXCLUSIVE)) { LWLock *lock = BufferDescriptorGetContentLock(bufdesc); - ereport(PANIC, (errmodule(MOD_USTORE), errmsg( - "xid %lu, oid %u, blockno %u. buffer %d is not unlocked, lock state %u.", + ereport(PANIC, ( + errmodule(MOD_USTORE), + errmsg("xid %lu, oid %u, blockno %u. buffer %d is not unlocked, lock state %lu.", GetTopTransactionId(), bufdesc->tag.rnode.relNode, BufferGetBlockNumber(t_thrd.ustore_cxt.undo_buffers[i].buf), t_thrd.ustore_cxt.undo_buffers[i].buf, lock->state))); diff --git a/src/gausskernel/storage/buffer/buf_init.cpp b/src/gausskernel/storage/buffer/buf_init.cpp index 5a16ba890f..431754a13d 100644 --- a/src/gausskernel/storage/buffer/buf_init.cpp +++ b/src/gausskernel/storage/buffer/buf_init.cpp @@ -166,7 +166,7 @@ void InitBufferPool(void) BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - pg_atomic_init_u32(&buf->state, 0); + pg_atomic_init_u64(&buf->state, 0); buf->wait_backend_pid = 0; buf->extra = &extra[i]; diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 4f1fd92577..e1b39d8f1c 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -131,7 +131,7 @@ static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence, bool *need_repair); static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool *hit, const XLogPhyBlock *pblk); -static void TerminateBufferIO_common(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits); +static void TerminateBufferIO_common(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits); /* * Ensure that the the PrivateRefCountArray has sufficient space to store one @@ -349,7 +349,7 @@ void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) } static void BufferSync(int flags); -static void TerminateBufferIO_common(BufferDesc* buf, bool clear_dirty, uint32 set_flag_bits); +static void TerminateBufferIO_common(BufferDesc* buf, bool clear_dirty, uint64 set_flag_bits); void shared_buffer_write_error_callback(void* arg); static BufferDesc* BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, bool* foundPtr, const XLogPhyBlock *pblk); @@ -454,7 +454,7 @@ void PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) */ static bool ConditionalStartBufferIO(BufferDesc *buf, bool for_input) { - uint32 buf_state; + uint64 buf_state; /* * Grab the io_in_progress lock so that other processes can wait for @@ -487,7 +487,7 @@ static bool ConditionalStartBufferIO(BufferDesc *buf, bool for_input) * At this point, there is no I/O active on this buffer * We are holding the BufHdr lock and the io_in_progress_lock. */ - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); if (for_input ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { /* Another thread already did the I/O */ UnlockBufHdr(buf, buf_state); @@ -560,8 +560,8 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi BufferTag old_tag; /* previous identity of buffer */ uint32 old_hash; /* hash value for oldTag */ LWLock *old_partition_lock = NULL; /* buffer partition lock for it */ - uint32 old_flags; - uint32 buf_state; + uint64 old_flags; + uint64 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, fork_num, block_num); @@ -943,7 +943,7 @@ void PageListPrefetch(Relation reln, ForkNumber fork_num, BlockNumber *block_lis * are NOT valid. For a shared buffer the IO_IN_PROGRESS * flag is set. */ - Assert(!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u64(&buf_desc->state) & BM_VALID)); /* spinlock not needed */ buf_block = is_local_buf ? LocalBufHdrGetBlock(buf_desc) : BufHdrGetBlock(buf_desc); @@ -1161,7 +1161,7 @@ void PageListBackWrite(uint32 *buf_list, int32 nbufs, uint32 flags = 0, SMgrRela XLogRecPtr recptr; SMgrRelation smgrReln; BufferDesc *bufHdr = NULL; - uint32 buf_state; + uint64 buf_state; t_thrd.storage_cxt.InProgressAioBuf = NULL; @@ -1579,13 +1579,13 @@ void AsyncUnpinBuffer(volatile void *buf_desc, bool forget_buffer) */ void AsyncCompltrPinBuffer(volatile void *buf_desc) { - uint32 buf_state; + uint64 buf_state; BufferDesc *buf = (BufferDesc *)buf_desc; buf_state = LockBufHdr(buf); /* Increment the shared reference count */ - buf_state += BUF_REFCOUNT_ONE; + buf_state = __sync_fetch_and_add(&buf->state, 1); UnlockBufHdr(buf, buf_state); } @@ -1598,14 +1598,14 @@ void AsyncCompltrPinBuffer(volatile void *buf_desc) */ void AsyncCompltrUnpinBuffer(volatile void *buf_desc) { - uint32 buf_state; + uint64 buf_state; BufferDesc *buf = (BufferDesc *)buf_desc; buf_state = LockBufHdr(buf); /* Decrement the shared reference count */ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); - buf_state -= BUF_REFCOUNT_ONE; + buf_state -= 1; /* Support the function LockBufferForCleanup() */ if ((buf_state & BM_PIN_COUNT_WAITER) && BUF_STATE_GET_REFCOUNT(buf_state) == 1) { @@ -1613,10 +1613,13 @@ void AsyncCompltrUnpinBuffer(volatile void *buf_desc) ThreadId wait_backend_pid = buf->wait_backend_pid; buf_state &= ~BM_PIN_COUNT_WAITER; + __sync_add_and_fetch(&buf->state, -1); UnlockBufHdr(buf, buf_state); ProcSendSignal(wait_backend_pid); - } else + } else { + buf_state = __sync_add_and_fetch(&buf->state, -1); UnlockBufHdr(buf, buf_state); + } } /* @@ -1874,11 +1877,11 @@ Buffer ReadBuffer_common_for_localbuf(RelFileNode rnode, char relpersistence, Fo */ /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); Assert(buf_state & BM_VALID); buf_state &= ~BM_VALID; - pg_atomic_write_u32(&bufHdr->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&bufHdr->state) + 1, buf_state >> 32); } /* @@ -1893,16 +1896,16 @@ Buffer ReadBuffer_common_for_localbuf(RelFileNode rnode, char relpersistence, Fo * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = LocalBufHdrGetBlock(bufHdr); (void)ReadBuffer_common_ReadBlock(smgr, relpersistence, forkNum, blockNum, mode, isExtend, bufBlock, NULL, &need_reapir); - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); buf_state |= BM_VALID; - pg_atomic_write_u32(&bufHdr->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&bufHdr->state) + 1, buf_state >> 32); return BufferDescriptorGetBuffer(bufHdr); } @@ -2165,7 +2168,7 @@ Buffer ReadBuffer_common_for_dms(ReadBufferMode readmode, BufferDesc* buf_desc, #ifdef USE_ASSERT_CHECKING bool need_verify = (!RecoveryInProgress() && !SS_IN_ONDEMAND_RECOVERY && - ((pg_atomic_read_u32(&buf_desc->state) & BM_VALID) != 0) && ENABLE_VERIFY_PAGE_VERSION); + ((pg_atomic_read_u64(&buf_desc->state) & BM_VALID) != 0) && ENABLE_VERIFY_PAGE_VERSION); char *past_image = NULL; if (need_verify) { past_image = (char *)palloc(BLCKSZ); @@ -2399,11 +2402,11 @@ found_branch: */ if (isLocalBuf) { /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); Assert(buf_state & BM_VALID); buf_state &= ~BM_VALID; - pg_atomic_write_u32(&bufHdr->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&bufHdr->state) + 1, buf_state >> 32); } else { /* * Loop to handle the very small possibility that someone re-sets @@ -2411,7 +2414,7 @@ found_branch: * it. */ do { - uint32 buf_state = LockBufHdr(bufHdr); + uint64 buf_state = LockBufHdr(bufHdr); Assert(buf_state & BM_VALID); buf_state &= ~BM_VALID; @@ -2424,7 +2427,7 @@ found_branch: if (ENABLE_DMS) { MarkReadHint(bufHdr->buf_id, relpersistence, isExtend, pblk); if (mode != RBM_FOR_REMOTE && relpersistence != RELPERSISTENCE_TEMP && !isLocalBuf) { - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); + Assert(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID)); do { if (!DmsCheckBufAccessible()) { @@ -2451,7 +2454,7 @@ found_branch: } if (!startio) { - Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID); + Assert(pg_atomic_read_u64(&bufHdr->state) & BM_VALID); found = true; goto found_branch; } @@ -2503,7 +2506,7 @@ found_branch: * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -2517,8 +2520,8 @@ found_branch: } if (needputtodirty) { /* set BM_DIRTY to overwrite later */ - uint32 old_buf_state = LockBufHdr(bufHdr); - uint32 buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED); + uint64 old_buf_state = LockBufHdr(bufHdr); + uint64 buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED); /* * When the page is marked dirty for the first time, needs to push the dirty page queue. @@ -2559,10 +2562,10 @@ found_branch: if (isLocalBuf) { /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); buf_state |= BM_VALID; - pg_atomic_write_u32(&bufHdr->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&bufHdr->state) + 1, buf_state >> 32); } else { bufHdr->extra->lsn_on_disk = PageGetLSN(bufBlock); #ifdef USE_ASSERT_CHECKING @@ -2585,8 +2588,8 @@ found_branch: void SimpleMarkBufDirty(BufferDesc *buf) { /* set BM_DIRTY to overwrite later */ - uint32 oldBufState = LockBufHdr(buf); - uint32 bufState = oldBufState | (BM_DIRTY | BM_JUST_DIRTIED); + uint64 oldBufState = LockBufHdr(buf); + uint64 bufState = oldBufState | (BM_DIRTY | BM_JUST_DIRTIED); /* * When the page is marked dirty for the first time, needs to push the dirty page queue. @@ -2612,7 +2615,7 @@ void SimpleMarkBufDirty(BufferDesc *buf) } -void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock) +void PageCheckIfCanEliminate(BufferDesc *buf, uint64 *oldFlags, bool *needGetLock) { if (ENABLE_DMS) { return; @@ -2640,7 +2643,7 @@ void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLoc } #ifdef USE_ASSERT_CHECKING -void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags) +void PageCheckWhenChosedElimination(const BufferDesc *buf, uint64 oldFlags) { if (SS_REFORM_REFORMER) { return; @@ -2689,11 +2692,11 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe BufferTag old_tag; /* previous identity of selected buffer */ uint32 old_hash; /* hash value for oldTag */ LWLock *old_partition_lock = NULL; /* buffer partition lock for it */ - uint32 old_flags; + uint64 old_flags; int buf_id; BufferDesc *buf = NULL; bool valid = false; - uint32 buf_state; + uint64 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, fork_num, block_num); @@ -3095,13 +3098,13 @@ void InvalidateBuffer(BufferDesc *buf) BufferTag old_tag; uint32 old_hash; /* hash value for oldTag */ LWLock *old_partition_lock = NULL; /* buffer partition lock for it */ - uint32 old_flags; - uint32 buf_state; + uint64 old_flags; + uint64 buf_state; /* Save the original buffer tag before dropping the spinlock */ old_tag = ((BufferDesc *)buf)->tag; - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); Assert(buf_state & BM_LOCKED); UnlockBufHdr(buf, buf_state); @@ -3230,8 +3233,8 @@ static void recheck_page_content(const BufferDesc *buf_desc) void MarkBufferDirty(Buffer buffer) { BufferDesc *buf_desc = NULL; - uint32 buf_state; - uint32 old_buf_state; + uint64 buf_state; + uint64 old_buf_state; if (!BufferIsValid(buffer)) { ereport(ERROR, (errcode(ERRCODE_INVALID_BUFFER), (errmsg("bad buffer ID: %d", buffer)))); @@ -3301,22 +3304,22 @@ void MarkBufferDirty(Buffer buffer) void MarkBufferMetaFlag(Buffer bufId, bool isSet) { BufferDesc *buf = GetBufferDescriptor(bufId - 1); - uint32 bufState; - uint32 oldBufState; + uint64 bufState; + uint64 oldBufState; for (;;) { - oldBufState = pg_atomic_read_u32(&buf->state); + oldBufState = pg_atomic_read_u64(&buf->state); if (oldBufState & BM_LOCKED) { oldBufState = WaitBufHdrUnlocked(buf); } bufState = oldBufState; if (isSet) { bufState |= BM_IS_META; - ereport(DEBUG1, (errmsg("mark buffer %d meta buffer stat %u.", bufId, bufState))); + ereport(DEBUG1, (errmsg("mark buffer %d meta buffer stat %lu.", bufId, bufState))); } else { bufState &= ~(BM_IS_META); - ereport(DEBUG1, (errmsg("unmark buffer %d meta buffer stat %u.", bufId, bufState))); + ereport(DEBUG1, (errmsg("unmark buffer %d meta buffer stat %lu.", bufId, bufState))); } - if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) { + if (pg_atomic_compare_exchange_u64(&buf->state, &oldBufState, bufState)) { break; } } @@ -3392,37 +3395,41 @@ bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) ref = GetPrivateRefCountEntry(b, true); if (ref == NULL) { - uint32 buf_state; - uint32 old_buf_state; + uint64 buf_state; + uint64 new_buf_state; ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - old_buf_state = pg_atomic_read_u32(&buf->state); for (;;) { - if (old_buf_state & BM_LOCKED) { - old_buf_state = WaitBufHdrUnlocked(buf); + buf_state = __sync_add_and_fetch(&buf->state, 1); + if (buf_state & BM_LOCKED) { + buf_state = __sync_fetch_and_add(&buf->state, -1); + WaitBufHdrUnlocked(buf); + continue; } - buf_state = old_buf_state; - - /* increase refcount */ - buf_state += BUF_REFCOUNT_ONE; + while (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) { + if (buf_state & BM_LOCKED) { + buf_state = WaitBufHdrUnlocked(buf); + continue; + } - /* increase usagecount unless already max */ - if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) { - buf_state += BUF_USAGECOUNT_ONE; + new_buf_state = buf_state; + new_buf_state += BUF_USAGECOUNT_ONE; + if (pg_atomic_compare_exchange_u64(&buf->state, &buf_state, new_buf_state)) { + break; + } } - if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { - result = (buf_state & BM_VALID) != 0; - break; - } + result = (buf_state & BM_VALID) != 0; + break; } } else { /* If we previously pinned the buffer, it must surely be valid */ result = true; } + ref->refcount++; Assert(ref->refcount > 0); ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, b); @@ -3454,7 +3461,7 @@ void PinBuffer_Locked(volatile BufferDesc *buf) { Buffer b; PrivateRefCountEntry *ref = NULL; - uint32 buf_state; + uint64 buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -3466,9 +3473,10 @@ void PinBuffer_Locked(volatile BufferDesc *buf) * Since we hold the buffer spinlock, we can update the buffer state and * release the lock in one operation. */ - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); Assert(buf_state & BM_LOCKED); - buf_state += BUF_REFCOUNT_ONE; + + buf_state = __sync_add_and_fetch(&buf->state, 1); UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -3505,31 +3513,19 @@ void UnpinBuffer(BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { - uint32 buf_state; - uint32 old_buf_state; + uint64 buf_state; /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(buf->content_lock)); Assert(!LWLockHeldByMe(buf->io_in_progress_lock)); - - /* - * Decrement the shared reference count. - * - * Since buffer spinlock holder can update status using just write, - * it's not safe to use atomic decrement here; thus use a CAS loop. - */ - old_buf_state = pg_atomic_read_u32(&buf->state); - for (;;) { - if (old_buf_state & BM_LOCKED) - old_buf_state = WaitBufHdrUnlocked(buf); - - buf_state = old_buf_state; - - buf_state -= BUF_REFCOUNT_ONE; - - if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { - break; + for(;;) { + buf_state = __sync_add_and_fetch(&buf->state, -1); + if(buf_state & BM_LOCKED) { + buf_state = __sync_add_and_fetch(&buf->state, 1); + WaitBufHdrUnlocked(buf); + continue; } + break; } /* Support the function LockBufferForCleanup() */ @@ -3568,7 +3564,7 @@ void UnpinBuffer(BufferDesc *buf, bool fixOwner) */ static void BufferSync(int flags) { - uint32 buf_state; + uint64 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -3579,7 +3575,7 @@ static void BufferSync(int flags) Oid last_tsid; binaryheap *ts_heap = NULL; int i; - uint32 mask = BM_DIRTY; + uint64 mask = BM_DIRTY; WritebackContext wb_context; gstrace_entry(GS_TRC_ID_BufferSync); @@ -3620,7 +3616,7 @@ static void BufferSync(int flags) * SyncOneBuffer. */ pg_memory_barrier(); - buf_state = pg_atomic_read_u32(&buf_desc->state); + buf_state = pg_atomic_read_u64(&buf_desc->state); if ((buf_state & mask) == mask) { buf_state = LockBufHdr(buf_desc); if ((buf_state & mask) == mask) { @@ -3762,7 +3758,7 @@ static void BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (pg_atomic_read_u32(&buf_desc->state) & BM_CHECKPOINT_NEEDED) { + if (pg_atomic_read_u64(&buf_desc->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); u_sess->stat_cxt.BgWriterStats->m_buf_written_checkpoints++; @@ -4226,7 +4222,7 @@ uint32 SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext* wb_c BufferDesc *buf_desc = GetBufferDescriptor(buf_id); uint32 result = 0; BufferTag tag; - uint32 buf_state; + uint64 buf_state; ReservePrivateRefCountEntry(); @@ -4434,7 +4430,7 @@ void PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path = NULL; BackendId backend; - uint32 buf_state; + uint64 buf_state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { @@ -4449,9 +4445,9 @@ void PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(((BufferDesc *)buf)->tag.rnode, backend, ((BufferDesc *)buf)->tag.forkNum); - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); ereport(WARNING, (errmsg("buffer refcount leak: [%03d] " - "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", + "(rel=%s, blockNum=%u, flags=0x%lx, refcount=%lu %d)", buffer, path, buf->tag.blockNum, buf_state & BUF_FLAG_MASK, BUF_STATE_GET_REFCOUNT(buf_state), loccount))); pfree(path); @@ -4602,7 +4598,7 @@ void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockN pgstatCountBlocksWriteTime4SessionLevel(INSTR_TIME_GET_MICROSEC(io_time)); \ } while (0) -void GetFlushBufferInfo(void *buf, RedoBufferInfo *bufferinfo, uint32 *buf_state, ReadBufferMethod flushmethod) +void GetFlushBufferInfo(void *buf, RedoBufferInfo *bufferinfo, uint64 *buf_state, ReadBufferMethod flushmethod) { if (flushmethod == WITH_NORMAL_CACHE || flushmethod == WITH_LOCAL_CACHE) { BufferDesc *bufdesc = (BufferDesc *)buf; @@ -4674,7 +4670,7 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo instr_time io_start, io_time; Block bufBlock; char *bufToWrite = NULL; - uint32 buf_state; + uint64 buf_state; RedoBufferInfo bufferinfo = {0}; t_thrd.dms_cxt.buf_in_aio = false; @@ -4977,7 +4973,7 @@ bool BufferIsPermanent(Buffer buffer) * old value or the new value, but not random garbage. */ buf_desc = GetBufferDescriptor(buffer - 1); - return (pg_atomic_read_u32(&buf_desc->state) & BM_PERMANENT) != 0; + return (pg_atomic_read_u64(&buf_desc->state) & BM_PERMANENT) != 0; } /* @@ -4990,7 +4986,7 @@ XLogRecPtr BufferGetLSNAtomic(Buffer buffer) char *page = BufferGetPage(buffer); XLogRecPtr lsn; - uint32 buf_state; + uint64 buf_state; /* If we don't need locking for correctness, fastpath out. */ if (BufferIsLocal(buffer)) { return PageGetLSN(page); @@ -5011,7 +5007,7 @@ void DropSegRelNodeSharedBuffer(RelFileNode node, ForkNumber forkNum) { for (int i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; if (buf_desc->extra->seg_fileno != node.relNode || buf_desc->tag.rnode.spcNode != node.spcNode || buf_desc->tag.rnode.dbNode != node.dbNode) { @@ -5029,7 +5025,7 @@ void DropSegRelNodeSharedBuffer(RelFileNode node, ForkNumber forkNum) for (int i = SegmentBufferStartID; i < TOTAL_BUFFER_NUM; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe * and saves some cycles. @@ -5056,7 +5052,7 @@ void RangeForgetBuffer(RelFileNode node, ForkNumber forkNum, BlockNumber firstDe { for (int i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; if (!RelFileNodeEquals(buf_desc->tag.rnode, node)) continue; @@ -5076,7 +5072,7 @@ void DropRelFileNodeShareBuffers(RelFileNode node, ForkNumber forkNum, BlockNumb for (i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before * we attempt to lock the buffer; this saves a lot of lock @@ -5181,7 +5177,7 @@ void DropRelFileNodeAllBuffersUsingHash(HTAB *relfilenode_hashtbl) for (i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; bool found = false; bool equal = false; bool find_dir = false; @@ -5233,7 +5229,7 @@ void DropRelFileNodeOneForkAllBuffersUsingHash(HTAB *relfilenode_hashtbl) int i; for (i = 0; i < SegmentBufferStartID; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; bool found = false; bool equal = false; bool find_dir = false; @@ -5344,7 +5340,7 @@ static FORCE_INLINE void ScanCompareAndInvalidateBuffer(const RelFileNode *rnode return; } - uint32 buf_state = LockBufHdr(bufHdr); + uint64 buf_state = LockBufHdr(bufHdr); if (find_dir) { equal = RelFileNodeRelEquals(bufHdr->tag.rnode, rnodes[match_idx]); @@ -5405,7 +5401,7 @@ void DropDatabaseBuffers(Oid dbid) for (i = 0; i < TOTAL_BUFFER_NUM; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe * and saves some cycles. @@ -5486,7 +5482,7 @@ static inline bool flush_buffer_match(BufferDesc *buf_desc, Relation rel, Oid db */ static void flush_wait_page_writer(BufferDesc *buf_desc, Relation rel, Oid db_id) { - uint32 buf_state; + uint64 buf_state; for (;;) { buf_state = LockBufHdr(buf_desc); if (flush_buffer_match(buf_desc, rel, db_id) && dw_buf_valid_aio_finished(buf_desc, buf_state) && @@ -5507,7 +5503,7 @@ void flush_all_buffers(Relation rel, Oid db_id, HTAB *hashtbl) { int i; BufferDesc *buf_desc = NULL; - uint32 buf_state; + uint64 buf_state; uint32 size = 0; uint32 total = 0; @@ -5713,11 +5709,11 @@ void MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((pg_atomic_read_u32(&buf_desc->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { + if ((pg_atomic_read_u64(&buf_desc->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool delayChkpt = false; - uint32 buf_state; - uint32 old_buf_state; + uint64 buf_state; + uint64 old_buf_state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -5730,8 +5726,8 @@ void MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * The incremental checkpoint is protected by the doublewriter, the * half-write problem does not occur. */ - if (!ENABLE_INCRE_CKPT && XLogHintBitIsNeeded() && - (pg_atomic_read_u32(&buf_desc->state) & BM_PERMANENT)) { + if (unlikely(!ENABLE_INCRE_CKPT && XLogHintBitIsNeeded() && + (pg_atomic_read_u64(&buf_desc->state) & BM_PERMANENT))) { /* * If we're in recovery we cannot dirty a page because of a hint. * We can set the hint, just not dirty the page as a result so the @@ -5850,7 +5846,7 @@ void UnlockBuffers(void) BufferDesc *buf = t_thrd.storage_cxt.PinCountWaitBuf; if (buf != NULL) { - uint32 buf_state; + uint64 buf_state; buf_state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we @@ -6129,7 +6125,7 @@ void LockBufferForCleanup(Buffer buffer) buf_desc = GetBufferDescriptor(buffer - 1); for (;;) { - uint32 buf_state; + uint64 buf_state; /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -6239,7 +6235,7 @@ bool ConditionalLockUHeapBufferForCleanup(Buffer buffer) bool ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *buf_desc = NULL; - uint32 buf_state, refcount; + uint64 buf_state, refcount; Assert(BufferIsValid(buffer)); @@ -6293,7 +6289,7 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) bool IsBufferCleanupOK(Buffer buffer) { BufferDesc *bufHdr; - uint32 buf_state; + uint64 buf_state; Assert(BufferIsValid(buffer)); @@ -6348,7 +6344,7 @@ void WaitIO(BufferDesc *buf) * AbortBufferIO. */ for (;;) { - uint32 buf_state; + uint64 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag @@ -6376,7 +6372,7 @@ void CheckIOState(volatile void *buf_desc) { BufferDesc *buf = (BufferDesc *)buf_desc; for (;;) { - uint32 buf_state; + uint64 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag @@ -6417,17 +6413,17 @@ void CheckIOState(volatile void *buf_desc) */ bool StartBufferIO(BufferDesc *buf, bool for_input) { - uint32 buf_state; + uint64 buf_state; bool dms_need_flush = false; // used in dms Assert(!t_thrd.storage_cxt.InProgressBuf); /* To check the InProgressBuf must be NULL. */ if (t_thrd.storage_cxt.InProgressBuf) { - ereport(PANIC, (errmsg("InProgressBuf not null: id %d flags %u, buf: id %d flags %u", + ereport(PANIC, (errmsg("InProgressBuf not null: id %d flags %lu, buf: id %d flags %lu", t_thrd.storage_cxt.InProgressBuf->buf_id, - pg_atomic_read_u32(&t_thrd.storage_cxt.InProgressBuf->state) & BUF_FLAG_MASK, - buf->buf_id, pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK))); + pg_atomic_read_u64(&t_thrd.storage_cxt.InProgressBuf->state) & BUF_FLAG_MASK, + buf->buf_id, pg_atomic_read_u64(&buf->state) & BUF_FLAG_MASK))); } for (; ;) { @@ -6505,7 +6501,7 @@ bool StartBufferIO(BufferDesc *buf, bool for_input) * releasing the io_in_progress_lock. ADIO does not use the * thread InProgressBuf or forInput */ -void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) +void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits) { Assert(buf == t_thrd.storage_cxt.InProgressBuf); TerminateBufferIO_common((BufferDesc *)buf, clear_dirty, set_flag_bits); @@ -6533,7 +6529,7 @@ void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, uint32 set_fl * The routine acquires the buf header spinlock, and changes the buf->flags. * it leaves the buffer without the io_in_progress_lock held. */ -void AsyncTerminateBufferIO(void *buffer, bool clear_dirty, uint32 set_flag_bits) +void AsyncTerminateBufferIO(void *buffer, bool clear_dirty, uint64 set_flag_bits) { BufferDesc *buf = (BufferDesc *)buffer; @@ -6545,9 +6541,9 @@ void AsyncTerminateBufferIO(void *buffer, bool clear_dirty, uint32 set_flag_bits * TerminateBufferIO_common: Common code called by TerminateBufferIO() and * AsyncTerminateBufferIO() to set th buffer flags. */ -static void TerminateBufferIO_common(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) +static void TerminateBufferIO_common(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits) { - uint32 buf_state; + uint64 buf_state; buf_state = LockBufHdr(buf); @@ -6701,7 +6697,7 @@ extern void AsyncAbortBufferIOByVacuum(void *buffer) */ void AbortBufferIO_common(BufferDesc *buf, bool isForInput) { - uint32 buf_state; + uint64 buf_state; buf_state = LockBufHdr(buf); Assert(buf_state & BM_IO_IN_PROGRESS); @@ -6758,16 +6754,16 @@ void shared_buffer_write_error_callback(void *arg) /* * Lock buffer header - set BM_LOCKED in buffer state. */ -uint32 LockBufHdr(BufferDesc *desc) +uint64 LockBufHdr(BufferDesc *desc) { #ifndef ENABLE_THREAD_CHECK SpinDelayStatus delayStatus = init_spin_delay(desc); #endif - uint32 old_buf_state; + uint64 old_buf_state; while (true) { /* set BM_LOCKED flag */ - old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + old_buf_state = pg_atomic_fetch_or_u64(&desc->state, BM_LOCKED); /* if it wasn't set before we're OK */ if (!(old_buf_state & BM_LOCKED)) break; @@ -6781,19 +6777,18 @@ uint32 LockBufHdr(BufferDesc *desc) /* ENABLE_THREAD_CHECK only, acquire semantic */ TsAnnotateHappensAfter(&desc->state); - return old_buf_state | BM_LOCKED; } const int MAX_SPINS_RETRY_TIMES = 100; -bool retryLockBufHdr(BufferDesc *desc, uint32 *buf_state) +bool retryLockBufHdr(BufferDesc *desc, uint64 *buf_state) { - uint32 old_buf_state = pg_atomic_read_u32(&desc->state); - uint32 retry_times = 0; + uint64 old_buf_state = pg_atomic_read_u64(&desc->state); + uint64 retry_times = 0; /* set BM_LOCKED flag */ for (retry_times = 0; retry_times < MAX_SPINS_RETRY_TIMES; retry_times++) { - old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + old_buf_state = pg_atomic_fetch_or_u64(&desc->state, BM_LOCKED); /* if it wasn't set before we're OK */ if (!(old_buf_state & BM_LOCKED)) { *buf_state = old_buf_state | BM_LOCKED; @@ -6818,20 +6813,20 @@ bool retryLockBufHdr(BufferDesc *desc, uint32 *buf_state) * Obviously the buffer could be locked by the time the value is returned, so * this is primarily useful in CAS style loops. */ -uint32 WaitBufHdrUnlocked(BufferDesc *buf) +uint64 WaitBufHdrUnlocked(BufferDesc *buf) { #ifndef ENABLE_THREAD_CHECK SpinDelayStatus delay_status = init_spin_delay(buf); #endif - uint32 buf_state; + uint64 buf_state; - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); while (buf_state & BM_LOCKED) { #ifndef ENABLE_THREAD_CHECK perform_spin_delay(&delay_status); #endif - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); } #ifndef ENABLE_THREAD_CHECK @@ -7266,7 +7261,7 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum) LWLock* partitionLock; /* buffer partition lock for it */ int bufId; BufferDesc *bufHdr; - uint32 bufState; + uint64 bufState; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(tag, smgr->smgr_rnode.node, forkNum, blockNum); diff --git a/src/gausskernel/storage/buffer/freelist.cpp b/src/gausskernel/storage/buffer/freelist.cpp index 8fd8e08ad6..314db64c6a 100644 --- a/src/gausskernel/storage/buffer/freelist.cpp +++ b/src/gausskernel/storage/buffer/freelist.cpp @@ -71,7 +71,7 @@ const int MAX_RETRY_TIMES = 1000; const float NEED_DELAY_RETRY_GET_BUF = 0.8; /* Prototypes for internal functions */ -static BufferDesc* GetBufferFromRing(BufferAccessStrategy strategy, uint32* buf_state); +static BufferDesc* GetBufferFromRing(BufferAccessStrategy strategy, uint64* buf_state); static void AddBufferToRing(BufferAccessStrategy strategy, volatile BufferDesc* buf); void PageListBackWrite(uint32* bufList, int32 n, /* buffer list, bufs to scan, */ @@ -79,7 +79,7 @@ void PageListBackWrite(uint32* bufList, int32 n, SMgrRelation use_smgrReln = NULL, /* opt relation */ int32* bufs_written = NULL, /* opt written count returned */ int32* bufs_reusable = NULL); /* opt reusable count returned */ -static BufferDesc* get_buf_from_candidate_list(BufferAccessStrategy strategy, uint32* buf_state); +static BufferDesc* get_buf_from_candidate_list(BufferAccessStrategy strategy, uint64* buf_state); static void perform_delay(StrategyDelayStatus *status) { @@ -177,12 +177,12 @@ static inline uint32 ClockSweepTick(int max_nbuffer_can_use) * If the fraction is too small, we will increase dynamiclly to avoid elog(ERROR) * in `Startup' process because of ERROR will promote to FATAL. */ -BufferDesc* StrategyGetBuffer(BufferAccessStrategy strategy, uint32* buf_state) +BufferDesc* StrategyGetBuffer(BufferAccessStrategy strategy, uint64* buf_state) { BufferDesc *buf = NULL; int bgwproc_no; int try_counter; - uint32 local_buf_state = 0; /* to avoid repeated (de-)referencing */ + uint64 local_buf_state = 0; /* to avoid repeated (de-)referencing */ int max_buffer_can_use; bool am_standby = RecoveryInProgress(); StrategyDelayStatus retry_lock_status = { 0, 0 }; @@ -306,8 +306,8 @@ retry: Min(u_sess->attr.attr_storage.shared_buffers_fraction + 0.1, 1.0); goto retry; } else if (dw_page_writer_running()) { - ereport(LOG, (errmsg("double writer is on, no buffer available, this buffer dirty is %u, " - "this buffer refcount is %u, now dirty page num is %ld", + ereport(LOG, (errmsg("double writer is on, no buffer available, this buffer dirty is %lu, " + "this buffer refcount is %lu, now dirty page num is %ld", (local_buf_state & BM_DIRTY), BUF_STATE_GET_REFCOUNT(local_buf_state), get_dirty_page_num()))); perform_delay(&retry_buf_status); @@ -541,11 +541,11 @@ const float MAX_RETRY_RING_PCT = 0.1; * * The bufhdr spin lock is held on the returned buffer. */ -static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) +static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, uint64 *buf_state) { BufferDesc *buf = NULL; Buffer buf_num; - uint32 local_buf_state; /* to avoid repeated (de-)referencing */ + uint64 local_buf_state; /* to avoid repeated (de-)referencing */ uint16 retry_times = 0; RETRY: @@ -605,7 +605,7 @@ RETRY: * shouldn't re-use it. */ buf = GetBufferDescriptor(buf_num - 1); - if (pg_atomic_read_u32(&buf->state) & (BM_DIRTY | BM_IS_META)) { + if (pg_atomic_read_u64(&buf->state) & (BM_DIRTY | BM_IS_META)) { if (retry_times < Min(MAX_RETRY_RING_TIMES, strategy->ring_size * MAX_RETRY_RING_PCT)) { goto RETRY; } else if (get_curr_candidate_nums(CAND_LIST_NORMAL) >= (uint32)g_instance.attr.attr_storage.NBuffers * @@ -705,10 +705,10 @@ void wakeup_pagewriter_thread() const int CANDIDATE_DIRTY_LIST_LEN = 100; const float HIGH_WATER = 0.75; -static BufferDesc* get_buf_from_candidate_list(BufferAccessStrategy strategy, uint32* buf_state) +static BufferDesc* get_buf_from_candidate_list(BufferAccessStrategy strategy, uint64* buf_state) { BufferDesc* buf = NULL; - uint32 local_buf_state; + uint64 local_buf_state; int buf_id = 0; int list_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num; int list_id = 0; diff --git a/src/gausskernel/storage/buffer/localbuf.cpp b/src/gausskernel/storage/buffer/localbuf.cpp index 01dc544d97..b683d085fc 100644 --- a/src/gausskernel/storage/buffer/localbuf.cpp +++ b/src/gausskernel/storage/buffer/localbuf.cpp @@ -103,16 +103,16 @@ void LocalBufferFlushAllBuffer() for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { BufferDesc *bufHdr = &u_sess->storage_cxt.LocalBufferDescriptors[i].bufferdesc; - uint32 buf_state; + uint64 buf_state; - buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state = pg_atomic_read_u64(&bufHdr->state); Assert(u_sess->storage_cxt.LocalRefCount[i] == 0); if ((buf_state & BM_VALID) && (buf_state & BM_DIRTY)) { LocalBufferFlushForExtremRTO(bufHdr); buf_state &= ~BM_DIRTY; - pg_atomic_write_u32(&bufHdr->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&bufHdr->state) + 1, buf_state >> 32); u_sess->instr_cxt.pg_buffer_usage->local_blks_written++; } @@ -144,7 +144,7 @@ BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber int b; int try_counter; bool found = false; - uint32 buf_state; + uint64 buf_state; INIT_BUFFERTAG(new_tag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -161,20 +161,20 @@ BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber #ifdef LBDEBUG fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif - buf_state = pg_atomic_read_u32(&buf_desc->state); + buf_state = pg_atomic_read_u64(&buf_desc->state); /* this part is equivalent to PinBuffer for a shared buffer */ if (u_sess->storage_cxt.LocalRefCount[b] == 0) { if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) { buf_state += BUF_USAGECOUNT_ONE; - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); } } u_sess->storage_cxt.LocalRefCount[b]++; ResourceOwnerRememberBuffer(t_thrd.utils_cxt.CurrentResourceOwner, BufferDescriptorGetBuffer(buf_desc)); *foundPtr = (buf_state & BM_VALID) ? TRUE : FALSE; /* If previous read attempt have failed; try again */ #ifdef EXTREME_RTO_DEBUG - ereport(LOG, (errmsg("LocalBufferAlloc %u/%u/%u %u %u find in local buf %u/%u/%u %u %u id %d state %X, lsn %lu", + ereport(LOG, (errmsg("LocalBufferAlloc %u/%u/%u %u %u find in local buf %u/%u/%u %u %u id %d state %lu, lsn %lu", smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode, smgr->smgr_rnode.node.relNode, forkNum, blockNum, hresult->key.rnode.spcNode, hresult->key.rnode.dbNode, hresult->key.rnode.relNode, hresult->key.forkNum, hresult->key.blockNum, hresult->id, @@ -202,11 +202,11 @@ BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[b].bufferdesc; if (u_sess->storage_cxt.LocalRefCount[b] == 0) { - buf_state = pg_atomic_read_u32(&buf_desc->state); + buf_state = pg_atomic_read_u64(&buf_desc->state); if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) { buf_state -= BUF_USAGECOUNT_ONE; - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); try_counter = u_sess->storage_cxt.NLocBuffer; } else { /* Found a usable buffer */ @@ -231,7 +231,7 @@ BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber /* Mark not-dirty now in case we error out below */ buf_state &= ~BM_DIRTY; - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); u_sess->instr_cxt.pg_buffer_usage->local_blks_written++; } @@ -255,7 +255,7 @@ BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(buf_desc->tag); buf_state &= ~(BM_VALID | BM_TAG_VALID); - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); } hresult = (LocalBufferLookupEnt *)hash_search(u_sess->storage_cxt.LocalBufHash, (void *)&new_tag, HASH_ENTER, @@ -273,7 +273,7 @@ BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber buf_state |= BM_TAG_VALID; buf_state &= ~BUF_USAGECOUNT_MASK; buf_state += BUF_USAGECOUNT_ONE; - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); buf_desc->extra->seg_fileno = EXTENT_INVALID; @@ -289,7 +289,7 @@ void MarkLocalBufferDirty(Buffer buffer) { int buf_id; BufferDesc *buf_desc = NULL; - uint32 buf_state; + uint64 buf_state; Assert(BufferIsLocal(buffer)); @@ -303,7 +303,7 @@ void MarkLocalBufferDirty(Buffer buffer) buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[buf_id].bufferdesc; - buf_state = pg_atomic_fetch_or_u32(&buf_desc->state, BM_DIRTY); + buf_state = pg_atomic_fetch_or_u64(&buf_desc->state, BM_DIRTY); if (!(buf_state & BM_DIRTY)) { u_sess->instr_cxt.pg_buffer_usage->local_blks_dirtied++; pgstatCountLocalBlocksDirtied4SessionLevel(); @@ -328,9 +328,9 @@ void DropRelFileNodeLocalBuffers(const RelFileNode &rnode, ForkNumber forkNum, B for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { BufferDesc* buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[i].bufferdesc; LocalBufferLookupEnt* hresult = NULL; - uint32 buf_state; + uint64 buf_state; - buf_state = pg_atomic_read_u32(&buf_desc->state); + buf_state = pg_atomic_read_u64(&buf_desc->state); if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(rnode, buf_desc->tag.rnode) && buf_desc->tag.forkNum == forkNum && buf_desc->tag.blockNum >= firstDelBlock) { @@ -351,7 +351,7 @@ void DropRelFileNodeLocalBuffers(const RelFileNode &rnode, ForkNumber forkNum, B CLEAR_BUFFERTAG(buf_desc->tag); buf_state &= ~BUF_FLAG_MASK; buf_state &= ~BUF_USAGECOUNT_MASK; - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); } } } @@ -370,9 +370,9 @@ void DropRelFileNodeAllLocalBuffers(const RelFileNode &rnode) for (i = 0; i < u_sess->storage_cxt.NLocBuffer; i++) { BufferDesc* buf_desc = &u_sess->storage_cxt.LocalBufferDescriptors[i].bufferdesc; LocalBufferLookupEnt* hresult = NULL; - uint32 buf_state; + uint64 buf_state; - buf_state = pg_atomic_read_u32(&buf_desc->state); + buf_state = pg_atomic_read_u64(&buf_desc->state); if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(rnode, buf_desc->tag.rnode)) { if (u_sess->storage_cxt.LocalRefCount[i] != 0) { @@ -396,7 +396,7 @@ void DropRelFileNodeAllLocalBuffers(const RelFileNode &rnode) CLEAR_BUFFERTAG(buf_desc->tag); buf_state &= ~BUF_FLAG_MASK; buf_state &= ~BUF_USAGECOUNT_MASK; - pg_atomic_write_u32(&buf_desc->state, buf_state); + pg_atomic_write_u32(((volatile uint32 *)&buf_desc->state) + 1, buf_state >> 32); } } } @@ -569,7 +569,7 @@ void ForgetLocalBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockN BufferTag tag; /* identity of target block */ LocalBufferLookupEnt *hresult; BufferDesc *bufHdr; - uint32 bufState; + uint64 bufState; /* * If somehow this is the first request in the session, there's nothing to @@ -594,6 +594,6 @@ void ForgetLocalBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockN /* mark buffer invalid */ bufHdr = GetLocalBufferDescriptor(hresult->id); CLEAR_BUFFERTAG(bufHdr->tag); - bufState = pg_atomic_read_u32(&bufHdr->state); + bufState = pg_atomic_read_u64(&bufHdr->state); bufState &= ~(BM_VALID | BM_TAG_VALID | BM_DIRTY); } diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index 1f727770eb..cc86c0d619 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -100,19 +100,22 @@ #define MAX(A, B) ((B) > (A) ? (B) : (A)) #endif -#define LW_FLAG_HAS_WAITERS ((uint32)1 << 30) -#define LW_FLAG_RELEASE_OK ((uint32)1 << 29) -#define LW_FLAG_LOCKED ((uint32)1 << 28) +#define LW_FLAG_HAS_WAITERS ((uint64)1LU << 30 << 32) +#define LW_FLAG_RELEASE_OK ((uint64)1LU << 29 << 32) +#define LW_FLAG_LOCKED ((uint64)1LU << 28 << 32) -#define LW_VAL_EXCLUSIVE ((uint32)1 << 24) +#define LW_VAL_EXCLUSIVE (((uint64)1LU << 24 << 32) + (1LU << 47) + (1LU << 39) + (1LU << 31) + (1LU << 23) + (1LU << 15) + (1LU << 7)) #define LW_VAL_SHARED 1 -#define LW_LOCK_MASK ((uint32)((1 << 25) - 1)) +#define LW_LOCK_MASK ((uint64)((1LU << 25 << 32) - 1)) #ifdef LOCK_DEBUG /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */ - #define LW_SHARED_MASK ((uint32)(1 << 23)) + #define LW_SHARED_MASK ((uint64)(1LU << 23 << 32)) #endif +#define LOCK_THREADID_MASK ((((uintptr_t)&t_thrd) >> 20) % 6) +#define LOCK_REFCOUNT_ONE_BY_THREADID (1LU << (8 * LOCK_THREADID_MASK)) + #define LWLOCK_TRANCHE_SIZE 128 const char **LWLockTrancheArray = NULL; @@ -230,9 +233,9 @@ inline static void PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mod { /* hide statement & context here, otherwise the log is just too verbose */ if (Trace_lwlocks) { - uint32 state = pg_atomic_read_u32(&lock->state); + uint64 state = pg_atomic_read_u64(&lock->state); ereport(LOG, (errhidestmt(true), errhidecontext(true), - errmsg("%d: %s(%s): excl %u shared %u haswaiters %u waiters %u rOK %d", + errmsg("%d: %s(%s): excl %lu shared %lu haswaiters %lu waiters %u rOK %ld", t_thrd.proc_cxt.MyProcPid, where, T_NAME(lock), !!(state & LW_VAL_EXCLUSIVE), state & LW_SHARED_MASK, !!(state & LW_FLAG_HAS_WAITERS), pg_atomic_read_u32(&lock->nwaiters), !!(state & LW_FLAG_RELEASE_OK)))); @@ -798,7 +801,7 @@ LWLock *LWLockAssign(int trancheId) */ void LWLockInitialize(LWLock *lock, int tranche_id) { - pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_init_u64(&lock->state, LW_FLAG_RELEASE_OK); /* ENABLE_THREAD_CHECK only, Register RWLock in Tsan */ TsAnnotateRWLockCreate(&lock->rwlock); @@ -820,7 +823,7 @@ static void LWThreadSuicide(PGPROC *proc, int extraWaits, LWLock *lock, LWLockMo Assert(false); /* for debug */ /* allow LWLockRelease to release waiters again. */ - pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); /* Fix the process wait semaphore's count for any absorbed wakeups. */ while (extraWaits-- > 0) { @@ -840,7 +843,7 @@ static void LWThreadSuicide(PGPROC *proc, int extraWaits, LWLock *lock, LWLockMo */ static bool LWLockAttemptLock(LWLock *lock, LWLockMode mode) { - uint32 old_state; + uint64 old_state; AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED); @@ -848,59 +851,47 @@ static bool LWLockAttemptLock(LWLock *lock, LWLockMode mode) * Read once outside the loop, later iterations will get the newer value * via compare & exchange. */ - old_state = pg_atomic_read_u32(&lock->state); /* loop until we've determined whether we could acquire the lock or not */ - while (true) { - uint32 desired_state; - bool lock_free = false; - - desired_state = old_state; - - if (mode == LW_EXCLUSIVE) { - lock_free = ((old_state & LW_LOCK_MASK) == 0); - if (lock_free) { - desired_state += LW_VAL_EXCLUSIVE; + uint64 desired_state = 0; + + if (mode == LW_SHARED) { + uint32 maskId = LOCK_THREADID_MASK; + uint64 refoneByThread = LOCK_REFCOUNT_ONE_BY_THREADID; + old_state = pg_atomic_read_u64((volatile uint64 *)&lock->state); + do { + if ((old_state & (LW_VAL_EXCLUSIVE)) != 0) { + return true; } - } else { - lock_free = ((old_state & LW_VAL_EXCLUSIVE) == 0); - if (lock_free) { - desired_state += LW_VAL_SHARED; + + desired_state = old_state + refoneByThread; + } while (!pg_atomic_compare_exchange_u8((((volatile uint8*)&lock->state) + maskId), ((uint8*)&old_state) + maskId, (desired_state >> (8 * maskId)))); + } else if (mode == LW_EXCLUSIVE) { + old_state = pg_atomic_read_u64(&lock->state); + do { + if ((old_state & LW_LOCK_MASK) != 0) { + return true; } - } - /* - * Attempt to swap in the state we are expecting. If we didn't see - * lock to be free, that's just the old value. If we saw it as free, - * we'll attempt to mark it acquired. The reason that we always swap - * in the value is that this doubles as a memory barrier. We could try - * to be smarter and only swap in values if we saw the lock as free, - * but benchmark haven't shown it as beneficial so far. - * - * Retry if the value changed since we last looked at it. - */ - if (pg_atomic_compare_exchange_u32(&lock->state, &old_state, desired_state)) { - if (lock_free) { - /* ENABLE_THREAD_CHECK only, Must acquire vector clock info from other - * thread after got the lock */ - if (desired_state & LW_VAL_EXCLUSIVE) { - TsAnnotateRWLockAcquired(&lock->rwlock, 1); - } else { - TsAnnotateRWLockAcquired(&lock->rwlock, 0); - } + desired_state = old_state + LW_VAL_EXCLUSIVE; + } while (!pg_atomic_compare_exchange_u64(&lock->state, &old_state, desired_state)); + } - /* Great! Got the lock. */ + /* ENABLE_THREAD_CHECK only, Must acquire vector clock info from other + * thread after got the lock */ + if (desired_state & LW_VAL_EXCLUSIVE) { + TsAnnotateRWLockAcquired(&lock->rwlock, 1); + } else { + TsAnnotateRWLockAcquired(&lock->rwlock, 0); + } + + /* Great! Got the lock. */ #ifdef LOCK_DEBUG - if (mode == LW_EXCLUSIVE) { - lock->owner = t_thrd.proc; - } -#endif - return false; - } else { - return true; /* someobdy else has the lock */ - } - } + if (mode == LW_EXCLUSIVE) { + lock->owner = t_thrd.proc; } +#endif + return false; } /* @@ -913,7 +904,7 @@ static bool LWLockAttemptLock(LWLock *lock, LWLockMode mode) */ static void LWLockWaitListLock(LWLock *lock) { - uint32 old_state; + uint64 old_state; #ifdef LWLOCK_STATS lwlock_stats *lwstats = NULL; uint32 delays = 0; @@ -923,7 +914,7 @@ static void LWLockWaitListLock(LWLock *lock) while (true) { /* always try once to acquire lock directly */ - old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED); + old_state = pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_LOCKED); if (!(old_state & LW_FLAG_LOCKED)) { break; /* got lock */ } @@ -938,7 +929,7 @@ static void LWLockWaitListLock(LWLock *lock) #ifndef ENABLE_THREAD_CHECK perform_spin_delay(&delayStatus); #endif - old_state = pg_atomic_read_u32(&lock->state); + old_state = pg_atomic_read_u64(&lock->state); } #ifdef LWLOCK_STATS delays += delayStatus.delays; @@ -971,13 +962,13 @@ static void LWLockWaitListLock(LWLock *lock) */ static void LWLockWaitListUnlock(LWLock *lock) { - uint32 old_state PG_USED_FOR_ASSERTS_ONLY; + uint64 old_state PG_USED_FOR_ASSERTS_ONLY; /* ENABLE_THREAD_CHECK only, Must release vector clock info to other * threads before unlock */ TsAnnotateRWLockReleased(&lock->listlock, 1); - old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED); + old_state = pg_atomic_fetch_and_u64(&lock->state, ~LW_FLAG_LOCKED); Assert(old_state & LW_FLAG_LOCKED); } @@ -1024,18 +1015,18 @@ static void LWLockWakeup(LWLock *lock) } } - Assert(dlist_is_empty(&wakeup) || (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS)); + Assert(dlist_is_empty(&wakeup) || (pg_atomic_read_u64(&lock->state) & LW_FLAG_HAS_WAITERS)); /* unset required flags, and release lock, in one fell swoop */ { - uint32 old_state; - uint32 desired_state; + uint64 old_state; + uint64 desired_state; /* ENABLE_THREAD_CHECK only, Must release vector clock info to other * threads before unlock */ TsAnnotateRWLockReleased(&lock->listlock, 1); - old_state = pg_atomic_read_u32(&lock->state); + old_state = pg_atomic_read_u64(&lock->state); while (true) { desired_state = old_state; @@ -1051,7 +1042,7 @@ static void LWLockWakeup(LWLock *lock) } desired_state &= ~LW_FLAG_LOCKED; // release lock - if (pg_atomic_compare_exchange_u32(&lock->state, &old_state, desired_state)) { + if (pg_atomic_compare_exchange_u64(&lock->state, &old_state, desired_state)) { break; } } @@ -1107,7 +1098,7 @@ static void LWLockQueueSelf(LWLock *lock, LWLockMode mode) LWLockWaitListLock(lock); /* setting the flag is protected by the spinlock */ - pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS); + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_HAS_WAITERS); t_thrd.proc->lwWaiting = true; t_thrd.proc->lwWaitMode = mode; @@ -1164,8 +1155,8 @@ static void LWLockDequeueSelf(LWLock *lock, LWLockMode mode) } } - if (dlist_is_empty(&lock->waiters) && (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) { - pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); + if (dlist_is_empty(&lock->waiters) && (pg_atomic_read_u64(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) { + pg_atomic_fetch_and_u64(&lock->state, ~LW_FLAG_HAS_WAITERS); } /* XXX: combine with fetch_and above? */ @@ -1183,7 +1174,7 @@ static void LWLockDequeueSelf(LWLock *lock, LWLockMode mode) * * Reset releaseOk if somebody woke us before we removed ourselves - * they'll have set it to false. */ - pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); /* * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would @@ -1237,7 +1228,7 @@ static bool LWLockConflictsWithVar(LWLock *lock, uint64 *valptr, uint64 oldval, * barrier here as far as the current usage is concerned. But that might * not be safe in general. */ - mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; + mustwait = (pg_atomic_read_u64(&lock->state) & LW_VAL_EXCLUSIVE) != 0; if (!mustwait) { *result = true; return false; @@ -1404,14 +1395,14 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid) break; } /* allow LWLockRelease to release waiters again. */ - pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); LWThreadSuicide(proc, extraWaits, lock, mode); } extraWaits++; } /* Retrying, allow LWLockRelease to release waiters again. */ - pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); #ifdef LOCK_DEBUG { @@ -1673,7 +1664,7 @@ bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newva * Set RELEASE_OK flag, to make sure we get woken up as soon as the * lock is released. */ - pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); /* * We're now guaranteed to be woken up if necessary. Recheck the lock @@ -1771,7 +1762,7 @@ void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) LWLockWaitListLock(lock); - Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE); + Assert(pg_atomic_read_u64(&lock->state) & LW_VAL_EXCLUSIVE); /* Update the lock's value */ *valptr = val; @@ -1812,7 +1803,7 @@ void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) void LWLockRelease(LWLock *lock) { LWLockMode mode = LW_EXCLUSIVE; - uint32 oldstate; + uint64 oldstate; bool check_waiters = false; int i; @@ -1841,12 +1832,12 @@ void LWLockRelease(LWLock *lock) /* ENABLE_THREAD_CHECK only, Must release vector clock info to other * threads before unlock */ TsAnnotateRWLockReleased(&lock->rwlock, 1); - oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE); + oldstate = pg_atomic_sub_fetch_u64(&lock->state, LW_VAL_EXCLUSIVE); } else { /* ENABLE_THREAD_CHECK only, Must release vector clock info to other * threads before unlock */ TsAnnotateRWLockReleased(&lock->rwlock, 0); - oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED); + oldstate = __sync_sub_and_fetch(&lock->state, LOCK_REFCOUNT_ONE_BY_THREADID); } /* nobody else can have that kind of lock */ @@ -1949,7 +1940,7 @@ bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode) /* reset a lwlock */ void LWLockReset(LWLock *lock) { - pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); + pg_atomic_init_u64(&lock->state, LW_FLAG_RELEASE_OK); /* ENABLE_THREAD_CHECK only */ TsAnnotateRWLockDestroy(&lock->listlock); @@ -1973,7 +1964,7 @@ void LWLockReset(LWLock *lock) */ void LWLockOwn(LWLock *lock) { - uint32 expected_state; + uint64 expected_state; /* Ensure we will have room to remember the lock */ if (t_thrd.storage_cxt.num_held_lwlocks >= MAX_SIMUL_LWLOCKS) { @@ -1981,7 +1972,7 @@ void LWLockOwn(LWLock *lock) } /* Ensure that lock is held */ - expected_state = pg_atomic_read_u32(&lock->state); + expected_state = pg_atomic_read_u64(&lock->state); if (!((expected_state & LW_LOCK_MASK) > 0 || (expected_state & LW_VAL_EXCLUSIVE) > 0)) { ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s is not held", T_NAME(lock)))); } @@ -2004,11 +1995,11 @@ void LWLockOwn(LWLock *lock) */ void LWLockDisown(LWLock *lock) { - uint32 expected_state; + uint64 expected_state; int i; /* Ensure that lock is held */ - expected_state = pg_atomic_read_u32(&lock->state); + expected_state = pg_atomic_read_u64(&lock->state); if (!((expected_state & LW_LOCK_MASK) > 0 || (expected_state & LW_VAL_EXCLUSIVE) > 0)) { ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s is not held", T_NAME(lock)))); } @@ -2094,8 +2085,8 @@ void wakeup_victim(LWLock *lock, ThreadId victim_tid) } /* update flag LW_FLAG_HAS_WAITERS before waking up victim */ - if (dlist_is_empty(&lock->waiters) && (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) { - pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); + if (dlist_is_empty(&lock->waiters) && (pg_atomic_read_u64(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) { + pg_atomic_fetch_and_u64(&lock->state, ~LW_FLAG_HAS_WAITERS); } LWLockWaitListUnlock(lock); diff --git a/src/gausskernel/storage/nvm/nvmbuffer.cpp b/src/gausskernel/storage/nvm/nvmbuffer.cpp index 5ade48cab9..568d537f06 100644 --- a/src/gausskernel/storage/nvm/nvmbuffer.cpp +++ b/src/gausskernel/storage/nvm/nvmbuffer.cpp @@ -32,7 +32,7 @@ #include "utils/resowner.h" #include "pgstat.h" -static BufferDesc *NvmStrategyGetBuffer(uint32 *buf_state); +static BufferDesc *NvmStrategyGetBuffer(uint64 *buf_state); extern PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move); static const int MILLISECOND_TO_MICROSECOND = 1000; @@ -57,12 +57,12 @@ static bool NvmPinBuffer(BufferDesc *buf, bool *migrate) { int b = BufferDescriptorGetBuffer(buf); bool result = false; - uint32 buf_state; - uint32 old_buf_state; + uint64 buf_state; + uint64 old_buf_state; *migrate = false; - old_buf_state = pg_atomic_read_u32(&buf->state); + old_buf_state = pg_atomic_read_u64(&buf->state); for (;;) { if (unlikely(old_buf_state & BM_IN_MIGRATE)) { *migrate = true; @@ -82,7 +82,7 @@ static bool NvmPinBuffer(BufferDesc *buf, bool *migrate) buf_state += BUF_USAGECOUNT_ONE; } - if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { + if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state, buf_state)) { result = (buf_state & BM_VALID) != 0; break; } @@ -126,10 +126,10 @@ static bool NvmPinBufferFast(BufferDesc *buf) */ static bool WaitUntilUnPin(BufferDesc *buf) { - uint32 old_buf_state; + uint64 old_buf_state; int waits = 0; for (;;) { - old_buf_state = pg_atomic_read_u32(&buf->state); + old_buf_state = pg_atomic_read_u64(&buf->state); if (BUF_STATE_GET_REFCOUNT(old_buf_state) == 1) { return true; } else { @@ -150,15 +150,15 @@ static void WaitBufHdrUnMigrate(BufferDesc *buf) #ifndef ENABLE_THREAD_CHECK SpinDelayStatus delay_status = init_spin_delay(buf); #endif - uint32 buf_state; + uint64 buf_state; - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); while (buf_state & BM_IN_MIGRATE) { #ifndef ENABLE_THREAD_CHECK perform_spin_delay(&delay_status); #endif - buf_state = pg_atomic_read_u32(&buf->state); + buf_state = pg_atomic_read_u64(&buf->state); } #ifndef ENABLE_THREAD_CHECK @@ -169,10 +169,10 @@ static void WaitBufHdrUnMigrate(BufferDesc *buf) static bool SetBufferMigrateFlag(Buffer buffer) { BufferDesc *buf = GetBufferDescriptor(buffer - 1); - uint32 bufState; - uint32 oldBufState; + uint64 bufState; + uint64 oldBufState; for (;;) { - oldBufState = pg_atomic_read_u32(&buf->state); + oldBufState = pg_atomic_read_u64(&buf->state); if (oldBufState & BM_LOCKED) { oldBufState = WaitBufHdrUnlocked(buf); } @@ -183,8 +183,8 @@ static bool SetBufferMigrateFlag(Buffer buffer) bufState = oldBufState; bufState |= BM_IN_MIGRATE; - ereport(DEBUG1, (errmsg("mark buffer %d migrate buffer stat %u.", buffer, bufState))); - if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) { + ereport(DEBUG1, (errmsg("mark buffer %d migrate buffer stat %lu.", buffer, bufState))); + if (pg_atomic_compare_exchange_u64(&buf->state, &oldBufState, bufState)) { return true; } } @@ -193,17 +193,17 @@ static bool SetBufferMigrateFlag(Buffer buffer) static void UnSetBufferMigrateFlag(Buffer buffer) { BufferDesc *buf = GetBufferDescriptor(buffer - 1); - uint32 bufState; - uint32 oldBufState; + uint64 bufState; + uint64 oldBufState; for (;;) { - oldBufState = pg_atomic_read_u32(&buf->state); + oldBufState = pg_atomic_read_u64(&buf->state); if (oldBufState & BM_LOCKED) { oldBufState = WaitBufHdrUnlocked(buf); } bufState = oldBufState; bufState &= ~(BM_IN_MIGRATE); - ereport(DEBUG1, (errmsg("unmark buffer %d migrate buffer stat %u.", buffer, bufState))); - if (pg_atomic_compare_exchange_u32(&buf->state, &oldBufState, bufState)) { + ereport(DEBUG1, (errmsg("unmark buffer %d migrate buffer stat %lu.", buffer, bufState))); + if (pg_atomic_compare_exchange_u64(&buf->state, &oldBufState, bufState)) { break; } } @@ -211,7 +211,7 @@ static void UnSetBufferMigrateFlag(Buffer buffer) static void NvmWaitBufferIO(BufferDesc *buf) { - uint32 buf_state; + uint64 buf_state; Assert(!t_thrd.storage_cxt.InProgressBuf); @@ -219,8 +219,8 @@ static void NvmWaitBufferIO(BufferDesc *buf) if (t_thrd.storage_cxt.InProgressBuf) { ereport(PANIC, (errmsg("InProgressBuf not null: id %d flags %u, buf: id %d flags %u", t_thrd.storage_cxt.InProgressBuf->buf_id, - pg_atomic_read_u32(&t_thrd.storage_cxt.InProgressBuf->state) & BUF_FLAG_MASK, - buf->buf_id, pg_atomic_read_u32(&buf->state) & BUF_FLAG_MASK))); + pg_atomic_read_u64(&t_thrd.storage_cxt.InProgressBuf->state) & BUF_FLAG_MASK, + buf->buf_id, pg_atomic_read_u64(&buf->state) & BUF_FLAG_MASK))); } bool ioDone = false; @@ -266,12 +266,12 @@ BufferDesc *NvmBufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber fo BufferTag old_tag; /* previous identity of selected buffer */ uint32 old_hash; /* hash value for oldTag */ LWLock *old_partition_lock = NULL; /* buffer partition lock for it */ - uint32 old_flags; + uint64 old_flags; int buf_id; BufferDesc *buf = NULL; BufferDesc *nvmBuf = NULL; bool valid = false; - uint32 buf_state, nvm_buf_state; + uint64 buf_state, nvm_buf_state; bool migrate = false; errno_t rc; @@ -803,10 +803,10 @@ static inline uint32 NvmClockSweepTick(void) return victim; } -static BufferDesc* get_nvm_buf_from_candidate_list(uint32* buf_state) +static BufferDesc* get_nvm_buf_from_candidate_list(uint64* buf_state) { BufferDesc* buf = NULL; - uint32 local_buf_state; + uint64 local_buf_state; int buf_id = 0; int list_num = g_instance.ckpt_cxt_ctl->pgwr_procs.sub_num; volatile PgBackendStatus* beentry = t_thrd.shemem_ptr_cxt.MyBEEntry; @@ -840,11 +840,11 @@ static BufferDesc* get_nvm_buf_from_candidate_list(uint32* buf_state) } const int RETRY_COUNT = 3; -static BufferDesc *NvmStrategyGetBuffer(uint32* buf_state) +static BufferDesc *NvmStrategyGetBuffer(uint64* buf_state) { BufferDesc *buf = NULL; int try_counter = NVM_BUFFER_NUM * RETRY_COUNT; - uint32 local_buf_state = 0; /* to avoid repeated (de-)referencing */ + uint64 local_buf_state = 0; /* to avoid repeated (de-)referencing */ /* Check the Candidate list */ if (ENABLE_INCRE_CKPT && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 1) { diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index aeb1732919..ced3fad28c 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -46,9 +46,10 @@ thread_local static bool isForInput = false; static const int TEN_MICROSECOND = 10; #define BufHdrLocked(bufHdr) ((bufHdr)->state & BM_LOCKED) -#define SegBufferIsPinned(bufHdr) ((bufHdr)->state & BUF_REFCOUNT_MASK) +#define SegBufferIsPinned(bufHdr) BUF_STATE_GET_REFCOUNT((bufHdr)->state) -static BufferDesc *SegStrategyGetBuffer(uint32 *buf_state); +static BufferDesc *SegStrategyGetBuffer(uint64 *buf_state); +static bool SegStartBufferIO(BufferDesc *buf, bool forInput); extern PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move); extern void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref); @@ -74,7 +75,7 @@ void AbortSegBufferIO(void) static bool SegStartBufferIO(BufferDesc *buf, bool forInput) { - uint32 buf_state; + uint64 buf_state; bool dms_need_flush = false; // used in dms SegmentCheck(!InProgressBuf); @@ -123,11 +124,11 @@ static bool SegStartBufferIO(BufferDesc *buf, bool forInput) return true; } -void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) +void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits) { SegmentCheck(buf == InProgressBuf); - uint32 buf_state = LockBufHdr(buf); + uint64 buf_state = LockBufHdr(buf); SegmentCheck(buf_state & BM_IO_IN_PROGRESS); @@ -193,8 +194,8 @@ bool SegPinBuffer(BufferDesc *buf) PrivateRefCountEntry * ref = GetPrivateRefCountEntry(b, true); if (ref == NULL) { - uint32 buf_state; - uint32 old_buf_state = pg_atomic_read_u32(&buf->state); + uint64 buf_state; + uint64 old_buf_state = pg_atomic_read_u64(&buf->state); ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); @@ -206,7 +207,7 @@ bool SegPinBuffer(BufferDesc *buf) buf_state = old_buf_state; buf_state += BUF_REFCOUNT_ONE; - if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { + if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state, buf_state)) { result = old_buf_state & BM_VALID; break; } @@ -236,9 +237,11 @@ static bool SegPinBufferLocked(BufferDesc *buf, const BufferTag *tag) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - uint32 buf_state = pg_atomic_read_u32(&buf->state); + uint64 buf_state = pg_atomic_read_u64(&buf->state); Assert(buf_state & BM_LOCKED); - buf_state += BUF_REFCOUNT_ONE; + + buf_state = __sync_add_and_fetch(&buf->state, 1); + UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -265,10 +268,10 @@ void SegUnpinBuffer(BufferDesc *buf) ref->refcount--; if (ref->refcount == 0) { - uint32 buf_state; - uint32 old_buf_state; + uint64 buf_state; + uint64 old_buf_state; - old_buf_state = pg_atomic_read_u32(&buf->state); + old_buf_state = pg_atomic_read_u64(&buf->state); for (;;) { if (old_buf_state & BM_LOCKED) { old_buf_state = WaitBufHdrUnlocked(buf); @@ -277,7 +280,7 @@ void SegUnpinBuffer(BufferDesc *buf) buf_state = old_buf_state; SegmentCheck(BUF_STATE_GET_REFCOUNT(buf_state) > 0); buf_state -= BUF_REFCOUNT_ONE; - if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, buf_state)) { + if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state, buf_state)) { break; } } @@ -303,7 +306,7 @@ void SegUnlockReleaseBuffer(Buffer buffer) void SegMarkBufferDirty(Buffer buf) { - uint32 old_buf_state, buf_state; + uint64 old_buf_state, buf_state; BufferDesc *bufHdr; bufHdr = GetBufferDescriptor(buf - 1); @@ -393,7 +396,7 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln) } SegmentCheck(spc != NULL); - uint32 buf_state = LockBufHdr(buf); + uint64 buf_state = LockBufHdr(buf); buf_state &= ~BM_JUST_DIRTIED; UnlockBufHdr(buf, buf_state); @@ -527,7 +530,7 @@ Buffer ReadSegBufferForDMS(BufferDesc* bufHdr, ReadBufferMode mode, SegSpace *sp } else { #ifdef USE_ASSERT_CHECKING bool need_verify = (!RecoveryInProgress() && !SS_IN_ONDEMAND_RECOVERY && - ((pg_atomic_read_u32(&bufHdr->state) & BM_VALID) != 0) && ENABLE_DSS && + ((pg_atomic_read_u64(&bufHdr->state) & BM_VALID) != 0) && ENABLE_DSS && ENABLE_VERIFY_PAGE_VERSION); char *past_image = NULL; if (need_verify) { @@ -594,12 +597,12 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc BufferDesc *bufHdr = SegBufferAlloc(spc, rnode, forkNum, blockNum, &found); if (!found) { - SegmentCheck(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); + SegmentCheck(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID)); char *bufBlock = (char *)BufHdrGetBlock(bufHdr); if (ENABLE_DMS && mode != RBM_FOR_REMOTE) { - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); + Assert(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID)); do { bool startio; @@ -610,7 +613,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc } if (!startio) { - Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID); + Assert(pg_atomic_read_u64(&bufHdr->state) & BM_VALID); found = true; goto found_branch; } @@ -639,7 +642,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc * previous attempts to read the buffer must have failed, * but DRC has been created, so load page directly again */ - Assert(pg_atomic_read_u32(&bufHdr->state) & BM_IO_ERROR); + Assert(pg_atomic_read_u64(&bufHdr->state) & BM_IO_ERROR); buf_ctrl->state |= BUF_NEED_LOAD; } @@ -714,8 +717,8 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, { BufferDesc *buf; BufferTag new_tag, old_tag; - uint32 buf_state; - uint32 old_flags; + uint64 buf_state; + uint64 old_flags; uint32 new_hash, old_hash; LWLock *new_partition_lock; LWLock *old_partition_lock; @@ -855,10 +858,10 @@ static inline uint32 ClockSweepTick(void) return victim; } -static BufferDesc* get_segbuf_from_candidate_list(uint32* buf_state) +static BufferDesc* get_segbuf_from_candidate_list(uint64* buf_state) { BufferDesc* buf = NULL; - uint32 local_buf_state; + uint64 local_buf_state; int buf_id = 0; if (ENABLE_INCRE_CKPT && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) { @@ -895,7 +898,7 @@ static BufferDesc* get_segbuf_from_candidate_list(uint32* buf_state) /* lock the buffer descriptor before return */ const int RETRY_COUNT = 3; -static BufferDesc *SegStrategyGetBuffer(uint32 *buf_state) +static BufferDesc *SegStrategyGetBuffer(uint64 *buf_state) { // todo: add free list BufferDesc *buf = get_segbuf_from_candidate_list(buf_state); @@ -910,7 +913,7 @@ static BufferDesc *SegStrategyGetBuffer(uint32 *buf_state) int buf_id = BufferIdOfSegmentBuffer(ClockSweepTick()); buf = GetBufferDescriptor(buf_id); - uint32 state = LockBufHdr(buf); + uint64 state = LockBufHdr(buf); if (BUF_STATE_GET_REFCOUNT(state) == 0) { *buf_state = state; @@ -923,7 +926,7 @@ static BufferDesc *SegStrategyGetBuffer(uint32 *buf_state) UnlockBufHdr(buf, state); ereport(DEBUG5, (errmodule(MOD_SEGMENT_PAGE), - (errmsg("SegStrategyGetBuffer get a pinned buffer, %d, buffer tag <%u, %u, %u, %u>.%d.%u, state %u", + (errmsg("SegStrategyGetBuffer get a pinned buffer, %d, buffer tag <%u, %u, %u, %u>.%d.%u, state %lu", buf_id, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.rnode.bucketNode, buf->tag.forkNum, buf->tag.blockNum, state)))); } @@ -937,7 +940,7 @@ void SegDropSpaceMetaBuffers(Oid spcNode, Oid dbNode) smgrcloseall(); for (i = SegmentBufferStartID; i < TOTAL_BUFFER_NUM; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - uint32 buf_state; + uint64 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe * and saves some cycles. diff --git a/src/gausskernel/storage/smgr/segment/segxlog.cpp b/src/gausskernel/storage/smgr/segment/segxlog.cpp index 8efa9fa38e..ca364f22de 100644 --- a/src/gausskernel/storage/smgr/segment/segxlog.cpp +++ b/src/gausskernel/storage/smgr/segment/segxlog.cpp @@ -534,7 +534,7 @@ void move_extent_flush_buffer(XLogMoveExtent *xlog_data) BufferDesc *buf_desc = BufferGetBufferDescriptor(buffer); if (buf_desc->extra->seg_blockno == old_seg_blockno) { - uint32 buf_state = LockBufHdr(buf_desc); + uint64 buf_state = LockBufHdr(buf_desc); if (buf_state & BM_DIRTY) { /* spin-lock should be released before IO */ UnlockBufHdr(buf_desc, buf_state); diff --git a/src/gausskernel/storage/smgr/segment/space.cpp b/src/gausskernel/storage/smgr/segment/space.cpp index 1e444cf3dd..2c0467e68f 100644 --- a/src/gausskernel/storage/smgr/segment/space.cpp +++ b/src/gausskernel/storage/smgr/segment/space.cpp @@ -557,7 +557,7 @@ static void copy_extent(SegExtentGroup *seg, RelFileNode logic_rnode, uint32 log * physical location, they will find data on disk are too old, incurring LSN check failing. */ BufferDesc *buf_desc = BufferGetBufferDescriptor(buf); - uint32 buf_state = LockBufHdr(buf_desc); + uint64 buf_state = LockBufHdr(buf_desc); UnlockBufHdr(buf_desc, buf_state); if (buf_state & BM_DIRTY) { FlushOneBufferIncludeDW(buf_desc); @@ -1094,7 +1094,7 @@ static void invalidate_metadata_buffer(SegExtentGroup *seg, BlockNumber target_s { for (int i = SegmentBufferStartID; i < TOTAL_BUFFER_NUM; i++) { BufferDesc *bufdesc = GetBufferDescriptor(i); - uint32 state; + uint64 state; if (IsBufferToBeTruncated(bufdesc, seg, target_size)) { state = LockBufHdr(bufdesc); diff --git a/src/include/access/double_write.h b/src/include/access/double_write.h index 734269684d..d50d354b5c 100644 --- a/src/include/access/double_write.h +++ b/src/include/access/double_write.h @@ -146,7 +146,7 @@ const uint16 DW_FIRST_DATA_PAGE_NUM = (32768 - DW_SECOND_DATA_PAGE_NUM - DW_SECO const uint16 DW_SECOND_BUFTAG_START_IDX = 1 + DW_FIRST_DATA_PAGE_NUM + 1; /* two head */ const uint16 DW_SECOND_DATA_START_IDX = DW_SECOND_BUFTAG_START_IDX + DW_SECOND_BUFTAG_PAGE_NUM; -inline bool dw_buf_valid_dirty(uint32 buf_state) +inline bool dw_buf_valid_dirty(uint64 buf_state) { if (ENABLE_DMS && ENABLE_DSS_AIO) { return true; @@ -155,7 +155,7 @@ inline bool dw_buf_valid_dirty(uint32 buf_state) return ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)); } -inline bool dw_buf_ckpt_needed(uint32 buf_state) +inline bool dw_buf_ckpt_needed(uint64 buf_state) { return ((buf_state & (BM_VALID | BM_DIRTY | BM_CHECKPOINT_NEEDED)) == (BM_VALID | BM_DIRTY | BM_CHECKPOINT_NEEDED)); } @@ -312,7 +312,7 @@ inline bool dw_page_writer_running() /** * If enable dms and aio, the aio_in_process should be false. */ -inline bool dw_buf_valid_aio_finished(BufferDesc *buf_desc, uint32 buf_state) +inline bool dw_buf_valid_aio_finished(BufferDesc *buf_desc, uint64 buf_state) { if (!ENABLE_DMS || !ENABLE_DSS_AIO) { return true; diff --git a/src/include/access/xlogproc.h b/src/include/access/xlogproc.h index 0510e86a94..564b35b3f2 100755 --- a/src/include/access/xlogproc.h +++ b/src/include/access/xlogproc.h @@ -98,7 +98,7 @@ typedef struct { int dirtyflag; /* true if the buffer changed */ } RedoBufferInfo; -extern void GetFlushBufferInfo(void *buf, RedoBufferInfo *bufferinfo, uint32 *buf_state, ReadBufferMethod flushmethod); +extern void GetFlushBufferInfo(void *buf, RedoBufferInfo *bufferinfo, uint64 *buf_state, ReadBufferMethod flushmethod); #define MakeRedoBufferDirty(bufferinfo) ((bufferinfo)->dirtyflag = true) #define RedoBufferDirtyClear(bufferinfo) ((bufferinfo)->dirtyflag = false) @@ -108,7 +108,7 @@ extern void GetFlushBufferInfo(void *buf, RedoBufferInfo *bufferinfo, uint32 *bu typedef struct { RedoBufferTag blockinfo; - pg_atomic_uint32 state; + pg_atomic_uint64 state; } RedoBufferDesc; typedef struct { @@ -822,7 +822,7 @@ extern void XLogRedoBufferRelease(RedoBufferManager* buffermanager, Buffer buffe extern BlockNumber XLogRedoBufferGetBlkNumber(RedoBufferManager* buffermanager, Buffer bufferid); extern Block XLogRedoBufferGetBlk(RedoBufferManager* buffermanager, RedoMemSlot* bufferslot); extern Block XLogRedoBufferGetPage(RedoBufferManager* buffermanager, Buffer bufferid); -extern void XLogRedoBufferSetState(RedoBufferManager* buffermanager, RedoMemSlot* bufferslot, uint32 state); +extern void XLogRedoBufferSetState(RedoBufferManager* buffermanager, RedoMemSlot* bufferslot, uint64 state); #define XLogRedoBufferInitFunc(bufferManager, buffernum, defOperate, interruptOperte) do { \ XLogRedoBufferInit(bufferManager, buffernum, defOperate, interruptOperte); \ diff --git a/src/include/storage/buf/buf_internals.h b/src/include/storage/buf/buf_internals.h index b5a2d24b63..bfaa2e6b09 100644 --- a/src/include/storage/buf/buf_internals.h +++ b/src/include/storage/buf/buf_internals.h @@ -38,16 +38,19 @@ * * The definition of buffer state components is below. */ -#define BUF_REFCOUNT_ONE 1 -#define BUF_REFCOUNT_MASK ((1U << 16) - 1) -#define BUF_USAGECOUNT_MASK 0x003C0000U -#define BUF_USAGECOUNT_ONE (1U << 18) -#define BUF_USAGECOUNT_SHIFT 18 -#define BUF_FLAG_MASK 0xFFC00000U + +#define BUF_REFCOUNT_ONE 1LU +#define BUF_REFCOUNT_ONE_16 (1LU << 16) +#define BUF_REFCOUNT_ONE_32 (1LU << 32) +#define BUF_REFCOUNT_MASK ((1LU << 17) - 1) +#define BUF_USAGECOUNT_MASK 0x003C000000000000LU +#define BUF_USAGECOUNT_ONE (1LU << 18 << 32) +#define BUF_USAGECOUNT_SHIFT (18 + 32) +#define BUF_FLAG_MASK 0xFFC0000000000000LU /* Get refcount and usagecount from buffer state */ -#define BUF_STATE_GET_REFCOUNT(state) ((state)&BUF_REFCOUNT_MASK) #define BUF_STATE_GET_USAGECOUNT(state) (((state)&BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) +#define BUF_STATE_GET_REFCOUNT(state) (((state)&0xFFFFFFFF)) /* * Flags for buffer descriptors @@ -55,19 +58,20 @@ * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ -#define BM_IN_MIGRATE (1U << 16) /* buffer is migrating */ -#define BM_IS_META (1U << 17) -#define BM_LOCKED (1U << 22) /* buffer header is locked */ -#define BM_DIRTY (1U << 23) /* data needs writing */ -#define BM_VALID (1U << 24) /* data is valid */ -#define BM_TAG_VALID (1U << 25) /* tag is assigned */ -#define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */ -#define BM_IO_ERROR (1U << 27) /* previous I/O failed */ -#define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */ -#define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */ -#define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */ +#define BM_IN_MIGRATE (1U << 16 << 32) /* buffer is migrating */ +#define BM_IS_TMP_BUF (1U << 21 << 32) /* temp buf, can not write to disk */ +#define BM_IS_META (1LU << 17 << 32) +#define BM_LOCKED (1LU << 22 << 32) /* buffer header is locked */ +#define BM_DIRTY (1LU << 23 << 32) /* data needs writing */ +#define BM_VALID (1LU << 24 << 32) /* data is valid */ +#define BM_TAG_VALID (1LU << 25 << 32) /* tag is assigned */ +#define BM_IO_IN_PROGRESS (1LU << 26 << 32) /* read or write in progress */ +#define BM_IO_ERROR (1LU << 27 << 32) /* previous I/O failed */ +#define BM_JUST_DIRTIED (1LU << 28 << 32) /* dirtied since write started */ +#define BM_PIN_COUNT_WAITER (1LU << 29 << 32) /* have waiter for sole pin */ +#define BM_CHECKPOINT_NEEDED (1LU << 30 << 32) /* must write for checkpoint */ #define BM_PERMANENT \ - (1U << 31) /* permanent relation (not \ + (1LU << 31 << 32) /* permanent relation (not \ * unlogged, or init fork) ) */ /* * The maximum allowed value of usage_count represents a tradeoff between @@ -208,10 +212,11 @@ typedef struct BufferDescExtra { typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ - int buf_id; /* buffer's index number (from 0) */ /* state of the tag, containing flags, refcount and usagecount */ - pg_atomic_uint32 state; + pg_atomic_uint64 state; + + int buf_id; /* buffer's index number (from 0) */ ThreadId wait_backend_pid; /* backend PID of pin-count waiter */ @@ -266,7 +271,7 @@ typedef union BufferDescPadded { * Functions for acquiring/releasing a shared buffer header's spinlock. Do * not apply these to local buffers! */ -extern uint32 LockBufHdr(BufferDesc* desc); +extern uint64 LockBufHdr(BufferDesc* desc); #ifdef ENABLE_THREAD_CHECK extern "C" { @@ -282,10 +287,10 @@ extern "C" { /* ENABLE_THREAD_CHECK only, release semantic */ \ TsAnnotateHappensBefore(&desc->state); \ pg_write_barrier(); \ - pg_atomic_write_u32(&(desc)->state, (s) & (~BM_LOCKED)); \ + pg_atomic_write_u32((((volatile uint32 *)&(desc)->state) + 1), ( ( (s) & (~BM_LOCKED) ) >> 32) ); \ } while (0) -extern bool retryLockBufHdr(BufferDesc* desc, uint32* buf_state); +extern bool retryLockBufHdr(BufferDesc* desc, uint64* buf_state); /* * The PendingWriteback & WritebackContext structure are used to keep * information about pending flush requests to be issued to the OS. @@ -333,7 +338,7 @@ extern void IssuePendingWritebacks(WritebackContext* context); extern void ScheduleBufferTagForWriteback(WritebackContext* context, BufferTag* tag); /* freelist.c */ -extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state); +extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state); extern void StrategyFreeBuffer(volatile BufferDesc* buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc* buf); diff --git a/src/include/storage/buf/bufmgr.h b/src/include/storage/buf/bufmgr.h index a02c2837d9..fd108ec952 100644 --- a/src/include/storage/buf/bufmgr.h +++ b/src/include/storage/buf/bufmgr.h @@ -283,11 +283,11 @@ extern void UnlockReleaseBuffer(Buffer buffer); extern void MarkBufferDirty(Buffer buffer); extern void IncrBufferRefCount(Buffer buffer); extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum); -void PageCheckIfCanEliminate(BufferDesc *buf, uint32 *oldFlags, bool *needGetLock); +void PageCheckIfCanEliminate(BufferDesc *buf, uint64 *oldFlags, bool *needGetLock); #ifdef USE_ASSERT_CHECKING -void PageCheckWhenChosedElimination(const BufferDesc *buf, uint32 oldFlags); +void PageCheckWhenChosedElimination(const BufferDesc *buf, uint64 oldFlags); #endif -uint32 WaitBufHdrUnlocked(BufferDesc* buf); +uint64 WaitBufHdrUnlocked(BufferDesc* buf); void WaitIO(BufferDesc *buf); void InvalidateBuffer(BufferDesc *buf); extern void ReservePrivateRefCountEntry(void); @@ -365,9 +365,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern void AsyncUnpinBuffer(volatile void* bufHdr, bool forgetBuffer); extern void AsyncCompltrPinBuffer(volatile void* bufHdr); extern void AsyncCompltrUnpinBuffer(volatile void* bufHdr); -extern void TerminateBufferIO(volatile BufferDesc* buf, bool clear_dirty, uint32 set_flag_bits); +extern void TerminateBufferIO(volatile BufferDesc* buf, bool clear_dirty, uint64 set_flag_bits); -extern void AsyncTerminateBufferIO(void* bufHdr, bool clear_dirty, uint32 set_flag_bits); +extern void AsyncTerminateBufferIO(void* bufHdr, bool clear_dirty, uint64 set_flag_bits); extern void AsyncAbortBufferIO(void* buf, bool isForInput); extern void AsyncTerminateBufferIOByVacuum(void* buffer); extern void AsyncAbortBufferIOByVacuum(void* buffer); diff --git a/src/include/storage/lock/lwlock.h b/src/include/storage/lock/lwlock.h index dfbb6123a7..86dcf187a8 100644 --- a/src/include/storage/lock/lwlock.h +++ b/src/include/storage/lock/lwlock.h @@ -319,7 +319,7 @@ struct PGPROC; typedef struct LWLock { uint16 tranche; /* tranche ID */ - pg_atomic_uint32 state; /* state of exlusive/nonexclusive lockers */ + pg_atomic_uint64 state; /* state of exlusive/nonexclusive lockers */ dlist_head waiters; /* list of waiting PGPROCs */ #ifdef LOCK_DEBUG pg_atomic_uint32 nwaiters; /* number of waiters */ diff --git a/src/include/storage/smgr/segment.h b/src/include/storage/smgr/segment.h index 8cb73ac3ee..adc02f33ef 100644 --- a/src/include/storage/smgr/segment.h +++ b/src/include/storage/smgr/segment.h @@ -147,7 +147,7 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_ void SetInProgressFlags(BufferDesc *bufDesc, bool input); bool HasInProgressBuf(void); -void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits); +void SegTerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits); #ifdef USE_ASSERT_CHECKING void SegFlushCheckDiskLSN(SegSpace *spc, RelFileNode rNode, ForkNumber forknum, BlockNumber blocknum, char *buf); #endif diff --git a/src/include/utils/atomic.h b/src/include/utils/atomic.h index 510a695aab..ca866780fe 100644 --- a/src/include/utils/atomic.h +++ b/src/include/utils/atomic.h @@ -240,6 +240,16 @@ static inline bool pg_atomic_compare_exchange_u32(volatile uint32* ptr, uint32* return ret; } +static inline bool pg_atomic_compare_exchange_u8(volatile uint8* ptr, uint8* expected, uint8 newval) +{ + bool ret = false; + uint8 current; + current = __sync_val_compare_and_swap(ptr, *expected, newval); + ret = current == *expected; + *expected = current; + return ret; +} + /* * @Description: Atomic write in a 32-bit address. * @IN ptr: int32 pointer -- Gitee From 9dc2e1d6fafc072bcbe654122cce5a0a31e2c2fd Mon Sep 17 00:00:00 2001 From: "arcoalien@qq.com" Date: Thu, 2 Nov 2023 11:43:58 +0800 Subject: [PATCH 2/2] =?UTF-8?q?add=20=E5=8D=8A=E6=97=A0=E9=94=81dynamiac?= =?UTF-8?q?=20hash=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/utils/hash/dynahash.cpp | 15 +++-- .../ddes/adapter/ss_dms_callback.cpp | 64 ++++++------------- .../storage/access/heap/visibilitymap.cpp | 2 +- .../storage/access/nbtree/nbtsearch.cpp | 4 +- src/gausskernel/storage/buffer/bufmgr.cpp | 37 +++++------ src/gausskernel/storage/lmgr/lwlock.cpp | 3 + .../storage/smgr/segment/segbuffer.cpp | 31 ++++++--- .../storage/smgr/segment/space.cpp | 9 ++- 8 files changed, 77 insertions(+), 88 deletions(-) diff --git a/src/common/backend/utils/hash/dynahash.cpp b/src/common/backend/utils/hash/dynahash.cpp index 537a95d20b..b8fb6eeb3f 100644 --- a/src/common/backend/utils/hash/dynahash.cpp +++ b/src/common/backend/utils/hash/dynahash.cpp @@ -1607,6 +1607,12 @@ void* buf_hash_operate(HTAB* hashp, const BufferTag* keyPtr, uint32 hashvalue, b case HASH_REMOVE: if (currBucket != NULL) { + CLEAR_BUFFERTAG(*((BufferTag*)(ELEMENTKEY(currBucket)))); + pg_memory_barrier(); + + /* remove record from hash bucket's chain. */ + *prevBucketPtr = currBucket->link; + freelist_idx = FREELIST_IDX(hctl, hashvalue); /* if partitioned, must lock to touch nentries and freeList */ if (IS_PARTITIONED(hctl)) { @@ -1615,9 +1621,6 @@ void* buf_hash_operate(HTAB* hashp, const BufferTag* keyPtr, uint32 hashvalue, b Assert(hctl->freeList[freelist_idx].nentries > 0); hctl->freeList[freelist_idx].nentries--; - /* remove record from hash bucket's chain. */ - *prevBucketPtr = currBucket->link; - /* add the record to the freelist for this table. */ currBucket->link = hctl->freeList[freelist_idx].freeList; hctl->freeList[freelist_idx].freeList = currBucket; @@ -1654,14 +1657,16 @@ void* buf_hash_operate(HTAB* hashp, const BufferTag* keyPtr, uint32 hashvalue, b } } - /* link into hashbucket chain */ - *prevBucketPtr = currBucket; currBucket->link = NULL; /* copy key into record */ currBucket->hashvalue = hashvalue; BUFFERTAGS_PTR_SET((BufferTag*)(ELEMENTKEY(currBucket)), keyPtr); + pg_memory_barrier(); + /* link into hashbucket chain */ + *prevBucketPtr = currBucket; + /* * Caller is expected to fill the data field on return. DO NOT * insert any code that could possibly throw error here, as doing diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 477a89261e..8c02e196b3 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -498,7 +498,6 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ int ret = DMS_SUCCESS; int buf_id = -1; uint32 hash; - LWLock *partition_lock = NULL; BufferDesc *buf_desc = NULL; RelFileNode relfilenode = tag->rnode; bool get_lock = false; @@ -516,24 +515,13 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ *buf_ctrl = NULL; hash = BufTableHashCode(tag); - partition_lock = BufMappingPartitionLock(hash); uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; PG_TRY(); { do { - get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED); - if (!get_lock) { - ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " - "lock:%p", - tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, - tag->forkNum, tag->blockNum, partition_lock)))); - ret = GS_TIMEOUT; - break; - } buf_id = BufTableLookup(tag, hash); if (buf_id < 0) { - LWLockRelease(partition_lock); break; } @@ -546,7 +534,11 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ (void)PinBuffer(buf_desc, NULL); is_seg = false; } - LWLockRelease(partition_lock); + + if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) { + DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg); + break; + } bool wait_success = SSWaitIOTimeout(buf_desc); if (!wait_success) { @@ -669,25 +661,13 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig int buf_id = -1; BufferTag* tag = (BufferTag *)pageid; uint32 hash; - LWLock *partition_lock = NULL; uint64 buf_state; int ret = DMS_SUCCESS; - + bool get_lock; hash = BufTableHashCode(tag); - partition_lock = BufMappingPartitionLock(hash); - bool get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED); - if (!get_lock) { - ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " - "lwlock:%p", - tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, - tag->forkNum, tag->blockNum, partition_lock)))); - return GS_TIMEOUT; - } - buf_id = BufTableLookup(tag, hash); if (buf_id < 0) { /* not found in shared buffer */ - LWLockRelease(partition_lock); return ret; } @@ -699,9 +679,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig buf_desc = GetBufferDescriptor(buf_id); if (SS_PRIMARY_MODE) { buf_state = LockBufHdr(buf_desc); - if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || BUF_STATE_GET_USAGECOUNT(buf_state) != 0) { + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || BUF_STATE_GET_USAGECOUNT(buf_state) != 0 || + !BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) { UnlockBufHdr(buf_desc, buf_state); - LWLockRelease(partition_lock); return DMS_ERROR; } @@ -711,7 +691,6 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, tag->forkNum, tag->blockNum, buf_desc->state))); UnlockBufHdr(buf_desc, buf_state); - LWLockRelease(partition_lock); buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL; buf_ctrl->seg_fileno = EXTENT_INVALID; buf_ctrl->seg_blockno = InvalidBlockNumber; @@ -734,7 +713,6 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig } UnlockBufHdr(buf_desc, buf_state); - LWLockRelease(partition_lock); return ret; } @@ -744,7 +722,11 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner); (void)PinBuffer(buf_desc, NULL); } - LWLockRelease(partition_lock); + + if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) { + DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id)); + return ret; + } bool wait_success = SSWaitIOTimeout(buf_desc); if (!wait_success) { @@ -1319,7 +1301,6 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d { int buf_id; uint32 hash; - LWLock *partition_lock = NULL; BufferTag *tag = (BufferTag *)pageid; BufferDesc *buf_desc; bool ret = true; @@ -1338,21 +1319,10 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d #endif hash = BufTableHashCode(tag); - partition_lock = BufMappingPartitionLock(hash); uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; PG_TRY(); { - bool get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED); - if (!get_lock) { - ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " - "lwlock:%p", - tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, - tag->forkNum, tag->blockNum, partition_lock)))); - ret = false; - break; - } - buf_id = BufTableLookup(tag, hash); if (buf_id >= 0) { buf_desc = GetBufferDescriptor(buf_id); @@ -1362,7 +1332,12 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner); (void)PinBuffer(buf_desc, NULL); } - LWLockRelease(partition_lock); + + if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) { + SSUnPinBuffer(buf_desc); + *ret_buf_desc = NULL; + break; + } bool wait_success = SSWaitIOTimeout(buf_desc); if (!wait_success) { @@ -1375,7 +1350,6 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d *is_valid = (pg_atomic_read_u64(&buf_desc->state) & BM_VALID) != 0; *ret_buf_desc = buf_desc; } else { - LWLockRelease(partition_lock); *ret_buf_desc = NULL; } } diff --git a/src/gausskernel/storage/access/heap/visibilitymap.cpp b/src/gausskernel/storage/access/heap/visibilitymap.cpp index 89df8ba17c..4b118e6fd3 100644 --- a/src/gausskernel/storage/access/heap/visibilitymap.cpp +++ b/src/gausskernel/storage/access/heap/visibilitymap.cpp @@ -242,7 +242,7 @@ void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, XLogRe Assert(PageIsAllVisible(heapPage)); if (ENABLE_DMS) { BufferDesc* buf_desc = GetBufferDescriptor(heapBuf - 1); - if ((pg_atomic_read_u32(&buf_desc->state) & BM_DIRTY) == 0) { + if ((pg_atomic_read_u64(&buf_desc->state) & BM_DIRTY) == 0) { MarkBufferDirty(heapBuf); } } diff --git a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp index 40365059b2..0c146edd00 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp @@ -77,7 +77,6 @@ BTStack _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, Buffe ItemId itemid; IndexTuple itup; BlockNumber blkno; - BlockNumber par_blkno; BTStack new_stack = NULL; /* @@ -107,7 +106,6 @@ BTStack _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, Buffe itemid = PageGetItemId(page, offnum); itup = (IndexTuple)PageGetItem(page, itemid); blkno = BTreeInnerTupleGetDownLink(itup); - par_blkno = BufferGetBlockNumber(*bufP); /* * We need to save the location of the index entry we chose in the @@ -121,7 +119,7 @@ BTStack _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, Buffe */ if (needStack) { new_stack = (BTStack)palloc(sizeof(BTStackData)); - new_stack->bts_blkno = par_blkno; + new_stack->bts_blkno = BufferGetBlockNumber(*bufP); new_stack->bts_offset = offnum; new_stack->bts_btentry = blkno; new_stack->bts_parent = stack_in; diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index e1b39d8f1c..2a7a8abca9 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2703,10 +2703,9 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe /* determine its hash code and partition lock ID */ new_hash = BufTableHashCode(&new_tag); - new_partition_lock = BufMappingPartitionLock(new_hash); +retry: /* see if the block is in the buffer pool already */ - (void)LWLockAcquire(new_partition_lock, LW_SHARED); pgstat_report_waitevent(WAIT_EVENT_BUF_HASH_SEARCH); buf_id = BufTableLookup(&new_tag, new_hash); pgstat_report_waitevent(WAIT_EVENT_END); @@ -2720,8 +2719,10 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe valid = PinBuffer(buf, strategy); - /* Can release the mapping lock as soon as we've pinned it */ - LWLockRelease(new_partition_lock); + if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &new_tag)) { + UnpinBuffer(buf, true); + goto retry; + } *found = TRUE; @@ -2753,11 +2754,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe return buf; } - /* - * Didn't find it in the buffer pool. We'll have to initialize a new - * buffer. Remember to unlock the mapping lock while doing the work. - */ - LWLockRelease(new_partition_lock); + new_partition_lock = BufMappingPartitionLock(new_hash); /* Loop here in case we have to try another victim buffer */ for (;;) { bool needGetLock = false; @@ -2891,6 +2888,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe * To change the association of a valid buffer, we'll need to have * exclusive lock on both the old and new mapping partitions. */ + old_flags = buf_state & BUF_FLAG_MASK; if (old_flags & BM_TAG_VALID) { /* * Need to compute the old tag's hashcode and partition lock ID. @@ -2913,6 +2911,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe old_partition_lock = NULL; } + buf_state = LockBufHdr(buf); /* * Try to make a hashtable entry for the buffer under its new tag. * This could fail because while we were writing someone else @@ -2928,6 +2927,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe * pool in the first place. First, give up the buffer we were * planning to use. */ + UnlockBufHdr(buf, buf_state); UnpinBuffer(buf, true); /* Can give up that buffer's mapping partition lock now */ @@ -2964,10 +2964,6 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe return buf; } - /* - * Need to lock the buffer header too in order to change its tag. - */ - buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were * doing the I/O and making the new hashtable entry. If so, we can't @@ -2998,11 +2994,12 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe } } - UnlockBufHdr(buf, buf_state); BufTableDelete(&new_tag, new_hash); if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) { LWLockRelease(old_partition_lock); } + + UnlockBufHdr(buf, buf_state); LWLockRelease(new_partition_lock); UnpinBuffer(buf, true); } @@ -3034,8 +3031,6 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; } - UnlockBufHdr(buf, buf_state); - if (ENABLE_DMS) { GetDmsBufCtrl(buf->buf_id)->lock_mode = DMS_LOCK_NULL; GetDmsBufCtrl(buf->buf_id)->been_loaded = false; @@ -3061,7 +3056,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe buf->extra->seg_blockno = InvalidBlockNumber; } LWLockRelease(new_partition_lock); - + UnlockBufHdr(buf, buf_state); /* * Buffer contents are currently invalid. Try to get the io_in_progress * lock. If StartBufferIO returns false, then someone else managed to @@ -7258,7 +7253,6 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum) SMgrRelation smgr = smgropen(rnode, InvalidBackendId); BufferTag tag; /* identity of target block */ uint32 hash; /* hash value for tag */ - LWLock* partitionLock; /* buffer partition lock for it */ int bufId; BufferDesc *bufHdr; uint64 bufState; @@ -7268,12 +7262,9 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum) /* determine its hash code and partition lock ID */ hash = BufTableHashCode(&tag); - partitionLock = BufMappingPartitionLock(hash); /* see if the block is in the buffer pool */ - LWLockAcquire(partitionLock, LW_SHARED); bufId = BufTableLookup(&tag, hash); - LWLockRelease(partitionLock); /* didn't find it, so nothing to do */ if (bufId < 0) { @@ -7284,6 +7275,10 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum) bufHdr = GetBufferDescriptor(bufId); bufState = LockBufHdr(bufHdr); + if (!BUFFERTAGS_PTR_EQUAL(&bufHdr->tag, &tag)) { + UnlockBufHdr(bufHdr, bufState); + return; + } /* * The buffer might been evicted after we released the partition lock and * before we acquired the buffer header lock. If so, the buffer we've diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index cc86c0d619..31c7c2c4a3 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -865,6 +865,9 @@ static bool LWLockAttemptLock(LWLock *lock, LWLockMode mode) } desired_state = old_state + refoneByThread; + if ((desired_state & (LW_VAL_EXCLUSIVE)) != 0) { + return true; + } } while (!pg_atomic_compare_exchange_u8((((volatile uint8*)&lock->state) + maskId), ((uint8*)&old_state) + maskId, (desired_state >> (8 * maskId)))); } else if (mode == LW_EXCLUSIVE) { old_state = pg_atomic_read_u64(&lock->state); diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index ced3fad28c..2b1ad95f19 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -727,18 +727,31 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, INIT_BUFFERTAG(new_tag, rnode, forkNum, blockNum); new_hash = BufTableHashCode(&new_tag); - new_partition_lock = BufMappingPartitionLock(new_hash); - LWLockAcquire(new_partition_lock, LW_SHARED); +retry: int buf_id = BufTableLookup(&new_tag, new_hash); if (buf_id >= 0) { - return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr); + buf = GetBufferDescriptor(buf_id); + bool valid = SegPinBuffer(buf); + if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &new_tag)) { + SegUnpinBuffer(buf); + goto retry; + } + + *foundPtr = true; + + if (!valid) { + if (SegStartBufferIO(buf, true)) { + *foundPtr = false; + } + } + + return buf; } *foundPtr = FALSE; - LWLockRelease(new_partition_lock); - + new_partition_lock = BufMappingPartitionLock(new_hash); for (;;) { ReservePrivateRefCountEntry(); @@ -771,6 +784,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, } } + old_flags = buf_state & BUF_FLAG_MASK; old_flag_valid = old_flags & BM_TAG_VALID; if (old_flag_valid) { @@ -787,9 +801,11 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, old_partition_lock = NULL; } + buf_state = LockBufHdr(buf); buf_id = BufTableInsert(&new_tag, new_hash, buf->buf_id); if (buf_id >= 0) { + UnlockBufHdr(buf, buf_state); SegUnpinBuffer(buf); if (old_flag_valid && old_partition_lock != new_partition_lock) LWLockRelease(old_partition_lock); @@ -797,7 +813,6 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr); } - buf_state = LockBufHdr(buf); old_flags = buf_state & BUF_FLAG_MASK; if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY)) { @@ -815,11 +830,11 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, break; } } - UnlockBufHdr(buf, buf_state); BufTableDelete(&new_tag, new_hash); if (old_flag_valid && old_partition_lock != new_partition_lock) { LWLockRelease(old_partition_lock); } + UnlockBufHdr(buf, buf_state); LWLockRelease(new_partition_lock); SegUnpinBuffer(buf); } @@ -828,7 +843,6 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | BUF_USAGECOUNT_MASK); buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; - UnlockBufHdr(buf, buf_state); if (ENABLE_DMS) { GetDmsBufCtrl(buf->buf_id)->lock_mode = DMS_LOCK_NULL; @@ -842,6 +856,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, } } LWLockRelease(new_partition_lock); + UnlockBufHdr(buf, buf_state); *foundPtr = !SegStartBufferIO(buf, true); return buf; diff --git a/src/gausskernel/storage/smgr/segment/space.cpp b/src/gausskernel/storage/smgr/segment/space.cpp index 2c0467e68f..c34489e3c3 100644 --- a/src/gausskernel/storage/smgr/segment/space.cpp +++ b/src/gausskernel/storage/smgr/segment/space.cpp @@ -479,9 +479,6 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_ INIT_BUFFERTAG(tag, *rnode, forknum, logic_blocknum); uint32 hashcode = BufTableHashCode(&tag); - LWLock *partition_lock = BufMappingPartitionLock(hashcode); - - LWLockAcquire(partition_lock, LW_SHARED); int buf_id = BufTableLookup(&tag, hashcode); if (buf_id >= 0) { BufferDesc *buf = GetBufferDescriptor(buf_id); @@ -491,7 +488,10 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_ /* Pin the buffer to avoid invalidated by others */ bool valid = PinBuffer(buf, NULL); - LWLockRelease(partition_lock); + if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &tag)) { + UnpinBuffer(buf, true); + return InvalidBuffer; + } if (!valid) { UnpinBuffer(buf, true); @@ -500,7 +500,6 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_ return BufferDescriptorGetBuffer(buf); } - LWLockRelease(partition_lock); return InvalidBuffer; } -- Gitee