diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index eb9b4305ebad6fd7b38b60e2b42d17bb021eb56c..be2cb137278eeec13ff04b5a7efcbeab2adc8865 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -9785,7 +9785,7 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam statement_init_metric_context(); instr_stmt_report_trace_id(u_sess->trace_cxt.trace_id); exec_parse_message(query_string, stmt_name, paramTypes, paramTypeNames, paramModes, numParams); - if (ENABLE_REMOTE_EXECUTE && (libpqsw_redirect() || libpqsw_get_set_command()) && + if (ENABLE_REMOTE_EXECUTE && (libpqsw_redirect() || libpqsw_get_set_command() || libpqsw_command_is_prepare()) && !libpqsw_only_localrun()) { get_redirect_manager()->push_message(firstchar, &input_message, diff --git a/src/gausskernel/storage/replication/libpqsw.cpp b/src/gausskernel/storage/replication/libpqsw.cpp index 98054bacfef39be7b17573215cc9418072444f43..68b5f6587ee6e26bf0467a0cee1897b142dd3fc6 100644 --- a/src/gausskernel/storage/replication/libpqsw.cpp +++ b/src/gausskernel/storage/replication/libpqsw.cpp @@ -564,12 +564,23 @@ bool libpqsw_redirect() return libpqsw_get_redirect() || libpqsw_get_batch() || libpqsw_get_transaction(); } +bool libpqsw_command_is_prepare() +{ + return get_redirect_manager()->get_prepare_command(); +} + /* query if enable set command*/ bool libpqsw_get_set_command() { return get_redirect_manager()->state.set_command; } +bool libpqsw_is_prepare_within_PBE() +{ + RedirectManager* redirect_manager = get_redirect_manager(); + return (redirect_manager->get_pbe_state() && redirect_manager->get_prepare_command()); +} + /* if skip readonly check in P or Q message */ bool libpqsw_skip_check_readonly() { if (!g_instance.attr.attr_sql.enableRemoteExcute) { @@ -807,6 +818,10 @@ static bool libpqsw_process_bind_message(StringInfo msg, CachedPlanSource* psrc) */ static bool libpqsw_process_transfer_message(int qtype, StringInfo msg) { + if (libpqsw_is_prepare_within_PBE()) { + return true; + } + if (qtype == 'E') { if (libpqsw_remote_in_transaction()) { libpqsw_set_transaction(true); @@ -835,6 +850,10 @@ static bool libpqsw_process_transfer_message(int qtype, StringInfo msg) static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool afterpush, bool remote_execute) { + if (libpqsw_is_prepare_within_PBE()) { + return false; + } + bool ret = false; RedirectManager* redirect_manager = get_redirect_manager(); /* For B E and Select, no push */ @@ -1023,6 +1042,10 @@ bool libpqsw_process_message(int qtype, StringInfo msg) trace_msg_func trace_func = get_msg_trace_func(qtype); trace_func(qtype, msg); // the extend query start msg + if (qtype != 'Q') { + redirect_manager->set_pbe_state(true); + } + if (qtype == 'P') { return false; } @@ -1062,7 +1085,7 @@ bool libpqsw_process_message(int qtype, StringInfo msg) return false; } - if (!libpqsw_redirect()) { + if (!libpqsw_redirect() && !libpqsw_is_prepare_within_PBE()) { return false; } @@ -1080,11 +1103,13 @@ bool libpqsw_process_message(int qtype, StringInfo msg) libpqsw_set_batch(false); libpqsw_set_redirect(false); libpqsw_set_set_command(false); + get_redirect_manager()->set_prepare_command(false); } } /* for begin in pbe and in trxn */ - if (SS_STANDBY_MODE && libpqsw_need_localexec_withinPBE(qtype, msg, true, ready_to_excute)) { + if (SS_STANDBY_MODE && (libpqsw_need_localexec_withinPBE(qtype, msg, true, ready_to_excute) || + libpqsw_is_prepare_within_PBE())) { return false; } @@ -1096,6 +1121,9 @@ bool libpqsw_process_parse_message(const char* commandTag, List* query_list) { libpqsw_set_command_tag(commandTag); bool need_redirect = libpqsw_before_redirect(commandTag, query_list, NULL); + if (!need_redirect && ((strcmp(commandTag, "PREPARE") == 0) || (strcmp(commandTag, "EXECUTE") == 0))) { + get_redirect_manager()->set_prepare_command(true); + } if (IsAbortedTransactionBlockState() && !libpqsw_end_command(commandTag)) { need_redirect = false; @@ -1116,6 +1144,33 @@ bool libpqsw_process_parse_message(const char* commandTag, List* query_list) return need_redirect; } +static bool libpqsw_process_prepare_within_PBE(const char* commandTag, List* query_list, const char* query_string) +{ + libpqsw_set_command_tag(commandTag); + RedirectManager* redirect_manager = get_redirect_manager(); + bool need_redirect = libpqsw_before_redirect(commandTag, query_list, query_string); + if (need_redirect && !libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_PARSE)) { + /* clean self, need be carefull here */ + int len = t_thrd.libpq_cxt.PqSendPointer - t_thrd.libpq_cxt.PqSendStart; + if (len > 0) { + errno_t rc = memset_s(t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendStart, len, 0, len); + securec_check(rc, "\0", "\0"); + t_thrd.libpq_cxt.PqSendPointer = t_thrd.libpq_cxt.PqSendStart; + } + return true; + } else { + /* clean message queue */ + RedirectMessageManager* message_manager = &(redirect_manager->messages_manager); + if (message_manager->message_empty()) { + return false; + } + message_manager->reset(); + get_redirect_manager()->set_prepare_command(false); + return false; + } + return false; +} + /* process Q type msg, true if need in redirect mode*/ bool libpqsw_process_query_message(const char* commandTag, List* query_list, const char* query_string, bool is_multistmt, bool is_last) { @@ -1123,6 +1178,10 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con return false; } + if (libpqsw_is_prepare_within_PBE()) { + return libpqsw_process_prepare_within_PBE(commandTag, query_list, query_string); + } + bool enableCe = false; libpqsw_set_command_tag(commandTag); bool need_redirect = libpqsw_before_redirect(commandTag, query_list, query_string); diff --git a/src/include/replication/libpqsw.h b/src/include/replication/libpqsw.h index 5b102d06273f39128ea1dfbfab9d3355e9d84b1b..8772be670ade108b9c61c148457cc08a31a40640 100644 --- a/src/include/replication/libpqsw.h +++ b/src/include/replication/libpqsw.h @@ -89,6 +89,8 @@ void libpqsw_create_conn(); void libpqsw_trace_q_msg(const char* commandTag, const char* queryString); void libpqsw_disconnect(bool clear_queue); void libpqsw_check_ddl_on_primary(const char* commandTag); +bool libpqsw_command_is_prepare(); +bool libpqsw_is_prepare_within_PBE(); #ifdef _cplusplus } @@ -132,6 +134,8 @@ typedef struct { bool already_connected; bool client_enable_ce; bool have_savepoint; + bool isPBEPacket; + bool isPrepareCommand; } RedirectState; // the max len =(PBEPBEDS) == 8, 20 is enough @@ -263,6 +267,8 @@ public: state.already_connected = false; state.client_enable_ce = false; state.have_savepoint = false; + state.isPBEPacket = false; + state.isPrepareCommand = false; ss_standby_state = 0; server_proc_slot = 0; ss_standby_sxid = 0; @@ -306,6 +312,26 @@ public: return state.enable_remote_excute; } + void set_pbe_state(bool flag) + { + state.isPBEPacket = flag; + } + + bool get_pbe_state() + { + return state.isPBEPacket; + } + + void set_prepare_command(bool flag) + { + state.isPrepareCommand = flag; + } + + bool get_prepare_command() + { + return state.isPrepareCommand; + } + bool log_enable(); void logtrace(int level, const char* fmt, ...)