diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index bb31f1a6996c4c40705f86faa11094f420fddc2a..7d4a4c9bb28eba73eb547fffccb78faee95bb367 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -91,13 +91,10 @@ static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool chec { if (IsSegmentFileNode(buf_desc->tag.rnode)) { SegmentCheck(!IsSegmentPhysicalRelNode(buf_desc->tag.rnode)); + ereport(WARNING, (errmsg("buffer:%d is segdata page, bufdesc seginfo is empty", buffer))); SegPageLocation loc = seg_get_physical_location(buf_desc->tag.rnode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, check_standby); SegmentCheck(loc.blocknum != InvalidBlockNumber); - - ereport(DEBUG1, (errmsg("buffer:%d is segdata page, bufdesc seginfo is empty, calc segfileno:%d, segblkno:%u", - buffer, (int32)loc.extent_size, loc.blocknum))); - buf_desc->seg_fileno = (uint8)EXTENT_SIZE_TO_TYPE((int)loc.extent_size); buf_desc->seg_blockno = loc.blocknum; } @@ -287,9 +284,9 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X SmgrNetPageCheckDiskLSN(buf_desc, read_mode, pblk); #endif - TerminateBufferIO(buf_desc, false, BM_VALID); buffer = BufferDescriptorGetBuffer(buf_desc); - if (!RecoveryInProgress() || g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) { + if ((!RecoveryInProgress() || g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy) && + buf_desc->seg_fileno == EXTENT_INVALID) { CalcSegDmsPhysicalLoc(buf_desc, buffer, !g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy); } } @@ -300,6 +297,7 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X } ClearReadHint(buf_desc->buf_id); + TerminateBufferIO(buf_desc, false, BM_VALID); return buffer; } @@ -493,6 +491,14 @@ int32 CheckBuf4Rebuild(BufferDesc *buf_desc) return DMS_SUCCESS; } +#ifdef USE_ASSERT_CHECKING + if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) { + SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); + } else { + SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); + } +#endif + dms_context_t dms_ctx; InitDmsBufContext(&dms_ctx, buf_desc->tag); bool is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false; @@ -651,7 +657,7 @@ void SSRecheckBufferPool() void CheckPageNeedSkipInRecovery(Buffer buf) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1); - if (buf_ctrl->lock_mode == DMS_LOCK_EXCLUSIVE) { + if (buf_ctrl->lock_mode == DMS_LOCK_EXCLUSIVE || !(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) { return; } @@ -682,6 +688,11 @@ bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc) if (!ENABLE_DMS) { return true; } + + if (ENABLE_DSS_AIO && buf_desc->aio_in_progress) { + return false; + } + /** this page produced in flush_copy phase, should not eliminate and mark dirty now * when mark dirty: replay xlog * why not use SS_IN_FLUSHCOPY to judge @@ -693,4 +704,44 @@ bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc) } return true; -} \ No newline at end of file +} + +bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer) +{ + Buffer buf = BlockGetBuffer(buffer); + if (BufferIsInvalid(buf)) { + return false; + } + + BufferDesc *buf_desc = BufferGetBufferDescriptor(buf); + bool ret = false; + + if ((pg_atomic_read_u32(&buf_desc->state) & BM_VALID) && buf_desc->seg_fileno != EXTENT_INVALID) { + SMGR_READ_STATUS rdStatus; + if (reln->seg_space == NULL) { + reln->seg_space = spc_open(reln->smgr_rnode.node.spcNode, reln->smgr_rnode.node.dbNode, false); + } + + SegmentCheck(reln->seg_space); + RelFileNode fakenode = { + .spcNode = reln->smgr_rnode.node.spcNode, + .dbNode = reln->smgr_rnode.node.dbNode, + .relNode = buf_desc->seg_fileno, + .bucketNode = SegmentBktId, + .opt = 0 + }; + + seg_physical_read(reln->seg_space, fakenode, forknum, buf_desc->seg_blockno, (char *)buffer); + if (PageIsVerified((Page)buffer, buf_desc->seg_blockno)) { + rdStatus = SMGR_RD_OK; + } else { + rdStatus = SMGR_RD_CRC_ERROR; + } + + if (rdStatus == SMGR_RD_OK) { + ret = true; + } + } + + return ret; +} diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 77dbf485f6fffd71b636b74ae2151b298c56f717..8d7c9ec9a18a6f6a79436464ffb25cf409934612 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -535,6 +535,9 @@ static void tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl (void)LWLockAcquire(buf_desc->content_lock, content_mode); *buf_ctrl = GetDmsBufCtrl(buf_id); Assert(buf_id >= 0); + Assert((*buf_ctrl)->lock_mode != DMS_LOCK_NULL); + (*buf_ctrl)->seg_fileno = buf_desc->seg_fileno; + (*buf_ctrl)->seg_blockno = buf_desc->seg_blockno; } while (0); } PG_CATCH(); @@ -670,6 +673,17 @@ static void CBVerifyPage(dms_buf_ctrl_t *buf_ctrl, char *new_page) } BufferDesc *buf_desc = GetBufferDescriptor(buf_ctrl->buf_id); + + if (buf_desc->seg_fileno == EXTENT_INVALID) { + buf_desc->seg_fileno = buf_ctrl->seg_fileno; + buf_desc->seg_blockno = buf_ctrl->seg_blockno; + } else if (buf_desc->seg_fileno != buf_ctrl->seg_fileno || buf_desc->seg_blockno != buf_ctrl->seg_blockno) { + ereport(PANIC, (errmsg("[%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u", + buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, + buf_desc->tag.rnode.bucketNode, buf_desc->tag.rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, + buf_desc->seg_fileno, buf_desc->seg_blockno))); + } + /* page content is not valid */ if ((pg_atomic_read_u32(&buf_desc->state) & BM_VALID) == 0) { return; @@ -1305,13 +1319,6 @@ static int CBFlushCopy(void *db_handle, char *pageid) LockBuffer(buffer, BUFFER_LOCK_SHARE); BufferDesc* buf_desc = GetBufferDescriptor(buffer - 1); XLogRecPtr pagelsn = BufferGetLSN(buf_desc); -#ifdef USE_ASSERT_CHECKING - if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) { - SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, spc); - } else { - SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); - } -#endif if (XLByteLT(g_instance.dms_cxt.ckptRedo, pagelsn)) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer - 1); diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index 708473e759d3d5bd242cdb436ea7053d245d0b1d..cd2ebe496ad46eb76a31d43bdbda708c4367dccf 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -120,7 +120,7 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo dms_txn_info.snapshotxmin = InvalidTransactionId; } - if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) { + if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(FATAL, (errmsg("SSTransactionIdGetCommitSeqNo failed during reform, xid=%lu.", transactionId))); } @@ -141,7 +141,8 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo } break; } else { - if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) { + if (SS_IN_REFORM && + (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(FATAL, (errmsg("SSTransactionIdGetCommitSeqNo failed during reform, xid=%lu.", transactionId))); } pg_usleep(USECS_PER_SEC); @@ -203,7 +204,8 @@ bool SSTransactionIdDidCommit(TransactionId transactionId, bool* ret_did_commit) transactionId, did_commit))); break; } else { - if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) { + if (SS_IN_REFORM && + (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(FATAL, (errmsg("SSTransactionIdDidCommit failed during reform, xid=%lu.", transactionId))); } else if (SS_IN_REFORM) { return false; @@ -240,7 +242,8 @@ bool SSTransactionIdIsInProgress(TransactionId transactionId, bool *in_progress) transactionId, *in_progress))); break; } else { - if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) { + if (SS_IN_REFORM && + (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(FATAL, (errmsg("SSTransactionIdIsInProgress failed during reform, xid=%lu.", transactionId))); } else if (SS_IN_REFORM) { return false; @@ -267,7 +270,8 @@ TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, u ereport(DEBUG1, (errmsg("SS get update xid success, multixact xid=%lu, uxid=%lu.", xmax, update_xid))); break; } else { - if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) { + if (SS_IN_REFORM && + (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(FATAL, (errmsg("SSMultiXactIdGetUpdateXid failed during reform, xid=%lu.", xmax))); } pg_usleep(USECS_PER_SEC); diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index f468fccc27a685f85afbed100fe739880fc122e1..bf92e1722596e0f07e4fde9ee0d4afebe23d3f91 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,2 +1,2 @@ -dms_commit_id=3c50e0355d843eb2da9b9976c9d4c2bfaedb79ca +dms_commit_id=3bc68b32cf1c4e0fa85963ad18c0b7b4d6e83a26 dss_commit_id=5df6ee25ef2bb07849029c760051ca96aeb416bb diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index b52856f69336cf07747ef18505cf084424c2d8b4..38f066686def2693ff7dc33dc8222ce4bd12f637 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -3477,15 +3477,15 @@ static int ServerLoop(void) } /* if workload manager is off, we still use this thread to build user hash table */ if ((ENABLE_WORKLOAD_CONTROL || !WLMIsInfoInit()) && g_instance.pid_cxt.WLMCollectPID == 0 && - pmState == PM_RUN && !dummyStandbyMode && !SS_IN_REFORM) + pmState == PM_RUN && !dummyStandbyMode && !SS_STANDBY_MODE && !SS_IN_REFORM) g_instance.pid_cxt.WLMCollectPID = initialize_util_thread(WLM_WORKER); if (ENABLE_WORKLOAD_CONTROL && (g_instance.pid_cxt.WLMMonitorPID == 0) && (pmState == PM_RUN) && - !dummyStandbyMode && !SS_IN_REFORM) + !dummyStandbyMode && !SS_STANDBY_MODE && !SS_IN_REFORM) g_instance.pid_cxt.WLMMonitorPID = initialize_util_thread(WLM_MONITOR); if (ENABLE_WORKLOAD_CONTROL && (g_instance.pid_cxt.WLMArbiterPID == 0) && (pmState == PM_RUN) && - !dummyStandbyMode && !SS_IN_REFORM) + !dummyStandbyMode && !SS_STANDBY_MODE && !SS_IN_REFORM) g_instance.pid_cxt.WLMArbiterPID = initialize_util_thread(WLM_ARBITER); if (IS_PGXC_COORDINATOR && g_instance.attr.attr_sql.max_resource_package && @@ -6311,17 +6311,19 @@ static void reaper(SIGNAL_ARGS) /* if workload manager is off, we still use this thread to build user hash table */ if ((ENABLE_WORKLOAD_CONTROL || !WLMIsInfoInit()) && g_instance.pid_cxt.WLMCollectPID == 0 && - !dummyStandbyMode && !SS_IN_REFORM) { + !dummyStandbyMode && !SS_STANDBY_MODE && !SS_IN_REFORM) { /* DN need rebuild hash when upgrade to primary */ if (IS_PGXC_DATANODE) g_instance.wlm_cxt->stat_manager.infoinit = 0; g_instance.pid_cxt.WLMCollectPID = initialize_util_thread(WLM_WORKER); } - if (ENABLE_WORKLOAD_CONTROL && (g_instance.pid_cxt.WLMMonitorPID == 0) && !dummyStandbyMode) + if (ENABLE_WORKLOAD_CONTROL && (g_instance.pid_cxt.WLMMonitorPID == 0) && !dummyStandbyMode && + !SS_STANDBY_MODE && !SS_IN_REFORM) g_instance.pid_cxt.WLMMonitorPID = initialize_util_thread(WLM_MONITOR); - if (ENABLE_WORKLOAD_CONTROL && (g_instance.pid_cxt.WLMArbiterPID == 0) && !dummyStandbyMode) + if (ENABLE_WORKLOAD_CONTROL && (g_instance.pid_cxt.WLMArbiterPID == 0) && !dummyStandbyMode && + !SS_STANDBY_MODE && !SS_IN_REFORM) g_instance.pid_cxt.WLMArbiterPID = initialize_util_thread(WLM_ARBITER); if (IS_PGXC_COORDINATOR && g_instance.attr.attr_sql.max_resource_package && diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 1752b5388759e987d3a63870a555502be58e2d39..f41e18bba7c3cec00b011a03f9de340c56009cd9 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2211,8 +2211,6 @@ Buffer ReadBuffer_common_for_dms(ReadBufferMode readmode, BufferDesc* buf_desc, #ifdef USE_ASSERT_CHECKING buf_desc->lsn_dirty = InvalidXLogRecPtr; #endif - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(buf_desc, false, BM_VALID); t_thrd.vacuum_cxt.VacuumPageMiss++; if (t_thrd.vacuum_cxt.VacuumCostActive) diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 0a7141918fd56873daaaa44f8bd7adfb5cc2f5d8..f36fa8221205f89000217b49ac59e3370d1d2e63 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -1511,6 +1511,10 @@ SMGR_READ_STATUS seg_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blo { LOG_SMGR_API(reln->smgr_rnode, forknum, blocknum, "seg_read"); + if (ENABLE_DMS && SSSegRead(reln, forknum, buffer)) { + return SMGR_RD_OK; + } + Buffer seg_buffer = read_head_buffer(reln, forknum, false); if (ENABLE_DMS) { LockBuffer(seg_buffer, BUFFER_LOCK_SHARE); diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index ae679ee88a6c50205fa5487a1c2124e944c1b697..67722b855f8a58428cc77370d27893687c25c71a 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -284,6 +284,8 @@ typedef struct st_dms_buf_ctrl { unsigned int pblk_relno; unsigned int pblk_blkno; unsigned long long pblk_lsn; + unsigned char seg_fileno; + unsigned int seg_blockno; #endif }dms_buf_ctrl_t; @@ -784,7 +786,7 @@ typedef struct st_logger_param { #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 48 +#define DMS_LOCAL_VERSION 49 #ifdef __cplusplus } diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 0f9624b11c076da0cfd94e8b1a3978c5aeca5be6..c7c0631505225cf19d9197833b9ab78361010eb4 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -75,5 +75,6 @@ void SegNetPageCheckDiskLSN(BufferDesc* buf_desc, ReadBufferMode read_mode, SegS unsigned int DMSGetProcType4RequestPage(); void BufValidateDrc(BufferDesc *buf_desc); bool SSPageCheckIfCanEliminate(BufferDesc* buf_desc); +bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer); #endif