diff --git a/src/common/backend/catalog/builtin_funcs.ini b/src/common/backend/catalog/builtin_funcs.ini index 10c7458a4c1a9963a3040d0636907faff9cf54f4..154859411e4d05fe342ee20b42b2f5dd11df156f 100755 --- a/src/common/backend/catalog/builtin_funcs.ini +++ b/src/common/backend/catalog/builtin_funcs.ini @@ -8961,6 +8961,10 @@ "pg_terminate_session", 1, AddBuiltinFunc(_0(2099), _1("pg_terminate_session"), _2(2), _3(true), _4(false), _5(pg_terminate_session), _6(16), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(2, 20, 20), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("pg_terminate_session"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) ), + AddFuncGroup( + "pg_terminate_active_session_socket", 1, + AddBuiltinFunc(_0(3148), _1("pg_terminate_active_session_socket"), _2(2), _3(true), _4(false), _5(pg_terminate_active_session_socket), _6(16), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(2, 20, 20), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("pg_terminate_active_session_socket"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), AddFuncGroup( "pg_test_err_contain_err", 1, AddBuiltinFunc(_0(9999), _1("pg_test_err_contain_err"), _2(1), _3(true), _4(false), _5(pg_test_err_contain_err), _6(2278), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(1, 23), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("pg_test_err_contain_err"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) diff --git a/src/common/backend/parser/parse_utilcmd.cpp b/src/common/backend/parser/parse_utilcmd.cpp index 07f62561873323232ff20629bb22e3847014b388..574157df354f1f5b7fcdeb29731be1c34892ece8 100644 --- a/src/common/backend/parser/parse_utilcmd.cpp +++ b/src/common/backend/parser/parse_utilcmd.cpp @@ -1553,6 +1553,17 @@ static void transformTableLikeClause( * table can have different column numbers. */ attmap = (AttrNumber*)palloc0(sizeof(AttrNumber) * tupleDesc->natts); + int colCount = list_length(cxt->columns); + for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) { + Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1]; + if (attribute->attisdropped && (!u_sess->attr.attr_sql.enable_cluster_resize || RelationIsTsStore(relation))) + continue; + if (attribute->attkvtype == ATT_KV_HIDE && table_like_clause->options != CREATE_TABLE_LIKE_ALL) { + continue; + } + colCount++; + attmap[parent_attno - 1] = colCount; + } /* * Insert the copied attributes into the cxt for the new table definition. @@ -1655,8 +1666,6 @@ static void transformTableLikeClause( */ cxt->columns = lappend(cxt->columns, def); - attmap[parent_attno - 1] = list_length(cxt->columns); - /* * Copy default, if present and the default has been requested */ diff --git a/src/common/backend/utils/adt/misc.cpp b/src/common/backend/utils/adt/misc.cpp index f4370494174dbbdeb71f3f6fbeef6ec625482813..129b8254bf54045f1c34da513b756dad3832a576 100644 --- a/src/common/backend/utils/adt/misc.cpp +++ b/src/common/backend/utils/adt/misc.cpp @@ -340,6 +340,53 @@ Datum pg_terminate_session(PG_FUNCTION_ARGS) PG_RETURN_BOOL(r == 0); } +Datum pg_terminate_active_session_socket(PG_FUNCTION_ARGS) +{ + ThreadId tid = PG_GETARG_INT64(0); + uint64 sid = PG_GETARG_INT64(1); + + if (tid <= 0 || sid <= 0) { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid params, tid or sessionid must have real number"))); + } + + if (!initialuser()) { + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be initial user can use this function"))); + } + + ereport(LOG, (errmsg("pg_terminate_active_session_socket: tid %lu and sessionid %lu", tid, sid))); + + if (ENABLE_THREAD_POOL && tid != sid) { + int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(tid, sid); + if (count == 0) { + ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", tid, sid))); + PG_RETURN_BOOL(false); + } + } + + ProcArrayStruct *array_ptr = g_instance.proc_array_idx; + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + for (int index = 0; index < array_ptr->numProcs; index++) { + int pg_proc_no = array_ptr->pgprocnos[index]; + volatile PGPROC *proc = g_instance.proc_base_all_procs[pg_proc_no]; + VirtualTransactionId vxid; + GET_VXID_FROM_PGPROC(vxid, *proc); + + if (proc->pid != tid) { + continue; + } + + (void)SendProcSignal(tid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, vxid.backendId); + ereport(LOG, + (errmsg("pg_terminate_active_session_socket: send signal to tid %lu and sessionid %lu", tid, sid))); + break; + } + LWLockRelease(ProcArrayLock); + + PG_RETURN_BOOL(true); +} + /* * function name: pg_wlm_jump_queue * description : wlm jump the queue with thread id. diff --git a/src/common/backend/utils/cache/knl_localsysdbcache.cpp b/src/common/backend/utils/cache/knl_localsysdbcache.cpp index 5902e98c216f8b8fb8206627f82dc8a5c1060d98..a36f8c693feffa7fafbd119d97ac18d78e14b1a8 100644 --- a/src/common/backend/utils/cache/knl_localsysdbcache.cpp +++ b/src/common/backend/utils/cache/knl_localsysdbcache.cpp @@ -120,11 +120,11 @@ bool IsGotPoolReload() } void ResetGotPoolReload(bool value) { - if (EnableLocalSysCache()) { + if (!t_thrd.int_cxt.ignoreSessionBackendSignal) { u_sess->sig_cxt.got_pool_reload = value; + } + if (EnableLocalSysCache()) { t_thrd.lsc_cxt.lsc->got_pool_reload = value; - } else { - u_sess->sig_cxt.got_pool_reload = value; } } diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index 87dbef78f00c7ca4be99aaca004dcd349dfcc86e..7607aafcc9712427c64a3364e8c51f4143beec4e 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -59,7 +59,7 @@ bool open_join_children = true; bool will_shutdown = false; /* hard-wired binary version number */ -const uint32 GRAND_VERSION_NUM = 92609; +const uint32 GRAND_VERSION_NUM = 92610; const uint32 SQL_PATCH_VERSION_NUM = 92608; const uint32 PREDPUSH_SAME_LEVEL_VERSION_NUM = 92522; diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index e37744caee5795319a23e71adbbd83fee8ce44b1..7c535af485554e2f5e83656612c437fa1f319d84 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -6043,7 +6043,10 @@ void HandlePoolerReload(void) return; ResetGotPoolReload(true); - u_sess->sig_cxt.cp_PoolReload = true; + + if (!t_thrd.int_cxt.ignoreSessionBackendSignal) { + u_sess->sig_cxt.cp_PoolReload = true; + } } // HandleMemoryContextDump @@ -6061,6 +6064,39 @@ void HandleExecutorFlag(void) if (IS_PGXC_DATANODE) u_sess->exec_cxt.executorStopFlag = true; } + +void handle_terminate_active_sess_socket() +{ + /* If doing free_session_context, u_sess will be invalid pointer, just return */ + if (t_thrd.int_cxt.ignoreSessionBackendSignal) { + return; + } + + if (t_thrd.role == THREADPOOL_WORKER) { + if (u_sess->sig_cxt.got_terminate_sess_socket) { + t_thrd.threadpool_cxt.worker->GetGroup()->GetListener()->DelSessionFromEpoll(u_sess, false); + + int sock = u_sess->proc_cxt.MyProcPort->sock; + if (unlikely(u_sess->status == KNL_SESS_UNINIT)) { + u_sess->status = KNL_SESS_CLOSERAW; + } else { + u_sess->status = KNL_SESS_CLOSE; + } + u_sess->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET; + closesocket(sock); + u_sess->sig_cxt.got_terminate_sess_socket = false; + } + } else if (t_thrd.role == THREADPOOL_STREAM) { + if (u_sess->proc_cxt.MyProcPort->is_logic_conn) { + gs_close_gsocket(&(u_sess->proc_cxt.MyProcPort->gs_sock)); + } + } else { + int sock = u_sess->proc_cxt.MyProcPort->sock; + u_sess->proc_cxt.MyProcPort->sock = -1; + closesocket(sock); + } +} + /* * RecoveryConflictInterrupt: out-of-line portion of recovery conflict * handling following receipt of SIGUSR1. Designed to be similar to die() @@ -11801,6 +11837,7 @@ bool checkCompArgs(const char *compFormat) void ResetInterruptCxt() { t_thrd.int_cxt.ignoreBackendSignal = false; + t_thrd.int_cxt.ignoreSessionBackendSignal = false; t_thrd.int_cxt.InterruptHoldoffCount = 0; t_thrd.int_cxt.QueryCancelHoldoffCount = 0; diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 5fd6442e13a5ca35f5bc7ef9c439c556dad5c1f0..0785f32efc56086416dc030a8bb6f136619d825e 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -1574,8 +1574,10 @@ void free_session_context(knl_session_context* session) MemoryContextDeleteChildren(session->top_mem_cxt); MemoryContextDelete(session->top_mem_cxt); (void)syscalllockFree(&session->utils_cxt.deleMemContextMutex); + t_thrd.int_cxt.ignoreSessionBackendSignal = true; pfree_ext(session); use_fake_session(); + t_thrd.int_cxt.ignoreSessionBackendSignal = false; } bool stp_set_commit_rollback_err_msg(stp_xact_err_type type) diff --git a/src/gausskernel/process/threadpool/threadpool_listener.cpp b/src/gausskernel/process/threadpool/threadpool_listener.cpp index f89790746c2cec8c1f31783aab86b39b8de666a3..43de803d7d236402b3645e95665f8fd1e85041f6 100644 --- a/src/gausskernel/process/threadpool/threadpool_listener.cpp +++ b/src/gausskernel/process/threadpool/threadpool_listener.cpp @@ -205,6 +205,19 @@ void ThreadPoolListener::CreateEpoll() } } +void ThreadPoolListener::dispatch_socked_closed_session(knl_session_context* session) +{ + Assert(session->status != KNL_SESS_UNINIT); + m_idleSessionList->Remove(&session->elem); + if (unlikely(session->status == KNL_SESS_UNINIT)) { + session->status = KNL_SESS_CLOSERAW; + } else { + session->status = KNL_SESS_CLOSE; + } + session->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET; + AddIdleSessionToHead(session); +} + void ThreadPoolListener::AddEpoll(knl_session_context* session) { struct epoll_event ev = {0}; @@ -236,16 +249,13 @@ void ThreadPoolListener::AddEpoll(knl_session_context* session) } } if (unlikely(res != 0)) { -#ifdef USE_ASSERT_CHECKING - ereport(PANIC, -#else ereport(WARNING, -#endif (errmodule(MOD_THREAD_POOL), errmsg("epoll_ctl fail %m, sess status:%d, sock:%d, host:%s, port:%s", session->status, session->proc_cxt.MyProcPort->sock, session->proc_cxt.MyProcPort->remote_host, session->proc_cxt.MyProcPort->remote_port))); + dispatch_socked_closed_session(session); } } @@ -462,7 +472,7 @@ void ThreadPoolListener::DispatchSession(knl_session_context* session) } } -void ThreadPoolListener::DelSessionFromEpoll(knl_session_context* session) +void ThreadPoolListener::DelSessionFromEpoll(knl_session_context* session, bool sub_count) { if (ENABLE_THREAD_POOL_DN_LOGICCONN) { struct epoll_event ev = {0}; @@ -476,7 +486,10 @@ void ThreadPoolListener::DelSessionFromEpoll(knl_session_context* session) #endif comm_epoll_ctl(m_epollFd, EPOLL_CTL_DEL, session->proc_cxt.MyProcPort->sock, NULL); } - (void)pg_atomic_fetch_sub_u32((volatile uint32*)&m_group->m_sessionCount, 1); + + if (sub_count) { + (void)pg_atomic_fetch_sub_u32((volatile uint32*)&m_group->m_sessionCount, 1); + } } void ThreadPoolListener::RemoveWorkerFromList(ThreadPoolWorker* worker) diff --git a/src/gausskernel/process/threadpool/threadpool_sessctl.cpp b/src/gausskernel/process/threadpool/threadpool_sessctl.cpp index 69d3e05b0fc9b64ce5c26dae55e57079e3f4aa05..67ddf10da48635582e8474024880bd3b0c74947f 100755 --- a/src/gausskernel/process/threadpool/threadpool_sessctl.cpp +++ b/src/gausskernel/process/threadpool/threadpool_sessctl.cpp @@ -478,6 +478,33 @@ void ThreadPoolSessControl::HandlePoolerReload() alock.unLock(); } +int ThreadPoolSessControl::terminate_session_socket(ThreadId tid, uint64 session_id) +{ + int count = 0; + AutoMutexLock alock(&m_sessCtrlock); + alock.lock(); + + knl_sess_control* ctrl = NULL; + Dlelem* elem = DLGetHead(&m_activelist); + while (elem != NULL) { + ctrl= (knl_sess_control*)DLE_VAL(elem); + knl_session_context* sess = ctrl->sess; + /* Only active session can be closed socket */ + if (sess == NULL || sess->attachPid != tid || sess->session_id != session_id || + sess->status != KNL_SESS_ATTACH) { + elem = DLGetSucc(elem); + continue; + } + + ctrl->sess->sig_cxt.got_terminate_sess_socket = true; + count++; + elem = DLGetSucc(elem); + break; + } + alock.unLock(); + return count; +} + void ThreadPoolSessControl::calculateSessMemCxtStats( knl_session_context* sess, const MemoryContext context, Tuplestorestate* tupStore, TupleDesc tupDesc) { diff --git a/src/gausskernel/process/threadpool/threadpool_worker.cpp b/src/gausskernel/process/threadpool/threadpool_worker.cpp index e5b96bd6ac615452add34778846b85b2baef5584..fbb42bcf268ecba0f33b09423c329cce76a44288 100644 --- a/src/gausskernel/process/threadpool/threadpool_worker.cpp +++ b/src/gausskernel/process/threadpool/threadpool_worker.cpp @@ -726,7 +726,7 @@ void ThreadPoolWorker::CleanUpSession(bool threadexit) } /* Close Session. */ - m_group->GetListener()->DelSessionFromEpoll(m_currentSession); + m_group->GetListener()->DelSessionFromEpoll(m_currentSession, true); if (m_currentSession->proc_cxt.PassConnLimit) { SpinLockAcquire(&g_instance.conn_cxt.ConnCountLock); diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 19ac17d7bc05c45c540d391946047e6e9ad7e5af..8de23dd916cf56c1f6641d608a2e29daab64f262 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -5624,6 +5624,7 @@ bool ConditionalLockBuffer(Buffer buffer) void LockBufferForCleanup(Buffer buffer) { BufferDesc *buf_desc = NULL; + int retry_count = 0; Assert(BufferIsValid(buffer)); Assert(t_thrd.storage_cxt.PinCountWaitBuf == NULL); @@ -5673,10 +5674,11 @@ void LockBufferForCleanup(Buffer buffer) /* Wait to be signaled by UnpinBuffer() */ if (InHotStandby && g_supportHotStandby) { + retry_count++; /* Publish the bufid that Startup process waits on */ parallel_recovery::SetStartupBufferPinWaitBufId(buffer - 1); /* Set alarm and then wait to be signaled by UnpinBuffer() */ - ResolveRecoveryConflictWithBufferPin(); + ResolveRecoveryConflictWithBufferPin(retry_count); /* Reset the published bufid */ parallel_recovery::SetStartupBufferPinWaitBufId(-1); diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index 933d898db407f652948989afafdfe33795c2469d..a835c951a06bd395c6257c1ab6f6aef36ec8cec3 100755 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -3051,11 +3051,13 @@ VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbO * * Returns pid of the process signaled, or 0 if not found. */ -ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode) +ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode, int retry_count) { ProcArrayStruct* arrayP = g_instance.proc_array_idx; int index; ThreadId pid = 0; + uint64 sessionid = 0; + const int max_retry_count = 1000; LWLockAcquire(ProcArrayLock, LW_SHARED); @@ -3069,13 +3071,27 @@ ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalRe if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { proc->recoveryConflictPending = true; pid = proc->pid; + sessionid = proc->sessionid; if (pid != 0) { /* * Kill the pid if it's still here. If not, that's what we * wanted so ignore any errors. */ - (void)SendProcSignal(pid, sigmode, vxid.backendId); + if (retry_count < max_retry_count) { + (void)SendProcSignal(pid, sigmode, vxid.backendId); + } else { + if (ENABLE_THREAD_POOL && pid != sessionid) { + int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(pid, sessionid); + if (count == 0) { + ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", + pid, sessionid))); + } + } + (void)SendProcSignal(pid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, vxid.backendId); + ereport(LOG, (errmsg("CancelVirtualTransaction: send signal to tid %lu and sessionid %lu", + pid, sessionid))); + } } break; @@ -3260,11 +3276,13 @@ int CountDBActiveBackends(Oid database_oid) /* * CancelDBBackends --- cancel backends that are using specified database */ -void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending) +void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending, int retry_count) { ProcArrayStruct* arrayP = g_instance.proc_array_idx; int index; ThreadId pid = 0; + uint64 sessionid = 0; + const int max_retry_count = 1000; /* tell all backends to die */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -3280,13 +3298,27 @@ void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPen proc->recoveryConflictPending = conflictPending; pid = proc->pid; + sessionid = proc->sessionid; if (pid != 0) { /* * Kill the pid if it's still here. If not, that's what we * wanted so ignore any errors. */ - (void)SendProcSignal(pid, sigmode, procvxid.backendId); + if (retry_count < max_retry_count) { + (void)SendProcSignal(pid, sigmode, procvxid.backendId); + } else { + if (ENABLE_THREAD_POOL && pid != sessionid) { + int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(pid, sessionid); + if (count == 0) { + ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", + pid, sessionid))); + } + } + (void)SendProcSignal(pid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, procvxid.backendId); + ereport(LOG, (errmsg("CancelDBBackends: send signal to tid %lu and sessionid %lu", + pid, sessionid))); + } } } } diff --git a/src/gausskernel/storage/ipc/procsignal.cpp b/src/gausskernel/storage/ipc/procsignal.cpp index 0d4bf4edd5c3c30640ae75661942ef5c87bd14c9..d78e54e0e63aa986dfe01934e9eaddebe85a13e7 100755 --- a/src/gausskernel/storage/ipc/procsignal.cpp +++ b/src/gausskernel/storage/ipc/procsignal.cpp @@ -312,6 +312,10 @@ void procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET)) { + handle_terminate_active_sess_socket(); + } + latch_sigusr1_handler(); errno = save_errno; diff --git a/src/gausskernel/storage/ipc/standby.cpp b/src/gausskernel/storage/ipc/standby.cpp index b54b52d88c19380fa95af142f0b2c89bdb36b4c7..36aa61e4707e656ba0c804142f3aa672aaa0312e 100755 --- a/src/gausskernel/storage/ipc/standby.cpp +++ b/src/gausskernel/storage/ipc/standby.cpp @@ -36,7 +36,7 @@ #include "utils/snapmgr.h" #include "replication/walreceiver.h" static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId* waitlist, TransactionId* xminArray, - ProcSignalReason reason, TimestampTz waitStart); + ProcSignalReason reason, TimestampTz waitStart, int retry_count); static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid); static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock* locks); static void LogReleaseAccessExclusiveLocks(int nlocks, xl_standby_lock* locks); @@ -185,8 +185,8 @@ static bool WaitExceedsMaxStandbyDelay(TimestampTz startTime) * a specific rmgr. Here we just issue the orders to the procs. The procs * then throw the required error as instructed. */ -static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId* waitlist, TransactionId* xminArray, - ProcSignalReason reason, TimestampTz waitStart) +static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, TransactionId *xminArray, + ProcSignalReason reason, TimestampTz waitStart, int retry_count) { char* new_status = NULL; bool waited = false; @@ -240,7 +240,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId* waitlis * Now find out who to throw out of the balloon. */ Assert(VirtualTransactionIdIsValid(*waitlist)); - pid = CancelVirtualTransaction(*waitlist, reason); + pid = CancelVirtualTransaction(*waitlist, reason, retry_count); /* * Wait a little bit for it to die so that we avoid flooding * an unresponsive backend when system is heavily loaded. @@ -280,6 +280,7 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R { VirtualTransactionId* backends = NULL; TimestampTz waitStart; + int retry_count; /* * If we get passed InvalidTransactionId then we are a little surprised, * but it is theoretically possible in normal running. It also happens @@ -315,6 +316,7 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R } waitStart = GetCurrentTimestamp(); + retry_count = 0; while (true) { backends = GetConflictingVirtualXIDs(latestRemovedXid, node.dbNode, lsn, limitXminCSN, t_thrd.storage_cxt.xminArray); @@ -322,8 +324,9 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R break; } + retry_count++; ResolveRecoveryConflictWithVirtualXIDs(backends, t_thrd.storage_cxt.xminArray, - PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart); + PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart, retry_count); } } @@ -331,6 +334,7 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid { VirtualTransactionId* backends = NULL; TimestampTz waitStart; + int retry_count; /* * If we get passed InvalidTransactionId then we are a little surprised, * but it is theoretically possible in normal running. It also happens @@ -352,6 +356,7 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid } waitStart = GetCurrentTimestamp(); + retry_count = 0; while (true) { backends = GetConflictingVirtualXIDs(latestRemovedXid, dbid, lsn, InvalidCommitSeqNo, t_thrd.storage_cxt.xminArray); @@ -359,8 +364,9 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid break; } + retry_count++; ResolveRecoveryConflictWithVirtualXIDs(backends, t_thrd.storage_cxt.xminArray, - PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart); + PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart, retry_count); } } @@ -369,6 +375,7 @@ void ResolveRecoveryConflictWithTablespace(Oid tsid) { VirtualTransactionId* temp_file_users = NULL; TimestampTz waitStart; + int retry_count; /* * Standby users may be currently using this tablespace for their * temporary files. We only care about current users because @@ -387,19 +394,22 @@ void ResolveRecoveryConflictWithTablespace(Oid tsid) * We don't wait for commit because drop tablespace is non-transactional. */ waitStart = GetCurrentTimestamp(); + retry_count = 0; while (true) { temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId, InvalidOid); if (!VirtualTransactionIdIsValid(*temp_file_users)) { break; } + retry_count++; ResolveRecoveryConflictWithVirtualXIDs(temp_file_users, NULL, - PROCSIG_RECOVERY_CONFLICT_TABLESPACE, waitStart); + PROCSIG_RECOVERY_CONFLICT_TABLESPACE, waitStart, retry_count); } } void ResolveRecoveryConflictWithDatabase(Oid dbid) { + int retry_count; /* * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that * only waits for transactions and completely idle sessions would block @@ -411,8 +421,10 @@ void ResolveRecoveryConflictWithDatabase(Oid dbid) * block during InitPostgres() and then disconnect when they see the * database has been removed. */ + retry_count = 0; while (CountDBActiveBackends(dbid) > 0) { - CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true); + retry_count++; + CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true, retry_count); /* * Wait awhile for them to die so that we avoid flooding an @@ -429,6 +441,7 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) int num_attempts = 0; LOCKTAG locktag; TimestampTz waitStart; + int retry_count; SET_LOCKTAG_RELATION(locktag, dbOid, relOid); @@ -441,13 +454,15 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) * justifies the means. */ waitStart = GetCurrentTimestamp(); + retry_count = 0; while (!lock_acquired) { if (++num_attempts < 3) backends = GetLockConflicts(&locktag, AccessExclusiveLock); else backends = GetConflictingVirtualXIDs(InvalidTransactionId, InvalidOid); - ResolveRecoveryConflictWithVirtualXIDs(backends, NULL, PROCSIG_RECOVERY_CONFLICT_LOCK, waitStart); + retry_count++; + ResolveRecoveryConflictWithVirtualXIDs(backends, NULL, PROCSIG_RECOVERY_CONFLICT_LOCK, waitStart, retry_count); if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false) != LOCKACQUIRE_NOT_AVAIL) lock_acquired = true; @@ -479,7 +494,7 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) * so we don't do a deadlock check right away ... only if we have had to wait * at least deadlock_timeout. Most of the logic about that is in proc.c. */ -void ResolveRecoveryConflictWithBufferPin(void) +void ResolveRecoveryConflictWithBufferPin(int retry_count) { bool sig_alarm_enabled = false; TimestampTz ltime; @@ -505,7 +520,7 @@ void ResolveRecoveryConflictWithBufferPin(void) /* * We're already behind, so clear a path as quickly as possible. */ - SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, retry_count); } else { /* * Wake up at ltime, and check for deadlocks as well if we will be @@ -531,7 +546,7 @@ void ResolveRecoveryConflictWithBufferPin(void) ereport(LOG, (errmsg("buffer pin confilict resolve ok, proc time %d ms", ComputeTimeStamp(now)))); } -void SendRecoveryConflictWithBufferPin(ProcSignalReason reason) +void SendRecoveryConflictWithBufferPin(ProcSignalReason reason, int retry_count) { Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN || reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); @@ -541,7 +556,7 @@ void SendRecoveryConflictWithBufferPin(ProcSignalReason reason) * conflict flag yet, since most backends will be innocent. Let the * SIGUSR1 handling in each backend decide their own fate. */ - CancelDBBackends(InvalidOid, reason, false); + CancelDBBackends(InvalidOid, reason, false, retry_count); } /* diff --git a/src/gausskernel/storage/page/pageparse.cpp b/src/gausskernel/storage/page/pageparse.cpp index 604bde04ae1d72404cc1d76a96b27433a3c5def1..babf41f4d360af71b21f495709deb99d4508bfbb 100644 --- a/src/gausskernel/storage/page/pageparse.cpp +++ b/src/gausskernel/storage/page/pageparse.cpp @@ -355,6 +355,15 @@ static void ParseTupleHeader(const PageHeader page, uint lineno, char *strOutput static void ParseHeapPage(const PageHeader page, BlockNumber blockNum, char *strOutput, BlockNumber block_endpoint) { errno_t rc = EOK; + if (PageIsNew(page)) { + rc = snprintf_s(strOutput + (int)strlen(strOutput), MAXOUTPUTLEN, MAXOUTPUTLEN - 1, + "Page information of block %u/%u : new page\n", blockNum, block_endpoint); + securec_check_ss(rc, "\0", "\0"); + ParseHeapHeader(page, strOutput, blockNum, block_endpoint); + rc = snprintf_s(strOutput + (int)strlen(strOutput), MAXOUTPUTLEN, MAXOUTPUTLEN - 1, "\n"); + securec_check_ss(rc, "\0", "\0"); + return; + } if (page->pd_lower < GetPageHeaderSize(page) || page->pd_lower > page->pd_upper || page->pd_upper > page->pd_special || page->pd_special > BLCKSZ || page->pd_special != MAXALIGN(page->pd_special)) { @@ -531,7 +540,8 @@ static void ParseOnePage(const PageHeader page, BlockNumber blockNum, char *strO { errno_t rc = EOK; if (strcmp(relation_type, "heap") == 0) { - if (PG_HEAP_PAGE_LAYOUT_VERSION != (uint16)PageGetPageLayoutVersion(page) || PageGetSpecialSize(page) != 0) { + if ((PG_HEAP_PAGE_LAYOUT_VERSION != (uint16)PageGetPageLayoutVersion(page) && + (uint16)PageGetPageLayoutVersion(page) != 0) || PageGetSpecialSize(page) != 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("The target page is not heap, the given page version is: %u", (uint16)PageGetPageLayoutVersion(page))))); @@ -554,14 +564,16 @@ static void ParseOnePage(const PageHeader page, BlockNumber blockNum, char *strO } ParseUHeapPage((void *)page, blockNum, block_endpoint, strOutput, dumpUndo); } else if (strcmp(relation_type, "btree") == 0) { - if (PG_COMM_PAGE_LAYOUT_VERSION != (uint16)PageGetPageLayoutVersion(page) || PageGetSpecialSize(page) == 0) { + if ((PG_COMM_PAGE_LAYOUT_VERSION != (uint16)PageGetPageLayoutVersion(page) && + (uint16)PageGetPageLayoutVersion(page) != 0) || PageGetSpecialSize(page) == 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("The target page is not btree, the given page version is: %u", (uint16)PageGetPageLayoutVersion(page))))); } ParseIndexPage((void *)page, BTREE_INDEX, blockNum, block_endpoint, strOutput); } else if (strcmp(relation_type, "ubtree") == 0) { - if (PG_COMM_PAGE_LAYOUT_VERSION != (uint16)PageGetPageLayoutVersion(page) || PageGetSpecialSize(page) == 0) { + if ((PG_COMM_PAGE_LAYOUT_VERSION != (uint16)PageGetPageLayoutVersion(page) && + (uint16)PageGetPageLayoutVersion(page) != 0) || PageGetSpecialSize(page) == 0) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("The target page is not ubtree, the given page version is: %u", (uint16)PageGetPageLayoutVersion(page))))); diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_610.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_610.sql new file mode 100644 index 0000000000000000000000000000000000000000..bfaa53d63431a016cbec17b6bcd85ef7dfe9bffc --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_610.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_active_session_socket() cascade; diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_610.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_610.sql new file mode 100644 index 0000000000000000000000000000000000000000..bfaa53d63431a016cbec17b6bcd85ef7dfe9bffc --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_610.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_active_session_socket() cascade; diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_610.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_610.sql new file mode 100644 index 0000000000000000000000000000000000000000..1b83f5ba22b33677ba820e52e9cdf1166bab5cf5 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_610.sql @@ -0,0 +1,5 @@ +set local inplace_upgrade_next_system_object_oids = IUO_PROC, 3148; +CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_active_session_socket + (IN threadid BIGINT, + IN sessionid BIGINT) +RETURNS bool LANGUAGE INTERNAL NOT FENCED as 'pg_terminate_active_session_socket'; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_610.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_610.sql new file mode 100644 index 0000000000000000000000000000000000000000..1b83f5ba22b33677ba820e52e9cdf1166bab5cf5 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_610.sql @@ -0,0 +1,5 @@ +set local inplace_upgrade_next_system_object_oids = IUO_PROC, 3148; +CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_active_session_socket + (IN threadid BIGINT, + IN sessionid BIGINT) +RETURNS bool LANGUAGE INTERNAL NOT FENCED as 'pg_terminate_active_session_socket'; \ No newline at end of file diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 50bc622d6456292b152bd9ce70058be32fd14967..b7bbfdbdadfe823fbb082174a4755b650a269144 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -196,6 +196,7 @@ typedef struct knl_u_sig_context { */ volatile sig_atomic_t got_pool_reload; volatile sig_atomic_t cp_PoolReload; + volatile sig_atomic_t got_terminate_sess_socket; } knl_u_sig_context; class AutonomousSession; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 59cb57eaa79818b711c44c47151094814636fd2a..cdf81313a0b1f9670472d3f1a07091bb11ed5658 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -1427,6 +1427,7 @@ typedef struct knl_t_interrupt_context { volatile bool ignoreBackendSignal; /* ignore signal for threadpool worker */ + volatile bool ignoreSessionBackendSignal; /* ignore signal for u_session */ } knl_t_interrupt_context; typedef int64 pg_time_t; diff --git a/src/include/postgres.h b/src/include/postgres.h index 156cc0cae15e8bc04b147a08dea6cbc96c456b60..25b2c3769514c4c18bbc6b0102de389944d46b1a 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -958,6 +958,7 @@ extern size_t mmap_threshold; void HandlePoolerReload(void); void HandleMemoryContextDump(void); void HandleExecutorFlag(void); +void handle_terminate_active_sess_socket(); extern void start_xact_command(void); extern void finish_xact_command(void); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index b84a8c6b5ccb17546ed4851bef006454e802f47a..7928ee7e506f2bcc8812f60d4b55d38cb0fbd893 100755 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -114,13 +114,14 @@ extern VirtualTransactionId* GetCurrentVirtualXIDs( extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid, XLogRecPtr lsn = 0, CommitSeqNo limitXminCSN = InvalidCommitSeqNo, TransactionId* xminArray = NULL); -extern ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode); +extern ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode, + int retry_count); extern bool MinimumActiveBackends(int min); extern int CountDBBackends(Oid database_oid); extern int CountDBActiveBackends(Oid database_oid); extern int CountSingleNodeActiveBackends(Oid databaseOid, Oid userOid); -extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending); +extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending, int retry_count); extern void CancelSingleNodeBackends(Oid databaseOid, Oid userOid, ProcSignalReason sigmode, bool conflictPending); extern int CountUserBackends(Oid roleid); extern bool CountOtherDBBackends(Oid databaseId, int* nbackends, int* nprepared); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 5b01df0b4284a2c8a48dcd3963621b24a6ac66b4..b82cfbb61936bff95aa30fbde1f8f2dc1300e0c6 100755 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -56,6 +56,9 @@ typedef enum { PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, PROCSIG_EXECUTOR_FLAG, + /* close active session socket */ + PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 165c386e0e606c63372ebc27b95b2afd510b6b7d..87972602ea9bd5a10efd1bf24248e287af5ee9c7 100755 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,8 +30,8 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); -extern void ResolveRecoveryConflictWithBufferPin(void); -extern void SendRecoveryConflictWithBufferPin(ProcSignalReason reason); +extern void ResolveRecoveryConflictWithBufferPin(int retry_count); +extern void SendRecoveryConflictWithBufferPin(ProcSignalReason reason, int retry_count = 0); extern void CheckRecoveryConflictDeadlock(void); /* diff --git a/src/include/threadpool/threadpool_listener.h b/src/include/threadpool/threadpool_listener.h index 07117bb55a723294d7f5c2f1aed84b9cfc58d2fa..0090c1e431182be8db8f65ef7f5ce9399d78b382 100644 --- a/src/include/threadpool/threadpool_listener.h +++ b/src/include/threadpool/threadpool_listener.h @@ -45,9 +45,10 @@ public: bool TryFeedWorker(ThreadPoolWorker* worker); void AddNewSession(knl_session_context* session); void WaitTask(); - void DelSessionFromEpoll(knl_session_context* session); + void DelSessionFromEpoll(knl_session_context* session, bool sub_count); void RemoveWorkerFromList(ThreadPoolWorker* worker); void AddEpoll(knl_session_context* session); + void dispatch_socked_closed_session(knl_session_context* session); void SendShutDown(); void ReaperAllSession(); void ShutDown() const; diff --git a/src/include/threadpool/threadpool_sessctl.h b/src/include/threadpool/threadpool_sessctl.h index 842f0061cc1a5ee391200935733581e016b24fce..b0809f497a4215e694c5004c2d959e186a9f2a3f 100644 --- a/src/include/threadpool/threadpool_sessctl.h +++ b/src/include/threadpool/threadpool_sessctl.h @@ -59,6 +59,7 @@ public: bool ValidDBoidAndUseroid(Oid dbOid, Oid userOid, knl_sess_control* ctrl); void SigHupHandler(); void HandlePoolerReload(); + int terminate_session_socket(ThreadId tid, uint64 session_id); void CheckSessionTimeout(); #ifndef ENABLE_MULTIPLE_NODES void CheckIdleInTransactionSessionTimeout(); diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 5de8a4d12d7998b61045f2bea92baacf01a1266f..07649e31b42bb5c256e8772159a517f78477dc82 100755 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -607,6 +607,7 @@ extern Datum pg_cancel_session(PG_FUNCTION_ARGS); extern Datum pg_cancel_invalid_query(PG_FUNCTION_ARGS); extern Datum pg_terminate_backend(PG_FUNCTION_ARGS); extern Datum pg_terminate_session(PG_FUNCTION_ARGS); +extern Datum pg_terminate_active_session_socket(PG_FUNCTION_ARGS); extern Datum pg_reload_conf(PG_FUNCTION_ARGS); extern Datum pg_tablespace_databases(PG_FUNCTION_ARGS); extern Datum pg_tablespace_location(PG_FUNCTION_ARGS);