From 2a152270f1db5d711143d85395f0afb0bae69fd1 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Tue, 15 Aug 2023 10:45:36 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=8F=91=E5=B8=83=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=86=B2=E7=AA=81=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- contrib/test_decoding/test_decoding.cpp | 80 ----- doc/src/sgml/ref/alter_subscription.sgmlin | 11 + src/bin/gs_guc/cluster_guc.conf | 1 + .../backend/catalog/pg_subscription.cpp | 12 +- src/common/backend/parser/gram.y | 10 + .../backend/utils/misc/guc/guc_storage.cpp | 19 + .../optimizer/commands/subscriptioncmds.cpp | 340 ++++++++++-------- .../runtime/executor/execReplication.cpp | 49 ++- .../replication/logical/parallel_decode.cpp | 18 +- .../storage/replication/logical/worker.cpp | 288 ++++++++++++++- src/include/catalog/pg_subscription.h | 9 +- .../knl/knl_guc/knl_session_attr_storage.h | 1 + src/include/knl/knl_thread.h | 1 + src/include/replication/logical.h | 2 + src/include/replication/replicainternal.h | 7 + src/test/regress/input/subscription.source | 8 + .../regress/output/recovery_2pc_tools.source | 1 + src/test/regress/output/subscription.source | 25 +- src/test/subscription/schedule | 4 +- src/test/subscription/testcase/disable.sh | 65 ++++ src/test/subscription/testcase/skiplsn.sh | 69 ++++ 21 files changed, 767 insertions(+), 253 deletions(-) create mode 100644 src/test/subscription/testcase/disable.sh create mode 100644 src/test/subscription/testcase/skiplsn.sh diff --git a/contrib/test_decoding/test_decoding.cpp b/contrib/test_decoding/test_decoding.cpp index 8f5b377b7b..76175dd9b5 100644 --- a/contrib/test_decoding/test_decoding.cpp +++ b/contrib/test_decoding/test_decoding.cpp @@ -199,87 +199,7 @@ static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id) return true; return false; } -static void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld) -{ - if ((tuple->tupTableType == HEAP_TUPLE) && (HEAP_TUPLE_IS_COMPRESSED(tuple->t_data) || - (int)HeapTupleHeaderGetNatts(tuple->t_data, tupdesc) > tupdesc->natts)) { - return; - } - - Oid oid; - /* print oid of tuple, it's not included in the TupleDesc */ - if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid) { - appendStringInfo(s, " oid[oid]:%u", oid); - } - - /* print all columns individually */ - for (int natt = 0; natt < tupdesc->natts; natt++) { - Form_pg_attribute attr; /* the attribute itself */ - Oid typoutput; /* output function */ - bool typisvarlena = false; - Datum origval; /* possibly toasted Datum */ - bool isnull = true; /* column is null? */ - - attr = &tupdesc->attrs[natt]; - - /* - * don't print dropped columns, we can't be sure everything is - * available for them - */ - if (attr->attisdropped) - continue; - - /* - * Don't print system columns, oid will already have been printed if - * present. - */ - if (attr->attnum < 0 || (isOld && !IsRelationReplidentKey(relation, attr->attnum))) - continue; - - Oid typid = attr->atttypid; /* type of current attribute */ - - /* get Datum from tuple */ - if (tuple->tupTableType == HEAP_TUPLE) { - origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull); - } else { - origval = uheap_getattr((UHeapTuple)tuple, natt + 1, tupdesc, &isnull); - } - - /* print attribute name */ - appendStringInfoChar(s, ' '); - appendStringInfoString(s, quote_identifier(NameStr(attr->attname))); - - /* print attribute type */ - appendStringInfoChar(s, '['); - char* type_name = format_type_be(typid); - if (strlen(type_name) == strlen("clob") && strncmp(type_name, "clob", strlen("clob")) == 0) { - errno_t rc = strcpy_s(type_name, sizeof("clob"), "text"); - securec_check_c(rc, "\0", "\0"); - } - appendStringInfoString(s, type_name); - appendStringInfoChar(s, ']'); - - /* query output function */ - getTypeOutputInfo(typid, &typoutput, &typisvarlena); - - /* print separator */ - appendStringInfoChar(s, ':'); - - /* print data */ - if (isnull) - appendStringInfoString(s, "null"); - else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK_B(origval)) - appendStringInfoString(s, "unchanged-toast-datum"); - else if (!typisvarlena) - PrintLiteral(s, typid, OidOutputFunctionCall(typoutput, origval)); - else { - Datum val; /* definitely detoasted Datum */ - val = PointerGetDatum(PG_DETOAST_DATUM(origval)); - PrintLiteral(s, typid, OidOutputFunctionCall(typoutput, val)); - } - } -} /* * callback for individual changed tuples */ diff --git a/doc/src/sgml/ref/alter_subscription.sgmlin b/doc/src/sgml/ref/alter_subscription.sgmlin index 2c4e4d168a..6ad2fe06ca 100644 --- a/doc/src/sgml/ref/alter_subscription.sgmlin +++ b/doc/src/sgml/ref/alter_subscription.sgmlin @@ -25,6 +25,7 @@ ALTER SUBSCRIPTION name CONNECTION ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name ENABLE +ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) ALTER SUBSCRIPTION name OWNER TO new_owner ALTER SUBSCRIPTION name RENAME TO new_name @@ -134,6 +135,16 @@ ALTER SUBSCRIPTION name RENAME TO < + + DISABLE + + + Disables a running subscription, stopping the logical replication + worker at the end of the transaction. + + + + SET ( subscription_parameter [= value] [, ... ] ) diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 6df9ad913d..fcb6c7ec0b 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -744,6 +744,7 @@ enable_remote_excute|bool|0,0|NULL|NULL| light_comm|bool|0,0|NULL|NULL| ignore_standby_lsn_window|int|0,2147483647|ms|NULL| ignore_feedback_xmin_window|int|0,2147483647|ms|NULL| +subscription_conflict_resolution|enum|error,apply_remote,keep_local|NULL|NULL| [cmserver] log_dir|string|0,0|NULL|NULL| log_file_size|int|0,2047|MB|NULL| diff --git a/src/common/backend/catalog/pg_subscription.cpp b/src/common/backend/catalog/pg_subscription.cpp index 86b7dea51a..5f075ff2dd 100644 --- a/src/common/backend/catalog/pg_subscription.cpp +++ b/src/common/backend/catalog/pg_subscription.cpp @@ -103,6 +103,14 @@ Subscription *GetSubscription(Oid subid, bool missing_ok) sub->binary = DatumGetBool(datum); } + /* Get skiplsn */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subskiplsn, &isnull); + if (unlikely(isnull)) { + sub->skiplsn = InvalidXLogRecPtr; + } else { + sub->skiplsn = TextDatumGetLsn(datum); + } + ReleaseSysCache(tup); return sub; @@ -237,7 +245,7 @@ static List *textarray_to_stringlist(ArrayType *textarray) return res; } -static Datum LsnGetTextDatum(XLogRecPtr lsn) +Datum LsnGetTextDatum(XLogRecPtr lsn) { char clsn[MAXFNAMELEN]; int ret = snprintf_s(clsn, sizeof(clsn), sizeof(clsn) - 1, "%X/%X", (uint32)(lsn >> 32), (uint32)lsn); @@ -246,7 +254,7 @@ static Datum LsnGetTextDatum(XLogRecPtr lsn) return CStringGetTextDatum(clsn); } -static XLogRecPtr TextDatumGetLsn(Datum datum) +XLogRecPtr TextDatumGetLsn(Datum datum) { XLogRecPtr lsn; uint32 lsn_hi; diff --git a/src/common/backend/parser/gram.y b/src/common/backend/parser/gram.y index 86590164f7..e1ab544eb6 100644 --- a/src/common/backend/parser/gram.y +++ b/src/common/backend/parser/gram.y @@ -18574,6 +18574,16 @@ AlterSubscriptionStmt: n->options = list_make1(makeDefElem("enabled", (Node *)makeInteger(TRUE))); $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DISABLE_P + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->refresh = false; + n->subname = $3; + n->options = list_make1(makeDefElem("enabled", + (Node *)makeInteger(FALSE))); + $$ = (Node *)n; } ; /***************************************************************************** diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 22ffc8493d..54f09363f4 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -287,6 +287,13 @@ static const struct config_enum_entry repl_auth_mode_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry ConflictResolvers[] = { + {"error", RESOLVE_ERROR, false}, + {"apply_remote", RESOLVE_APPLY_REMOTE, false}, + {"keep_local", RESOLVE_KEEP_LOCAL, false}, + {NULL, 0, false} +}; + /* * Although only "on", "off", "remote_write", and "local" are documented, we * accept all the likely variants of "on" and "off". @@ -4679,6 +4686,18 @@ static void InitStorageConfigureNamesEnum() NULL, NULL, NULL}, + {{"subscription_conflict_resolution", + PGC_SIGHUP, + NODE_SINGLENODE, + REPLICATION, + gettext_noop("Sets method used for conflict resolution for resolvable conflicts."), + NULL}, + &u_sess->attr.attr_storage.subscription_conflict_resolution, + RESOLVE_ERROR, + ConflictResolvers, + NULL, + NULL, + NULL}, #ifndef ENABLE_MULTIPLE_NODES {{"dcf_log_file_permission", PGC_POSTMASTER, diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index eea9de4dcd..aa52f2ebe0 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -45,10 +45,46 @@ #include "utils/syscache.h" #include "utils/array.h" #include "utils/acl.h" +#include "utils/pg_lsn.h" #include "access/tableam.h" #include "libpq/libpq-fe.h" #include "replication/slot.h" +/* + * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION + * command. + */ +#define SUBOPT_CONNINFO 0x00000001 +#define SUBOPT_PUBLICATION 0x00000002 +#define SUBOPT_ENABLED 0x00000004 +#define SUBOPT_SLOT_NAME 0x00000008 +#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000010 +#define SUBOPT_BINARY 0x00000020 +#define SUBOPT_COPY_DATA 0x00000040 +#define SUBOPT_CONNECT 0x00000080 +#define SUBOPT_SKIPLSN 0x00000100 + +/* check if the 'val' has 'bits' set */ +#define IsSet(val, bits) (((val) & (bits)) == (bits)) + +/* + * Structure to hold a bitmap representing the user-provided CREATE/ALTER + * SUBSCRIPTION command options and the parsed/default values of each of them. + */ +typedef struct SubOpts +{ + bits32 specified_opts; + char *conninfo; + List *publications; + bool enabled; + char *slot_name; + char *synchronous_commit; + bool binary; + bool copy_data; + bool connect; + XLogRecPtr skiplsn; +} SubOpts; + static bool ConnectPublisher(char* conninfo, char* slotname); static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications, bool *copy_data, bool create_slot); @@ -61,120 +97,127 @@ static bool CheckPublicationsExistOnPublisher(List *publications); * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. * * Since not all options can be specified in both commands, this function - * will report an error on options if the target output pointer is NULL to - * accommodate that. + * will report an error mutually exclusive options are specified. + * + * Caller is expected to have cleared 'opts'. */ -static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given, - bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary, - bool *copy_data_given, bool *copy_data, bool *connect_given, bool *connect) +static void parse_subscription_options(const List *stmt_options, bits32 supported_opts, SubOpts *opts) { ListCell *lc; - if (conninfo) { - *conninfo = NULL; - } - if (publications) { - *publications = NIL; - } - if (enabled) { - *enabled_given = false; - } - if (slot_name) { - *slot_name_given = false; - *slot_name = NULL; - } - if (synchronous_commit) { - *synchronous_commit = NULL; - } - if (binary) { - *binary_given = false; - *binary = false; - } + /* caller must expect some option */ + Assert(supported_opts != 0); - if (copy_data) { - *copy_data_given = false; - *copy_data = true; + /* Set default values for the boolean supported options. */ + if (IsSet(supported_opts, SUBOPT_BINARY)) { + opts->binary = false; } - - if (connect) { - *connect_given = false; - *connect = true; + if (IsSet(supported_opts, SUBOPT_COPY_DATA)) { + opts->copy_data = true; + } + if (IsSet(supported_opts, SUBOPT_CONNECT)) { + opts->connect = true; } /* Parse options */ - foreach (lc, options) { + foreach (lc, stmt_options) { DefElem *defel = (DefElem *)lfirst(lc); - if (strcmp(defel->defname, "conninfo") == 0 && conninfo) { - if (*conninfo) { + if (IsSet(supported_opts, SUBOPT_CONNINFO) && strcmp(defel->defname, "conninfo") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_CONNINFO)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *conninfo = defGetString(defel); - } else if (strcmp(defel->defname, "publication") == 0 && publications) { - if (*publications) { + opts->specified_opts |= SUBOPT_CONNINFO; + opts->conninfo = defGetString(defel); + } else if (IsSet(supported_opts, SUBOPT_PUBLICATION) && strcmp(defel->defname, "publication") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_PUBLICATION)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *publications = defGetStringList(defel); - } else if (strcmp(defel->defname, "enabled") == 0 && enabled) { - if (*enabled_given) { + opts->specified_opts |= SUBOPT_PUBLICATION; + opts->publications = defGetStringList(defel); + } else if (IsSet(supported_opts, SUBOPT_ENABLED) && strcmp(defel->defname, "enabled") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_ENABLED)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *enabled_given = true; - *enabled = defGetBoolean(defel); - } else if (strcmp(defel->defname, "slot_name") == 0 && slot_name) { - if (*slot_name_given) { + opts->specified_opts |= SUBOPT_ENABLED; + opts->enabled = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && strcmp(defel->defname, "slot_name") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *slot_name_given = true; - *slot_name = defGetString(defel); + opts->specified_opts |= SUBOPT_SLOT_NAME; + opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ - if (strcmp(*slot_name, "none") == 0) { - *slot_name = NULL; + if (strcmp(opts->slot_name, "none") == 0) { + opts->slot_name = NULL; } else { - ReplicationSlotValidateName(*slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, ERROR); } - } else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit) { - if (*synchronous_commit) { + } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && + strcmp(defel->defname, "synchronous_commit") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *synchronous_commit = defGetString(defel); + opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts->synchronous_commit = defGetString(defel); /* Test if the given value is valid for synchronous_commit GUC. */ - (void)set_config_option("synchronous_commit", *synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, - false, 0, false); - } else if (strcmp(defel->defname, "binary") == 0 && binary) { - if (*binary_given) { + (void)set_config_option("synchronous_commit", opts->synchronous_commit, PGC_BACKEND, PGC_S_TEST, + GUC_ACTION_SET, false, 0, false); + } else if (IsSet(supported_opts, SUBOPT_BINARY) && strcmp(defel->defname, "binary") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_BINARY)) { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + } + + opts->specified_opts |= SUBOPT_BINARY; + opts->binary = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *binary_given = true; - *binary = defGetBoolean(defel); - } else if (strcmp(defel->defname, "copy_data") == 0 && copy_data) { - if (*copy_data_given) { + opts->specified_opts |= SUBOPT_COPY_DATA; + opts->copy_data = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_CONNECT) && strcmp(defel->defname, "connect") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_CONNECT)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *copy_data_given = true; - *copy_data = defGetBoolean(defel); - } else if (strcmp(defel->defname, "connect") == 0 && connect) { - if (*connect_given) { + opts->specified_opts = SUBOPT_CONNECT; + opts->connect = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_SKIPLSN) && strcmp(defel->defname, "skiplsn") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_SKIPLSN)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); } - *connect_given = true; - *connect = defGetBoolean(defel); + char *lsn_str = defGetString(defel); + opts->specified_opts = SUBOPT_SKIPLSN; + /* Setting lsn = 'NONE' is treated as resetting LSN */ + if (strcmp(lsn_str, "none") == 0) { + opts->skiplsn = InvalidXLogRecPtr; + } else { + /* Parse the argument as LSN */ + opts->skiplsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn_str))); + + if (XLogRecPtrIsInvalid(opts->skiplsn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid WAL location (LSN): %s", lsn_str))); + } } else { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname))); @@ -185,38 +228,39 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis * We've been explicitly asked to not connect, that requires some * additional processing. */ - if (connect && !*connect) { + if (IsSet(supported_opts, SUBOPT_CONNECT) && !opts->connect) { /* Check for incompatible options from the user. */ - if (*enabled_given && *enabled) + if (IsSet(opts->specified_opts, SUBOPT_ENABLED) && opts->enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "connect = false", "enabled = true"))); - if (*copy_data_given && *copy_data) + if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA) && opts->copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); /* Change the defaults of other options. */ - *enabled = false; - *copy_data = false; + opts->enabled = false; + opts->copy_data = false; } /* * Do additional checking for disallowed combination when * slot_name = NONE was used. */ - if (slot_name && *slot_name_given && !*slot_name) { - if (enabled && *enabled_given && *enabled) { + if (!opts->slot_name && IsSet(supported_opts, SUBOPT_SLOT_NAME) && IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) { + if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && IsSet(opts->specified_opts, SUBOPT_ENABLED)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("slot_name = NONE and enabled = true are mutually exclusive options"))); } - if (enabled && !*enabled_given && *enabled) { + if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && !IsSet(opts->specified_opts, SUBOPT_ENABLED)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("subscription with slot_name = NONE must also set enabled = false"))); } } - if (copy_data && *copy_data && u_sess->attr.attr_storage.max_sync_workers_per_subscription == 0) { + if (IsSet(supported_opts, SUBOPT_COPY_DATA) && IsSet(opts->specified_opts, SUBOPT_COPY_DATA) && + u_sess->attr.attr_storage.max_sync_workers_per_subscription == 0) { ereport(WARNING, (errmsg("you need to set max_sync_workers_per_subscription because it is zero but " "copy_data is true"))); } @@ -451,27 +495,20 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; - bool enabled_given = false; - bool enabled = true; - char *synchronous_commit; - char *slotname; - bool slotname_given; - bool binary; - bool binary_given; - bool copy_data; - bool copy_data_given; - bool connect; - bool connect_given; char originname[NAMEDATALEN]; List *publications; + bits32 supported_opts; + SubOpts opts = {0}; + opts.enabled = true; int rc; /* * Parse and check options. * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname, - &synchronous_commit, &binary_given, &binary, ©_data_given, ©_data, &connect_given, &connect); + supported_opts = (SUBOPT_ENABLED | SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | + SUBOPT_COPY_DATA | SUBOPT_CONNECT); + parse_subscription_options(stmt->options, supported_opts, &opts); /* * Since creating a replication slot is not transactional, rolling back @@ -479,7 +516,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * CREATE SUBSCRIPTION inside a transaction block if creating a * replication slot. */ - if (enabled) + if (opts.enabled) PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (enabled = true)"); if (!superuser()) @@ -495,8 +532,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) } /* The default for synchronous_commit of subscriptions is off. */ - if (synchronous_commit == NULL) { - synchronous_commit = "off"; + if (opts.synchronous_commit == NULL) { + opts.synchronous_commit = "off"; } publications = stmt->publication; @@ -513,28 +550,29 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(u_sess->proc_cxt.MyDatabaseId); values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); - values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); /* encrypt conninfo */ char *encryptConninfo = EncryptOrDecryptConninfo(stmt->conninfo, 'E'); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo); - if (enabled) { - if (!slotname_given) { - slotname = stmt->subname; + if (opts.enabled) { + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { + opts.slot_name = stmt->subname; } - values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slotname)); + values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); } else { - if (slotname_given && slotname) { + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name) { ereport(WARNING, (errmsg("When enabled=false, it is dangerous to set slot_name. " "This will cause wal log accumulation on the publisher, " "so slot_name will be forcibly set to NULL."))); } nulls[Anum_pg_subscription_subslotname - 1] = true; } - values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); + values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(opts.synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + values[Anum_pg_subscription_subskiplsn - 1] = LsnGetTextDatum(InvalidXLogRecPtr); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -553,8 +591,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connect to remote side to execute requested commands and fetch table * info. */ - if (connect) { - if (!AttemptConnectPublisher(encryptConninfo, slotname, true)) { + if (opts.connect) { + if (!AttemptConnectPublisher(encryptConninfo, opts.slot_name, true)) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher."))); } @@ -567,8 +605,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * If requested, create the replication slot on remote side for our * newly created subscription. */ - Assert(!enabled || slotname); - CreateSlotInPublisherAndInsertSubRel(slotname, subid, publications, ©_data, enabled); + Assert(!opts.enabled || opts.slot_name); + CreateSlotInPublisherAndInsertSubRel(opts.slot_name, subid, publications, &opts.copy_data, opts.enabled); (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); } else { @@ -582,7 +620,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) securec_check(rc, "", ""); /* Don't wake up logical replication launcher unnecessarily */ - if (enabled) { + if (opts.enabled) { ApplyLauncherWakeupAtCommit(); } @@ -793,17 +831,6 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) Datum values[Natts_pg_subscription]; HeapTuple tup; Oid subid; - bool enabled_given = false; - bool enabled = false; - bool binary_given = false; - bool binary = false; - char *synchronous_commit = NULL; - char *conninfo = NULL; - char *slot_name = NULL; - bool slotname_given = false; - bool copy_data = false; - bool copy_data_given = false; - List *publications = NIL; Subscription *sub = NULL; int rc; bool checkConn = false; @@ -812,6 +839,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool needFreeConninfo = false; char *finalSlotName = NULL; char *encryptConninfo = NULL; + bits32 supported_opts; + SubOpts opts = {0}; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -831,17 +860,18 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Lock the subscription so nobody else can do anything with it. */ LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); - enabled = sub->enabled; + opts.enabled = sub->enabled; finalSlotName = sub->name; encryptConninfo = sub->conninfo; /* Parse options. */ if (!stmt->refresh) { - parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given, - &slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL, NULL, NULL); + supported_opts = (SUBOPT_CONNINFO | SUBOPT_PUBLICATION | SUBOPT_ENABLED | SUBOPT_SLOT_NAME | + SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN); + parse_subscription_options(stmt->options, supported_opts, &opts); } else { - parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, - ©_data_given, ©_data, NULL, NULL); + supported_opts = SUBOPT_COPY_DATA; + parse_subscription_options(stmt->options, supported_opts, &opts); PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); } @@ -854,38 +884,39 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces)); securec_check(rc, "", ""); - if (enabled_given && enabled != sub->enabled) { - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + if (IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled != sub->enabled) { + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; } - if (conninfo) { + if (opts.conninfo) { /* Check the connection info string. */ - libpqrcv_check_conninfo(conninfo); - encryptConninfo = EncryptOrDecryptConninfo(conninfo, 'E'); - rc = memset_s(conninfo, strlen(conninfo), 0, strlen(conninfo)); + libpqrcv_check_conninfo(opts.conninfo); + encryptConninfo = EncryptOrDecryptConninfo(opts.conninfo, 'E'); + rc = memset_s(opts.conninfo, strlen(opts.conninfo), 0, strlen(opts.conninfo)); securec_check(rc, "\0", "\0"); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; needFreeConninfo = true; /* need to check whether new conninfo can be used to connect to new publisher */ - if (sub->enabled || (enabled_given && enabled)) { + if (sub->enabled || (IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled)) { checkConn = true; } } - if (slotname_given) { - if (sub->enabled && !slot_name) { + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { + if (sub->enabled && !opts.slot_name) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot set slot_name = NONE for enabled subscription"))); } /* change to non-null value */ - if (slot_name) { - if (sub->enabled || (enabled_given && enabled)) { - values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + if (opts.slot_name) { + if (sub->enabled || (IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled)) { + values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, + CStringGetDatum(opts.slot_name)); /* if old slotname is null or same as new slot name, then we need to validate the new slot name */ - validateSlot = sub->slotname == NULL || strcmp(slot_name, sub->slotname) != 0; - finalSlotName = slot_name; + validateSlot = sub->slotname == NULL || strcmp(opts.slot_name, sub->slotname) != 0; + finalSlotName = opts.slot_name; } else { ereport(ERROR, (errmsg("Currently enabled=false, cannot change slot_name to a non-null value."))); } @@ -896,30 +927,34 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) * when enable this subscription but slot_name is not specified, * it will be set to default value subname. */ - if (!sub->enabled && enabled_given && enabled) { + if (!sub->enabled && IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled) { values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(sub->name)); } else { nulls[Anum_pg_subscription_subslotname - 1] = true; } } replaces[Anum_pg_subscription_subslotname - 1] = true; - } else if (!sub->enabled && enabled_given && enabled) { + } else if (!sub->enabled && IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled) { values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(sub->name)); replaces[Anum_pg_subscription_subslotname - 1] = true; } - if (synchronous_commit) { - values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); + if (opts.synchronous_commit) { + values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(opts.synchronous_commit); replaces[Anum_pg_subscription_subsynccommit - 1] = true; } - if (binary_given) { - values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) { + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); replaces[Anum_pg_subscription_subbinary - 1] = true; } - if (publications != NIL) { - values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + if (opts.publications != NIL) { + values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(opts.publications); replaces[Anum_pg_subscription_subpublications - 1] = true; } else { - publications = sub->publications; + opts.publications = sub->publications; + } + if (IsSet(opts.specified_opts, SUBOPT_SKIPLSN)) { + values[Anum_pg_subscription_subskiplsn - 1] = LsnGetTextDatum(opts.skiplsn); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); @@ -938,8 +973,9 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* case 0: keep the subscription enabled, it's over here */ /* case 1: deactivating subscription */ - if (sub->enabled && !enabled) { - ereport(ERROR, (errmsg("If you want to deactivate this subscription, use DROP SUBSCRIPTION."))); + if (sub->enabled && !opts.enabled) { + ereport(WARNING, (errmsg("The subscription will be disabled, take care of the xlog would be accumulate " + "because the slot of subscription wouldn't be advanced"))); } /* case 2: keep the subscription active */ @@ -947,7 +983,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (!AttemptConnectPublisher(encryptConninfo, finalSlotName, true)) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg( "Failed to connect to publisher."))); } - if (!CheckPublicationsExistOnPublisher(publications)) { + if (!CheckPublicationsExistOnPublisher(opts.publications)) { (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); } @@ -958,7 +994,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) * enabling subscription, but slot hasn't been created, * then mark createSlot to true. */ - if (!sub->enabled && enabled && (!sub->slotname || !*(sub->slotname))) { + if (!sub->enabled && opts.enabled && (!sub->slotname || !*(sub->slotname))) { createSlot = true; } @@ -968,13 +1004,13 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) checkConn ? "The new conninfo cannot connect to new publisher." : "Failed to connect to publisher."))); } - if (!CheckPublicationsExistOnPublisher(publications)) { + if (!CheckPublicationsExistOnPublisher(opts.publications)) { (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); } if (createSlot) { - CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, publications, NULL, true); + CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, opts.publications, NULL, true); } /* no need to validate replication slot if the slot is created just by ourself */ @@ -984,6 +1020,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); ApplyLauncherWakeupAtCommit(); + } else if (!sub->enabled && opts.enabled) { + ApplyLauncherWakeupAtCommit(); } if (stmt->refresh) { @@ -991,7 +1029,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); } - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); } if (needFreeConninfo) { diff --git a/src/gausskernel/runtime/executor/execReplication.cpp b/src/gausskernel/runtime/executor/execReplication.cpp index 0f53e32210..9011ccee35 100644 --- a/src/gausskernel/runtime/executor/execReplication.cpp +++ b/src/gausskernel/runtime/executor/execReplication.cpp @@ -53,7 +53,8 @@ static bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, Tuple * This is not generic routine, it expects the idxrel to be replication * identity of a rel and meet all limitations associated with that. */ -static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot) +static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot, + EState *estate) { int attoff; bool isnull; @@ -66,6 +67,16 @@ static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel Assert(!isnull); opclass = (oidvector *)DatumGetPointer(indclassDatum); + List* expressionsState = NIL; + ExprContext* econtext = GetPerTupleExprContext(estate); + econtext->ecxt_scantuple = searchslot; + ListCell* indexpr_item = NULL; + + if (idxrel->rd_indexprs != NIL) { + expressionsState = ExecPrepareExprList(idxrel->rd_indexprs, estate); + indexpr_item = list_head(expressionsState); + } + /* Build scankey for every attribute in the index. */ for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++) { Oid op; @@ -92,14 +103,34 @@ static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel regop = get_opcode(op); - /* Initialize the scankey. */ - ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, searchslot->tts_values[mainattno - 1]); - skey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; + if (mainattno != 0) { + /* Initialize the scankey. */ + ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, searchslot->tts_values[mainattno - 1]); + skey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; - /* Check for null value. */ - if (searchslot->tts_isnull[mainattno - 1]) { - hasnulls = true; - skey[attoff].sk_flags |= SK_ISNULL; + /* Check for null value. */ + if (searchslot->tts_isnull[mainattno - 1]) { + hasnulls = true; + skey[attoff].sk_flags |= SK_ISNULL; + } + } else { + if (idxrel->rd_indexprs == NIL) { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("wrong number of index expressions"))); + } else { + bool isNull = false; + Datum datum = ExecEvalExprSwitchContext((ExprState*)lfirst(indexpr_item), econtext, &isNull, NULL); + indexpr_item = lnext(indexpr_item); + + ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, datum); + skey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; + + /* Check for null value. */ + if (isNull) { + hasnulls = true; + skey[attoff].sk_flags |= SK_ISNULL; + } + } } } @@ -307,7 +338,7 @@ static bool RelationFindReplTupleByIndex(EState *estate, Relation rel, Relation scan->isUpsert = true; /* Build scan key. */ - build_replindex_scan_key(skey, targetRel, idxrel, searchslot); + build_replindex_scan_key(skey, targetRel, idxrel, searchslot, estate); while (true) { found = false; diff --git a/src/gausskernel/storage/replication/logical/parallel_decode.cpp b/src/gausskernel/storage/replication/logical/parallel_decode.cpp index 2246bafbe9..f041823778 100644 --- a/src/gausskernel/storage/replication/logical/parallel_decode.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode.cpp @@ -72,13 +72,23 @@ ParallelReorderBufferTXN *ParallelReorderBufferGetOldestTXN(ParallelReorderBuffe return txn; } -void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld) +void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld, + bool printOid) { if ((tuple->tupTableType == HEAP_TUPLE) && (HEAP_TUPLE_IS_COMPRESSED(tuple->t_data) || (int)HeapTupleHeaderGetNatts(tuple->t_data, tupdesc) > tupdesc->natts)) { return; } + if (printOid) { + Oid oid; + + /* print oid of tuple, it's not included in the TupleDesc */ + if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid) { + appendStringInfo(s, " oid[oid]:%u", oid); + } + } + /* print all columns individually */ for (int natt = 0; natt < tupdesc->natts; natt++) { Form_pg_attribute attr; /* the attribute itself */ @@ -90,6 +100,12 @@ void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, Hea attr = &tupdesc->attrs[natt]; + /* + * Don't print dropped columns, we can't be sure everything is + * available for them. + * Don't print system columns, oid will already have been printed if + * present. + */ if (attr->attisdropped || attr->attnum < 0 || (isOld && !IsRelationReplidentKey(relation, attr->attnum))) continue; diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index 9e754cf009..1705754f87 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -115,6 +115,10 @@ static void ApplyWorkerProcessMsg(char type, StringInfo s, XLogRecPtr *lastRcv); static void apply_dispatch(StringInfo s); static void apply_handle_conninfo(StringInfo s); static void UpdateConninfo(char* standbysInfo); +static Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot, + FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot = NULL); +static void IsSkippingChanges(XLogRecPtr finish_lsn); +static void StopSkippingChanges(); /* * Should this worker apply changes for given relation. @@ -478,6 +482,8 @@ static void apply_handle_begin(StringInfo s) t_thrd.applyworker_cxt.remoteFinalLsn = begin_data.final_lsn; t_thrd.applyworker_cxt.curRemoteCsn = begin_data.csn; + IsSkippingChanges(begin_data.final_lsn); + pgstat_report_activity(STATE_RUNNING, NULL); } @@ -510,6 +516,10 @@ static void apply_handle_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); + if (t_thrd.applyworker_cxt.isSkipTransaction) { + StopSkippingChanges(); + } + pgstat_report_activity(STATE_IDLE, NULL); } @@ -570,6 +580,69 @@ static Oid GetRelationIdentityOrPK(Relation rel) return idxoid; } +/* + * Find the tuple in a table using any unique index and returns the conflicting + * index's oid, if any conflict found. + * + * *originslot* contains the old tuple during UPDATE, if conflict with it which + * to be updated, ignore it. + */ +Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot, + FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot) +{ + Oid replidxoid = InvalidOid; + bool found = false; + ResultRelInfo* relinfo = estate->es_result_relation_info; + + /* Check the replica identity index first */ + replidxoid = RelationGetReplicaIndex(relinfo->ri_RelationDesc); + found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot, + localslot, fakeRelInfo); + if (found) { + if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc, + localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc, + originslot->tts_tuple)) == 0) { + /* If conflict with the tuple to be updated, ignore it. */ + found = false; + } else { + return replidxoid; + } + } + + for (int i = 0; i < relinfo->ri_NumIndices; i++) { + IndexInfo *ii = relinfo->ri_IndexRelationInfo[i]; + Relation idxrel; + Oid idxoid = InvalidOid; + + if (!ii->ii_Unique) { + continue; + } + + idxrel = relinfo->ri_IndexRelationDescs[i]; + idxoid = RelationGetRelid(idxrel); + + if (idxoid == replidxoid) { + continue; + } + + found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, idxoid, LockTupleExclusive, remoteslot, + localslot, fakeRelInfo); + + if (found) { + if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc, + localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc, + originslot->tts_tuple)) == 0) { + /* If conflict with the tuple to be updated, ignore it. */ + found = false; + } else { + return idxoid; + } + } + } + + return InvalidOid; +} + /* * Handle INSERT message. */ @@ -582,6 +655,13 @@ static void apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; FakeRelationPartition fakeRelInfo; + TupleTableSlot *localslot; + EPQState epqstate; + Oid conflictIndexOid = InvalidOid; + + if (t_thrd.applyworker_cxt.isSkipTransaction) { + return; + } ensure_transaction(); @@ -600,6 +680,8 @@ static void apply_handle_insert(StringInfo s) estate = create_estate_for_relation(rel); remoteslot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops); ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); + localslot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops); + ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel)); /* Input functions may need an active snapshot, so get one */ PushActiveSnapshot(GetTransactionSnapshot()); @@ -611,11 +693,62 @@ static void apply_handle_insert(StringInfo s) ExecOpenIndices(estate->es_result_relation_info, false); - /* Get fake relation and partition for patitioned table */ - GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo); + if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) { + StringInfoData localtup, remotetup; + initStringInfo(&localtup); + tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel), + (HeapTuple)localslot->tts_tuple, false); + + initStringInfo(&remotetup); + tuple_to_stringinfo(rel->localrel, &remotetup, RelationGetDescr(rel->localrel), + (HeapTuple)tableam_tslot_get_tuple_from_slot(rel->localrel, remoteslot), false); + + switch (u_sess->attr.attr_storage.subscription_conflict_resolution) { + case RESOLVE_ERROR: + ereport(ERROR, (errmsg("CONFLICT: remote insert on relation %s (local index %s). Resolution: error.", + RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)), + errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data, + remotetup.data, t_thrd.applyworker_cxt.curWorker->subid, + (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT), + (uint32)t_thrd.applyworker_cxt.remoteFinalLsn))); + break; + case RESOLVE_APPLY_REMOTE: + ereport(LOG, (errmsg("CONFLICT: remote insert on relation %s (local index %s). " + "Resolution: apply_remote.", + RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)), + errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data, + remotetup.data, t_thrd.applyworker_cxt.curWorker->subid, + (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT), + (uint32)t_thrd.applyworker_cxt.remoteFinalLsn))); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + EvalPlanQualSetSlot(&epqstate, remoteslot); + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo); + + EvalPlanQualEnd(&epqstate); + break; + case RESOLVE_KEEP_LOCAL: + ereport(LOG, (errmsg("CONFLICT: remote insert on relation %s (local index %s). " + "Resolution: keep_local.", + RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)), + errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data, + remotetup.data, t_thrd.applyworker_cxt.curWorker->subid, + (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT), + (uint32)t_thrd.applyworker_cxt.remoteFinalLsn))); + break; + default: + ereport(ERROR, (errmsg("wrong parameter value for subscription_conflict_resolution"))); + break; + } + FreeStringInfo(&localtup); + FreeStringInfo(&remotetup); + } else { + /* Get fake relation and partition for patitioned table */ + GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo); - /* Do the insert. */ - ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo); + /* Do the insert. */ + ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo); + } /* Cleanup. */ ExecCloseIndices(estate->es_result_relation_info); @@ -701,10 +834,16 @@ static void apply_handle_update(StringInfo s) bool has_oldtup; TupleTableSlot *localslot; TupleTableSlot *remoteslot; + TupleTableSlot *conflictLocalSlot; RangeTblEntry *target_rte = NULL; bool found = false; MemoryContext oldctx; FakeRelationPartition fakeRelInfo; + Oid conflictIndexOid = InvalidOid; + + if (t_thrd.applyworker_cxt.isSkipTransaction) { + return; + } ensure_transaction(); @@ -728,6 +867,8 @@ static void apply_handle_update(StringInfo s) ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); localslot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops); ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel)); + conflictLocalSlot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops); + ExecSetSlotDescriptor(conflictLocalSlot, RelationGetDescr(rel->localrel)); EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); /* @@ -783,10 +924,64 @@ static void apply_handle_update(StringInfo s) slot_modify_data(remoteslot, localslot, rel, &newtup); MemoryContextSwitchTo(oldctx); - EvalPlanQualSetSlot(&epqstate, remoteslot); + if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot)) + != InvalidOid) { + StringInfoData localtup, remotetup; + initStringInfo(&localtup); + tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel), + (HeapTuple)conflictLocalSlot->tts_tuple, false); + + initStringInfo(&remotetup); + tuple_to_stringinfo(rel->localrel, &remotetup, RelationGetDescr(rel->localrel), + (HeapTuple)tableam_tslot_get_tuple_from_slot(rel->localrel, remoteslot), false); + + switch (u_sess->attr.attr_storage.subscription_conflict_resolution) { + case RESOLVE_ERROR: + ereport(ERROR, (errmsg("CONFLICT: remote update on relation %s (local index %s). " + "Resolution: error.", + RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)), + errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data, + remotetup.data, t_thrd.applyworker_cxt.curWorker->subid, + (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT), + (uint32)t_thrd.applyworker_cxt.remoteFinalLsn))); + break; + case RESOLVE_APPLY_REMOTE: + ereport(LOG, (errmsg("CONFLICT: remote update on relation %s (local index %s). " + "Resolution: apply_remote.", + RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)), + errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data, + remotetup.data, t_thrd.applyworker_cxt.curWorker->subid, + (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT), + (uint32)t_thrd.applyworker_cxt.remoteFinalLsn))); + /* first delete the conflict tuple */ + EvalPlanQualSetSlot(&epqstate, conflictLocalSlot); + ExecSimpleRelationDelete(estate, &epqstate, conflictLocalSlot, &fakeRelInfo); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo); + break; + case RESOLVE_KEEP_LOCAL: + ereport(LOG, (errmsg("CONFLICT: remote update on relation %s (local index %s). " + "Resolution: keep_local.", + RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)), + errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data, + remotetup.data, t_thrd.applyworker_cxt.curWorker->subid, + (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT), + (uint32)t_thrd.applyworker_cxt.remoteFinalLsn))); + break; + default: + ereport(ERROR, (errmsg("wrong parameter value for subscription_conflict_resolution"))); + break; + } + FreeStringInfo(&localtup); + FreeStringInfo(&remotetup); + } else { + EvalPlanQualSetSlot(&epqstate, remoteslot); - /* Do the actual update. */ - ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo); + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo); + } } else { /* * The tuple to be updated could not be found. @@ -824,6 +1019,10 @@ static void apply_handle_delete(StringInfo s) MemoryContext oldctx; FakeRelationPartition fakeRelInfo; + if (t_thrd.applyworker_cxt.isSkipTransaction) { + return; + } + ensure_transaction(); relid = logicalrep_read_delete(s, &oldtup); @@ -1828,3 +2027,78 @@ bool IsLogicalWorker(void) { return t_thrd.applyworker_cxt.curWorker != NULL; } + +/* + * Start skipping changes of the transaction if the given LSN matches the + * LSN specified by subscription's skiplsn. + */ +static void IsSkippingChanges(XLogRecPtr finish_lsn) +{ + /* + * Quick return if it's not requested to skip this transaction. This + * function is called for every remote transaction and we assume that + * skipping the transaction is not used often. + */ + if (likely(XLogRecPtrIsInvalid(t_thrd.applyworker_cxt.mySubscription->skiplsn) || + t_thrd.applyworker_cxt.mySubscription->skiplsn != finish_lsn)) { + return; + } + + t_thrd.applyworker_cxt.isSkipTransaction = true; +} + +static void StopSkippingChanges() +{ + t_thrd.applyworker_cxt.isSkipTransaction = false; + + /* + * Quick return if it's not requested to skip this transaction. This + * function is called for every remote transaction and we assume that + * skipping the transaction is not used often. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + } + + HeapTuple tup; + Relation rel; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + errno_t rc = 0; + + /* + * Protect subskiplsn of pg_subscription from being concurrently updated + * while clearing it. + */ + LockSharedObject(SubscriptionRelationId, t_thrd.applyworker_cxt.mySubscription->oid, 0, AccessShareLock); + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(t_thrd.applyworker_cxt.mySubscription->oid)); + + if (!HeapTupleIsValid(tup)) { + ereport(ERROR, (errmsg("subscription \"%s\" does not exist", t_thrd.applyworker_cxt.mySubscription->name))); + } + + rc = memset_s(values, sizeof(values),0, sizeof(values)); + securec_check_c(rc, "\0", "\0"); + rc = memset_s(nulls, sizeof(nulls),false, sizeof(nulls)); + securec_check_c(rc, "\0", "\0"); + rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces)); + securec_check_c(rc, "\0", "\0"); + + /* reset subskiplsn */ + values[Anum_pg_subscription_subskiplsn - 1] = LsnGetTextDatum(InvalidXLogRecPtr); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); + /* Update the catalog. */ + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + + heap_freetuple(tup); + heap_close(rel, NoLock); + + CommitTransactionCommand(); +} diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 6765cd38cc..8b79927fdf 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -52,13 +52,15 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE text subpublications[1]; /* List of publications subscribed to */ bool subbinary; /* True if the subscription wants the * publisher to send data in binary */ + text subskiplsn; /* All changes finished at this LSN are + * skipped */ #endif } FormData_pg_subscription; typedef FormData_pg_subscription *Form_pg_subscription; -#define Natts_pg_subscription 9 +#define Natts_pg_subscription 10 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 @@ -68,6 +70,7 @@ typedef FormData_pg_subscription *Form_pg_subscription; #define Anum_pg_subscription_subsynccommit 7 #define Anum_pg_subscription_subpublications 8 #define Anum_pg_subscription_subbinary 9 +#define Anum_pg_subscription_subskiplsn 10 typedef struct Subscription { @@ -81,6 +84,8 @@ typedef struct Subscription { char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ bool binary; /* Indicates if the subscription wants data in binary format */ + XLogRecPtr skiplsn; /* All changes finished at this LSN are + * skipped */ } Subscription; @@ -91,6 +96,8 @@ extern char *get_subscription_name(Oid subid, bool missing_ok); extern int CountDBSubscriptions(Oid dbid); extern void ClearListContent(List *list); +extern Datum LsnGetTextDatum(XLogRecPtr lsn); +extern XLogRecPtr TextDatumGetLsn(Datum datum); #endif /* PG_SUBSCRIPTION_H */ diff --git a/src/include/knl/knl_guc/knl_session_attr_storage.h b/src/include/knl/knl_guc/knl_session_attr_storage.h index 96d0d72ffc..af7a7e4303 100755 --- a/src/include/knl/knl_guc/knl_session_attr_storage.h +++ b/src/include/knl/knl_guc/knl_session_attr_storage.h @@ -266,6 +266,7 @@ typedef struct knl_session_attr_storage { int logical_sender_timeout; int ignore_standby_lsn_window; int ignore_feedback_xmin_window; + int subscription_conflict_resolution; } knl_session_attr_storage; #endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_STORAGE */ diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 61c629b060..3a5b4aebd3 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3336,6 +3336,7 @@ typedef struct knl_t_apply_worker_context { List *tableStates; XLogRecPtr remoteFinalLsn; CommitSeqNo curRemoteCsn; + bool isSkipTransaction; } knl_t_apply_worker_context; typedef struct knl_t_publication_context { diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index fb09647b07..015552c675 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -358,5 +358,7 @@ extern void FreeLogicalLog(ParallelReorderBuffer *rb, logicalLog *logChange, int extern bool LogicalDecodeParseOptionsDefault(const char* defaultStr, void **options); extern DecodeOptionsDefault* LogicalDecodeGetOptionsDefault(); template void LogicalDecodeReportLostChanges(const T *iterstate); +extern void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld, + bool printOid = false); #endif diff --git a/src/include/replication/replicainternal.h b/src/include/replication/replicainternal.h index b582b44e6f..924601b2b4 100755 --- a/src/include/replication/replicainternal.h +++ b/src/include/replication/replicainternal.h @@ -214,6 +214,13 @@ typedef enum replauthmode{ REPL_AUTH_UUID /* uuid auth */ } ReplAuthMode; +typedef enum +{ + RESOLVE_ERROR, + RESOLVE_APPLY_REMOTE, + RESOLVE_KEEP_LOCAL +} PGLogicalResolveOption; + extern bool data_catchup; extern bool wal_catchup; extern BuildMode build_mode; diff --git a/src/test/regress/input/subscription.source b/src/test/regress/input/subscription.source index 30cd7a96b2..3083dc5b6e 100644 --- a/src/test/regress/input/subscription.source +++ b/src/test/regress/input/subscription.source @@ -70,6 +70,14 @@ ALTER SUBSCRIPTION testsub owner to regress_subscription_user2; -- alter subbinary to true ALTER SUBSCRIPTION testsub SET (binary=true); select subname, subbinary from pg_subscription where subname='testsub'; +-- set subskiplsn +ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF'); +select subname, subskiplsn from pg_subscription where subname='testsub'; +ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH'); +ALTER SUBSCRIPTION testsub SET (skiplsn = 'none'); +select subname, subskiplsn from pg_subscription where subname='testsub'; +-- disable test +ALTER SUBSCRIPTION testsub DISABLE; --rename ALTER SUBSCRIPTION testsub rename to testsub_rename; --- inside a transaction block diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index a65da8a14e..21280f9c65 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -650,6 +650,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c stats_temp_directory | string | | | stream_cluster_run_mode | enum | | | string_hash_compatible | bool | | | + subscription_conflict_resolution | enum | | | support_batch_bind | bool | | | support_extended_features | bool | | | sync_config_strategy | enum | | | diff --git a/src/test/regress/output/subscription.source b/src/test/regress/output/subscription.source index 7ef7842a12..24fe8ecaab 100644 --- a/src/test/regress/output/subscription.source +++ b/src/test/regress/output/subscription.source @@ -19,6 +19,7 @@ ALTER SUBSCRIPTION name CONNECTION 'conninfo' ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name ENABLE +ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) ALTER SUBSCRIPTION name OWNER TO new_owner ALTER SUBSCRIPTION name RENAME TO new_name @@ -153,6 +154,25 @@ select subname, subbinary from pg_subscription where subname='testsub'; testsub | t (1 row) +-- set subskiplsn +ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF'); +select subname, subskiplsn from pg_subscription where subname='testsub'; + subname | subskiplsn +---------+------------ + testsub | 0/ABCDEF +(1 row) + +ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH'); +ERROR: invalid input syntax for type pg_lsn: "0/ABCDEFGH" +ALTER SUBSCRIPTION testsub SET (skiplsn = 'none'); +select subname, subskiplsn from pg_subscription where subname='testsub'; + subname | subskiplsn +---------+------------ + testsub | 0/0 +(1 row) + +-- disable test +ALTER SUBSCRIPTION testsub DISABLE; --rename ALTER SUBSCRIPTION testsub rename to testsub_rename; --- inside a transaction block @@ -337,12 +357,15 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1 testsub | ALTER SUBSCRIPTION testsub SET (synchronous_commit=on); testsub | ALTER SUBSCRIPTION testsub owner to regress_subscription_user2; testsub | ALTER SUBSCRIPTION testsub SET (binary=true); + testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF'); + testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = 'none'); + testsub | ALTER SUBSCRIPTION testsub DISABLE; testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename; sub_len_999 | CREATE SUBSCRIPTION sub_leninsert_only WITH (connect = false); testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename; testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo; sub_len_999 | DROP SUBSCRIPTION IF EXISTS sub_len_999; -(17 rows) +(20 rows) --clear audit log SELECT pg_delete_audit('1012-11-10', '3012-11-11'); diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index 6913c6c8d4..52ee04318f 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -10,4 +10,6 @@ sync encoding ddl matviews -change_wal_level \ No newline at end of file +change_wal_level +skiplsn +disable \ No newline at end of file diff --git a/src/test/subscription/testcase/disable.sh b/src/test/subscription/testcase/disable.sh new file mode 100644 index 0000000000..1797690029 --- /dev/null +++ b/src/test/subscription/testcase/disable.sh @@ -0,0 +1,65 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="disable_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $pub_node1_port "insert into tab_rep values (1,1)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "alter subscription tap_sub disable" + + exec_sql $case_db $pub_node1_port "insert into tab_rep values (2,2)" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "1|1" ]; then + echo "check data not sync after disable subscription success" + else + echo "$failed_keyword when check data not sync after disable subscription" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "alter subscription tap_sub enable" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "1|1 +2|2" ]; then + echo "check data not sync after enable subscription success" + else + echo "$failed_keyword when check data not sync after enable subscription" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/skiplsn.sh b/src/test/subscription/testcase/skiplsn.sh new file mode 100644 index 0000000000..669e640fc7 --- /dev/null +++ b/src/test/subscription/testcase/skiplsn.sh @@ -0,0 +1,69 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="skiplsn_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $sub_node1_port "insert into tab_rep values (1,1)" + + logfile=$(get_log_file "sub_datanode1") + + location=$(awk 'END{print NR}' $logfile) + + exec_sql $case_db $pub_node1_port "insert into tab_rep values (1,2)" + + content=$(tail -n +$location $logfile) + commitlsn=$(expr "$content" : '.*commit_lsn:\s\([0-9|/|ABCDEF]*\).*') + + while [ -z $commitlsn ] + do + content=$(tail -n +$location $logfile) + commitlsn=$(expr "$content" : '.*commit_lsn:\s\([0-9|/|ABCDEF]*\).*') + done + + exec_sql $case_db $sub_node1_port "alter subscription tap_sub set (skiplsn = '$commitlsn')" + + exec_sql $case_db $pub_node1_port "insert into tab_rep values (2, 2)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "1|1 +2|2" ]; then + echo "check data sync after skip conflict success" + else + echo "$failed_keyword when check data sync after skip conflict" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file -- Gitee From fb651b6de64529b2c2625355424461753368d6a6 Mon Sep 17 00:00:00 2001 From: he-shaoyu Date: Thu, 17 Aug 2023 16:56:13 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test/subscription/schedule | 3 +- .../subscription/testcase/pub_subconflict.sh | 147 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/test/subscription/testcase/pub_subconflict.sh diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index 52ee04318f..836ac0e58b 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -12,4 +12,5 @@ ddl matviews change_wal_level skiplsn -disable \ No newline at end of file +disable +pub_subconflict \ No newline at end of file diff --git a/src/test/subscription/testcase/pub_subconflict.sh b/src/test/subscription/testcase/pub_subconflict.sh new file mode 100644 index 0000000000..ee40c6b9d8 --- /dev/null +++ b/src/test/subscription/testcase/pub_subconflict.sh @@ -0,0 +1,147 @@ + +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="conflict_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + # => keep_local + exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = keep_local" + + # Setup structure on subscriber + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_keep (a int primary key, b char)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_keep (a int primary key, b char)" + + # Setup logical replication + echo "create publication and subscription. (=> keep_local)" + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub1 FOR TABLE tab_keep" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub1" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $sub_node1_port "INSERT INTO tab_keep VALUES(1, 'a')" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_keep VALUES(1, 'b')" + + # Wait for catchup + wait_for_catchup $case_db $pub_node1_port "tap_sub1" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_keep where b = 'a'")" = "1" ]; then + echo "check pub_sub conflict for 1st sub success" + else + echo "$failed_keyword when check pub_sub conflict for 1st sub" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub1" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub1" + + # => apply_remote + echo "create publication and subscription. (=> apply_remote)" + exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = apply_remote" + + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_apply (a int primary key, b char)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_apply (a int primary key, b char)" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub2 FOR TABLE tab_apply" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub2" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $sub_node1_port "INSERT INTO tab_apply VALUES(1, 'k')" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_apply VALUES(1, 'a')" + + # Wait for catchup + wait_for_catchup $case_db $pub_node1_port "tap_sub2" + + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_apply where b = 'a'")" = "1" ]; then + echo "check pub_sub conflict for 2nd sub success" + else + echo "$failed_keyword when check pub_sub conflict for 2nd sub" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub2" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub2" + + # update apply_remote + echo "create publication and subscription. (update apply_remote)" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_updateApply (a int primary key, b char)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_updateApply (a int primary key, b char)" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub3 FOR TABLE tab_updateApply" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub3 CONNECTION '$publisher_connstr' PUBLICATION tap_pub3" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_updateApply VALUES(1, 'a')" + + # Wait for catchup + wait_for_catchup $case_db $pub_node1_port "tap_sub3" + + exec_sql $case_db $sub_node1_port "INSERT INTO tab_updateApply VALUES(2, 'b')" + exec_sql $case_db $pub_node1_port "UPDATE tab_updateApply SET a = 2 where a = 1" + + # Wait for catchup + wait_for_catchup $case_db $pub_node1_port "tap_sub3" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_updateApply")" = "2|a" ]; then + echo "check pub_sub conflict for 3rd sub success" + else + echo "$failed_keyword when check pub_sub conflict for 3rd sub" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub3" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub3" + + #unique conflict + echo "create publication and subscription. (unique conflict)" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_unique (a int primary key, b char, c int unique)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_unique (a int primary key, b char, c int unique)" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub4 FOR TABLE tab_unique" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub4 CONNECTION '$publisher_connstr' PUBLICATION tap_pub4" + + exec_sql $case_db $sub_node1_port "INSERT INTO tab_unique VALUES (1, 'a', 1), (2, 'c', 2), (3, 'c', 3)" + + # Wait for catchup + wait_for_catchup $case_db $pub_node1_port "tap_sub4" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_unique VALUES(3, 'k', 4)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub4" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_unique VALUES(1, 'b', 2)" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_unique")" = "1|a|1 +2|c|2 +3|k|4" ]; then + echo "check pub_sub conflict for 4th sub success" + else + echo "$failed_keyword when check pub_sub conflict for 4th sub" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub4" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub4" +} + +function tear_down(){ + exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file -- Gitee From b06cb282a6b87dea1d008a49ab45ac021827171a Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Fri, 18 Aug 2023 15:17:39 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=A3=80=E8=A7=86=E6=84=8F=E8=A7=81?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../optimizer/commands/subscriptioncmds.cpp | 4 +- .../process/threadpool/knl_instance.cpp | 2 + .../storage/replication/logical/launcher.cpp | 4 + .../storage/replication/logical/worker.cpp | 94 +++++++++++++++---- src/include/knl/knl_instance.h | 3 + src/include/replication/worker_internal.h | 2 + 6 files changed, 88 insertions(+), 21 deletions(-) diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index aa52f2ebe0..e4da27801b 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -195,7 +195,7 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte errmsg("conflicting or redundant options"))); } - opts->specified_opts = SUBOPT_CONNECT; + opts->specified_opts |= SUBOPT_CONNECT; opts->connect = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SKIPLSN) && strcmp(defel->defname, "skiplsn") == 0) { if (IsSet(opts->specified_opts, SUBOPT_SKIPLSN)) { @@ -205,7 +205,7 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte } char *lsn_str = defGetString(defel); - opts->specified_opts = SUBOPT_SKIPLSN; + opts->specified_opts |= SUBOPT_SKIPLSN; /* Setting lsn = 'NONE' is treated as resetting LSN */ if (strcmp(lsn_str, "none") == 0) { opts->skiplsn = InvalidXLogRecPtr; diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 1b010de53a..165f93de0b 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -991,6 +991,8 @@ void knl_instance_init() for (int i = 0; i < DB_CMPT_MAX; i++) { pthread_mutex_init(&g_instance.loadPluginLock[i], NULL); } + g_instance.needCheckConflictSubIds = NIL; + pthread_mutex_init(&g_instance.subIdsLock, NULL); #endif knl_g_datadir_init(&g_instance.datadir_cxt); diff --git a/src/gausskernel/storage/replication/logical/launcher.cpp b/src/gausskernel/storage/replication/logical/launcher.cpp index 9bffe943b9..1f136a7fa6 100644 --- a/src/gausskernel/storage/replication/logical/launcher.cpp +++ b/src/gausskernel/storage/replication/logical/launcher.cpp @@ -286,6 +286,10 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid user TIMESTAMP_NOBEGIN(worker->reply_time); worker->workerLaunchTime = GetCurrentTimestamp(); + pthread_mutex_lock(&g_instance.subIdsLock); + worker->needCheckConflict = list_member_oid(g_instance.needCheckConflictSubIds, subid); + pthread_mutex_unlock(&g_instance.subIdsLock); + t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = worker; LWLockRelease(LogicalRepWorkerLock); diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index 1705754f87..796d6c63c3 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -516,6 +516,17 @@ static void apply_handle_commit(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); + + if (t_thrd.applyworker_cxt.curWorker->needCheckConflict) { + t_thrd.applyworker_cxt.curWorker->needCheckConflict = false; + MemoryContext oldctx = MemoryContextSwitchTo(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT)); + pthread_mutex_lock(&g_instance.subIdsLock); + g_instance.needCheckConflictSubIds = list_delete_oid(g_instance.needCheckConflictSubIds, + t_thrd.applyworker_cxt.curWorker->subid); + pthread_mutex_unlock(&g_instance.subIdsLock); + MemoryContextSwitchTo(oldctx); + } + if (t_thrd.applyworker_cxt.isSkipTransaction) { StopSkippingChanges(); } @@ -596,16 +607,19 @@ Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSl /* Check the replica identity index first */ replidxoid = RelationGetReplicaIndex(relinfo->ri_RelationDesc); - found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot, - localslot, fakeRelInfo); - if (found) { - if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc, - localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc, - originslot->tts_tuple)) == 0) { - /* If conflict with the tuple to be updated, ignore it. */ - found = false; - } else { - return replidxoid; + if (OidIsValid(replidxoid)) { + found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot, + localslot, fakeRelInfo); + + if (found) { + if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc, + localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc, + originslot->tts_tuple)) == 0) { + /* If conflict with the tuple to be updated, ignore it. */ + found = false; + } else { + return replidxoid; + } } } @@ -693,7 +707,8 @@ static void apply_handle_insert(StringInfo s) ExecOpenIndices(estate->es_result_relation_info, false); - if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) { + if (t_thrd.applyworker_cxt.curWorker->needCheckConflict && + (conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) { StringInfoData localtup, remotetup; initStringInfo(&localtup); tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel), @@ -743,11 +758,31 @@ static void apply_handle_insert(StringInfo s) FreeStringInfo(&localtup); FreeStringInfo(&remotetup); } else { - /* Get fake relation and partition for patitioned table */ - GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo); + PG_TRY(); + { + /* Get fake relation and partition for patitioned table */ + GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo); - /* Do the insert. */ - ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo); + /* Do the insert. */ + ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo); + } + PG_CATCH(); + { + ErrorData* errdata = NULL; + + (void*)MemoryContextSwitchTo(oldctx); + errdata = CopyErrorData(); + if (errdata->sqlerrcode == ERRCODE_UNIQUE_VIOLATION) { + (void*)MemoryContextSwitchTo(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT)); + pthread_mutex_lock(&g_instance.subIdsLock); + g_instance.needCheckConflictSubIds = lappend_oid(g_instance.needCheckConflictSubIds, + t_thrd.applyworker_cxt.curWorker->subid); + pthread_mutex_unlock(&g_instance.subIdsLock); + (void*)MemoryContextSwitchTo(oldctx); + } + PG_RE_THROW(); + } + PG_END_TRY(); } /* Cleanup. */ @@ -924,7 +959,8 @@ static void apply_handle_update(StringInfo s) slot_modify_data(remoteslot, localslot, rel, &newtup); MemoryContextSwitchTo(oldctx); - if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot)) + if (t_thrd.applyworker_cxt.curWorker->needCheckConflict && + (conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot)) != InvalidOid) { StringInfoData localtup, remotetup; initStringInfo(&localtup); @@ -977,10 +1013,30 @@ static void apply_handle_update(StringInfo s) FreeStringInfo(&localtup); FreeStringInfo(&remotetup); } else { - EvalPlanQualSetSlot(&epqstate, remoteslot); + PG_TRY(); + { + EvalPlanQualSetSlot(&epqstate, remoteslot); - /* Do the actual update. */ - ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo); + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo); + } + PG_CATCH(); + { + ErrorData* errdata = NULL; + + (void*)MemoryContextSwitchTo(oldctx); + errdata = CopyErrorData(); + if (errdata->sqlerrcode == ERRCODE_UNIQUE_VIOLATION) { + (void*)MemoryContextSwitchTo(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT)); + pthread_mutex_lock(&g_instance.subIdsLock); + g_instance.needCheckConflictSubIds = lappend_oid(g_instance.needCheckConflictSubIds, + t_thrd.applyworker_cxt.curWorker->subid); + pthread_mutex_unlock(&g_instance.subIdsLock); + (void*)MemoryContextSwitchTo(oldctx); + } + PG_RE_THROW(); + } + PG_END_TRY(); } } else { /* diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index c3096bb848..0aac937044 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -1341,6 +1341,9 @@ typedef struct knl_instance_context { void *raw_parser_hook[DB_CMPT_MAX]; char *llvmIrFilePath[DB_CMPT_MAX]; pthread_mutex_t loadPluginLock[DB_CMPT_MAX]; + + List* needCheckConflictSubIds; + pthread_mutex_t subIdsLock; #endif pg_atomic_uint32 extensionNum; knl_g_audit_context audit_cxt; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4642c343e3..7ac83f729d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -46,6 +46,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + bool needCheckConflict; } LogicalRepWorker; typedef struct ApplyLauncherShmStruct { -- Gitee From 27b7a765d7958e0e3eaac93a411543123cb247d1 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Sat, 26 Aug 2023 15:52:31 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E5=9F=BA=E7=A1=80=E6=95=B0=E6=8D=AE=E5=A4=8D?= =?UTF-8?q?=E5=88=B6=E6=97=B6=E5=BE=80=E9=9D=9E=E7=A9=BA=E5=88=97=E4=BC=A0?= =?UTF-8?q?=E7=A9=BA=E5=80=BC=E7=9A=84core=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/replication/logical/tablesync.cpp | 10 +++- src/test/subscription/schedule | 3 +- src/test/subscription/testcase/bugs.sh | 59 +++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/gausskernel/storage/replication/logical/tablesync.cpp b/src/gausskernel/storage/replication/logical/tablesync.cpp index 11484ae5d2..5ea58c2e3a 100644 --- a/src/gausskernel/storage/replication/logical/tablesync.cpp +++ b/src/gausskernel/storage/replication/logical/tablesync.cpp @@ -776,7 +776,15 @@ static void copy_table(Relation rel) /* Create CopyState for ingestion of the data from publisher. */ attnamelist = make_copy_attnamelist(relmapentry); - cstate = BeginCopyFrom(rel, NULL, attnamelist, NIL, &mem_info, (const char*)cmd.data, copy_read_data); + cstate = BeginCopyFrom(rel, NULL, attnamelist, NIL, &mem_info, NULL, copy_read_data); + + RangeTblEntry *rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->requiredPerms = ACL_SELECT; + + cstate->range_table = list_make1(rte); /* Do the copy */ (void)CopyFrom(cstate); diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index 836ac0e58b..0635e09413 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -13,4 +13,5 @@ matviews change_wal_level skiplsn disable -pub_subconflict \ No newline at end of file +pub_subconflict +bugs \ No newline at end of file diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh index e69de29bb2..ec35e54733 100644 --- a/src/test/subscription/testcase/bugs.sh +++ b/src/test/subscription/testcase/bugs.sh @@ -0,0 +1,59 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="bugs_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b text)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (1)" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key, b text not null default 0)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + logfile=$(get_log_file "sub_datanode1") + + location=$(awk 'END{print NR}' $logfile) + + content=$(tail -n +$location $logfile) + targetstr=$(expr "$content" : '.*\(Failing row contains\).*') + + attempt=0 + while [ -z "$targetstr" ] + do + content=$(tail -n +$location $logfile) + targetstr=$(expr "$content" : '.*\(Failing row contains\).*') + attempt=`expr $attempt \+ 1` + + sleep 1 + if [ $attempt -eq 5 ]; then + echo "$failed_keyword when check failing row log" + exit 1 + fi + done + + echo "check failing row log success" +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file -- Gitee From 8a6b40b5a1c512be1a8589477429c1771ae1ce32 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Fri, 1 Sep 2023 11:48:50 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dskiplsn=E5=81=B6=E7=8E=B0?= =?UTF-8?q?=E5=A4=B1=E6=95=88&&=E5=88=86=E5=8C=BA=E8=A1=A8=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=86=B2=E7=AA=81=E5=9C=BA=E6=99=AF=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E6=9C=AA=E9=87=8A=E6=94=BE=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runtime/executor/execReplication.cpp | 2 +- .../storage/replication/logical/worker.cpp | 48 ++++++---- src/test/subscription/testcase/bugs.sh | 90 +++++++++++++++++++ 3 files changed, 123 insertions(+), 17 deletions(-) diff --git a/src/gausskernel/runtime/executor/execReplication.cpp b/src/gausskernel/runtime/executor/execReplication.cpp index 9011ccee35..f07a5d8a9e 100644 --- a/src/gausskernel/runtime/executor/execReplication.cpp +++ b/src/gausskernel/runtime/executor/execReplication.cpp @@ -73,7 +73,7 @@ static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel ListCell* indexpr_item = NULL; if (idxrel->rd_indexprs != NIL) { - expressionsState = ExecPrepareExprList(idxrel->rd_indexprs, estate); + expressionsState = (List*)ExecPrepareExpr((Expr*)idxrel->rd_indexprs, estate); indexpr_item = list_head(expressionsState); } diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index 796d6c63c3..d9f1f137ad 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -116,7 +116,7 @@ static void apply_dispatch(StringInfo s); static void apply_handle_conninfo(StringInfo s); static void UpdateConninfo(char* standbysInfo); static Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot, - FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot = NULL); + TupleTableSlot *originslot = NULL); static void IsSkippingChanges(XLogRecPtr finish_lsn); static void StopSkippingChanges(); @@ -599,17 +599,26 @@ static Oid GetRelationIdentityOrPK(Relation rel) * to be updated, ignore it. */ Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot, - FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot) + TupleTableSlot *originslot) { Oid replidxoid = InvalidOid; bool found = false; ResultRelInfo* relinfo = estate->es_result_relation_info; + FakeRelationPartition fakeRelInfo; /* Check the replica identity index first */ replidxoid = RelationGetReplicaIndex(relinfo->ri_RelationDesc); if (OidIsValid(replidxoid)) { found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot, - localslot, fakeRelInfo); + localslot, &fakeRelInfo); + + /* Cleanup. */ + if (fakeRelInfo.needRleaseDummyRel && fakeRelInfo.partRel) { + releaseDummyRelation(&fakeRelInfo.partRel); + } + if (fakeRelInfo.partList) { + releasePartitionList(relinfo->ri_RelationDesc, &fakeRelInfo.partList, NoLock); + } if (found) { if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc, @@ -640,7 +649,15 @@ Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSl } found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, idxoid, LockTupleExclusive, remoteslot, - localslot, fakeRelInfo); + localslot, &fakeRelInfo); + + /* Cleanup. */ + if (fakeRelInfo.needRleaseDummyRel && fakeRelInfo.partRel) { + releaseDummyRelation(&fakeRelInfo.partRel); + } + if (fakeRelInfo.partList) { + releasePartitionList(relinfo->ri_RelationDesc, &fakeRelInfo.partList, NoLock); + } if (found) { if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc, @@ -673,12 +690,12 @@ static void apply_handle_insert(StringInfo s) EPQState epqstate; Oid conflictIndexOid = InvalidOid; + ensure_transaction(); + if (t_thrd.applyworker_cxt.isSkipTransaction) { return; } - ensure_transaction(); - relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); if (!should_apply_changes_for_rel(rel)) { @@ -707,8 +724,11 @@ static void apply_handle_insert(StringInfo s) ExecOpenIndices(estate->es_result_relation_info, false); + /* Get fake relation and partition for patitioned table */ + GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo); + if (t_thrd.applyworker_cxt.curWorker->needCheckConflict && - (conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) { + (conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot)) != InvalidOid) { StringInfoData localtup, remotetup; initStringInfo(&localtup); tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel), @@ -760,9 +780,6 @@ static void apply_handle_insert(StringInfo s) } else { PG_TRY(); { - /* Get fake relation and partition for patitioned table */ - GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo); - /* Do the insert. */ ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo); } @@ -876,12 +893,12 @@ static void apply_handle_update(StringInfo s) FakeRelationPartition fakeRelInfo; Oid conflictIndexOid = InvalidOid; + ensure_transaction(); + if (t_thrd.applyworker_cxt.isSkipTransaction) { return; } - ensure_transaction(); - relid = logicalrep_read_update(s, &has_oldtup, &oldtup, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); if (!should_apply_changes_for_rel(rel)) { @@ -960,8 +977,7 @@ static void apply_handle_update(StringInfo s) MemoryContextSwitchTo(oldctx); if (t_thrd.applyworker_cxt.curWorker->needCheckConflict && - (conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot)) - != InvalidOid) { + (conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, localslot)) != InvalidOid) { StringInfoData localtup, remotetup; initStringInfo(&localtup); tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel), @@ -1075,12 +1091,12 @@ static void apply_handle_delete(StringInfo s) MemoryContext oldctx; FakeRelationPartition fakeRelInfo; + ensure_transaction(); + if (t_thrd.applyworker_cxt.isSkipTransaction) { return; } - ensure_transaction(); - relid = logicalrep_read_delete(s, &oldtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); if (!should_apply_changes_for_rel(rel)) { diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh index ec35e54733..5eaa094916 100644 --- a/src/test/subscription/testcase/bugs.sh +++ b/src/test/subscription/testcase/bugs.sh @@ -8,6 +8,8 @@ function test_1() { echo "create database and tables." exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + # BUG1: coredump when apply null value into not null column. # Create some preexisting content on publisher exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b text)" exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (1)" @@ -43,6 +45,94 @@ function test_1() { done echo "check failing row log success" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;TRUNCATE TABLE tab_rep" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;TRUNCATE TABLE tab_rep" + + # BUG2: skiplsn does not work occasionally + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub with (copy_data=false); ALTER SUBSCRIPTION tap_sub DISABLE" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (1, 'pub1'); INSERT INTO tab_rep VALUES (2, 'sub2');" + dumpfile=$(exec_sql $case_db $pub_node1_port "select gs_xlogdump_xid(xmin) from tab_rep where a = 1;") + skiplsn=$(grep 'start_lsn' $dumpfile | sed -n '5p' | awk '{print $2}') + + exec_sql $case_db $sub_node1_port "alter subscription tap_sub set (skiplsn = '$skiplsn')" + exec_sql $case_db $sub_node1_port "alter subscription tap_sub enable" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "2|sub2" ]; then + echo "check data skip success" + else + echo "$failed_keyword when check data skip" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE tab_rep" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE tab_rep" + + # BUG3: partition relation not closed when handle conflict + # Create partition table + ddl=" +create table t_pubsub_0349( + id int primary key constraint id_nn not null, + use_filename varchar(20), + filename varchar2(255) +)partition by range(id)( + partition p1 values less than(30), + partition p2 values less than(60), + partition p3 values less than(90), + partition p4 values less than(maxvalue));" + exec_sql $case_db $pub_node1_port "$ddl" + exec_sql $case_db $sub_node1_port "$ddl" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = apply_remote" + + exec_sql $case_db $sub_node1_port "INSERT INTO t_pubsub_0349 VALUES (1, 'a', 'a');" + exec_sql $case_db $pub_node1_port "INSERT INTO t_pubsub_0349 VALUES (1, 'a', 'c');" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM t_pubsub_0349")" = "1|a|c" ]; then + echo "check insert conflict handle success" + else + echo "$failed_keyword when check insert conflict handle" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "INSERT INTO t_pubsub_0349 VALUES (2, 'a', 'a');" + exec_sql $case_db $pub_node1_port "UPDATE t_pubsub_0349 SET id = 2 WHERE id = 1;" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM t_pubsub_0349")" = "2|a|c" ]; then + echo "check update conflict handle success" + else + echo "$failed_keyword when check update conflict handle" + exit 1 + fi + + logfile=$(get_log_file "sub_datanode1") + leakstr=$(grep 'partcache reference leak' $logfile -m 1) + if [ -z "$leakstr" ]; then + echo "check relation close success" + else + echo "$failed_keyword when check relation close" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error" } function tear_down() { -- Gitee