From 649b5fda4f00a2f9fb65b81864f29bb9cc2bcb4d Mon Sep 17 00:00:00 2001 From: dongning12 Date: Mon, 17 Apr 2023 11:27:40 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E8=B5=84=E6=BA=90=E6=B1=A0=E5=8C=96?= =?UTF-8?q?=E3=80=91opengauss=E4=BE=A7=E9=80=82=E9=85=8DDMS=E7=9A=84?= =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/gs_guc/cluster_guc.conf | 1 + .../backend/utils/misc/guc/guc_storage.cpp | 14 +++ .../utils/misc/postgresql_single.conf.sample | 1 + src/gausskernel/ddes/adapter/ss_dms.cpp | 7 ++ .../ddes/adapter/ss_dms_bufmgr.cpp | 27 ----- .../ddes/adapter/ss_dms_callback.cpp | 107 +++++++++++++++--- src/gausskernel/ddes/adapter/ss_init.cpp | 1 + src/gausskernel/storage/buffer/bufmgr.cpp | 8 ++ src/include/ddes/dms/ss_dms.h | 4 + src/include/ddes/dms/ss_dms_bufmgr.h | 3 +- .../knl/knl_guc/knl_instance_attr_storage.h | 1 + src/include/storage/proc.h | 3 + .../regress/output/recovery_2pc_tools.source | 1 + 13 files changed, 131 insertions(+), 47 deletions(-) diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 86d904f1ee..1f8313e4f0 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -721,6 +721,7 @@ ss_interconnect_type|string|0,0|NULL|NULL| ss_log_level|int|0,887|NULL|NULL| ss_log_backup_file_count|int|0,1024|NULL|NULL| ss_log_max_file_size|int|1024,4194304|kB|NULL| +ss_parallel_thread_count|int|0,64|NULL|NULL| ss_instance_id|int|0,63|NULL|NULL| ss_interconnect_url|string|0,0|NULL|NULL| ss_rdma_work_config|string|0,0|NULL|NULL| diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 868851d016..a98a463d8d 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -3567,6 +3567,20 @@ static void InitStorageConfigureNamesInt() 4 * 1024 *1024, NULL, assign_ss_log_max_file_size, + NULL}, + {{"ss_parallel_thread_count", + PGC_POSTMASTER, + NODE_SINGLENODE, + SHARED_STORAGE_OPTIONS, + gettext_noop("Sets ss reform parallel thread count"), + NULL, + GUC_SUPERUSER_ONLY}, + &g_instance.attr.attr_storage.dms_attr.parallel_thread_num, + 16, + 0, + 64, + NULL, + NULL, NULL}, /* End-of-list marker */ {{NULL, diff --git a/src/common/backend/utils/misc/postgresql_single.conf.sample b/src/common/backend/utils/misc/postgresql_single.conf.sample index 0b5a3e8bcd..60d178295e 100644 --- a/src/common/backend/utils/misc/postgresql_single.conf.sample +++ b/src/common/backend/utils/misc/postgresql_single.conf.sample @@ -845,3 +845,4 @@ job_queue_processes = 10 # Number of concurrent jobs, optional: [0..1000] #ss_log_level = 7 #ss_log_backup_file_count = 10 #ss_log_max_file_size = 10MB +#ss_parallel_thread_count = 16 \ No newline at end of file diff --git a/src/gausskernel/ddes/adapter/ss_dms.cpp b/src/gausskernel/ddes/adapter/ss_dms.cpp index 67b4a767dc..0cef80debc 100644 --- a/src/gausskernel/ddes/adapter/ss_dms.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms.cpp @@ -106,6 +106,7 @@ int ss_dms_func_init() SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_wait_reform)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_get_event)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_buf_res_rebuild_drc)); + SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_buf_res_rebuild_drc_parallel)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_is_recovery_session)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(drc_get_page_master_id)); SS_RETURN_IFERR(DMS_LOAD_SYMBOL_FUNC(dms_release_page_batch)); @@ -268,6 +269,12 @@ int dms_buf_res_rebuild_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsign return g_ss_dms_func.dms_buf_res_rebuild_drc(dms_ctx, ctrl, lsn, is_dirty); } +int dms_buf_res_rebuild_drc_parallel(dms_context_t *dms_ctx, dms_ctrl_info_t *ctrl_info, unsigned char thread_index, + unsigned char for_rebuild) +{ + return g_ss_dms_func.dms_buf_res_rebuild_drc_parallel(dms_ctx, ctrl_info, thread_index, for_rebuild); +} + int dms_is_recovery_session(unsigned int sid) { return g_ss_dms_func.dms_is_recovery_session(sid); diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 34c8714a6d..d09aaa85ac 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -533,33 +533,6 @@ void BufValidateDrc(BufferDesc *buf_desc) dms_validate_drc(&dms_ctx, buf_ctrl, lsn, (unsigned char)is_dirty); } -int32 CheckBuf4Rebuild(BufferDesc *buf_desc) -{ -#ifdef USE_ASSERT_CHECKING - if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) { - SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); - } else { - SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); - } -#endif - - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); - Assert(buf_ctrl != NULL); - Assert(buf_ctrl->is_edp != 1); - Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo)); - dms_context_t dms_ctx; - InitDmsBufContext(&dms_ctx, buf_desc->tag); - bool is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false; - int ret = dms_buf_res_rebuild_drc(&dms_ctx, buf_ctrl, (unsigned long long)BufferGetLSN(buf_desc), is_dirty); - if (ret != DMS_SUCCESS) { - ereport(LOG, (errmsg("Failed to rebuild page, rel:%u/%u/%u/%d, forknum:%d, blocknum:%u.", - buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, - buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum))); - return ret; - } - return DMS_SUCCESS; -} - int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, dms_opengauss_lock_req_type_t reqType) { diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 83a5336155..cfcb2eb0ae 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1065,34 +1065,102 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state) return ret; } -static int32 CBDrcBufRebuild(void *db_handle) +static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index) { - /* Load Control File */ - int src_id = SSGetPrimaryInstId(); - SSReadControlFile(src_id, true); +#ifdef USE_ASSERT_CHECKING + if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) { + SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); + } else { + SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); + } +#endif + + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); + Assert(buf_ctrl != NULL); + Assert(buf_ctrl->is_edp != 1); + Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo)); + dms_context_t dms_ctx; + InitDmsBufContext(&dms_ctx, buf_desc->tag); + dms_ctrl_info_t ctrl_info = { 0 }; + ctrl_info.ctrl = *buf_ctrl; + ctrl_info.lsn = (unsigned long long)BufferGetLSN(buf_desc); + ctrl_info.is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false; + int ret = dms_buf_res_rebuild_drc_parallel(&dms_ctx, &ctrl_info, thread_index, true); + if (ret != DMS_SUCCESS) { + ereport(WARNING, (errmsg("Failed to rebuild page, rel:%u/%u/%u/%d, forknum:%d, blocknum:%u.", + buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, + buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum))); + return ret; + } + return DMS_SUCCESS; +} +static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_index) +{ uint32 buf_state; - for (int i = 0; i < TOTAL_BUFFER_NUM; i++) { + Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM); + for (int i = begin; i < begin + len; i++) { BufferDesc *buf_desc = GetBufferDescriptor(i); - buf_state = LockBufHdr(buf_desc); - if (SSCheckBufferIfCanGoRebuild(buf_desc, buf_state)) { - int ret = CheckBuf4Rebuild(buf_desc); - if (ret != DMS_SUCCESS) { - if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) { - LWLockRelease(buf_desc->io_in_progress_lock); + if (LWLockConditionalAcquire(buf_desc->content_lock, LW_EXCLUSIVE)) { + buf_state = LockBufHdr(buf_desc); + if (buf_state & BM_VALID) { + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); + buf_ctrl->lock_mode = DMS_LOCK_NULL; + } + UnlockBufHdr(buf_desc, buf_state); + LWLockRelease(buf_desc->content_lock); + } else { + buf_state = LockBufHdr(buf_desc); + if (SSCheckBufferIfCanGoRebuild(buf_desc, buf_state)) { + int ret = SSRebuildBuf(buf_desc, thread_index); + if (ret != DMS_SUCCESS) { + if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) { + LWLockRelease(buf_desc->io_in_progress_lock); + } + UnlockBufHdr(buf_desc, buf_state); + return ret; } - UnlockBufHdr(buf_desc, buf_state); - return ret; } + if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) { + LWLockRelease(buf_desc->io_in_progress_lock); + } + UnlockBufHdr(buf_desc, buf_state); } - if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) { - LWLockRelease(buf_desc->io_in_progress_lock); - } - UnlockBufHdr(buf_desc, buf_state); } + ereport(LOG, (errmodule(MOD_DMS), + errmsg("[SS reform] rebuild buf thread_index:%d, buf_if start from:%d to:%d, max_buf_id:%d", + (int)thread_index, begin, (begin + len - 1), (TOTAL_BUFFER_NUM - 1)))); return GS_SUCCESS; } +/* + * as you can see, thread_num represets the number of thread. thread_index reprsents the n-th thread, begin from 0. + * special case: + * when parallel disable, rebuild phase still call this function, + * do you think thread_num is 1, and thread_index is 0 ? + * actually thread_num and thread_index are 255. It just a agreement in DMS + */ +const int dms_invalid_thread_index = 255; +const int dms_invalid_thread_num = 255; +static int32 CBDrcBufRebuildParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num, + unsigned char for_rebuild) +{ + Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) || + (thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num && + thread_index < thread_num)); + int buf_num = TOTAL_BUFFER_NUM / thread_num; + int buf_begin = thread_index * buf_num; + if (thread_index == thread_num - 1) { + buf_num = TOTAL_BUFFER_NUM - buf_begin; + } + + if (thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) { + buf_begin = 0; + buf_num = TOTAL_BUFFER_NUM; + } + return CBDrcBufRebuildInternal(buf_begin, buf_num, thread_index); +} + static int32 CBDrcBufValidate(void *db_handle) { /* Load Control File */ @@ -1514,6 +1582,9 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char } } + int old_primary = SSGetPrimaryInstId(); + SSReadControlFile(old_primary, true); + /* cluster has no transactions during startup reform */ if (!g_instance.dms_cxt.SSRecoveryInfo.startup_reform) { SendPostmasterSignal(PMSIGNAL_DMS_REFORM); @@ -1643,7 +1714,7 @@ void DmsInitCallback(dms_callback_t *callback) callback->opengauss_recovery_primary = CBRecoveryPrimary; callback->get_dms_status = CBGetDmsStatus; callback->set_dms_status = CBSetDmsStatus; - callback->dms_reform_rebuild_buf_res = CBDrcBufRebuild; + callback->dms_reform_rebuild_parallel = CBDrcBufRebuildParallel; callback->dms_thread_init = DmsCallbackThreadShmemInit; callback->confirm_owner = CBConfirmOwner; callback->confirm_converting = CBConfirmConverting; diff --git a/src/gausskernel/ddes/adapter/ss_init.cpp b/src/gausskernel/ddes/adapter/ss_init.cpp index 4913be62c7..9a07261bfe 100644 --- a/src/gausskernel/ddes/adapter/ss_init.cpp +++ b/src/gausskernel/ddes/adapter/ss_init.cpp @@ -371,6 +371,7 @@ static void setDMSProfile(dms_profile_t* profile) profile->inst_map = 0; profile->enable_reform = (unsigned char)dms_attr->enable_reform; profile->load_balance_mode = 1; /* primary-standby */ + profile->parallel_thread_num = dms_attr->parallel_thread_num; if (dms_attr->enable_ssl && g_instance.attr.attr_security.EnableSSL) { InitDmsSSL(); diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 09401934f8..93bbda82d2 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2435,6 +2435,14 @@ found_branch: Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); do { + if (!DmsCheckBufAccessible()) { + if(LWLockHeldByMe(bufHdr->io_in_progress_lock)) { + TerminateBufferIO(bufHdr, false, 0); + } + pg_usleep(5000L); + continue; + } + bool startio; if (LWLockHeldByMe(bufHdr->io_in_progress_lock)) { startio = true; diff --git a/src/include/ddes/dms/ss_dms.h b/src/include/ddes/dms/ss_dms.h index 5532f331db..f285f2ca4f 100644 --- a/src/include/ddes/dms/ss_dms.h +++ b/src/include/ddes/dms/ss_dms.h @@ -57,6 +57,8 @@ typedef struct st_ss_dms_func { void (*dms_get_event)(dms_wait_event_t event_type, unsigned long long *event_cnt, unsigned long long *event_time); int (*dms_buf_res_rebuild_drc)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn, unsigned char is_dirty); + int (*dms_buf_res_rebuild_drc_parallel)(dms_context_t *dms_ctx, dms_ctrl_info_t *ctrl_info, unsigned char thread_index, + unsigned char for_rebuild); int (*dms_is_recovery_session)(unsigned int sid); int (*drc_get_page_master_id)(char pageid[DMS_PAGEID_SIZE], unsigned char *master_id); int (*dms_release_page_batch)(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count); @@ -103,6 +105,8 @@ int dms_wait_reform(unsigned int *has_offline); void dms_get_event(dms_wait_event_t event_type, unsigned long long *event_cnt, unsigned long long *event_time); int dms_buf_res_rebuild_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn, unsigned char is_dirty); +int dms_buf_res_rebuild_drc_parallel(dms_context_t *dms_ctx, dms_ctrl_info_t *ctrl_info, unsigned char thread_index, + unsigned char for_rebuild); int dms_is_recovery_session(unsigned int sid); int drc_get_page_master_id(char pageid[DMS_PAGEID_SIZE], unsigned char *master_id); int dms_release_page_batch(dms_context_t *dms_ctx, dcs_batch_buf_t *owner_map, unsigned int *owner_count); diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 67752c59b4..a2b67a7cbf 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -50,7 +50,7 @@ typedef struct SSBroadcastDDLLock { void InitDmsBufCtrl(void); void InitDmsContext(dms_context_t* dmsContext); - +void InitDmsBufContext(dms_context_t* dmsBufCxt, BufferTag buftag); void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock *pblk); bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode); bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode); @@ -61,7 +61,6 @@ Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io); Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io); bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released); -int32 CheckBuf4Rebuild(BufferDesc* buf_desc); int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, dms_opengauss_lock_req_type_t reqType = LOCK_NORMAL_MODE); int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock); diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index f260ed0ec7..323359e13d 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -121,6 +121,7 @@ typedef struct knl_instance_attr_dms { int32 sslog_level; int32 sslog_backup_file_count; int32 sslog_max_file_size; //Unit:KB + int parallel_thread_num; } knl_instance_attr_dms; typedef struct knl_instance_attr_storage { diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index b4c772915d..aa5ead11ca 100755 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -451,6 +451,8 @@ const int MAX_COMPACTION_THREAD_NUM = 10; #define NUM_DMS_REFORM_CALLLBACK_PROCS (5) #define NUM_DMS_LSNR_CALLBACK_PROC (1) #define NUM_DMS_SMON_CALLBACK_PROC (1) +#define NUM_DMS_PARALLEL_CALLBACK_PROC (g_instance.attr.attr_storage.dms_attr.parallel_thread_num <= 1 ? 0 : \ + g_instance.attr.attr_storage.dms_attr.parallel_thread_num) #define NUM_DMS_RDMA_THREAD_CNT (g_instance.attr.attr_storage.dms_attr.work_thread_count * 2) #define NUM_DMS_CALLBACK_PROCS \ (g_instance.attr.attr_storage.dms_attr.enable_dms ? \ @@ -460,6 +462,7 @@ const int MAX_COMPACTION_THREAD_NUM = 10; NUM_DMS_RDMA_THREAD_CNT) + \ NUM_DMS_LSNR_CALLBACK_PROC + \ NUM_DMS_SMON_CALLBACK_PROC + \ + NUM_DMS_PARALLEL_CALLBACK_PROC + \ NUM_DMS_REFORM_CALLLBACK_PROCS ) : 0) #define GLOBAL_ALL_PROCS \ diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 9a40c0474a..d1d3d7102f 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -630,6 +630,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c ss_log_max_file_size | integer | kB | 1024 | 4194304 ssl_renegotiation_limit | integer | kB | 0 | 2147483647 ss_ock_log_path | string | | | + ss_parallel_thread_count | integer | | 0 | 64 ss_rdma_work_config | string | | | ss_recv_msg_pool_size | integer | kB | 1024 | 1048576 ss_scrlock_server_bind_core | string | | | -- Gitee