diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 97656b595384758ceada5becc41274ae5925a468..816195ea19bec5f203926c156ffad657b3c1140d 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -944,5 +944,59 @@ long SSGetBufSleepTime(int retry_times) if (retry_times < ss_buf_retry_threshold) { return 5000L * retry_times; } - return 1000L * 1000 * 20; + return SS_BUF_MAX_WAIT_TIME; } + +bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode) +{ + bool get_lock = false; + int wait_tickets = 2000; + int cur_tickets = 0; + + do { + get_lock = LWLockConditionalAcquire(lock, mode); + if (get_lock) { + break; + } + + pg_usleep(1000L); + cur_tickets++; + if (cur_tickets >= wait_tickets) { + break; + } + } while (true); + + if (!get_lock) { + ereport(WARNING, (errcode(MOD_DMS), (errmsg("[SS lwlock] request LWLock:%p timeout, LWLockMode:%d, timeout:2s", + lock, mode)))); + } + return get_lock; +} + +bool SSWaitIOTimeout(BufferDesc *buf) +{ + bool ret = false; + for (;;) { + uint32 buf_state; + buf_state = LockBufHdr(buf); + UnlockBufHdr(buf, buf_state); + + if (!(buf_state & BM_IO_IN_PROGRESS)) { + ret = true; + break; + } + ret = SSLWLockAcquireTimeout(buf->io_in_progress_lock, LW_SHARED); + if (ret) { + LWLockRelease(buf->io_in_progress_lock); + } + } + + if (!ret) { + BufferTag *tag = &buf->tag; + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] SSWaitIOTimeout, " + "buf_id:%d, io_in_progress_lock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, buf->buf_id, buf->io_in_progress_lock)))); + } + return ret; +} \ No newline at end of file diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 4adc3cd60ad7f7c910ad8b26250bc530ca66b9ac..21b4ccfc465efdac1c0b884d05d423c77de011e3 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -537,6 +537,7 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ LWLock *partition_lock = NULL; BufferDesc *buf_desc = NULL; RelFileNode relfilenode = tag->rnode; + bool get_lock = false; #ifdef USE_ASSERT_CHECKING if (IsSegmentPhysicalRelNode(relfilenode)) { @@ -557,7 +558,15 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ PG_TRY(); { do { - (void)LWLockAcquire(partition_lock, LW_SHARED); + get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED); + if (!get_lock) { + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " + "lock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, partition_lock)))); + ret = GS_TIMEOUT; + break; + } buf_id = BufTableLookup(tag, hash); if (buf_id < 0) { LWLockRelease(partition_lock); @@ -575,7 +584,12 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ } LWLockRelease(partition_lock); - WaitIO(buf_desc); + bool wait_success = SSWaitIOTimeout(buf_desc); + if (!wait_success) { + DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg); + ret = GS_TIMEOUT; + break; + } if (!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)) { ereport(WARNING, (errmodule(MOD_DMS), @@ -600,7 +614,16 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_ } LWLockMode content_mode = (mode == DMS_LOCK_SHARE) ? LW_SHARED : LW_EXCLUSIVE; - (void)LWLockAcquire(buf_desc->content_lock, content_mode); + get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, content_mode); + if (!get_lock) { + DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg); + ret = GS_TIMEOUT; + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " + "buf_id:%d, lwlock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock)))); + break; + } *buf_ctrl = GetDmsBufCtrl(buf_id); Assert(buf_id >= 0); if ((*buf_ctrl)->been_loaded == false) { @@ -686,7 +709,15 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig hash = BufTableHashCode(tag); partition_lock = BufMappingPartitionLock(hash); - (void)LWLockAcquire(partition_lock, LW_SHARED); + bool get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED); + if (!get_lock) { + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " + "lwlock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, partition_lock)))); + return GS_TIMEOUT; + } + buf_id = BufTableLookup(tag, hash); if (buf_id < 0) { /* not found in shared buffer */ @@ -708,7 +739,12 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig } LWLockRelease(partition_lock); - WaitIO(buf_desc); + bool wait_success = SSWaitIOTimeout(buf_desc); + if (!wait_success) { + ret = GS_TIMEOUT; + break; + } + if ((!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)) || (pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR)) { ereport(LOG, (errmodule(MOD_DMS), @@ -721,10 +757,18 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig bool can_invld_owner = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED | BM_PERMANENT)) > 0 ? false : true; if (!invld_owner || (invld_owner && can_invld_owner)) { - (void)LWLockAcquire(buf_desc->content_lock, LW_EXCLUSIVE); - buf_ctrl = GetDmsBufCtrl(buf_id); - buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL; - LWLockRelease(buf_desc->content_lock); + get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, LW_EXCLUSIVE); + if (!get_lock) { + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " + "buf_id:%d, lwlock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock)))); + ret = GS_TIMEOUT; + } else { + buf_ctrl = GetDmsBufCtrl(buf_id); + buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL; + LWLockRelease(buf_desc->content_lock); + } } else { /* invalidate owner which buffer is dirty/permanent */ ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%x", @@ -1258,13 +1302,14 @@ static int32 CBDrcBufValidate(void *db_handle) } // used for find bufferdesc in dms -static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_desc) +static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_desc) { int buf_id; uint32 hash; LWLock *partition_lock = NULL; BufferTag *tag = (BufferTag *)pageid; BufferDesc *buf_desc; + bool ret = true; RelFileNode relfilenode = tag->rnode; @@ -1285,7 +1330,16 @@ static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; PG_TRY(); { - (void)LWLockAcquire(partition_lock, LW_SHARED); + bool get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED); + if (!get_lock) { + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " + "lwlock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, partition_lock)))); + ret = false; + break; + } + buf_id = BufTableLookup(tag, hash); if (buf_id >= 0) { buf_desc = GetBufferDescriptor(buf_id); @@ -1297,7 +1351,13 @@ static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d } LWLockRelease(partition_lock); - WaitIO(buf_desc); + bool wait_success = SSWaitIOTimeout(buf_desc); + if (!wait_success) { + SSUnPinBuffer(buf_desc); + ret = false; + break; + } + Assert(!(pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR)); *is_valid = (pg_atomic_read_u32(&buf_desc->state) & BM_VALID) != 0; *ret_buf_desc = buf_desc; @@ -1312,6 +1372,7 @@ static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d ReleaseResource(); } PG_END_TRY(); + return ret; } void SSUnPinBuffer(BufferDesc* buf_desc) @@ -1330,7 +1391,13 @@ static int CBConfirmOwner(void *db_handle, char *pageid, unsigned char *lock_mod bool valid; dms_buf_ctrl_t *buf_ctrl = NULL; - SSGetBufferDesc(pageid, &valid, &buf_desc); + bool ret = SSGetBufferDesc(pageid, &valid, &buf_desc); + if (!ret) { + ereport(WARNING, (errmodule(MOD_DMS), + errmsg("[SS] CBConfirmOwner, require LWLock timeout"))); + return GS_TIMEOUT; + } + if (buf_desc == NULL) { *lock_mode = (uint8)DMS_LOCK_NULL; return GS_SUCCESS; @@ -1363,12 +1430,17 @@ static int CBConfirmConverting(void *db_handle, char *pageid, unsigned char smon BufferDesc *buf_desc = NULL; bool valid; dms_buf_ctrl_t *buf_ctrl = NULL; - bool timeout = false; *lsn = 0; *edp_map = 0; - SSGetBufferDesc(pageid, &valid, &buf_desc); + bool ret = SSGetBufferDesc(pageid, &valid, &buf_desc); + if (!ret) { + ereport(WARNING, (errmodule(MOD_DMS), + errmsg("[SS] CBConfirmConverting, require LWLock timeout"))); + return GS_TIMEOUT; + } + if (buf_desc == NULL) { *lock_mode = (uint8)DMS_LOCK_NULL; return GS_SUCCESS; @@ -1380,38 +1452,23 @@ static int CBConfirmConverting(void *db_handle, char *pageid, unsigned char smon return GS_SUCCESS; } - struct timeval begin_tv; - struct timeval now_tv; - (void)gettimeofday(&begin_tv, NULL); - long begin = GET_US(begin_tv); - long now; - - while (true) { - bool is_locked = LWLockConditionalAcquire(buf_desc->io_in_progress_lock, LW_EXCLUSIVE); - if (is_locked) { - buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); - *lock_mode = buf_ctrl->lock_mode; - LWLockRelease(buf_desc->io_in_progress_lock); - break; - } - - (void)gettimeofday(&now_tv, NULL); - now = GET_US(now_tv); - if (now - begin > REFORM_CONFIRM_TIMEOUT) { - timeout = true; - break; - } - pg_usleep(REFORM_CONFIRM_INTERVAL); /* sleep 5ms */ - } - - if (!timeout) { + bool get_lock = SSLWLockAcquireTimeout(buf_desc->io_in_progress_lock, LW_EXCLUSIVE); + if (get_lock) { + buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); + *lock_mode = buf_ctrl->lock_mode; + LWLockRelease(buf_desc->io_in_progress_lock); SSUnPinBuffer(buf_desc); return GS_SUCCESS; } if (smon_chk) { SSUnPinBuffer(buf_desc); - return GS_TIMEDOUT; + BufferTag *tag = &buf_desc->tag; + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, " + "buf_id:%d, lwlock:%p", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, buf_desc->buf_id, buf_desc->io_in_progress_lock)))); + return GS_TIMEOUT; } // without lock @@ -1840,7 +1897,13 @@ static int CBMarkNeedFlush(void *db_handle, char *pageid) BufferDesc *buf_desc = NULL; BufferTag *tag = (BufferTag *)pageid; - SSGetBufferDesc(pageid, &valid, &buf_desc); + bool ret = SSGetBufferDesc(pageid, &valid, &buf_desc); + if (!ret) { + ereport(WARNING, (errmodule(MOD_DMS), + errmsg("[SS] CBMarkNeedFlush, require LWLock timeout"))); + return GS_TIMEOUT; + } + if (buf_desc == NULL) { ereport(WARNING, (errmodule(MOD_DMS), errmsg("[SS] CBMarkNeedFlush, buf_desc not found"))); diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 63cf83f6f80b7cea7442d6ca98abd20958f590f4..67d058370a80c4a4f0dfb6729b1f3624916bc043 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -5976,7 +5976,15 @@ retry: } dms_retry_times++; - pg_usleep(SSGetBufSleepTime(dms_retry_times)); + long sleep_time = SSGetBufSleepTime(dms_retry_times); + if (sleep_time == SS_BUF_MAX_WAIT_TIME && !SS_IN_REFORM) { + volatile BufferTag *tag = &buf->tag; + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS buf][%u/%u/%u/%d %d-%u] request buf timeout, " + "buf_id:%d", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, buf->buf_id)))); + } + pg_usleep(sleep_time); goto retry; } } @@ -6085,7 +6093,15 @@ retry: } dms_retry_times++; - pg_usleep(SSGetBufSleepTime(dms_retry_times)); + long sleep_time = SSGetBufSleepTime(dms_retry_times); + if (sleep_time == SS_BUF_MAX_WAIT_TIME && !SS_IN_REFORM) { + volatile BufferTag *tag = &buf->tag; + ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS buf][%u/%u/%u/%d %d-%u] request buf timeout, " + "buf_id:%d", + tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode, + tag->forkNum, tag->blockNum, buf->buf_id)))); + } + pg_usleep(sleep_time); goto retry; } } diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 8807ad54b7319b5759c0bdc38090902c48bd9015..e21f5a908435f0c1cf7534a6e8bce289b7986e2b 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -29,6 +29,7 @@ #include "access/xlogproc.h" #define GetDmsBufCtrl(id) (&t_thrd.storage_cxt.dmsBufCtl[(id)]) +#define SS_BUF_MAX_WAIT_TIME (1000L * 1000 * 20) // 20s #define DmsInitLatch(drid, _type, _oid, _idx, _parent_part, _part, _uid) \ do { \ @@ -84,4 +85,6 @@ SMGR_READ_STATUS SmgrNetPageCheckRead(Oid spcNode, Oid dbNode, Oid relNode, Fork BlockNumber blockNo, char *blockbuf); void SSUnPinBuffer(BufferDesc* buf_desc); bool SSOndemandRequestPrimaryRedo(BufferTag tag); +bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode); +bool SSWaitIOTimeout(BufferDesc *buf); #endif diff --git a/src/include/storage/file/fio_device_com.h b/src/include/storage/file/fio_device_com.h index 007a80ef45e74874773df2480a080ff639d8c3ac..47faaf5d24cdab64a99b3c344e94a80e566926b0 100644 --- a/src/include/storage/file/fio_device_com.h +++ b/src/include/storage/file/fio_device_com.h @@ -54,6 +54,6 @@ extern uint64 XLogSegmentSize; #define GS_SUCCESS 0 #define GS_ERROR (-1) -#define GS_TIMEDOUT 1 +#define GS_TIMEOUT 1 #endif /* FIO_DEVICE_COM_H */