diff --git a/src/common/backend/catalog/builtin_funcs.ini b/src/common/backend/catalog/builtin_funcs.ini index 0f5e4e9f25f17318d8134d537b7408afb00b1b56..e4510b2f0a8f9909ebd014ee7d4a96b8855ed8ff 100755 --- a/src/common/backend/catalog/builtin_funcs.ini +++ b/src/common/backend/catalog/builtin_funcs.ini @@ -9266,6 +9266,10 @@ "pg_terminate_session", 1, AddBuiltinFunc(_0(2099), _1("pg_terminate_session"), _2(2), _3(true), _4(false), _5(pg_terminate_session), _6(16), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(2, 20, 20), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("pg_terminate_session"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) ), + AddFuncGroup( + "pg_terminate_active_session_socket", 1, + AddBuiltinFunc(_0(3148), _1("pg_terminate_active_session_socket"), _2(2), _3(true), _4(false), _5(pg_terminate_active_session_socket), _6(16), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(2, 20, 20), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("pg_terminate_active_session_socket"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) + ), AddFuncGroup( "pg_test_err_contain_err", 1, AddBuiltinFunc(_0(9999), _1("pg_test_err_contain_err"), _2(1), _3(true), _4(false), _5(pg_test_err_contain_err), _6(2278), _7(PG_CATALOG_NAMESPACE), _8(BOOTSTRAP_SUPERUSERID), _9(INTERNALlanguageId), _10(1), _11(0), _12(0), _13(0), _14(false), _15(false), _16(false), _17(false), _18('v'), _19(0), _20(1, 23), _21(NULL), _22(NULL), _23(NULL), _24(NULL), _25("pg_test_err_contain_err"), _26(NULL), _27(NULL), _28(NULL), _29(0), _30(false), _31(NULL), _32(false), _33(NULL), _34('f'), _35(NULL), _36(0), _37(false), _38(NULL), _39(NULL), _40(0)) diff --git a/src/common/backend/utils/adt/misc.cpp b/src/common/backend/utils/adt/misc.cpp index 43d6bee20cb87a353bca1b546bbaa2be91e96ed5..8c361e3d442aacbf94c55b1914c344556523d549 100644 --- a/src/common/backend/utils/adt/misc.cpp +++ b/src/common/backend/utils/adt/misc.cpp @@ -343,6 +343,53 @@ Datum pg_terminate_session(PG_FUNCTION_ARGS) PG_RETURN_BOOL(r == 0); } +Datum pg_terminate_active_session_socket(PG_FUNCTION_ARGS) +{ + ThreadId tid = PG_GETARG_INT64(0); + uint64 sid = PG_GETARG_INT64(1); + + if (tid <= 0 || sid <= 0) { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid params, tid or sessionid must have real number"))); + } + + if (!initialuser()) { + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be initial user can use this function"))); + } + + ereport(LOG, (errmsg("pg_terminate_active_session_socket: tid %lu and sessionid %lu", tid, sid))); + + if (ENABLE_THREAD_POOL && tid != sid) { + int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(tid, sid); + if (count == 0) { + ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", tid, sid))); + PG_RETURN_BOOL(false); + } + } + + ProcArrayStruct *array_ptr = g_instance.proc_array_idx; + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + for (int index = 0; index < array_ptr->numProcs; index++) { + int pg_proc_no = array_ptr->pgprocnos[index]; + volatile PGPROC *proc = g_instance.proc_base_all_procs[pg_proc_no]; + VirtualTransactionId vxid; + GET_VXID_FROM_PGPROC(vxid, *proc); + + if (proc->pid != tid) { + continue; + } + + (void)SendProcSignal(tid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, vxid.backendId); + ereport(LOG, + (errmsg("pg_terminate_active_session_socket: send signal to tid %lu and sessionid %lu", tid, sid))); + break; + } + LWLockRelease(ProcArrayLock); + + PG_RETURN_BOOL(true); +} + /* * function name: pg_wlm_jump_queue * description : wlm jump the queue with thread id. diff --git a/src/common/backend/utils/cache/knl_localsysdbcache.cpp b/src/common/backend/utils/cache/knl_localsysdbcache.cpp index 160105aa18b38e1273ee768ac64f0d6647079bd2..475d670d4293b1c38cb8f02a70ac0d60a115d62a 100644 --- a/src/common/backend/utils/cache/knl_localsysdbcache.cpp +++ b/src/common/backend/utils/cache/knl_localsysdbcache.cpp @@ -118,11 +118,11 @@ bool IsGotPoolReload() } void ResetGotPoolReload(bool value) { - if (EnableLocalSysCache()) { + if (!t_thrd.int_cxt.ignoreSessionBackendSignal) { u_sess->sig_cxt.got_pool_reload = value; + } + if (EnableLocalSysCache()) { t_thrd.lsc_cxt.lsc->got_pool_reload = value; - } else { - u_sess->sig_cxt.got_pool_reload = value; } } diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index 8c854840a80757d998019a7a1d8aeb0be6cc46c4..0de9b598692ca83e3f94d041a5f0c611d3b6b8ad 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -74,7 +74,7 @@ bool will_shutdown = false; * NEXT | 92899 | ? | ? * ********************************************/ -const uint32 GRAND_VERSION_NUM = 92852; +const uint32 GRAND_VERSION_NUM = 92853; /******************************************** * 2.VERSION NUM FOR EACH FEATURE diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index 698d2ab1956f38b5c16d0badead33d721a7da4b0..1b517e099773447b7bc2da851971cf8d2faeb335 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -6218,7 +6218,10 @@ void HandlePoolerReload(void) return; ResetGotPoolReload(true); - u_sess->sig_cxt.cp_PoolReload = true; + + if (!t_thrd.int_cxt.ignoreSessionBackendSignal) { + u_sess->sig_cxt.cp_PoolReload = true; + } } // HandleMemoryContextDump @@ -6236,6 +6239,39 @@ void HandleExecutorFlag(void) if (IS_PGXC_DATANODE) u_sess->exec_cxt.executorStopFlag = true; } + +void handle_terminate_active_sess_socket() +{ + /* If doing free_session_context, u_sess will be invalid pointer, just return */ + if (t_thrd.int_cxt.ignoreSessionBackendSignal) { + return; + } + + if (t_thrd.role == THREADPOOL_WORKER) { + if (u_sess->sig_cxt.got_terminate_sess_socket) { + t_thrd.threadpool_cxt.worker->GetGroup()->GetListener()->DelSessionFromEpoll(u_sess, false); + + int sock = u_sess->proc_cxt.MyProcPort->sock; + if (unlikely(u_sess->status == KNL_SESS_UNINIT)) { + u_sess->status = KNL_SESS_CLOSERAW; + } else { + u_sess->status = KNL_SESS_CLOSE; + } + u_sess->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET; + closesocket(sock); + u_sess->sig_cxt.got_terminate_sess_socket = false; + } + } else if (t_thrd.role == THREADPOOL_STREAM) { + if (u_sess->proc_cxt.MyProcPort->is_logic_conn) { + gs_close_gsocket(&(u_sess->proc_cxt.MyProcPort->gs_sock)); + } + } else { + int sock = u_sess->proc_cxt.MyProcPort->sock; + u_sess->proc_cxt.MyProcPort->sock = -1; + closesocket(sock); + } +} + /* * RecoveryConflictInterrupt: out-of-line portion of recovery conflict * handling following receipt of SIGUSR1. Designed to be similar to die() @@ -12065,6 +12101,7 @@ bool checkCompArgs(const char *compFormat) void ResetInterruptCxt() { t_thrd.int_cxt.ignoreBackendSignal = false; + t_thrd.int_cxt.ignoreSessionBackendSignal = false; t_thrd.int_cxt.InterruptHoldoffCount = 0; t_thrd.int_cxt.QueryCancelHoldoffCount = 0; diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 8c212478d9ddac0f87618e074e72131e9d397d46..e460bed7cb9aa9f9f1f97e26872377321bcb5b17 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -1615,8 +1615,10 @@ void free_session_context(knl_session_context* session) MemoryContextDeleteChildren(session->top_mem_cxt); MemoryContextDelete(session->top_mem_cxt); (void)syscalllockFree(&session->utils_cxt.deleMemContextMutex); + t_thrd.int_cxt.ignoreSessionBackendSignal = true; pfree_ext(session); use_fake_session(); + t_thrd.int_cxt.ignoreSessionBackendSignal = false; } bool stp_set_commit_rollback_err_msg(stp_xact_err_type type) diff --git a/src/gausskernel/process/threadpool/threadpool_listener.cpp b/src/gausskernel/process/threadpool/threadpool_listener.cpp index eff445d47e9648aa37d8e30ad53b1530aa2fb3c5..d6c5f21f508eb975f402e50199fb6a96ae8cbbb0 100644 --- a/src/gausskernel/process/threadpool/threadpool_listener.cpp +++ b/src/gausskernel/process/threadpool/threadpool_listener.cpp @@ -206,6 +206,19 @@ void ThreadPoolListener::CreateEpoll() } } +void ThreadPoolListener::dispatch_socked_closed_session(knl_session_context* session) +{ + Assert(session->status != KNL_SESS_UNINIT); + m_idleSessionList->Remove(&session->elem); + if (unlikely(session->status == KNL_SESS_UNINIT)) { + session->status = KNL_SESS_CLOSERAW; + } else { + session->status = KNL_SESS_CLOSE; + } + session->proc_cxt.MyProcPort->sock = PGINVALID_SOCKET; + AddIdleSessionToHead(session); +} + void ThreadPoolListener::AddEpoll(knl_session_context* session) { struct epoll_event ev = {0}; @@ -244,16 +257,13 @@ void ThreadPoolListener::AddEpoll(knl_session_context* session) } } if (unlikely(res != 0)) { -#ifdef USE_ASSERT_CHECKING - ereport(PANIC, -#else ereport(WARNING, -#endif (errmodule(MOD_THREAD_POOL), errmsg("epoll_ctl fail %m, sess status:%d, sock:%d, host:%s, port:%s", session->status, session->proc_cxt.MyProcPort->sock, session->proc_cxt.MyProcPort->remote_host, session->proc_cxt.MyProcPort->remote_port))); + dispatch_socked_closed_session(session); } } @@ -483,7 +493,7 @@ void ThreadPoolListener::DispatchSession(knl_session_context* session) } } -void ThreadPoolListener::DelSessionFromEpoll(knl_session_context* session) +void ThreadPoolListener::DelSessionFromEpoll(knl_session_context* session, bool sub_count) { if (ENABLE_THREAD_POOL_DN_LOGICCONN) { struct epoll_event ev = {0}; @@ -497,7 +507,10 @@ void ThreadPoolListener::DelSessionFromEpoll(knl_session_context* session) #endif comm_epoll_ctl(m_epollFd, EPOLL_CTL_DEL, session->proc_cxt.MyProcPort->sock, NULL); } - (void)pg_atomic_fetch_sub_u32((volatile uint32*)&m_group->m_sessionCount, 1); + + if (sub_count) { + (void)pg_atomic_fetch_sub_u32((volatile uint32*)&m_group->m_sessionCount, 1); + } } void ThreadPoolListener::RemoveWorkerFromList(ThreadPoolWorker* worker) diff --git a/src/gausskernel/process/threadpool/threadpool_sessctl.cpp b/src/gausskernel/process/threadpool/threadpool_sessctl.cpp index f0908d677a0b2b5c6fc190a23831ac58b96b7adc..1f16a09d910748b89f3136829ac04a80af1658e5 100755 --- a/src/gausskernel/process/threadpool/threadpool_sessctl.cpp +++ b/src/gausskernel/process/threadpool/threadpool_sessctl.cpp @@ -478,6 +478,33 @@ void ThreadPoolSessControl::HandlePoolerReload() alock.unLock(); } +int ThreadPoolSessControl::terminate_session_socket(ThreadId tid, uint64 session_id) +{ + int count = 0; + AutoMutexLock alock(&m_sessCtrlock); + alock.lock(); + + knl_sess_control* ctrl = NULL; + Dlelem* elem = DLGetHead(&m_activelist); + while (elem != NULL) { + ctrl= (knl_sess_control*)DLE_VAL(elem); + knl_session_context* sess = ctrl->sess; + /* Only active session can be closed socket */ + if (sess == NULL || sess->attachPid != tid || sess->session_id != session_id || + sess->status != KNL_SESS_ATTACH) { + elem = DLGetSucc(elem); + continue; + } + + ctrl->sess->sig_cxt.got_terminate_sess_socket = true; + count++; + elem = DLGetSucc(elem); + break; + } + alock.unLock(); + return count; +} + void ThreadPoolSessControl::calculateSessMemCxtStats( knl_session_context* sess, const MemoryContext context, Tuplestorestate* tupStore, TupleDesc tupDesc) { diff --git a/src/gausskernel/process/threadpool/threadpool_worker.cpp b/src/gausskernel/process/threadpool/threadpool_worker.cpp index 0afdacf68594badc649652a81433f6b90a322fbc..6e9464b96ebe82b99391727d6229a01bcf6abadf 100644 --- a/src/gausskernel/process/threadpool/threadpool_worker.cpp +++ b/src/gausskernel/process/threadpool/threadpool_worker.cpp @@ -716,7 +716,7 @@ void ThreadPoolWorker::CleanUpSession(bool threadexit) } /* Close Session. */ - m_group->GetListener()->DelSessionFromEpoll(m_currentSession); + m_group->GetListener()->DelSessionFromEpoll(m_currentSession, true); if (m_currentSession->proc_cxt.PassConnLimit) { SpinLockAcquire(&g_instance.conn_cxt.ConnCountLock); diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 8e79236af711c0ec7e26450202fa4179e59f8163..c09cd74d52a5ecfb218f8f02b203bcbbca7079b0 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -6075,6 +6075,7 @@ retry: void LockBufferForCleanup(Buffer buffer) { BufferDesc *buf_desc = NULL; + int retry_count = 0; Assert(BufferIsValid(buffer)); Assert(t_thrd.storage_cxt.PinCountWaitBuf == NULL); @@ -6124,10 +6125,11 @@ void LockBufferForCleanup(Buffer buffer) /* Wait to be signaled by UnpinBuffer() */ if (InHotStandby && g_supportHotStandby) { + retry_count++; /* Publish the bufid that Startup process waits on */ parallel_recovery::SetStartupBufferPinWaitBufId(buffer - 1); /* Set alarm and then wait to be signaled by UnpinBuffer() */ - ResolveRecoveryConflictWithBufferPin(); + ResolveRecoveryConflictWithBufferPin(retry_count); /* Reset the published bufid */ parallel_recovery::SetStartupBufferPinWaitBufId(-1); diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index c8b43dd590310d24b7c93e624bffabc5157bfb68..0d241a20458df61e367bd59f4836c7be769479fa 100755 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -3105,11 +3105,13 @@ VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbO * * Returns pid of the process signaled, or 0 if not found. */ -ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode) +ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode, int retry_count) { ProcArrayStruct* arrayP = g_instance.proc_array_idx; int index; ThreadId pid = 0; + uint64 sessionid = 0; + const int max_retry_count = 1000; LWLockAcquire(ProcArrayLock, LW_SHARED); @@ -3123,13 +3125,27 @@ ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalRe if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { proc->recoveryConflictPending = true; pid = proc->pid; + sessionid = proc->sessionid; if (pid != 0) { /* * Kill the pid if it's still here. If not, that's what we * wanted so ignore any errors. */ - (void)SendProcSignal(pid, sigmode, vxid.backendId); + if (retry_count < max_retry_count) { + (void)SendProcSignal(pid, sigmode, vxid.backendId); + } else { + if (ENABLE_THREAD_POOL && pid != sessionid) { + int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(pid, sessionid); + if (count == 0) { + ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", + pid, sessionid))); + } + } + (void)SendProcSignal(pid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, vxid.backendId); + ereport(LOG, (errmsg("CancelVirtualTransaction: send signal to tid %lu and sessionid %lu", + pid, sessionid))); + } } break; @@ -3314,11 +3330,13 @@ int CountDBActiveBackends(Oid database_oid) /* * CancelDBBackends --- cancel backends that are using specified database */ -void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending) +void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending, int retry_count) { ProcArrayStruct* arrayP = g_instance.proc_array_idx; int index; ThreadId pid = 0; + uint64 sessionid = 0; + const int max_retry_count = 1000; /* tell all backends to die */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -3334,13 +3352,27 @@ void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPen proc->recoveryConflictPending = conflictPending; pid = proc->pid; + sessionid = proc->sessionid; if (pid != 0) { /* * Kill the pid if it's still here. If not, that's what we * wanted so ignore any errors. */ - (void)SendProcSignal(pid, sigmode, procvxid.backendId); + if (retry_count < max_retry_count) { + (void)SendProcSignal(pid, sigmode, procvxid.backendId); + } else { + if (ENABLE_THREAD_POOL && pid != sessionid) { + int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(pid, sessionid); + if (count == 0) { + ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", + pid, sessionid))); + } + } + (void)SendProcSignal(pid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, procvxid.backendId); + ereport(LOG, (errmsg("CancelDBBackends: send signal to tid %lu and sessionid %lu", + pid, sessionid))); + } } } } diff --git a/src/gausskernel/storage/ipc/procsignal.cpp b/src/gausskernel/storage/ipc/procsignal.cpp index 0d4bf4edd5c3c30640ae75661942ef5c87bd14c9..d78e54e0e63aa986dfe01934e9eaddebe85a13e7 100755 --- a/src/gausskernel/storage/ipc/procsignal.cpp +++ b/src/gausskernel/storage/ipc/procsignal.cpp @@ -312,6 +312,10 @@ void procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET)) { + handle_terminate_active_sess_socket(); + } + latch_sigusr1_handler(); errno = save_errno; diff --git a/src/gausskernel/storage/ipc/standby.cpp b/src/gausskernel/storage/ipc/standby.cpp index d023429204bf6d8176d2648cd721e212829e14cc..69b80ff499041a3b17a91adce34467fad198e88e 100755 --- a/src/gausskernel/storage/ipc/standby.cpp +++ b/src/gausskernel/storage/ipc/standby.cpp @@ -37,8 +37,9 @@ #include "pgxc/poolutils.h" #include "replication/walreceiver.h" static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId* waitlist, TransactionId* xminArray, - ProcSignalReason reason, TimestampTz waitStart, + ProcSignalReason reason, TimestampTz waitStart, int retry_count, TransactionId limitXmin = InvalidTransactionId); + static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid); static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock* locks); static void LogReleaseAccessExclusiveLocks(int nlocks, xl_standby_lock* locks); @@ -181,7 +182,7 @@ static bool WaitExceedsMaxStandbyDelay(TimestampTz startTime) */ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId* waitlist, TransactionId* xminArray, ProcSignalReason reason, TimestampTz waitStart, - TransactionId limitXmin) + int retry_count, TransactionId limitXmin) { char* new_status = NULL; bool waited = false; @@ -236,7 +237,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId* waitlis * Now find out who to throw out of the balloon. */ Assert(VirtualTransactionIdIsValid(*waitlist)); - pid = CancelVirtualTransaction(*waitlist, reason); + pid = CancelVirtualTransaction(*waitlist, reason, retry_count); /* * Wait a little bit for it to die so that we avoid flooding * an unresponsive backend when system is heavily loaded. @@ -276,7 +277,7 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R { VirtualTransactionId* backends = NULL; TimestampTz waitStart; - + int retry_count; /* * If we get passed InvalidTransactionId then we are a little surprised, * but it is theoretically possible in normal running. It also happens @@ -312,6 +313,7 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R } waitStart = GetCurrentTimestamp(); + retry_count = 0; while (true) { backends = GetConflictingVirtualXIDs(latestRemovedXid, node.dbNode, lsn, limitXminCSN, t_thrd.storage_cxt.xminArray); @@ -319,8 +321,9 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R break; } + retry_count++; ResolveRecoveryConflictWithVirtualXIDs(backends, t_thrd.storage_cxt.xminArray, - PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart, latestRemovedXid); + PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart, retry_count, latestRemovedXid); } } @@ -328,6 +331,7 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid { VirtualTransactionId* backends = NULL; TimestampTz waitStart; + int retry_count; /* * If we get passed InvalidTransactionId then we are a little surprised, * but it is theoretically possible in normal running. It also happens @@ -349,6 +353,7 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid } waitStart = GetCurrentTimestamp(); + retry_count = 0; while (true) { backends = GetConflictingVirtualXIDs(latestRemovedXid, dbid, lsn, InvalidCommitSeqNo, t_thrd.storage_cxt.xminArray); @@ -356,8 +361,9 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid break; } + retry_count++; ResolveRecoveryConflictWithVirtualXIDs(backends, t_thrd.storage_cxt.xminArray, - PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart, latestRemovedXid); + PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, waitStart, retry_count, latestRemovedXid); } } @@ -366,7 +372,7 @@ void ResolveRecoveryConflictWithTablespace(Oid tsid) { VirtualTransactionId* temp_file_users = NULL; TimestampTz waitStart; - + int retry_count; /* * Standby users may be currently using this tablespace for their * temporary files. We only care about current users because @@ -385,19 +391,22 @@ void ResolveRecoveryConflictWithTablespace(Oid tsid) * We don't wait for commit because drop tablespace is non-transactional. */ waitStart = GetCurrentTimestamp(); + retry_count = 0; while (true) { temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId, InvalidOid); if (!VirtualTransactionIdIsValid(*temp_file_users)) { break; } + retry_count++; ResolveRecoveryConflictWithVirtualXIDs(temp_file_users, NULL, - PROCSIG_RECOVERY_CONFLICT_TABLESPACE, waitStart); + PROCSIG_RECOVERY_CONFLICT_TABLESPACE, waitStart, retry_count); } } void ResolveRecoveryConflictWithDatabase(Oid dbid) { + int retry_count; /* * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that * only waits for transactions and completely idle sessions would block @@ -416,8 +425,10 @@ void ResolveRecoveryConflictWithDatabase(Oid dbid) pg_usleep(10000); } } else { + retry_count = 0; while (CountDBBackends(dbid) > 0) { - CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true); + retry_count++; + CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true, retry_count); /* * Wait awhile for them to die so that we avoid flooding an * unresponsive backend when system is heavily loaded. @@ -434,6 +445,7 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) int num_attempts = 0; LOCKTAG locktag; TimestampTz waitStart; + int retry_count; SET_LOCKTAG_RELATION(locktag, dbOid, relOid); @@ -446,13 +458,16 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) * justifies the means. */ waitStart = GetCurrentTimestamp(); + retry_count = 0; while (!lock_acquired) { if (++num_attempts < 3) backends = GetLockConflicts(&locktag, AccessExclusiveLock); else backends = GetConflictingVirtualXIDs(InvalidTransactionId, InvalidOid); - ResolveRecoveryConflictWithVirtualXIDs(backends, NULL, PROCSIG_RECOVERY_CONFLICT_LOCK, waitStart); + retry_count++; + ResolveRecoveryConflictWithVirtualXIDs(backends, NULL, PROCSIG_RECOVERY_CONFLICT_LOCK, + waitStart, retry_count); if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false) != LOCKACQUIRE_NOT_AVAIL) lock_acquired = true; @@ -484,7 +499,7 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) * so we don't do a deadlock check right away ... only if we have had to wait * at least deadlock_timeout. Most of the logic about that is in proc.c. */ -void ResolveRecoveryConflictWithBufferPin(void) +void ResolveRecoveryConflictWithBufferPin(int retry_count) { bool sig_alarm_enabled = false; TimestampTz ltime; @@ -510,7 +525,7 @@ void ResolveRecoveryConflictWithBufferPin(void) /* * We're already behind, so clear a path as quickly as possible. */ - SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, retry_count); } else { /* * Wake up at ltime, and check for deadlocks as well if we will be @@ -536,7 +551,7 @@ void ResolveRecoveryConflictWithBufferPin(void) ereport(LOG, (errmsg("buffer pin confilict resolve ok, proc time %d ms", ComputeTimeStamp(now)))); } -void SendRecoveryConflictWithBufferPin(ProcSignalReason reason) +void SendRecoveryConflictWithBufferPin(ProcSignalReason reason, int retry_count) { Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN || reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); @@ -546,7 +561,7 @@ void SendRecoveryConflictWithBufferPin(ProcSignalReason reason) * conflict flag yet, since most backends will be innocent. Let the * SIGUSR1 handling in each backend decide their own fate. */ - CancelDBBackends(InvalidOid, reason, false); + CancelDBBackends(InvalidOid, reason, false, retry_count); } /* diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_853.sql b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_853.sql new file mode 100644 index 0000000000000000000000000000000000000000..bfaa53d63431a016cbec17b6bcd85ef7dfe9bffc --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_maindb/rollback-post_catalog_maindb_92_853.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_active_session_socket() cascade; diff --git a/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_853.sql b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_853.sql new file mode 100644 index 0000000000000000000000000000000000000000..bfaa53d63431a016cbec17b6bcd85ef7dfe9bffc --- /dev/null +++ b/src/include/catalog/upgrade_sql/rollback_catalog_otherdb/rollback-post_catalog_otherdb_92_853.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS pg_catalog.pg_terminate_active_session_socket() cascade; diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_853.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_853.sql new file mode 100644 index 0000000000000000000000000000000000000000..1b83f5ba22b33677ba820e52e9cdf1166bab5cf5 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_maindb/upgrade-post_catalog_maindb_92_853.sql @@ -0,0 +1,5 @@ +set local inplace_upgrade_next_system_object_oids = IUO_PROC, 3148; +CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_active_session_socket + (IN threadid BIGINT, + IN sessionid BIGINT) +RETURNS bool LANGUAGE INTERNAL NOT FENCED as 'pg_terminate_active_session_socket'; \ No newline at end of file diff --git a/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_853.sql b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_853.sql new file mode 100644 index 0000000000000000000000000000000000000000..1b83f5ba22b33677ba820e52e9cdf1166bab5cf5 --- /dev/null +++ b/src/include/catalog/upgrade_sql/upgrade_catalog_otherdb/upgrade-post_catalog_otherdb_92_853.sql @@ -0,0 +1,5 @@ +set local inplace_upgrade_next_system_object_oids = IUO_PROC, 3148; +CREATE OR REPLACE FUNCTION pg_catalog.pg_terminate_active_session_socket + (IN threadid BIGINT, + IN sessionid BIGINT) +RETURNS bool LANGUAGE INTERNAL NOT FENCED as 'pg_terminate_active_session_socket'; \ No newline at end of file diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index f477006f1d4d6b9c5c97d7e18a3bbcf3dfa98498..5756e2f3692d2cd0ec70c67fd5ddf761d758e3ed 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -200,6 +200,7 @@ typedef struct knl_u_sig_context { */ volatile sig_atomic_t got_pool_reload; volatile sig_atomic_t cp_PoolReload; + volatile sig_atomic_t got_terminate_sess_socket; } knl_u_sig_context; class AutonomousSession; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index f58b2a0ca2c18c93f47c0b83f40cf0bdf642e60c..797b150d76909eebc8ded7724460971af224fcec 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -1425,6 +1425,7 @@ typedef struct knl_t_interrupt_context { volatile bool ignoreBackendSignal; /* ignore signal for threadpool worker */ + volatile bool ignoreSessionBackendSignal; /* ignore signal for u_session */ } knl_t_interrupt_context; typedef int64 pg_time_t; diff --git a/src/include/postgres.h b/src/include/postgres.h index e7d7a0ae5ef13e7e9d0edd25c8e400a4300aba85..ddffafa1a202ac7d9415c80648f9dbd4efd156e5 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -959,6 +959,7 @@ extern size_t mmap_threshold; void HandlePoolerReload(void); void HandleMemoryContextDump(void); void HandleExecutorFlag(void); +void handle_terminate_active_sess_socket(); extern void start_xact_command(void); extern void finish_xact_command(void); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 5a837050652459e8f911ef07459cd5b5a7982286..8f183f6fef2d7c783024c461dec3445eb9ee3c7b 100755 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -114,13 +114,14 @@ extern VirtualTransactionId* GetCurrentVirtualXIDs( extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid, XLogRecPtr lsn = 0, CommitSeqNo limitXminCSN = InvalidCommitSeqNo, TransactionId* xminArray = NULL); -extern ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode); +extern ThreadId CancelVirtualTransaction(const VirtualTransactionId& vxid, ProcSignalReason sigmode, + int retry_count); extern bool MinimumActiveBackends(int min); extern int CountDBBackends(Oid database_oid); extern int CountDBActiveBackends(Oid database_oid); extern int CountSingleNodeActiveBackends(Oid databaseOid, Oid userOid); -extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending); +extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending, int retry_count); extern void CancelSingleNodeBackends(Oid databaseOid, Oid userOid, ProcSignalReason sigmode, bool conflictPending); extern int CountUserBackends(Oid roleid); extern bool CountOtherDBBackends(Oid databaseId, int* nbackends, int* nprepared); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 5b01df0b4284a2c8a48dcd3963621b24a6ac66b4..b82cfbb61936bff95aa30fbde1f8f2dc1300e0c6 100755 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -56,6 +56,9 @@ typedef enum { PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, PROCSIG_EXECUTOR_FLAG, + /* close active session socket */ + PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 840043bafefb942f8edb7bcbf7ed241f8a6e5c03..1c4bff4232d5387601591700db3790681f9bdb22 100755 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,8 +30,8 @@ void ResolveRecoveryConflictWithSnapshotOid(TransactionId latestRemovedXid, Oid extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); -extern void ResolveRecoveryConflictWithBufferPin(void); -extern void SendRecoveryConflictWithBufferPin(ProcSignalReason reason); +extern void ResolveRecoveryConflictWithBufferPin(int retry_count); +extern void SendRecoveryConflictWithBufferPin(ProcSignalReason reason, int retry_count = 0); extern void CheckRecoveryConflictDeadlock(void); /* diff --git a/src/include/threadpool/threadpool_listener.h b/src/include/threadpool/threadpool_listener.h index cc8ad06767c2e291bec534347f8e7954b7f98b72..ea340b1aa24b052ea7d261bc6232eed55b21de99 100644 --- a/src/include/threadpool/threadpool_listener.h +++ b/src/include/threadpool/threadpool_listener.h @@ -46,9 +46,10 @@ public: bool TryFeedWorker(ThreadPoolWorker* worker); void AddNewSession(knl_session_context* session); void WaitTask(); - void DelSessionFromEpoll(knl_session_context* session); + void DelSessionFromEpoll(knl_session_context* session, bool sub_count); void RemoveWorkerFromList(ThreadPoolWorker* worker); void AddEpoll(knl_session_context* session); + void dispatch_socked_closed_session(knl_session_context* session); void SendShutDown(); void ReaperAllSession(); void ShutDown() const; diff --git a/src/include/threadpool/threadpool_sessctl.h b/src/include/threadpool/threadpool_sessctl.h index 842f0061cc1a5ee391200935733581e016b24fce..b0809f497a4215e694c5004c2d959e186a9f2a3f 100644 --- a/src/include/threadpool/threadpool_sessctl.h +++ b/src/include/threadpool/threadpool_sessctl.h @@ -59,6 +59,7 @@ public: bool ValidDBoidAndUseroid(Oid dbOid, Oid userOid, knl_sess_control* ctrl); void SigHupHandler(); void HandlePoolerReload(); + int terminate_session_socket(ThreadId tid, uint64 session_id); void CheckSessionTimeout(); #ifndef ENABLE_MULTIPLE_NODES void CheckIdleInTransactionSessionTimeout(); diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 123b9a67f125ecc18d0840ea3f8e3912c6980ab6..d422afdf6a805cc746022d97096149b0aa8de143 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -682,6 +682,7 @@ extern Datum pg_cancel_session(PG_FUNCTION_ARGS); extern Datum pg_cancel_invalid_query(PG_FUNCTION_ARGS); extern Datum pg_terminate_backend(PG_FUNCTION_ARGS); extern Datum pg_terminate_session(PG_FUNCTION_ARGS); +extern Datum pg_terminate_active_session_socket(PG_FUNCTION_ARGS); extern Datum pg_reload_conf(PG_FUNCTION_ARGS); extern Datum pg_tablespace_databases(PG_FUNCTION_ARGS); extern Datum pg_tablespace_location(PG_FUNCTION_ARGS);