From 2a152270f1db5d711143d85395f0afb0bae69fd1 Mon Sep 17 00:00:00 2001
From: chenxiaobin19 <1025221611@qq.com>
Date: Tue, 15 Aug 2023 10:45:36 +0800
Subject: [PATCH 1/5] =?UTF-8?q?=E5=8F=91=E5=B8=83=E8=AE=A2=E9=98=85?=
=?UTF-8?q?=E6=94=AF=E6=8C=81=E5=86=B2=E7=AA=81=E5=A4=84=E7=90=86?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
contrib/test_decoding/test_decoding.cpp | 80 -----
doc/src/sgml/ref/alter_subscription.sgmlin | 11 +
src/bin/gs_guc/cluster_guc.conf | 1 +
.../backend/catalog/pg_subscription.cpp | 12 +-
src/common/backend/parser/gram.y | 10 +
.../backend/utils/misc/guc/guc_storage.cpp | 19 +
.../optimizer/commands/subscriptioncmds.cpp | 340 ++++++++++--------
.../runtime/executor/execReplication.cpp | 49 ++-
.../replication/logical/parallel_decode.cpp | 18 +-
.../storage/replication/logical/worker.cpp | 288 ++++++++++++++-
src/include/catalog/pg_subscription.h | 9 +-
.../knl/knl_guc/knl_session_attr_storage.h | 1 +
src/include/knl/knl_thread.h | 1 +
src/include/replication/logical.h | 2 +
src/include/replication/replicainternal.h | 7 +
src/test/regress/input/subscription.source | 8 +
.../regress/output/recovery_2pc_tools.source | 1 +
src/test/regress/output/subscription.source | 25 +-
src/test/subscription/schedule | 4 +-
src/test/subscription/testcase/disable.sh | 65 ++++
src/test/subscription/testcase/skiplsn.sh | 69 ++++
21 files changed, 767 insertions(+), 253 deletions(-)
create mode 100644 src/test/subscription/testcase/disable.sh
create mode 100644 src/test/subscription/testcase/skiplsn.sh
diff --git a/contrib/test_decoding/test_decoding.cpp b/contrib/test_decoding/test_decoding.cpp
index 8f5b377b7b..76175dd9b5 100644
--- a/contrib/test_decoding/test_decoding.cpp
+++ b/contrib/test_decoding/test_decoding.cpp
@@ -199,87 +199,7 @@ static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id)
return true;
return false;
}
-static void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld)
-{
- if ((tuple->tupTableType == HEAP_TUPLE) && (HEAP_TUPLE_IS_COMPRESSED(tuple->t_data) ||
- (int)HeapTupleHeaderGetNatts(tuple->t_data, tupdesc) > tupdesc->natts)) {
- return;
- }
-
- Oid oid;
- /* print oid of tuple, it's not included in the TupleDesc */
- if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid) {
- appendStringInfo(s, " oid[oid]:%u", oid);
- }
-
- /* print all columns individually */
- for (int natt = 0; natt < tupdesc->natts; natt++) {
- Form_pg_attribute attr; /* the attribute itself */
- Oid typoutput; /* output function */
- bool typisvarlena = false;
- Datum origval; /* possibly toasted Datum */
- bool isnull = true; /* column is null? */
-
- attr = &tupdesc->attrs[natt];
-
- /*
- * don't print dropped columns, we can't be sure everything is
- * available for them
- */
- if (attr->attisdropped)
- continue;
-
- /*
- * Don't print system columns, oid will already have been printed if
- * present.
- */
- if (attr->attnum < 0 || (isOld && !IsRelationReplidentKey(relation, attr->attnum)))
- continue;
-
- Oid typid = attr->atttypid; /* type of current attribute */
-
- /* get Datum from tuple */
- if (tuple->tupTableType == HEAP_TUPLE) {
- origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
- } else {
- origval = uheap_getattr((UHeapTuple)tuple, natt + 1, tupdesc, &isnull);
- }
-
- /* print attribute name */
- appendStringInfoChar(s, ' ');
- appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
-
- /* print attribute type */
- appendStringInfoChar(s, '[');
- char* type_name = format_type_be(typid);
- if (strlen(type_name) == strlen("clob") && strncmp(type_name, "clob", strlen("clob")) == 0) {
- errno_t rc = strcpy_s(type_name, sizeof("clob"), "text");
- securec_check_c(rc, "\0", "\0");
- }
- appendStringInfoString(s, type_name);
- appendStringInfoChar(s, ']');
-
- /* query output function */
- getTypeOutputInfo(typid, &typoutput, &typisvarlena);
-
- /* print separator */
- appendStringInfoChar(s, ':');
-
- /* print data */
- if (isnull)
- appendStringInfoString(s, "null");
- else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK_B(origval))
- appendStringInfoString(s, "unchanged-toast-datum");
- else if (!typisvarlena)
- PrintLiteral(s, typid, OidOutputFunctionCall(typoutput, origval));
- else {
- Datum val; /* definitely detoasted Datum */
- val = PointerGetDatum(PG_DETOAST_DATUM(origval));
- PrintLiteral(s, typid, OidOutputFunctionCall(typoutput, val));
- }
- }
-}
/*
* callback for individual changed tuples
*/
diff --git a/doc/src/sgml/ref/alter_subscription.sgmlin b/doc/src/sgml/ref/alter_subscription.sgmlin
index 2c4e4d168a..6ad2fe06ca 100644
--- a/doc/src/sgml/ref/alter_subscription.sgmlin
+++ b/doc/src/sgml/ref/alter_subscription.sgmlin
@@ -25,6 +25,7 @@ ALTER SUBSCRIPTION name CONNECTION
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
+ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] )
ALTER SUBSCRIPTION name OWNER TO new_owner
ALTER SUBSCRIPTION name RENAME TO new_name
@@ -134,6 +135,16 @@ ALTER SUBSCRIPTION name RENAME TO <
+
+ DISABLE
+
+
+ Disables a running subscription, stopping the logical replication
+ worker at the end of the transaction.
+
+
+
+
SET ( subscription_parameter [= value] [, ... ] )
diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf
index 6df9ad913d..fcb6c7ec0b 100755
--- a/src/bin/gs_guc/cluster_guc.conf
+++ b/src/bin/gs_guc/cluster_guc.conf
@@ -744,6 +744,7 @@ enable_remote_excute|bool|0,0|NULL|NULL|
light_comm|bool|0,0|NULL|NULL|
ignore_standby_lsn_window|int|0,2147483647|ms|NULL|
ignore_feedback_xmin_window|int|0,2147483647|ms|NULL|
+subscription_conflict_resolution|enum|error,apply_remote,keep_local|NULL|NULL|
[cmserver]
log_dir|string|0,0|NULL|NULL|
log_file_size|int|0,2047|MB|NULL|
diff --git a/src/common/backend/catalog/pg_subscription.cpp b/src/common/backend/catalog/pg_subscription.cpp
index 86b7dea51a..5f075ff2dd 100644
--- a/src/common/backend/catalog/pg_subscription.cpp
+++ b/src/common/backend/catalog/pg_subscription.cpp
@@ -103,6 +103,14 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
sub->binary = DatumGetBool(datum);
}
+ /* Get skiplsn */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subskiplsn, &isnull);
+ if (unlikely(isnull)) {
+ sub->skiplsn = InvalidXLogRecPtr;
+ } else {
+ sub->skiplsn = TextDatumGetLsn(datum);
+ }
+
ReleaseSysCache(tup);
return sub;
@@ -237,7 +245,7 @@ static List *textarray_to_stringlist(ArrayType *textarray)
return res;
}
-static Datum LsnGetTextDatum(XLogRecPtr lsn)
+Datum LsnGetTextDatum(XLogRecPtr lsn)
{
char clsn[MAXFNAMELEN];
int ret = snprintf_s(clsn, sizeof(clsn), sizeof(clsn) - 1, "%X/%X", (uint32)(lsn >> 32), (uint32)lsn);
@@ -246,7 +254,7 @@ static Datum LsnGetTextDatum(XLogRecPtr lsn)
return CStringGetTextDatum(clsn);
}
-static XLogRecPtr TextDatumGetLsn(Datum datum)
+XLogRecPtr TextDatumGetLsn(Datum datum)
{
XLogRecPtr lsn;
uint32 lsn_hi;
diff --git a/src/common/backend/parser/gram.y b/src/common/backend/parser/gram.y
index 86590164f7..e1ab544eb6 100644
--- a/src/common/backend/parser/gram.y
+++ b/src/common/backend/parser/gram.y
@@ -18574,6 +18574,16 @@ AlterSubscriptionStmt:
n->options = list_make1(makeDefElem("enabled",
(Node *)makeInteger(TRUE)));
$$ = (Node *)n;
+ }
+ | ALTER SUBSCRIPTION name DISABLE_P
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->refresh = false;
+ n->subname = $3;
+ n->options = list_make1(makeDefElem("enabled",
+ (Node *)makeInteger(FALSE)));
+ $$ = (Node *)n;
} ;
/*****************************************************************************
diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp
index 22ffc8493d..54f09363f4 100755
--- a/src/common/backend/utils/misc/guc/guc_storage.cpp
+++ b/src/common/backend/utils/misc/guc/guc_storage.cpp
@@ -287,6 +287,13 @@ static const struct config_enum_entry repl_auth_mode_options[] = {
{NULL, 0, false}
};
+static const struct config_enum_entry ConflictResolvers[] = {
+ {"error", RESOLVE_ERROR, false},
+ {"apply_remote", RESOLVE_APPLY_REMOTE, false},
+ {"keep_local", RESOLVE_KEEP_LOCAL, false},
+ {NULL, 0, false}
+};
+
/*
* Although only "on", "off", "remote_write", and "local" are documented, we
* accept all the likely variants of "on" and "off".
@@ -4679,6 +4686,18 @@ static void InitStorageConfigureNamesEnum()
NULL,
NULL,
NULL},
+ {{"subscription_conflict_resolution",
+ PGC_SIGHUP,
+ NODE_SINGLENODE,
+ REPLICATION,
+ gettext_noop("Sets method used for conflict resolution for resolvable conflicts."),
+ NULL},
+ &u_sess->attr.attr_storage.subscription_conflict_resolution,
+ RESOLVE_ERROR,
+ ConflictResolvers,
+ NULL,
+ NULL,
+ NULL},
#ifndef ENABLE_MULTIPLE_NODES
{{"dcf_log_file_permission",
PGC_POSTMASTER,
diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp
index eea9de4dcd..aa52f2ebe0 100644
--- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp
+++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp
@@ -45,10 +45,46 @@
#include "utils/syscache.h"
#include "utils/array.h"
#include "utils/acl.h"
+#include "utils/pg_lsn.h"
#include "access/tableam.h"
#include "libpq/libpq-fe.h"
#include "replication/slot.h"
+/*
+ * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
+ * command.
+ */
+#define SUBOPT_CONNINFO 0x00000001
+#define SUBOPT_PUBLICATION 0x00000002
+#define SUBOPT_ENABLED 0x00000004
+#define SUBOPT_SLOT_NAME 0x00000008
+#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000010
+#define SUBOPT_BINARY 0x00000020
+#define SUBOPT_COPY_DATA 0x00000040
+#define SUBOPT_CONNECT 0x00000080
+#define SUBOPT_SKIPLSN 0x00000100
+
+/* check if the 'val' has 'bits' set */
+#define IsSet(val, bits) (((val) & (bits)) == (bits))
+
+/*
+ * Structure to hold a bitmap representing the user-provided CREATE/ALTER
+ * SUBSCRIPTION command options and the parsed/default values of each of them.
+ */
+typedef struct SubOpts
+{
+ bits32 specified_opts;
+ char *conninfo;
+ List *publications;
+ bool enabled;
+ char *slot_name;
+ char *synchronous_commit;
+ bool binary;
+ bool copy_data;
+ bool connect;
+ XLogRecPtr skiplsn;
+} SubOpts;
+
static bool ConnectPublisher(char* conninfo, char* slotname);
static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications,
bool *copy_data, bool create_slot);
@@ -61,120 +97,127 @@ static bool CheckPublicationsExistOnPublisher(List *publications);
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
*
* Since not all options can be specified in both commands, this function
- * will report an error on options if the target output pointer is NULL to
- * accommodate that.
+ * will report an error mutually exclusive options are specified.
+ *
+ * Caller is expected to have cleared 'opts'.
*/
-static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given,
- bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary,
- bool *copy_data_given, bool *copy_data, bool *connect_given, bool *connect)
+static void parse_subscription_options(const List *stmt_options, bits32 supported_opts, SubOpts *opts)
{
ListCell *lc;
- if (conninfo) {
- *conninfo = NULL;
- }
- if (publications) {
- *publications = NIL;
- }
- if (enabled) {
- *enabled_given = false;
- }
- if (slot_name) {
- *slot_name_given = false;
- *slot_name = NULL;
- }
- if (synchronous_commit) {
- *synchronous_commit = NULL;
- }
- if (binary) {
- *binary_given = false;
- *binary = false;
- }
+ /* caller must expect some option */
+ Assert(supported_opts != 0);
- if (copy_data) {
- *copy_data_given = false;
- *copy_data = true;
+ /* Set default values for the boolean supported options. */
+ if (IsSet(supported_opts, SUBOPT_BINARY)) {
+ opts->binary = false;
}
-
- if (connect) {
- *connect_given = false;
- *connect = true;
+ if (IsSet(supported_opts, SUBOPT_COPY_DATA)) {
+ opts->copy_data = true;
+ }
+ if (IsSet(supported_opts, SUBOPT_CONNECT)) {
+ opts->connect = true;
}
/* Parse options */
- foreach (lc, options) {
+ foreach (lc, stmt_options) {
DefElem *defel = (DefElem *)lfirst(lc);
- if (strcmp(defel->defname, "conninfo") == 0 && conninfo) {
- if (*conninfo) {
+ if (IsSet(supported_opts, SUBOPT_CONNINFO) && strcmp(defel->defname, "conninfo") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_CONNINFO)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
}
- *conninfo = defGetString(defel);
- } else if (strcmp(defel->defname, "publication") == 0 && publications) {
- if (*publications) {
+ opts->specified_opts |= SUBOPT_CONNINFO;
+ opts->conninfo = defGetString(defel);
+ } else if (IsSet(supported_opts, SUBOPT_PUBLICATION) && strcmp(defel->defname, "publication") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_PUBLICATION)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
}
- *publications = defGetStringList(defel);
- } else if (strcmp(defel->defname, "enabled") == 0 && enabled) {
- if (*enabled_given) {
+ opts->specified_opts |= SUBOPT_PUBLICATION;
+ opts->publications = defGetStringList(defel);
+ } else if (IsSet(supported_opts, SUBOPT_ENABLED) && strcmp(defel->defname, "enabled") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_ENABLED)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
}
- *enabled_given = true;
- *enabled = defGetBoolean(defel);
- } else if (strcmp(defel->defname, "slot_name") == 0 && slot_name) {
- if (*slot_name_given) {
+ opts->specified_opts |= SUBOPT_ENABLED;
+ opts->enabled = defGetBoolean(defel);
+ } else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && strcmp(defel->defname, "slot_name") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
}
- *slot_name_given = true;
- *slot_name = defGetString(defel);
+ opts->specified_opts |= SUBOPT_SLOT_NAME;
+ opts->slot_name = defGetString(defel);
/* Setting slot_name = NONE is treated as no slot name. */
- if (strcmp(*slot_name, "none") == 0) {
- *slot_name = NULL;
+ if (strcmp(opts->slot_name, "none") == 0) {
+ opts->slot_name = NULL;
} else {
- ReplicationSlotValidateName(*slot_name, ERROR);
+ ReplicationSlotValidateName(opts->slot_name, ERROR);
}
- } else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit) {
- if (*synchronous_commit) {
+ } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
+ strcmp(defel->defname, "synchronous_commit") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
}
- *synchronous_commit = defGetString(defel);
+ opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+ opts->synchronous_commit = defGetString(defel);
/* Test if the given value is valid for synchronous_commit GUC. */
- (void)set_config_option("synchronous_commit", *synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
- false, 0, false);
- } else if (strcmp(defel->defname, "binary") == 0 && binary) {
- if (*binary_given) {
+ (void)set_config_option("synchronous_commit", opts->synchronous_commit, PGC_BACKEND, PGC_S_TEST,
+ GUC_ACTION_SET, false, 0, false);
+ } else if (IsSet(supported_opts, SUBOPT_BINARY) && strcmp(defel->defname, "binary") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_BINARY)) {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ }
+
+ opts->specified_opts |= SUBOPT_BINARY;
+ opts->binary = defGetBoolean(defel);
+ } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
}
- *binary_given = true;
- *binary = defGetBoolean(defel);
- } else if (strcmp(defel->defname, "copy_data") == 0 && copy_data) {
- if (*copy_data_given) {
+ opts->specified_opts |= SUBOPT_COPY_DATA;
+ opts->copy_data = defGetBoolean(defel);
+ } else if (IsSet(supported_opts, SUBOPT_CONNECT) && strcmp(defel->defname, "connect") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_CONNECT)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
}
- *copy_data_given = true;
- *copy_data = defGetBoolean(defel);
- } else if (strcmp(defel->defname, "connect") == 0 && connect) {
- if (*connect_given) {
+ opts->specified_opts = SUBOPT_CONNECT;
+ opts->connect = defGetBoolean(defel);
+ } else if (IsSet(supported_opts, SUBOPT_SKIPLSN) && strcmp(defel->defname, "skiplsn") == 0) {
+ if (IsSet(opts->specified_opts, SUBOPT_SKIPLSN)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
}
- *connect_given = true;
- *connect = defGetBoolean(defel);
+ char *lsn_str = defGetString(defel);
+ opts->specified_opts = SUBOPT_SKIPLSN;
+ /* Setting lsn = 'NONE' is treated as resetting LSN */
+ if (strcmp(lsn_str, "none") == 0) {
+ opts->skiplsn = InvalidXLogRecPtr;
+ } else {
+ /* Parse the argument as LSN */
+ opts->skiplsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn_str)));
+
+ if (XLogRecPtrIsInvalid(opts->skiplsn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid WAL location (LSN): %s", lsn_str)));
+ }
} else {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname)));
@@ -185,38 +228,39 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
* We've been explicitly asked to not connect, that requires some
* additional processing.
*/
- if (connect && !*connect) {
+ if (IsSet(supported_opts, SUBOPT_CONNECT) && !opts->connect) {
/* Check for incompatible options from the user. */
- if (*enabled_given && *enabled)
+ if (IsSet(opts->specified_opts, SUBOPT_ENABLED) && opts->enabled)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options",
"connect = false", "enabled = true")));
- if (*copy_data_given && *copy_data)
+ if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA) && opts->copy_data)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options",
"connect = false", "copy_data = true")));
/* Change the defaults of other options. */
- *enabled = false;
- *copy_data = false;
+ opts->enabled = false;
+ opts->copy_data = false;
}
/*
* Do additional checking for disallowed combination when
* slot_name = NONE was used.
*/
- if (slot_name && *slot_name_given && !*slot_name) {
- if (enabled && *enabled_given && *enabled) {
+ if (!opts->slot_name && IsSet(supported_opts, SUBOPT_SLOT_NAME) && IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) {
+ if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && IsSet(opts->specified_opts, SUBOPT_ENABLED)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
}
- if (enabled && !*enabled_given && *enabled) {
+ if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && !IsSet(opts->specified_opts, SUBOPT_ENABLED)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("subscription with slot_name = NONE must also set enabled = false")));
}
}
- if (copy_data && *copy_data && u_sess->attr.attr_storage.max_sync_workers_per_subscription == 0) {
+ if (IsSet(supported_opts, SUBOPT_COPY_DATA) && IsSet(opts->specified_opts, SUBOPT_COPY_DATA) &&
+ u_sess->attr.attr_storage.max_sync_workers_per_subscription == 0) {
ereport(WARNING, (errmsg("you need to set max_sync_workers_per_subscription because it is zero but "
"copy_data is true")));
}
@@ -451,27 +495,20 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
- bool enabled_given = false;
- bool enabled = true;
- char *synchronous_commit;
- char *slotname;
- bool slotname_given;
- bool binary;
- bool binary_given;
- bool copy_data;
- bool copy_data_given;
- bool connect;
- bool connect_given;
char originname[NAMEDATALEN];
List *publications;
+ bits32 supported_opts;
+ SubOpts opts = {0};
+ opts.enabled = true;
int rc;
/*
* Parse and check options.
* Connection and publication should not be specified here.
*/
- parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname,
- &synchronous_commit, &binary_given, &binary, ©_data_given, ©_data, &connect_given, &connect);
+ supported_opts = (SUBOPT_ENABLED | SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+ SUBOPT_COPY_DATA | SUBOPT_CONNECT);
+ parse_subscription_options(stmt->options, supported_opts, &opts);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -479,7 +516,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* CREATE SUBSCRIPTION inside a transaction block if creating a
* replication slot.
*/
- if (enabled)
+ if (opts.enabled)
PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (enabled = true)");
if (!superuser())
@@ -495,8 +532,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
}
/* The default for synchronous_commit of subscriptions is off. */
- if (synchronous_commit == NULL) {
- synchronous_commit = "off";
+ if (opts.synchronous_commit == NULL) {
+ opts.synchronous_commit = "off";
}
publications = stmt->publication;
@@ -513,28 +550,29 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(u_sess->proc_cxt.MyDatabaseId);
values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
- values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
- values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
+ values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+ values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
/* encrypt conninfo */
char *encryptConninfo = EncryptOrDecryptConninfo(stmt->conninfo, 'E');
values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo);
- if (enabled) {
- if (!slotname_given) {
- slotname = stmt->subname;
+ if (opts.enabled) {
+ if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) {
+ opts.slot_name = stmt->subname;
}
- values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
} else {
- if (slotname_given && slotname) {
+ if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name) {
ereport(WARNING, (errmsg("When enabled=false, it is dangerous to set slot_name. "
"This will cause wal log accumulation on the publisher, "
"so slot_name will be forcibly set to NULL.")));
}
nulls[Anum_pg_subscription_subslotname - 1] = true;
}
- values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit);
+ values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(opts.synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications);
+ values[Anum_pg_subscription_subskiplsn - 1] = LsnGetTextDatum(InvalidXLogRecPtr);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -553,8 +591,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connect to remote side to execute requested commands and fetch table
* info.
*/
- if (connect) {
- if (!AttemptConnectPublisher(encryptConninfo, slotname, true)) {
+ if (opts.connect) {
+ if (!AttemptConnectPublisher(encryptConninfo, opts.slot_name, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher.")));
}
@@ -567,8 +605,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* If requested, create the replication slot on remote side for our
* newly created subscription.
*/
- Assert(!enabled || slotname);
- CreateSlotInPublisherAndInsertSubRel(slotname, subid, publications, ©_data, enabled);
+ Assert(!opts.enabled || opts.slot_name);
+ CreateSlotInPublisherAndInsertSubRel(opts.slot_name, subid, publications, &opts.copy_data, opts.enabled);
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
} else {
@@ -582,7 +620,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
securec_check(rc, "", "");
/* Don't wake up logical replication launcher unnecessarily */
- if (enabled) {
+ if (opts.enabled) {
ApplyLauncherWakeupAtCommit();
}
@@ -793,17 +831,6 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
Datum values[Natts_pg_subscription];
HeapTuple tup;
Oid subid;
- bool enabled_given = false;
- bool enabled = false;
- bool binary_given = false;
- bool binary = false;
- char *synchronous_commit = NULL;
- char *conninfo = NULL;
- char *slot_name = NULL;
- bool slotname_given = false;
- bool copy_data = false;
- bool copy_data_given = false;
- List *publications = NIL;
Subscription *sub = NULL;
int rc;
bool checkConn = false;
@@ -812,6 +839,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
bool needFreeConninfo = false;
char *finalSlotName = NULL;
char *encryptConninfo = NULL;
+ bits32 supported_opts;
+ SubOpts opts = {0};
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
@@ -831,17 +860,18 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
/* Lock the subscription so nobody else can do anything with it. */
LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
- enabled = sub->enabled;
+ opts.enabled = sub->enabled;
finalSlotName = sub->name;
encryptConninfo = sub->conninfo;
/* Parse options. */
if (!stmt->refresh) {
- parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given,
- &slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL, NULL, NULL);
+ supported_opts = (SUBOPT_CONNINFO | SUBOPT_PUBLICATION | SUBOPT_ENABLED | SUBOPT_SLOT_NAME |
+ SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN);
+ parse_subscription_options(stmt->options, supported_opts, &opts);
} else {
- parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
- ©_data_given, ©_data, NULL, NULL);
+ supported_opts = SUBOPT_COPY_DATA;
+ parse_subscription_options(stmt->options, supported_opts, &opts);
PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
}
@@ -854,38 +884,39 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "", "");
- if (enabled_given && enabled != sub->enabled) {
- values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+ if (IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled != sub->enabled) {
+ values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
}
- if (conninfo) {
+ if (opts.conninfo) {
/* Check the connection info string. */
- libpqrcv_check_conninfo(conninfo);
- encryptConninfo = EncryptOrDecryptConninfo(conninfo, 'E');
- rc = memset_s(conninfo, strlen(conninfo), 0, strlen(conninfo));
+ libpqrcv_check_conninfo(opts.conninfo);
+ encryptConninfo = EncryptOrDecryptConninfo(opts.conninfo, 'E');
+ rc = memset_s(opts.conninfo, strlen(opts.conninfo), 0, strlen(opts.conninfo));
securec_check(rc, "\0", "\0");
values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(encryptConninfo);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
needFreeConninfo = true;
/* need to check whether new conninfo can be used to connect to new publisher */
- if (sub->enabled || (enabled_given && enabled)) {
+ if (sub->enabled || (IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled)) {
checkConn = true;
}
}
- if (slotname_given) {
- if (sub->enabled && !slot_name) {
+ if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) {
+ if (sub->enabled && !opts.slot_name) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot set slot_name = NONE for enabled subscription")));
}
/* change to non-null value */
- if (slot_name) {
- if (sub->enabled || (enabled_given && enabled)) {
- values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slot_name));
+ if (opts.slot_name) {
+ if (sub->enabled || (IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled)) {
+ values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein,
+ CStringGetDatum(opts.slot_name));
/* if old slotname is null or same as new slot name, then we need to validate the new slot name */
- validateSlot = sub->slotname == NULL || strcmp(slot_name, sub->slotname) != 0;
- finalSlotName = slot_name;
+ validateSlot = sub->slotname == NULL || strcmp(opts.slot_name, sub->slotname) != 0;
+ finalSlotName = opts.slot_name;
} else {
ereport(ERROR, (errmsg("Currently enabled=false, cannot change slot_name to a non-null value.")));
}
@@ -896,30 +927,34 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
* when enable this subscription but slot_name is not specified,
* it will be set to default value subname.
*/
- if (!sub->enabled && enabled_given && enabled) {
+ if (!sub->enabled && IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled) {
values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(sub->name));
} else {
nulls[Anum_pg_subscription_subslotname - 1] = true;
}
}
replaces[Anum_pg_subscription_subslotname - 1] = true;
- } else if (!sub->enabled && enabled_given && enabled) {
+ } else if (!sub->enabled && IsSet(opts.specified_opts, SUBOPT_ENABLED) && opts.enabled) {
values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(sub->name));
replaces[Anum_pg_subscription_subslotname - 1] = true;
}
- if (synchronous_commit) {
- values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit);
+ if (opts.synchronous_commit) {
+ values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(opts.synchronous_commit);
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
}
- if (binary_given) {
- values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
+ if (IsSet(opts.specified_opts, SUBOPT_BINARY)) {
+ values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
- if (publications != NIL) {
- values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications);
+ if (opts.publications != NIL) {
+ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(opts.publications);
replaces[Anum_pg_subscription_subpublications - 1] = true;
} else {
- publications = sub->publications;
+ opts.publications = sub->publications;
+ }
+ if (IsSet(opts.specified_opts, SUBOPT_SKIPLSN)) {
+ values[Anum_pg_subscription_subskiplsn - 1] = LsnGetTextDatum(opts.skiplsn);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
@@ -938,8 +973,9 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
/* case 0: keep the subscription enabled, it's over here */
/* case 1: deactivating subscription */
- if (sub->enabled && !enabled) {
- ereport(ERROR, (errmsg("If you want to deactivate this subscription, use DROP SUBSCRIPTION.")));
+ if (sub->enabled && !opts.enabled) {
+ ereport(WARNING, (errmsg("The subscription will be disabled, take care of the xlog would be accumulate "
+ "because the slot of subscription wouldn't be advanced")));
}
/* case 2: keep the subscription active */
@@ -947,7 +983,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if (!AttemptConnectPublisher(encryptConninfo, finalSlotName, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg( "Failed to connect to publisher.")));
}
- if (!CheckPublicationsExistOnPublisher(publications)) {
+ if (!CheckPublicationsExistOnPublisher(opts.publications)) {
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
ereport(ERROR, (errmsg("There are some publications not exist on the publisher.")));
}
@@ -958,7 +994,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
* enabling subscription, but slot hasn't been created,
* then mark createSlot to true.
*/
- if (!sub->enabled && enabled && (!sub->slotname || !*(sub->slotname))) {
+ if (!sub->enabled && opts.enabled && (!sub->slotname || !*(sub->slotname))) {
createSlot = true;
}
@@ -968,13 +1004,13 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
checkConn ? "The new conninfo cannot connect to new publisher." : "Failed to connect to publisher.")));
}
- if (!CheckPublicationsExistOnPublisher(publications)) {
+ if (!CheckPublicationsExistOnPublisher(opts.publications)) {
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
ereport(ERROR, (errmsg("There are some publications not exist on the publisher.")));
}
if (createSlot) {
- CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, publications, NULL, true);
+ CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, opts.publications, NULL, true);
}
/* no need to validate replication slot if the slot is created just by ourself */
@@ -984,6 +1020,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
ApplyLauncherWakeupAtCommit();
+ } else if (!sub->enabled && opts.enabled) {
+ ApplyLauncherWakeupAtCommit();
}
if (stmt->refresh) {
@@ -991,7 +1029,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
}
- AlterSubscription_refresh(sub, copy_data);
+ AlterSubscription_refresh(sub, opts.copy_data);
}
if (needFreeConninfo) {
diff --git a/src/gausskernel/runtime/executor/execReplication.cpp b/src/gausskernel/runtime/executor/execReplication.cpp
index 0f53e32210..9011ccee35 100644
--- a/src/gausskernel/runtime/executor/execReplication.cpp
+++ b/src/gausskernel/runtime/executor/execReplication.cpp
@@ -53,7 +53,8 @@ static bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, Tuple
* This is not generic routine, it expects the idxrel to be replication
* identity of a rel and meet all limitations associated with that.
*/
-static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
+static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot,
+ EState *estate)
{
int attoff;
bool isnull;
@@ -66,6 +67,16 @@ static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel
Assert(!isnull);
opclass = (oidvector *)DatumGetPointer(indclassDatum);
+ List* expressionsState = NIL;
+ ExprContext* econtext = GetPerTupleExprContext(estate);
+ econtext->ecxt_scantuple = searchslot;
+ ListCell* indexpr_item = NULL;
+
+ if (idxrel->rd_indexprs != NIL) {
+ expressionsState = ExecPrepareExprList(idxrel->rd_indexprs, estate);
+ indexpr_item = list_head(expressionsState);
+ }
+
/* Build scankey for every attribute in the index. */
for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++) {
Oid op;
@@ -92,14 +103,34 @@ static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel
regop = get_opcode(op);
- /* Initialize the scankey. */
- ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, searchslot->tts_values[mainattno - 1]);
- skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+ if (mainattno != 0) {
+ /* Initialize the scankey. */
+ ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, searchslot->tts_values[mainattno - 1]);
+ skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
- /* Check for null value. */
- if (searchslot->tts_isnull[mainattno - 1]) {
- hasnulls = true;
- skey[attoff].sk_flags |= SK_ISNULL;
+ /* Check for null value. */
+ if (searchslot->tts_isnull[mainattno - 1]) {
+ hasnulls = true;
+ skey[attoff].sk_flags |= SK_ISNULL;
+ }
+ } else {
+ if (idxrel->rd_indexprs == NIL) {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("wrong number of index expressions")));
+ } else {
+ bool isNull = false;
+ Datum datum = ExecEvalExprSwitchContext((ExprState*)lfirst(indexpr_item), econtext, &isNull, NULL);
+ indexpr_item = lnext(indexpr_item);
+
+ ScanKeyInit(&skey[attoff], pkattno, BTEqualStrategyNumber, regop, datum);
+ skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+
+ /* Check for null value. */
+ if (isNull) {
+ hasnulls = true;
+ skey[attoff].sk_flags |= SK_ISNULL;
+ }
+ }
}
}
@@ -307,7 +338,7 @@ static bool RelationFindReplTupleByIndex(EState *estate, Relation rel, Relation
scan->isUpsert = true;
/* Build scan key. */
- build_replindex_scan_key(skey, targetRel, idxrel, searchslot);
+ build_replindex_scan_key(skey, targetRel, idxrel, searchslot, estate);
while (true) {
found = false;
diff --git a/src/gausskernel/storage/replication/logical/parallel_decode.cpp b/src/gausskernel/storage/replication/logical/parallel_decode.cpp
index 2246bafbe9..f041823778 100644
--- a/src/gausskernel/storage/replication/logical/parallel_decode.cpp
+++ b/src/gausskernel/storage/replication/logical/parallel_decode.cpp
@@ -72,13 +72,23 @@ ParallelReorderBufferTXN *ParallelReorderBufferGetOldestTXN(ParallelReorderBuffe
return txn;
}
-void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld)
+void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld,
+ bool printOid)
{
if ((tuple->tupTableType == HEAP_TUPLE) && (HEAP_TUPLE_IS_COMPRESSED(tuple->t_data) ||
(int)HeapTupleHeaderGetNatts(tuple->t_data, tupdesc) > tupdesc->natts)) {
return;
}
+ if (printOid) {
+ Oid oid;
+
+ /* print oid of tuple, it's not included in the TupleDesc */
+ if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid) {
+ appendStringInfo(s, " oid[oid]:%u", oid);
+ }
+ }
+
/* print all columns individually */
for (int natt = 0; natt < tupdesc->natts; natt++) {
Form_pg_attribute attr; /* the attribute itself */
@@ -90,6 +100,12 @@ void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, Hea
attr = &tupdesc->attrs[natt];
+ /*
+ * Don't print dropped columns, we can't be sure everything is
+ * available for them.
+ * Don't print system columns, oid will already have been printed if
+ * present.
+ */
if (attr->attisdropped || attr->attnum < 0 || (isOld && !IsRelationReplidentKey(relation, attr->attnum)))
continue;
diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp
index 9e754cf009..1705754f87 100644
--- a/src/gausskernel/storage/replication/logical/worker.cpp
+++ b/src/gausskernel/storage/replication/logical/worker.cpp
@@ -115,6 +115,10 @@ static void ApplyWorkerProcessMsg(char type, StringInfo s, XLogRecPtr *lastRcv);
static void apply_dispatch(StringInfo s);
static void apply_handle_conninfo(StringInfo s);
static void UpdateConninfo(char* standbysInfo);
+static Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot,
+ FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot = NULL);
+static void IsSkippingChanges(XLogRecPtr finish_lsn);
+static void StopSkippingChanges();
/*
* Should this worker apply changes for given relation.
@@ -478,6 +482,8 @@ static void apply_handle_begin(StringInfo s)
t_thrd.applyworker_cxt.remoteFinalLsn = begin_data.final_lsn;
t_thrd.applyworker_cxt.curRemoteCsn = begin_data.csn;
+ IsSkippingChanges(begin_data.final_lsn);
+
pgstat_report_activity(STATE_RUNNING, NULL);
}
@@ -510,6 +516,10 @@ static void apply_handle_commit(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
+ if (t_thrd.applyworker_cxt.isSkipTransaction) {
+ StopSkippingChanges();
+ }
+
pgstat_report_activity(STATE_IDLE, NULL);
}
@@ -570,6 +580,69 @@ static Oid GetRelationIdentityOrPK(Relation rel)
return idxoid;
}
+/*
+ * Find the tuple in a table using any unique index and returns the conflicting
+ * index's oid, if any conflict found.
+ *
+ * *originslot* contains the old tuple during UPDATE, if conflict with it which
+ * to be updated, ignore it.
+ */
+Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot,
+ FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot)
+{
+ Oid replidxoid = InvalidOid;
+ bool found = false;
+ ResultRelInfo* relinfo = estate->es_result_relation_info;
+
+ /* Check the replica identity index first */
+ replidxoid = RelationGetReplicaIndex(relinfo->ri_RelationDesc);
+ found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot,
+ localslot, fakeRelInfo);
+ if (found) {
+ if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc,
+ localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc,
+ originslot->tts_tuple)) == 0) {
+ /* If conflict with the tuple to be updated, ignore it. */
+ found = false;
+ } else {
+ return replidxoid;
+ }
+ }
+
+ for (int i = 0; i < relinfo->ri_NumIndices; i++) {
+ IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
+ Relation idxrel;
+ Oid idxoid = InvalidOid;
+
+ if (!ii->ii_Unique) {
+ continue;
+ }
+
+ idxrel = relinfo->ri_IndexRelationDescs[i];
+ idxoid = RelationGetRelid(idxrel);
+
+ if (idxoid == replidxoid) {
+ continue;
+ }
+
+ found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, idxoid, LockTupleExclusive, remoteslot,
+ localslot, fakeRelInfo);
+
+ if (found) {
+ if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc,
+ localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc,
+ originslot->tts_tuple)) == 0) {
+ /* If conflict with the tuple to be updated, ignore it. */
+ found = false;
+ } else {
+ return idxoid;
+ }
+ }
+ }
+
+ return InvalidOid;
+}
+
/*
* Handle INSERT message.
*/
@@ -582,6 +655,13 @@ static void apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
FakeRelationPartition fakeRelInfo;
+ TupleTableSlot *localslot;
+ EPQState epqstate;
+ Oid conflictIndexOid = InvalidOid;
+
+ if (t_thrd.applyworker_cxt.isSkipTransaction) {
+ return;
+ }
ensure_transaction();
@@ -600,6 +680,8 @@ static void apply_handle_insert(StringInfo s)
estate = create_estate_for_relation(rel);
remoteslot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops);
ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
+ localslot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops);
+ ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
@@ -611,11 +693,62 @@ static void apply_handle_insert(StringInfo s)
ExecOpenIndices(estate->es_result_relation_info, false);
- /* Get fake relation and partition for patitioned table */
- GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo);
+ if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) {
+ StringInfoData localtup, remotetup;
+ initStringInfo(&localtup);
+ tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel),
+ (HeapTuple)localslot->tts_tuple, false);
+
+ initStringInfo(&remotetup);
+ tuple_to_stringinfo(rel->localrel, &remotetup, RelationGetDescr(rel->localrel),
+ (HeapTuple)tableam_tslot_get_tuple_from_slot(rel->localrel, remoteslot), false);
+
+ switch (u_sess->attr.attr_storage.subscription_conflict_resolution) {
+ case RESOLVE_ERROR:
+ ereport(ERROR, (errmsg("CONFLICT: remote insert on relation %s (local index %s). Resolution: error.",
+ RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)),
+ errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data,
+ remotetup.data, t_thrd.applyworker_cxt.curWorker->subid,
+ (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT),
+ (uint32)t_thrd.applyworker_cxt.remoteFinalLsn)));
+ break;
+ case RESOLVE_APPLY_REMOTE:
+ ereport(LOG, (errmsg("CONFLICT: remote insert on relation %s (local index %s). "
+ "Resolution: apply_remote.",
+ RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)),
+ errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data,
+ remotetup.data, t_thrd.applyworker_cxt.curWorker->subid,
+ (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT),
+ (uint32)t_thrd.applyworker_cxt.remoteFinalLsn)));
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+ EvalPlanQualSetSlot(&epqstate, remoteslot);
+ /* Do the actual update. */
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo);
+
+ EvalPlanQualEnd(&epqstate);
+ break;
+ case RESOLVE_KEEP_LOCAL:
+ ereport(LOG, (errmsg("CONFLICT: remote insert on relation %s (local index %s). "
+ "Resolution: keep_local.",
+ RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)),
+ errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data,
+ remotetup.data, t_thrd.applyworker_cxt.curWorker->subid,
+ (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT),
+ (uint32)t_thrd.applyworker_cxt.remoteFinalLsn)));
+ break;
+ default:
+ ereport(ERROR, (errmsg("wrong parameter value for subscription_conflict_resolution")));
+ break;
+ }
+ FreeStringInfo(&localtup);
+ FreeStringInfo(&remotetup);
+ } else {
+ /* Get fake relation and partition for patitioned table */
+ GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo);
- /* Do the insert. */
- ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo);
+ /* Do the insert. */
+ ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo);
+ }
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
@@ -701,10 +834,16 @@ static void apply_handle_update(StringInfo s)
bool has_oldtup;
TupleTableSlot *localslot;
TupleTableSlot *remoteslot;
+ TupleTableSlot *conflictLocalSlot;
RangeTblEntry *target_rte = NULL;
bool found = false;
MemoryContext oldctx;
FakeRelationPartition fakeRelInfo;
+ Oid conflictIndexOid = InvalidOid;
+
+ if (t_thrd.applyworker_cxt.isSkipTransaction) {
+ return;
+ }
ensure_transaction();
@@ -728,6 +867,8 @@ static void apply_handle_update(StringInfo s)
ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
localslot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops);
ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
+ conflictLocalSlot = ExecInitExtraTupleSlot(estate, rel->localrel->rd_tam_ops);
+ ExecSetSlotDescriptor(conflictLocalSlot, RelationGetDescr(rel->localrel));
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
/*
@@ -783,10 +924,64 @@ static void apply_handle_update(StringInfo s)
slot_modify_data(remoteslot, localslot, rel, &newtup);
MemoryContextSwitchTo(oldctx);
- EvalPlanQualSetSlot(&epqstate, remoteslot);
+ if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot))
+ != InvalidOid) {
+ StringInfoData localtup, remotetup;
+ initStringInfo(&localtup);
+ tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel),
+ (HeapTuple)conflictLocalSlot->tts_tuple, false);
+
+ initStringInfo(&remotetup);
+ tuple_to_stringinfo(rel->localrel, &remotetup, RelationGetDescr(rel->localrel),
+ (HeapTuple)tableam_tslot_get_tuple_from_slot(rel->localrel, remoteslot), false);
+
+ switch (u_sess->attr.attr_storage.subscription_conflict_resolution) {
+ case RESOLVE_ERROR:
+ ereport(ERROR, (errmsg("CONFLICT: remote update on relation %s (local index %s). "
+ "Resolution: error.",
+ RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)),
+ errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data,
+ remotetup.data, t_thrd.applyworker_cxt.curWorker->subid,
+ (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT),
+ (uint32)t_thrd.applyworker_cxt.remoteFinalLsn)));
+ break;
+ case RESOLVE_APPLY_REMOTE:
+ ereport(LOG, (errmsg("CONFLICT: remote update on relation %s (local index %s). "
+ "Resolution: apply_remote.",
+ RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)),
+ errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data,
+ remotetup.data, t_thrd.applyworker_cxt.curWorker->subid,
+ (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT),
+ (uint32)t_thrd.applyworker_cxt.remoteFinalLsn)));
+ /* first delete the conflict tuple */
+ EvalPlanQualSetSlot(&epqstate, conflictLocalSlot);
+ ExecSimpleRelationDelete(estate, &epqstate, conflictLocalSlot, &fakeRelInfo);
+
+ EvalPlanQualSetSlot(&epqstate, remoteslot);
+ /* Do the actual update. */
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo);
+ break;
+ case RESOLVE_KEEP_LOCAL:
+ ereport(LOG, (errmsg("CONFLICT: remote update on relation %s (local index %s). "
+ "Resolution: keep_local.",
+ RelationGetRelationName(rel->localrel), get_rel_name(conflictIndexOid)),
+ errdetail("local tuple: %s, remote tuple: %s, origin: pg_%u, commit_lsn: %X/%X", localtup.data,
+ remotetup.data, t_thrd.applyworker_cxt.curWorker->subid,
+ (uint32)(t_thrd.applyworker_cxt.remoteFinalLsn >> BITS_PER_INT),
+ (uint32)t_thrd.applyworker_cxt.remoteFinalLsn)));
+ break;
+ default:
+ ereport(ERROR, (errmsg("wrong parameter value for subscription_conflict_resolution")));
+ break;
+ }
+ FreeStringInfo(&localtup);
+ FreeStringInfo(&remotetup);
+ } else {
+ EvalPlanQualSetSlot(&epqstate, remoteslot);
- /* Do the actual update. */
- ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo);
+ /* Do the actual update. */
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo);
+ }
} else {
/*
* The tuple to be updated could not be found.
@@ -824,6 +1019,10 @@ static void apply_handle_delete(StringInfo s)
MemoryContext oldctx;
FakeRelationPartition fakeRelInfo;
+ if (t_thrd.applyworker_cxt.isSkipTransaction) {
+ return;
+ }
+
ensure_transaction();
relid = logicalrep_read_delete(s, &oldtup);
@@ -1828,3 +2027,78 @@ bool IsLogicalWorker(void)
{
return t_thrd.applyworker_cxt.curWorker != NULL;
}
+
+/*
+ * Start skipping changes of the transaction if the given LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void IsSkippingChanges(XLogRecPtr finish_lsn)
+{
+ /*
+ * Quick return if it's not requested to skip this transaction. This
+ * function is called for every remote transaction and we assume that
+ * skipping the transaction is not used often.
+ */
+ if (likely(XLogRecPtrIsInvalid(t_thrd.applyworker_cxt.mySubscription->skiplsn) ||
+ t_thrd.applyworker_cxt.mySubscription->skiplsn != finish_lsn)) {
+ return;
+ }
+
+ t_thrd.applyworker_cxt.isSkipTransaction = true;
+}
+
+static void StopSkippingChanges()
+{
+ t_thrd.applyworker_cxt.isSkipTransaction = false;
+
+ /*
+ * Quick return if it's not requested to skip this transaction. This
+ * function is called for every remote transaction and we assume that
+ * skipping the transaction is not used often.
+ */
+ if (!IsTransactionState()) {
+ StartTransactionCommand();
+ }
+
+ HeapTuple tup;
+ Relation rel;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+ errno_t rc = 0;
+
+ /*
+ * Protect subskiplsn of pg_subscription from being concurrently updated
+ * while clearing it.
+ */
+ LockSharedObject(SubscriptionRelationId, t_thrd.applyworker_cxt.mySubscription->oid, 0, AccessShareLock);
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
+
+ /* Fetch the existing tuple. */
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(t_thrd.applyworker_cxt.mySubscription->oid));
+
+ if (!HeapTupleIsValid(tup)) {
+ ereport(ERROR, (errmsg("subscription \"%s\" does not exist", t_thrd.applyworker_cxt.mySubscription->name)));
+ }
+
+ rc = memset_s(values, sizeof(values),0, sizeof(values));
+ securec_check_c(rc, "\0", "\0");
+ rc = memset_s(nulls, sizeof(nulls),false, sizeof(nulls));
+ securec_check_c(rc, "\0", "\0");
+ rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
+ securec_check_c(rc, "\0", "\0");
+
+ /* reset subskiplsn */
+ values[Anum_pg_subscription_subskiplsn - 1] = LsnGetTextDatum(InvalidXLogRecPtr);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
+ /* Update the catalog. */
+ simple_heap_update(rel, &tup->t_self, tup);
+ CatalogUpdateIndexes(rel, tup);
+
+ heap_freetuple(tup);
+ heap_close(rel, NoLock);
+
+ CommitTransactionCommand();
+}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 6765cd38cc..8b79927fdf 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -52,13 +52,15 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE
text subpublications[1]; /* List of publications subscribed to */
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
+ text subskiplsn; /* All changes finished at this LSN are
+ * skipped */
#endif
}
FormData_pg_subscription;
typedef FormData_pg_subscription *Form_pg_subscription;
-#define Natts_pg_subscription 9
+#define Natts_pg_subscription 10
#define Anum_pg_subscription_subdbid 1
#define Anum_pg_subscription_subname 2
#define Anum_pg_subscription_subowner 3
@@ -68,6 +70,7 @@ typedef FormData_pg_subscription *Form_pg_subscription;
#define Anum_pg_subscription_subsynccommit 7
#define Anum_pg_subscription_subpublications 8
#define Anum_pg_subscription_subbinary 9
+#define Anum_pg_subscription_subskiplsn 10
typedef struct Subscription {
@@ -81,6 +84,8 @@ typedef struct Subscription {
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
bool binary; /* Indicates if the subscription wants data in binary format */
+ XLogRecPtr skiplsn; /* All changes finished at this LSN are
+ * skipped */
} Subscription;
@@ -91,6 +96,8 @@ extern char *get_subscription_name(Oid subid, bool missing_ok);
extern int CountDBSubscriptions(Oid dbid);
extern void ClearListContent(List *list);
+extern Datum LsnGetTextDatum(XLogRecPtr lsn);
+extern XLogRecPtr TextDatumGetLsn(Datum datum);
#endif /* PG_SUBSCRIPTION_H */
diff --git a/src/include/knl/knl_guc/knl_session_attr_storage.h b/src/include/knl/knl_guc/knl_session_attr_storage.h
index 96d0d72ffc..af7a7e4303 100755
--- a/src/include/knl/knl_guc/knl_session_attr_storage.h
+++ b/src/include/knl/knl_guc/knl_session_attr_storage.h
@@ -266,6 +266,7 @@ typedef struct knl_session_attr_storage {
int logical_sender_timeout;
int ignore_standby_lsn_window;
int ignore_feedback_xmin_window;
+ int subscription_conflict_resolution;
} knl_session_attr_storage;
#endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_STORAGE */
diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h
index 61c629b060..3a5b4aebd3 100755
--- a/src/include/knl/knl_thread.h
+++ b/src/include/knl/knl_thread.h
@@ -3336,6 +3336,7 @@ typedef struct knl_t_apply_worker_context {
List *tableStates;
XLogRecPtr remoteFinalLsn;
CommitSeqNo curRemoteCsn;
+ bool isSkipTransaction;
} knl_t_apply_worker_context;
typedef struct knl_t_publication_context {
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index fb09647b07..015552c675 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -358,5 +358,7 @@ extern void FreeLogicalLog(ParallelReorderBuffer *rb, logicalLog *logChange, int
extern bool LogicalDecodeParseOptionsDefault(const char* defaultStr, void **options);
extern DecodeOptionsDefault* LogicalDecodeGetOptionsDefault();
template void LogicalDecodeReportLostChanges(const T *iterstate);
+extern void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld,
+ bool printOid = false);
#endif
diff --git a/src/include/replication/replicainternal.h b/src/include/replication/replicainternal.h
index b582b44e6f..924601b2b4 100755
--- a/src/include/replication/replicainternal.h
+++ b/src/include/replication/replicainternal.h
@@ -214,6 +214,13 @@ typedef enum replauthmode{
REPL_AUTH_UUID /* uuid auth */
} ReplAuthMode;
+typedef enum
+{
+ RESOLVE_ERROR,
+ RESOLVE_APPLY_REMOTE,
+ RESOLVE_KEEP_LOCAL
+} PGLogicalResolveOption;
+
extern bool data_catchup;
extern bool wal_catchup;
extern BuildMode build_mode;
diff --git a/src/test/regress/input/subscription.source b/src/test/regress/input/subscription.source
index 30cd7a96b2..3083dc5b6e 100644
--- a/src/test/regress/input/subscription.source
+++ b/src/test/regress/input/subscription.source
@@ -70,6 +70,14 @@ ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
-- alter subbinary to true
ALTER SUBSCRIPTION testsub SET (binary=true);
select subname, subbinary from pg_subscription where subname='testsub';
+-- set subskiplsn
+ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF');
+select subname, subskiplsn from pg_subscription where subname='testsub';
+ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH');
+ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
+select subname, subskiplsn from pg_subscription where subname='testsub';
+-- disable test
+ALTER SUBSCRIPTION testsub DISABLE;
--rename
ALTER SUBSCRIPTION testsub rename to testsub_rename;
--- inside a transaction block
diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source
index a65da8a14e..21280f9c65 100644
--- a/src/test/regress/output/recovery_2pc_tools.source
+++ b/src/test/regress/output/recovery_2pc_tools.source
@@ -650,6 +650,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
stats_temp_directory | string | | |
stream_cluster_run_mode | enum | | |
string_hash_compatible | bool | | |
+ subscription_conflict_resolution | enum | | |
support_batch_bind | bool | | |
support_extended_features | bool | | |
sync_config_strategy | enum | | |
diff --git a/src/test/regress/output/subscription.source b/src/test/regress/output/subscription.source
index 7ef7842a12..24fe8ecaab 100644
--- a/src/test/regress/output/subscription.source
+++ b/src/test/regress/output/subscription.source
@@ -19,6 +19,7 @@ ALTER SUBSCRIPTION name CONNECTION 'conninfo'
ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...]
ALTER SUBSCRIPTION name REFRESH PUBLICATION [ WITH ( refresh_option [= value] [, ... ] ) ]
ALTER SUBSCRIPTION name ENABLE
+ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] )
ALTER SUBSCRIPTION name OWNER TO new_owner
ALTER SUBSCRIPTION name RENAME TO new_name
@@ -153,6 +154,25 @@ select subname, subbinary from pg_subscription where subname='testsub';
testsub | t
(1 row)
+-- set subskiplsn
+ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF');
+select subname, subskiplsn from pg_subscription where subname='testsub';
+ subname | subskiplsn
+---------+------------
+ testsub | 0/ABCDEF
+(1 row)
+
+ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH');
+ERROR: invalid input syntax for type pg_lsn: "0/ABCDEFGH"
+ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
+select subname, subskiplsn from pg_subscription where subname='testsub';
+ subname | subskiplsn
+---------+------------
+ testsub | 0/0
+(1 row)
+
+-- disable test
+ALTER SUBSCRIPTION testsub DISABLE;
--rename
ALTER SUBSCRIPTION testsub rename to testsub_rename;
--- inside a transaction block
@@ -337,12 +357,15 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1
testsub | ALTER SUBSCRIPTION testsub SET (synchronous_commit=on);
testsub | ALTER SUBSCRIPTION testsub owner to regress_subscription_user2;
testsub | ALTER SUBSCRIPTION testsub SET (binary=true);
+ testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF');
+ testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
+ testsub | ALTER SUBSCRIPTION testsub DISABLE;
testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename;
sub_len_999 | CREATE SUBSCRIPTION sub_len_999 CONNECTION *********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************PUBLICATION insert_only WITH (connect = false);
testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename;
testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo;
sub_len_999 | DROP SUBSCRIPTION IF EXISTS sub_len_999;
-(17 rows)
+(20 rows)
--clear audit log
SELECT pg_delete_audit('1012-11-10', '3012-11-11');
diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule
index 6913c6c8d4..52ee04318f 100644
--- a/src/test/subscription/schedule
+++ b/src/test/subscription/schedule
@@ -10,4 +10,6 @@ sync
encoding
ddl
matviews
-change_wal_level
\ No newline at end of file
+change_wal_level
+skiplsn
+disable
\ No newline at end of file
diff --git a/src/test/subscription/testcase/disable.sh b/src/test/subscription/testcase/disable.sh
new file mode 100644
index 0000000000..1797690029
--- /dev/null
+++ b/src/test/subscription/testcase/disable.sh
@@ -0,0 +1,65 @@
+#!/bin/sh
+
+source $1/env_utils.sh $1 $2
+
+case_db="disable_db"
+
+function test_1() {
+ echo "create database and tables."
+ exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
+ exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
+ # Create some preexisting content on publisher
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)"
+
+ # Setup structure on subscriber
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)"
+
+ # Setup logical replication
+ echo "create publication and subscription."
+ publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+
+ # Wait for initial table sync to finish
+ wait_for_subscription_sync $case_db $sub_node1_port
+
+ exec_sql $case_db $pub_node1_port "insert into tab_rep values (1,1)"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub"
+
+ exec_sql $case_db $sub_node1_port "alter subscription tap_sub disable"
+
+ exec_sql $case_db $pub_node1_port "insert into tab_rep values (2,2)"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "1|1" ]; then
+ echo "check data not sync after disable subscription success"
+ else
+ echo "$failed_keyword when check data not sync after disable subscription"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "alter subscription tap_sub enable"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "1|1
+2|2" ]; then
+ echo "check data not sync after enable subscription success"
+ else
+ echo "$failed_keyword when check data not sync after enable subscription"
+ exit 1
+ fi
+}
+
+function tear_down() {
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
+
+ exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
+ exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
+
+ echo "tear down"
+}
+
+test_1
+tear_down
\ No newline at end of file
diff --git a/src/test/subscription/testcase/skiplsn.sh b/src/test/subscription/testcase/skiplsn.sh
new file mode 100644
index 0000000000..669e640fc7
--- /dev/null
+++ b/src/test/subscription/testcase/skiplsn.sh
@@ -0,0 +1,69 @@
+#!/bin/sh
+
+source $1/env_utils.sh $1 $2
+
+case_db="skiplsn_db"
+
+function test_1() {
+ echo "create database and tables."
+ exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
+ exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
+ # Create some preexisting content on publisher
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)"
+
+ # Setup structure on subscriber
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key, b int)"
+
+ # Setup logical replication
+ echo "create publication and subscription."
+ publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+
+ # Wait for initial table sync to finish
+ wait_for_subscription_sync $case_db $sub_node1_port
+
+ exec_sql $case_db $sub_node1_port "insert into tab_rep values (1,1)"
+
+ logfile=$(get_log_file "sub_datanode1")
+
+ location=$(awk 'END{print NR}' $logfile)
+
+ exec_sql $case_db $pub_node1_port "insert into tab_rep values (1,2)"
+
+ content=$(tail -n +$location $logfile)
+ commitlsn=$(expr "$content" : '.*commit_lsn:\s\([0-9|/|ABCDEF]*\).*')
+
+ while [ -z $commitlsn ]
+ do
+ content=$(tail -n +$location $logfile)
+ commitlsn=$(expr "$content" : '.*commit_lsn:\s\([0-9|/|ABCDEF]*\).*')
+ done
+
+ exec_sql $case_db $sub_node1_port "alter subscription tap_sub set (skiplsn = '$commitlsn')"
+
+ exec_sql $case_db $pub_node1_port "insert into tab_rep values (2, 2)"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "1|1
+2|2" ]; then
+ echo "check data sync after skip conflict success"
+ else
+ echo "$failed_keyword when check data sync after skip conflict"
+ exit 1
+ fi
+}
+
+function tear_down() {
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
+
+ exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
+ exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
+
+ echo "tear down"
+}
+
+test_1
+tear_down
\ No newline at end of file
--
Gitee
From fb651b6de64529b2c2625355424461753368d6a6 Mon Sep 17 00:00:00 2001
From: he-shaoyu
Date: Thu, 17 Aug 2023 16:56:13 +0800
Subject: [PATCH 2/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B5=8B=E8=AF=95?=
=?UTF-8?q?=E7=94=A8=E4=BE=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/test/subscription/schedule | 3 +-
.../subscription/testcase/pub_subconflict.sh | 147 ++++++++++++++++++
2 files changed, 149 insertions(+), 1 deletion(-)
create mode 100644 src/test/subscription/testcase/pub_subconflict.sh
diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule
index 52ee04318f..836ac0e58b 100644
--- a/src/test/subscription/schedule
+++ b/src/test/subscription/schedule
@@ -12,4 +12,5 @@ ddl
matviews
change_wal_level
skiplsn
-disable
\ No newline at end of file
+disable
+pub_subconflict
\ No newline at end of file
diff --git a/src/test/subscription/testcase/pub_subconflict.sh b/src/test/subscription/testcase/pub_subconflict.sh
new file mode 100644
index 0000000000..ee40c6b9d8
--- /dev/null
+++ b/src/test/subscription/testcase/pub_subconflict.sh
@@ -0,0 +1,147 @@
+
+#!/bin/sh
+
+source $1/env_utils.sh $1 $2
+
+case_db="conflict_db"
+
+function test_1() {
+ echo "create database and tables."
+ exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
+ exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
+ # Create some preexisting content on publisher
+ # => keep_local
+ exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = keep_local"
+
+ # Setup structure on subscriber
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_keep (a int primary key, b char)"
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_keep (a int primary key, b char)"
+
+ # Setup logical replication
+ echo "create publication and subscription. (=> keep_local)"
+ publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
+
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub1 FOR TABLE tab_keep"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub1"
+
+ # Wait for initial table sync to finish
+ wait_for_subscription_sync $case_db $sub_node1_port
+
+ exec_sql $case_db $sub_node1_port "INSERT INTO tab_keep VALUES(1, 'a')"
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_keep VALUES(1, 'b')"
+
+ # Wait for catchup
+ wait_for_catchup $case_db $pub_node1_port "tap_sub1"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_keep where b = 'a'")" = "1" ]; then
+ echo "check pub_sub conflict for 1st sub success"
+ else
+ echo "$failed_keyword when check pub_sub conflict for 1st sub"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub1"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub1"
+
+ # => apply_remote
+ echo "create publication and subscription. (=> apply_remote)"
+ exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = apply_remote"
+
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_apply (a int primary key, b char)"
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_apply (a int primary key, b char)"
+
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub2 FOR TABLE tab_apply"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub2"
+
+ # Wait for initial table sync to finish
+ wait_for_subscription_sync $case_db $sub_node1_port
+
+ exec_sql $case_db $sub_node1_port "INSERT INTO tab_apply VALUES(1, 'k')"
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_apply VALUES(1, 'a')"
+
+ # Wait for catchup
+ wait_for_catchup $case_db $pub_node1_port "tap_sub2"
+
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_apply where b = 'a'")" = "1" ]; then
+ echo "check pub_sub conflict for 2nd sub success"
+ else
+ echo "$failed_keyword when check pub_sub conflict for 2nd sub"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub2"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub2"
+
+ # update apply_remote
+ echo "create publication and subscription. (update apply_remote)"
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_updateApply (a int primary key, b char)"
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_updateApply (a int primary key, b char)"
+
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub3 FOR TABLE tab_updateApply"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub3 CONNECTION '$publisher_connstr' PUBLICATION tap_pub3"
+
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_updateApply VALUES(1, 'a')"
+
+ # Wait for catchup
+ wait_for_catchup $case_db $pub_node1_port "tap_sub3"
+
+ exec_sql $case_db $sub_node1_port "INSERT INTO tab_updateApply VALUES(2, 'b')"
+ exec_sql $case_db $pub_node1_port "UPDATE tab_updateApply SET a = 2 where a = 1"
+
+ # Wait for catchup
+ wait_for_catchup $case_db $pub_node1_port "tap_sub3"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_updateApply")" = "2|a" ]; then
+ echo "check pub_sub conflict for 3rd sub success"
+ else
+ echo "$failed_keyword when check pub_sub conflict for 3rd sub"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub3"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub3"
+
+ #unique conflict
+ echo "create publication and subscription. (unique conflict)"
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_unique (a int primary key, b char, c int unique)"
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_unique (a int primary key, b char, c int unique)"
+
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub4 FOR TABLE tab_unique"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub4 CONNECTION '$publisher_connstr' PUBLICATION tap_pub4"
+
+ exec_sql $case_db $sub_node1_port "INSERT INTO tab_unique VALUES (1, 'a', 1), (2, 'c', 2), (3, 'c', 3)"
+
+ # Wait for catchup
+ wait_for_catchup $case_db $pub_node1_port "tap_sub4"
+
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_unique VALUES(3, 'k', 4)"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub4"
+
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_unique VALUES(1, 'b', 2)"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_unique")" = "1|a|1
+2|c|2
+3|k|4" ]; then
+ echo "check pub_sub conflict for 4th sub success"
+ else
+ echo "$failed_keyword when check pub_sub conflict for 4th sub"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub4"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub4"
+}
+
+function tear_down(){
+ exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error"
+
+ exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
+ exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
+
+ echo "tear down"
+}
+
+test_1
+tear_down
\ No newline at end of file
--
Gitee
From b06cb282a6b87dea1d008a49ab45ac021827171a Mon Sep 17 00:00:00 2001
From: chenxiaobin19 <1025221611@qq.com>
Date: Fri, 18 Aug 2023 15:17:39 +0800
Subject: [PATCH 3/5] =?UTF-8?q?=E6=A3=80=E8=A7=86=E6=84=8F=E8=A7=81?=
=?UTF-8?q?=E5=A4=84=E7=90=86?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../optimizer/commands/subscriptioncmds.cpp | 4 +-
.../process/threadpool/knl_instance.cpp | 2 +
.../storage/replication/logical/launcher.cpp | 4 +
.../storage/replication/logical/worker.cpp | 94 +++++++++++++++----
src/include/knl/knl_instance.h | 3 +
src/include/replication/worker_internal.h | 2 +
6 files changed, 88 insertions(+), 21 deletions(-)
diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp
index aa52f2ebe0..e4da27801b 100644
--- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp
+++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp
@@ -195,7 +195,7 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte
errmsg("conflicting or redundant options")));
}
- opts->specified_opts = SUBOPT_CONNECT;
+ opts->specified_opts |= SUBOPT_CONNECT;
opts->connect = defGetBoolean(defel);
} else if (IsSet(supported_opts, SUBOPT_SKIPLSN) && strcmp(defel->defname, "skiplsn") == 0) {
if (IsSet(opts->specified_opts, SUBOPT_SKIPLSN)) {
@@ -205,7 +205,7 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte
}
char *lsn_str = defGetString(defel);
- opts->specified_opts = SUBOPT_SKIPLSN;
+ opts->specified_opts |= SUBOPT_SKIPLSN;
/* Setting lsn = 'NONE' is treated as resetting LSN */
if (strcmp(lsn_str, "none") == 0) {
opts->skiplsn = InvalidXLogRecPtr;
diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp
index 1b010de53a..165f93de0b 100755
--- a/src/gausskernel/process/threadpool/knl_instance.cpp
+++ b/src/gausskernel/process/threadpool/knl_instance.cpp
@@ -991,6 +991,8 @@ void knl_instance_init()
for (int i = 0; i < DB_CMPT_MAX; i++) {
pthread_mutex_init(&g_instance.loadPluginLock[i], NULL);
}
+ g_instance.needCheckConflictSubIds = NIL;
+ pthread_mutex_init(&g_instance.subIdsLock, NULL);
#endif
knl_g_datadir_init(&g_instance.datadir_cxt);
diff --git a/src/gausskernel/storage/replication/logical/launcher.cpp b/src/gausskernel/storage/replication/logical/launcher.cpp
index 9bffe943b9..1f136a7fa6 100644
--- a/src/gausskernel/storage/replication/logical/launcher.cpp
+++ b/src/gausskernel/storage/replication/logical/launcher.cpp
@@ -286,6 +286,10 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid user
TIMESTAMP_NOBEGIN(worker->reply_time);
worker->workerLaunchTime = GetCurrentTimestamp();
+ pthread_mutex_lock(&g_instance.subIdsLock);
+ worker->needCheckConflict = list_member_oid(g_instance.needCheckConflictSubIds, subid);
+ pthread_mutex_unlock(&g_instance.subIdsLock);
+
t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = worker;
LWLockRelease(LogicalRepWorkerLock);
diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp
index 1705754f87..796d6c63c3 100644
--- a/src/gausskernel/storage/replication/logical/worker.cpp
+++ b/src/gausskernel/storage/replication/logical/worker.cpp
@@ -516,6 +516,17 @@ static void apply_handle_commit(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
+
+ if (t_thrd.applyworker_cxt.curWorker->needCheckConflict) {
+ t_thrd.applyworker_cxt.curWorker->needCheckConflict = false;
+ MemoryContext oldctx = MemoryContextSwitchTo(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT));
+ pthread_mutex_lock(&g_instance.subIdsLock);
+ g_instance.needCheckConflictSubIds = list_delete_oid(g_instance.needCheckConflictSubIds,
+ t_thrd.applyworker_cxt.curWorker->subid);
+ pthread_mutex_unlock(&g_instance.subIdsLock);
+ MemoryContextSwitchTo(oldctx);
+ }
+
if (t_thrd.applyworker_cxt.isSkipTransaction) {
StopSkippingChanges();
}
@@ -596,16 +607,19 @@ Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSl
/* Check the replica identity index first */
replidxoid = RelationGetReplicaIndex(relinfo->ri_RelationDesc);
- found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot,
- localslot, fakeRelInfo);
- if (found) {
- if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc,
- localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc,
- originslot->tts_tuple)) == 0) {
- /* If conflict with the tuple to be updated, ignore it. */
- found = false;
- } else {
- return replidxoid;
+ if (OidIsValid(replidxoid)) {
+ found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot,
+ localslot, fakeRelInfo);
+
+ if (found) {
+ if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc,
+ localslot->tts_tuple), tableam_tops_get_t_self(relinfo->ri_RelationDesc,
+ originslot->tts_tuple)) == 0) {
+ /* If conflict with the tuple to be updated, ignore it. */
+ found = false;
+ } else {
+ return replidxoid;
+ }
}
}
@@ -693,7 +707,8 @@ static void apply_handle_insert(StringInfo s)
ExecOpenIndices(estate->es_result_relation_info, false);
- if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) {
+ if (t_thrd.applyworker_cxt.curWorker->needCheckConflict &&
+ (conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) {
StringInfoData localtup, remotetup;
initStringInfo(&localtup);
tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel),
@@ -743,11 +758,31 @@ static void apply_handle_insert(StringInfo s)
FreeStringInfo(&localtup);
FreeStringInfo(&remotetup);
} else {
- /* Get fake relation and partition for patitioned table */
- GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo);
+ PG_TRY();
+ {
+ /* Get fake relation and partition for patitioned table */
+ GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo);
- /* Do the insert. */
- ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo);
+ /* Do the insert. */
+ ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo);
+ }
+ PG_CATCH();
+ {
+ ErrorData* errdata = NULL;
+
+ (void*)MemoryContextSwitchTo(oldctx);
+ errdata = CopyErrorData();
+ if (errdata->sqlerrcode == ERRCODE_UNIQUE_VIOLATION) {
+ (void*)MemoryContextSwitchTo(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT));
+ pthread_mutex_lock(&g_instance.subIdsLock);
+ g_instance.needCheckConflictSubIds = lappend_oid(g_instance.needCheckConflictSubIds,
+ t_thrd.applyworker_cxt.curWorker->subid);
+ pthread_mutex_unlock(&g_instance.subIdsLock);
+ (void*)MemoryContextSwitchTo(oldctx);
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
}
/* Cleanup. */
@@ -924,7 +959,8 @@ static void apply_handle_update(StringInfo s)
slot_modify_data(remoteslot, localslot, rel, &newtup);
MemoryContextSwitchTo(oldctx);
- if ((conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot))
+ if (t_thrd.applyworker_cxt.curWorker->needCheckConflict &&
+ (conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot))
!= InvalidOid) {
StringInfoData localtup, remotetup;
initStringInfo(&localtup);
@@ -977,10 +1013,30 @@ static void apply_handle_update(StringInfo s)
FreeStringInfo(&localtup);
FreeStringInfo(&remotetup);
} else {
- EvalPlanQualSetSlot(&epqstate, remoteslot);
+ PG_TRY();
+ {
+ EvalPlanQualSetSlot(&epqstate, remoteslot);
- /* Do the actual update. */
- ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo);
+ /* Do the actual update. */
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot, &fakeRelInfo);
+ }
+ PG_CATCH();
+ {
+ ErrorData* errdata = NULL;
+
+ (void*)MemoryContextSwitchTo(oldctx);
+ errdata = CopyErrorData();
+ if (errdata->sqlerrcode == ERRCODE_UNIQUE_VIOLATION) {
+ (void*)MemoryContextSwitchTo(INSTANCE_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT));
+ pthread_mutex_lock(&g_instance.subIdsLock);
+ g_instance.needCheckConflictSubIds = lappend_oid(g_instance.needCheckConflictSubIds,
+ t_thrd.applyworker_cxt.curWorker->subid);
+ pthread_mutex_unlock(&g_instance.subIdsLock);
+ (void*)MemoryContextSwitchTo(oldctx);
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
}
} else {
/*
diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h
index c3096bb848..0aac937044 100755
--- a/src/include/knl/knl_instance.h
+++ b/src/include/knl/knl_instance.h
@@ -1341,6 +1341,9 @@ typedef struct knl_instance_context {
void *raw_parser_hook[DB_CMPT_MAX];
char *llvmIrFilePath[DB_CMPT_MAX];
pthread_mutex_t loadPluginLock[DB_CMPT_MAX];
+
+ List* needCheckConflictSubIds;
+ pthread_mutex_t subIdsLock;
#endif
pg_atomic_uint32 extensionNum;
knl_g_audit_context audit_cxt;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 4642c343e3..7ac83f729d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -46,6 +46,8 @@ typedef struct LogicalRepWorker
TimestampTz last_recv_time;
XLogRecPtr reply_lsn;
TimestampTz reply_time;
+
+ bool needCheckConflict;
} LogicalRepWorker;
typedef struct ApplyLauncherShmStruct {
--
Gitee
From 27b7a765d7958e0e3eaac93a411543123cb247d1 Mon Sep 17 00:00:00 2001
From: chenxiaobin19 <1025221611@qq.com>
Date: Sat, 26 Aug 2023 15:52:31 +0800
Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=91=E5=B8=83?=
=?UTF-8?q?=E8=AE=A2=E9=98=85=E5=9F=BA=E7=A1=80=E6=95=B0=E6=8D=AE=E5=A4=8D?=
=?UTF-8?q?=E5=88=B6=E6=97=B6=E5=BE=80=E9=9D=9E=E7=A9=BA=E5=88=97=E4=BC=A0?=
=?UTF-8?q?=E7=A9=BA=E5=80=BC=E7=9A=84core=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../storage/replication/logical/tablesync.cpp | 10 +++-
src/test/subscription/schedule | 3 +-
src/test/subscription/testcase/bugs.sh | 59 +++++++++++++++++++
3 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/src/gausskernel/storage/replication/logical/tablesync.cpp b/src/gausskernel/storage/replication/logical/tablesync.cpp
index 11484ae5d2..5ea58c2e3a 100644
--- a/src/gausskernel/storage/replication/logical/tablesync.cpp
+++ b/src/gausskernel/storage/replication/logical/tablesync.cpp
@@ -776,7 +776,15 @@ static void copy_table(Relation rel)
/* Create CopyState for ingestion of the data from publisher. */
attnamelist = make_copy_attnamelist(relmapentry);
- cstate = BeginCopyFrom(rel, NULL, attnamelist, NIL, &mem_info, (const char*)cmd.data, copy_read_data);
+ cstate = BeginCopyFrom(rel, NULL, attnamelist, NIL, &mem_info, NULL, copy_read_data);
+
+ RangeTblEntry *rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel);
+ rte->relkind = rel->rd_rel->relkind;
+ rte->requiredPerms = ACL_SELECT;
+
+ cstate->range_table = list_make1(rte);
/* Do the copy */
(void)CopyFrom(cstate);
diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule
index 836ac0e58b..0635e09413 100644
--- a/src/test/subscription/schedule
+++ b/src/test/subscription/schedule
@@ -13,4 +13,5 @@ matviews
change_wal_level
skiplsn
disable
-pub_subconflict
\ No newline at end of file
+pub_subconflict
+bugs
\ No newline at end of file
diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh
index e69de29bb2..ec35e54733 100644
--- a/src/test/subscription/testcase/bugs.sh
+++ b/src/test/subscription/testcase/bugs.sh
@@ -0,0 +1,59 @@
+#!/bin/sh
+
+source $1/env_utils.sh $1 $2
+
+case_db="bugs_db"
+
+function test_1() {
+ echo "create database and tables."
+ exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
+ exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
+ # Create some preexisting content on publisher
+ exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b text)"
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (1)"
+
+ # Setup structure on subscriber
+ exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key, b text not null default 0)"
+
+ # Setup logical replication
+ echo "create publication and subscription."
+ publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+
+ logfile=$(get_log_file "sub_datanode1")
+
+ location=$(awk 'END{print NR}' $logfile)
+
+ content=$(tail -n +$location $logfile)
+ targetstr=$(expr "$content" : '.*\(Failing row contains\).*')
+
+ attempt=0
+ while [ -z "$targetstr" ]
+ do
+ content=$(tail -n +$location $logfile)
+ targetstr=$(expr "$content" : '.*\(Failing row contains\).*')
+ attempt=`expr $attempt \+ 1`
+
+ sleep 1
+ if [ $attempt -eq 5 ]; then
+ echo "$failed_keyword when check failing row log"
+ exit 1
+ fi
+ done
+
+ echo "check failing row log success"
+}
+
+function tear_down() {
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
+
+ exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
+ exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
+
+ echo "tear down"
+}
+
+test_1
+tear_down
\ No newline at end of file
--
Gitee
From 8a6b40b5a1c512be1a8589477429c1771ae1ce32 Mon Sep 17 00:00:00 2001
From: chenxiaobin19 <1025221611@qq.com>
Date: Fri, 1 Sep 2023 11:48:50 +0800
Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dskiplsn=E5=81=B6=E7=8E=B0?=
=?UTF-8?q?=E5=A4=B1=E6=95=88&&=E5=88=86=E5=8C=BA=E8=A1=A8=E5=A4=84?=
=?UTF-8?q?=E7=90=86=E5=86=B2=E7=AA=81=E5=9C=BA=E6=99=AF=E5=AF=B9=E8=B1=A1?=
=?UTF-8?q?=E6=9C=AA=E9=87=8A=E6=94=BE=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../runtime/executor/execReplication.cpp | 2 +-
.../storage/replication/logical/worker.cpp | 48 ++++++----
src/test/subscription/testcase/bugs.sh | 90 +++++++++++++++++++
3 files changed, 123 insertions(+), 17 deletions(-)
diff --git a/src/gausskernel/runtime/executor/execReplication.cpp b/src/gausskernel/runtime/executor/execReplication.cpp
index 9011ccee35..f07a5d8a9e 100644
--- a/src/gausskernel/runtime/executor/execReplication.cpp
+++ b/src/gausskernel/runtime/executor/execReplication.cpp
@@ -73,7 +73,7 @@ static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel
ListCell* indexpr_item = NULL;
if (idxrel->rd_indexprs != NIL) {
- expressionsState = ExecPrepareExprList(idxrel->rd_indexprs, estate);
+ expressionsState = (List*)ExecPrepareExpr((Expr*)idxrel->rd_indexprs, estate);
indexpr_item = list_head(expressionsState);
}
diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp
index 796d6c63c3..d9f1f137ad 100644
--- a/src/gausskernel/storage/replication/logical/worker.cpp
+++ b/src/gausskernel/storage/replication/logical/worker.cpp
@@ -116,7 +116,7 @@ static void apply_dispatch(StringInfo s);
static void apply_handle_conninfo(StringInfo s);
static void UpdateConninfo(char* standbysInfo);
static Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot,
- FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot = NULL);
+ TupleTableSlot *originslot = NULL);
static void IsSkippingChanges(XLogRecPtr finish_lsn);
static void StopSkippingChanges();
@@ -599,17 +599,26 @@ static Oid GetRelationIdentityOrPK(Relation rel)
* to be updated, ignore it.
*/
Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSlot *localslot,
- FakeRelationPartition *fakeRelInfo, TupleTableSlot *originslot)
+ TupleTableSlot *originslot)
{
Oid replidxoid = InvalidOid;
bool found = false;
ResultRelInfo* relinfo = estate->es_result_relation_info;
+ FakeRelationPartition fakeRelInfo;
/* Check the replica identity index first */
replidxoid = RelationGetReplicaIndex(relinfo->ri_RelationDesc);
if (OidIsValid(replidxoid)) {
found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, replidxoid, LockTupleExclusive, remoteslot,
- localslot, fakeRelInfo);
+ localslot, &fakeRelInfo);
+
+ /* Cleanup. */
+ if (fakeRelInfo.needRleaseDummyRel && fakeRelInfo.partRel) {
+ releaseDummyRelation(&fakeRelInfo.partRel);
+ }
+ if (fakeRelInfo.partList) {
+ releasePartitionList(relinfo->ri_RelationDesc, &fakeRelInfo.partList, NoLock);
+ }
if (found) {
if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc,
@@ -640,7 +649,15 @@ Oid find_conflict_tuple(EState *estate, TupleTableSlot *remoteslot, TupleTableSl
}
found = RelationFindReplTuple(estate, relinfo->ri_RelationDesc, idxoid, LockTupleExclusive, remoteslot,
- localslot, fakeRelInfo);
+ localslot, &fakeRelInfo);
+
+ /* Cleanup. */
+ if (fakeRelInfo.needRleaseDummyRel && fakeRelInfo.partRel) {
+ releaseDummyRelation(&fakeRelInfo.partRel);
+ }
+ if (fakeRelInfo.partList) {
+ releasePartitionList(relinfo->ri_RelationDesc, &fakeRelInfo.partList, NoLock);
+ }
if (found) {
if (originslot != NULL && ItemPointerCompare(tableam_tops_get_t_self(relinfo->ri_RelationDesc,
@@ -673,12 +690,12 @@ static void apply_handle_insert(StringInfo s)
EPQState epqstate;
Oid conflictIndexOid = InvalidOid;
+ ensure_transaction();
+
if (t_thrd.applyworker_cxt.isSkipTransaction) {
return;
}
- ensure_transaction();
-
relid = logicalrep_read_insert(s, &newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
if (!should_apply_changes_for_rel(rel)) {
@@ -707,8 +724,11 @@ static void apply_handle_insert(StringInfo s)
ExecOpenIndices(estate->es_result_relation_info, false);
+ /* Get fake relation and partition for patitioned table */
+ GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo);
+
if (t_thrd.applyworker_cxt.curWorker->needCheckConflict &&
- (conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot, &fakeRelInfo)) != InvalidOid) {
+ (conflictIndexOid = find_conflict_tuple(estate, remoteslot, localslot)) != InvalidOid) {
StringInfoData localtup, remotetup;
initStringInfo(&localtup);
tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel),
@@ -760,9 +780,6 @@ static void apply_handle_insert(StringInfo s)
} else {
PG_TRY();
{
- /* Get fake relation and partition for patitioned table */
- GetFakeRelAndPart(estate, rel->localrel, remoteslot, &fakeRelInfo);
-
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot, &fakeRelInfo);
}
@@ -876,12 +893,12 @@ static void apply_handle_update(StringInfo s)
FakeRelationPartition fakeRelInfo;
Oid conflictIndexOid = InvalidOid;
+ ensure_transaction();
+
if (t_thrd.applyworker_cxt.isSkipTransaction) {
return;
}
- ensure_transaction();
-
relid = logicalrep_read_update(s, &has_oldtup, &oldtup, &newtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
if (!should_apply_changes_for_rel(rel)) {
@@ -960,8 +977,7 @@ static void apply_handle_update(StringInfo s)
MemoryContextSwitchTo(oldctx);
if (t_thrd.applyworker_cxt.curWorker->needCheckConflict &&
- (conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, &fakeRelInfo, localslot))
- != InvalidOid) {
+ (conflictIndexOid = find_conflict_tuple(estate, remoteslot, conflictLocalSlot, localslot)) != InvalidOid) {
StringInfoData localtup, remotetup;
initStringInfo(&localtup);
tuple_to_stringinfo(rel->localrel, &localtup, RelationGetDescr(rel->localrel),
@@ -1075,12 +1091,12 @@ static void apply_handle_delete(StringInfo s)
MemoryContext oldctx;
FakeRelationPartition fakeRelInfo;
+ ensure_transaction();
+
if (t_thrd.applyworker_cxt.isSkipTransaction) {
return;
}
- ensure_transaction();
-
relid = logicalrep_read_delete(s, &oldtup);
rel = logicalrep_rel_open(relid, RowExclusiveLock);
if (!should_apply_changes_for_rel(rel)) {
diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh
index ec35e54733..5eaa094916 100644
--- a/src/test/subscription/testcase/bugs.sh
+++ b/src/test/subscription/testcase/bugs.sh
@@ -8,6 +8,8 @@ function test_1() {
echo "create database and tables."
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
+
+ # BUG1: coredump when apply null value into not null column.
# Create some preexisting content on publisher
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key, b text)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (1)"
@@ -43,6 +45,94 @@ function test_1() {
done
echo "check failing row log success"
+
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;TRUNCATE TABLE tab_rep"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;TRUNCATE TABLE tab_rep"
+
+ # BUG2: skiplsn does not work occasionally
+ # Setup logical replication
+ echo "create publication and subscription."
+ publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub with (copy_data=false); ALTER SUBSCRIPTION tap_sub DISABLE"
+
+ exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (1, 'pub1'); INSERT INTO tab_rep VALUES (2, 'sub2');"
+ dumpfile=$(exec_sql $case_db $pub_node1_port "select gs_xlogdump_xid(xmin) from tab_rep where a = 1;")
+ skiplsn=$(grep 'start_lsn' $dumpfile | sed -n '5p' | awk '{print $2}')
+
+ exec_sql $case_db $sub_node1_port "alter subscription tap_sub set (skiplsn = '$skiplsn')"
+ exec_sql $case_db $sub_node1_port "alter subscription tap_sub enable"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_rep")" = "2|sub2" ]; then
+ echo "check data skip success"
+ else
+ echo "$failed_keyword when check data skip"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE tab_rep"
+ exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE tab_rep"
+
+ # BUG3: partition relation not closed when handle conflict
+ # Create partition table
+ ddl="
+create table t_pubsub_0349(
+ id int primary key constraint id_nn not null,
+ use_filename varchar(20),
+ filename varchar2(255)
+)partition by range(id)(
+ partition p1 values less than(30),
+ partition p2 values less than(60),
+ partition p3 values less than(90),
+ partition p4 values less than(maxvalue));"
+ exec_sql $case_db $pub_node1_port "$ddl"
+ exec_sql $case_db $sub_node1_port "$ddl"
+
+ echo "create publication and subscription."
+ publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
+ exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
+ exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+
+ wait_for_subscription_sync $case_db $sub_node1_port
+
+ exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = apply_remote"
+
+ exec_sql $case_db $sub_node1_port "INSERT INTO t_pubsub_0349 VALUES (1, 'a', 'a');"
+ exec_sql $case_db $pub_node1_port "INSERT INTO t_pubsub_0349 VALUES (1, 'a', 'c');"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM t_pubsub_0349")" = "1|a|c" ]; then
+ echo "check insert conflict handle success"
+ else
+ echo "$failed_keyword when check insert conflict handle"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "INSERT INTO t_pubsub_0349 VALUES (2, 'a', 'a');"
+ exec_sql $case_db $pub_node1_port "UPDATE t_pubsub_0349 SET id = 2 WHERE id = 1;"
+
+ wait_for_catchup $case_db $pub_node1_port "tap_sub"
+
+ if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM t_pubsub_0349")" = "2|a|c" ]; then
+ echo "check update conflict handle success"
+ else
+ echo "$failed_keyword when check update conflict handle"
+ exit 1
+ fi
+
+ logfile=$(get_log_file "sub_datanode1")
+ leakstr=$(grep 'partcache reference leak' $logfile -m 1)
+ if [ -z "$leakstr" ]; then
+ echo "check relation close success"
+ else
+ echo "$failed_keyword when check relation close"
+ exit 1
+ fi
+
+ exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error"
}
function tear_down() {
--
Gitee