From 7e60efddf4a5db65822b103ac19c18192c5bbb52 Mon Sep 17 00:00:00 2001 From: "arcoalien@qq.com" Date: Fri, 7 Jul 2023 12:16:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dswitchover=E6=88=96failover?= =?UTF-8?q?=E8=BF=87=E7=A8=8B=E4=B8=AD=E9=87=8D=E5=BB=BA=E5=A4=8D=E5=88=B6?= =?UTF-8?q?=E6=A7=BDcoredump=E7=9A=84=E9=97=AE=E9=A2=98=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E9=80=BB=E8=BE=91=E5=A4=8D=E5=88=B6=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20=E9=99=A4=E5=A4=8D=E5=88=B6=E6=A7=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/initdb/ss_initdb.cpp | 4 +- .../process/postmaster/postmaster.cpp | 9 +- .../storage/access/transam/xlog.cpp | 4 +- .../storage/replication/logical/logical.cpp | 13 +- .../replication/logical/logicalfuncs.cpp | 6 +- .../logical/parallel_reorderbuffer.cpp | 13 +- .../replication/logical/reorderbuffer.cpp | 27 ++- src/gausskernel/storage/replication/slot.cpp | 168 +++++++++++++++--- .../storage/replication/slotfuncs.cpp | 19 +- .../storage/replication/walsender.cpp | 4 + src/include/access/xlog.h | 3 +- src/include/replication/logical.h | 2 +- src/include/replication/slot.h | 2 + 13 files changed, 217 insertions(+), 57 deletions(-) mode change 100755 => 100644 src/gausskernel/storage/replication/slot.cpp diff --git a/src/bin/initdb/ss_initdb.cpp b/src/bin/initdb/ss_initdb.cpp index 03c8534a53..29ca9be4aa 100644 --- a/src/bin/initdb/ss_initdb.cpp +++ b/src/bin/initdb/ss_initdb.cpp @@ -48,7 +48,8 @@ static const char* ss_clusterdirs[] = {"+global", "+pg_multixact/members", "+pg_multixact/offsets", "+pg_twophase", - "+pg_serial"}; + "+pg_serial", + "+pg_replslot"}; static const char* ss_instancedirs[] = {"+pg_xlog", "+pg_doublewrite" @@ -60,7 +61,6 @@ static const char* ss_instanceowndirs[] = {"base", "pg_xlog", "pg_xlog/archive_status", "undo", - "pg_replslot", "pg_stat_tmp", "pg_errorinfo", "pg_logical", diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 54dee94705..f4ccbb4ee5 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -4054,7 +4054,7 @@ static int ServerLoop(void) if ((u_sess->attr.attr_common.upgrade_mode == 0 || pg_atomic_read_u32(&WorkingGrandVersionNum) >= PUBLICATION_VERSION_NUM) && g_instance.pid_cxt.ApplyLauncerPID == 0 && - pmState == PM_RUN && !dummyStandbyMode && !SS_IN_REFORM) { + pmState == PM_RUN && !dummyStandbyMode && !ENABLE_DMS) { g_instance.pid_cxt.ApplyLauncerPID = initialize_util_thread(APPLY_LAUNCHER); } #endif @@ -6893,7 +6893,7 @@ static void reaper(SIGNAL_ARGS) #ifndef ENABLE_MULTIPLE_NODES if ((u_sess->attr.attr_common.upgrade_mode == 0 || pg_atomic_read_u32(&WorkingGrandVersionNum) >= PUBLICATION_VERSION_NUM) && - g_instance.pid_cxt.ApplyLauncerPID == 0 && !dummyStandbyMode && !SS_IN_REFORM) { + g_instance.pid_cxt.ApplyLauncerPID == 0 && !dummyStandbyMode && !ENABLE_DMS) { g_instance.pid_cxt.ApplyLauncerPID = initialize_util_thread(APPLY_LAUNCHER); } #endif @@ -10270,6 +10270,11 @@ static void sigusr1_handler(SIGNAL_ARGS) (errmsg("update gaussdb state file: db state(NORMAL_STATE), server mode(%s)", wal_get_role_string(get_cur_mode())))); + /* Clear replication slot info of SS standby */ + if (SS_NORMAL_STANDBY) { + ResetReplicationSlotsShmem(); + } + /* if enable remote execute, refesh conninfo */ if (SS_NORMAL_STANDBY && g_instance.attr.attr_sql.enableRemoteExcute) { SSStandbySetLibpqswConninfo(); diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 3d6dc1d242..8ad8c6be1b 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -9386,7 +9386,9 @@ void StartupXLOG(void) t_thrd.xact_cxt.ShmemVariableCache->xlogMaxCSN = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo - 1; init_instance_slot(); init_instance_slot_thread(); - StartupReplicationSlots(); + if (!ENABLE_DMS || (SS_REFORM_REFORMER && !SS_PRIMARY_DEMOTED)) { + StartupReplicationSlots(); + } /* * Startup logical state, needs to be setup now so we have proper data diff --git a/src/gausskernel/storage/replication/logical/logical.cpp b/src/gausskernel/storage/replication/logical/logical.cpp index fe159643a5..e025143cb9 100644 --- a/src/gausskernel/storage/replication/logical/logical.cpp +++ b/src/gausskernel/storage/replication/logical/logical.cpp @@ -52,6 +52,7 @@ #include "utils/builtins.h" #include "utils/memutils.h" +#include "storage/file/fio_device.h" /* data for errcontext callback */ typedef struct LogicalErrorCallbackState { LogicalDecodingContext *ctx; @@ -173,7 +174,7 @@ static LogicalDecodingContext *StartupDecodingContext(List *output_plugin_option ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; - ctx->write = do_write; + ctx->do_write = do_write; ctx->output_plugin_options = output_plugin_options; ctx->fast_forward = fast_forward; @@ -220,7 +221,7 @@ static LogicalDecodingContext *StartupDecodingContextForArea(List *output_plugin ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; - ctx->write = do_write; + ctx->do_write = do_write; ctx->output_plugin_options = output_plugin_options; ctx->fast_forward = fast_forward; @@ -599,7 +600,7 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, startptr, &err); + record = XLogReadRecord(ctx->reader, startptr, &err, true, SS_XLOGDIR); if (err != NULL) ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_LOGICAL_DECODE_ERROR), errmsg("Stopped to parse any valid XLog Record at %X/%X: %s.", @@ -657,7 +658,7 @@ void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_LOGICAL_DECODE_ERROR), errmsg("OutputPluginPrepareWrite needs to be called before OutputPluginWrite"))); - ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write); + ctx->do_write(ctx, ctx->write_location, ctx->write_xid, last_write); ctx->prepared_write = false; } @@ -1562,9 +1563,11 @@ void LogicalCleanSnapDirectory(bool rebuild) struct stat st; Assert(t_thrd.slot_cxt.MyReplicationSlot != NULL); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); int rc = snprintf_s(snappath, MAXPGPATH, MAXPGPATH - 1, - "pg_replslot/%s/snap", NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)); + "%s/%s/snap", replslot_path, NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)); securec_check_ss(rc, "\0", "\0"); if (stat(snappath, &st) == 0 && S_ISDIR(st.st_mode)) { diff --git a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp index 56f38335da..97d3f9b295 100755 --- a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp +++ b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp @@ -45,6 +45,7 @@ #include "access/xlog_internal.h" #include "storage/smgr/fd.h" +#include "storage/file/fio_device.h" #define MAXPG_LSNCOMPONENT 8 #define str_lsn_len 128 @@ -449,7 +450,10 @@ static Datum pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool conf CheckLogicalDecodingRequirements(u_sess->proc_cxt.MyDatabaseId); ReplicationSlotAcquire(NameStr(*name), false); - rc = sprintf_s(path, sizeof(path), "pg_replslot/%s/snap", NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)); + + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + rc = sprintf_s(path, sizeof(path), "%s/%s/snap", replslot_path, NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)); securec_check_ss(rc, "", ""); if (stat(path, &st) == 0 && S_ISDIR(st.st_mode)) { if (!rmtree(path, true)) diff --git a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp index ac096a89cc..84682111d8 100644 --- a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp @@ -64,6 +64,7 @@ #include "utils/relcache.h" #include "utils/relfilenodemap.h" +#include "storage/file/fio_device.h" static void ParallelReorderBufferSerializeReserve(ParallelReorderBuffer *rb, Size sz); static void ParallelReorderBufferCheckSerializeTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, @@ -762,12 +763,14 @@ static void ParallelReorderBufferRestoreCleanup(ParallelReorderBufferTXN *txn, X XLogSegNo first = (txn->first_lsn) / XLogSegSize; XLogSegNo last = (txn->final_lsn) / XLogSegSize; + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); /* iterate over all possible filenames, and delete them */ for (XLogSegNo cur = first; cur <= last; cur++) { char path[MAXPGPATH]; XLogRecPtr recptr; recptr = (cur * XLogSegSize); - errno_t rc = sprintf_s(path, sizeof(path), "pg_replslot/%s/snap/xid-%lu-lsn-%X-%X.snap", + errno_t rc = sprintf_s(path, sizeof(path), "%s/%s/snap/xid-%lu-lsn-%X-%X.snap", replslot_path, t_thrd.walsender_cxt.slotname, txn->xid, (uint32)(recptr >> 32), uint32(recptr)); securec_check_ss(rc, "", ""); if (unlink(path) != 0 && errno != ENOENT) { @@ -890,6 +893,8 @@ static void ParallelReorderBufferSerializeTXN(ParallelReorderBuffer *prb, Parall ParallelReorderBufferSerializeTXN(prb, subtxn, slotId); } + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); /* serialize changestream */ dlist_foreach_modify(change_i, &txn->changes) { @@ -915,7 +920,7 @@ static void ParallelReorderBufferSerializeTXN(ParallelReorderBuffer *prb, Parall * so each LSN only maps to a specific WAL record. */ - nRet = sprintf_s(path, MAXPGPATH, "pg_replslot/%s/snap/xid-%lu-lsn-%X-%X.snap", + nRet = sprintf_s(path, MAXPGPATH, "%s/%s/snap/xid-%lu-lsn-%X-%X.snap", replslot_path, t_thrd.walsender_cxt.slotname, txn->xid, (uint32)(recptr >> 32), (uint32)recptr); @@ -1016,6 +1021,8 @@ static Size ParallelReorderBufferRestoreChanges(ParallelReorderBuffer *prb, Para txn->nentries_mem = 0; Assert(dlist_is_empty(&txn->changes)); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); last_segno = (txn->final_lsn) / XLogSegSize; while (restored < (unsigned)g_instance.attr.attr_common.max_changes_in_memory && *segno <= last_segno) { if (restored > 0 && g_Logicaldispatcher[slotId].pOptions.max_reorderbuffer_in_memory > 0 && @@ -1042,7 +1049,7 @@ static Size ParallelReorderBufferRestoreChanges(ParallelReorderBuffer *prb, Para * so each LSN only maps to a specific WAL record. */ - rc = sprintf_s(path, sizeof(path), "pg_replslot/%s/snap/xid-%lu-lsn-%X-%X.snap", + rc = sprintf_s(path, sizeof(path), "%s/%s/snap/xid-%lu-lsn-%X-%X.snap", replslot_path, t_thrd.walsender_cxt.slotname, txn->xid, (uint32)(recptr >> 32), (uint32)recptr); diff --git a/src/gausskernel/storage/replication/logical/reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/reorderbuffer.cpp index ea84de910d..dae4adc28a 100644 --- a/src/gausskernel/storage/replication/logical/reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/reorderbuffer.cpp @@ -84,6 +84,7 @@ #include "utils/memutils.h" #include "utils/relcache.h" #include "utils/relfilenodemap.h" +#include "storage/file/fio_device.h" /* * We use a very simple form of a slab allocator for frequently allocated @@ -2055,6 +2056,8 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeTXN(rb, subtxn); } + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); /* serialize changestream */ dlist_foreach_modify(change_i, &txn->changes) { @@ -2079,7 +2082,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * No need to care about TLIs here, only used during a single run, * so each LSN only maps to a specific WAL record. */ - nRet = sprintf_s(path, MAXPGPATH, "pg_replslot/%s/snap/xid-%lu-lsn-%X-%X.snap", + nRet = sprintf_s(path, MAXPGPATH, "%s/%s/snap/xid-%lu-lsn-%X-%X.snap", replslot_path, NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name), txn->xid, (uint32)(recptr >> 32), (uint32)recptr); securec_check_ss(nRet, "", ""); @@ -2263,6 +2266,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn txn->nentries_mem = 0; Assert(dlist_is_empty(&txn->changes)); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); last_segno = (txn->final_lsn) / XLogSegSize; while (restored < (unsigned)g_instance.attr.attr_common.max_changes_in_memory && *segno <= last_segno) { int readBytes; @@ -2285,7 +2290,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn * No need to care about TLIs here, only used during a single run, * so each LSN only maps to a specific WAL record. */ - rc = sprintf_s(path, sizeof(path), "pg_replslot/%s/snap/xid-%lu-lsn-%X-%X.snap", + rc = sprintf_s(path, sizeof(path), "%s/%s/snap/xid-%lu-lsn-%X-%X.snap", replslot_path, NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name), txn->xid, (uint32)(recptr >> 32), (uint32)recptr); securec_check_ss(rc, "", ""); @@ -2499,13 +2504,15 @@ static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn } else { slot = ((LogicalDecodingContext *)rb->private_data)->slot; } + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); /* iterate over all possible filenames, and delete them */ for (cur = first; cur <= last; cur++) { char path[MAXPGPATH]; XLogRecPtr recptr; recptr = (cur * XLogSegSize); - rc = sprintf_s(path, sizeof(path), "pg_replslot/%s/snap/xid-%lu-lsn-%X-%X.snap", NameStr(slot->data.name), - txn->xid, (uint32)(recptr >> 32), uint32(recptr)); + rc = sprintf_s(path, sizeof(path), "%s/%s/snap/xid-%lu-lsn-%X-%X.snap", replslot_path, NameStr(slot->data.name), + txn->xid, (uint32)(recptr >> 32), uint32(recptr)); securec_check_ss(rc, "", ""); if (unlink(path) != 0 && errno != ENOENT) ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), @@ -2523,7 +2530,9 @@ void ReorderBufferClear(const char *slotname) struct stat statbuf; char path[MAXPGPATH] = {0}; char path_xid[MAXPGPATH] = {0}; - errno_t rc = sprintf_s(path, sizeof(path), "pg_replslot/%s/snap", slotname); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + errno_t rc = sprintf_s(path, sizeof(path), "%s/%s/snap", replslot_path, slotname); securec_check_ss(rc, "", ""); if (lstat(path, &statbuf) != 0 || !S_ISDIR(statbuf.st_mode)) { @@ -2534,7 +2543,7 @@ void ReorderBufferClear(const char *slotname) while ((spill_de = ReadDirExtended(spill_dir, path, DEBUG2)) != NULL) { /* only look at names that can be ours */ if (strncmp(spill_de->d_name, "xid", strlen("xid")) == 0) { - rc = sprintf_s(path_xid, sizeof(path_xid), "pg_replslot/%s/snap/%s", slotname, spill_de->d_name); + rc = sprintf_s(path_xid, sizeof(path_xid), "%s/%s/snap/%s", replslot_path, slotname, spill_de->d_name); securec_check_ss(rc, "", ""); if (unlink(path_xid) != 0) { ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), @@ -2556,8 +2565,10 @@ void StartupReorderBuffer(void) DIR *logical_dir = NULL; struct dirent *logical_de = NULL; - logical_dir = AllocateDir("pg_replslot"); - while ((logical_de = ReadDirExtended(logical_dir, "pg_replslot", DEBUG2)) != NULL) { + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + logical_dir = AllocateDir(replslot_path); + while ((logical_de = ReadDirExtended(logical_dir, replslot_path, DEBUG2)) != NULL) { if (strncmp(logical_de->d_name, ".", strlen(".")) == 0 || strncmp(logical_de->d_name, "..", strlen("..")) == 0) continue; diff --git a/src/gausskernel/storage/replication/slot.cpp b/src/gausskernel/storage/replication/slot.cpp old mode 100755 new mode 100644 index c6f5fad0f1..d3d31edcd5 --- a/src/gausskernel/storage/replication/slot.cpp +++ b/src/gausskernel/storage/replication/slot.cpp @@ -48,6 +48,7 @@ #include "replication/syncrep.h" #include "storage/copydir.h" #include "storage/smgr/fd.h" +#include "storage/file/fio_device.h" #include "storage/proc.h" #include "storage/procarray.h" #include "postmaster/postmaster.h" @@ -66,6 +67,8 @@ static char *trim_str(char *str, int str_len, char sep); static char *get_application_name(void); static void ReleaseArchiveSlotInfo(ReplicationSlot *slot); static int cmp_slot_lsn(const void *a, const void *b); +static int RenameReplslotPath(char *path1, char *path2); +static bool CheckExistReplslotPath(char *path); /* * Report shared-memory space needed by ReplicationSlotShmemInit. @@ -745,6 +748,9 @@ static void ReplicationSlotDropAcquired(void) /* slot isn't acquired anymore */ t_thrd.slot_cxt.MyReplicationSlot = NULL; + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + /* * If some other backend ran this code concurrently with us, we might try * to delete a slot with a certain name while someone else was trying to @@ -753,10 +759,10 @@ static void ReplicationSlotDropAcquired(void) LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); /* Generate pathnames. */ - int nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s", NameStr(slot->data.name)); + int nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s", replslot_path, NameStr(slot->data.name)); securec_check_ss(nRet, "\0", "\0"); - nRet = snprintf_s(tmppath, sizeof(tmppath), MAXPGPATH - 1, "pg_replslot/%s.tmp", NameStr(slot->data.name)); + nRet = snprintf_s(tmppath, sizeof(tmppath), MAXPGPATH - 1, "%s/%s.tmp", replslot_path, NameStr(slot->data.name)); securec_check_ss(nRet, "\0", "\0"); /* @@ -767,7 +773,7 @@ static void ReplicationSlotDropAcquired(void) * survive and this might get called during error handling. */ - if (rename(path, tmppath) == 0) { + if (RenameReplslotPath(path, tmppath) == 0) { /* * We need to fsync() the directory we just renamed and its parent to * make sure that our changes are on disk in a crash-safe fashion. If @@ -778,7 +784,7 @@ static void ReplicationSlotDropAcquired(void) */ START_CRIT_SECTION(); fsync_fname(tmppath, true); - fsync_fname("pg_replslot", true); + fsync_fname(replslot_path, true); END_CRIT_SECTION(); } else { volatile ReplicationSlot *vslot = slot; @@ -853,8 +859,10 @@ void ReplicationSlotSave(void) Assert(t_thrd.slot_cxt.MyReplicationSlot != NULL); - nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "pg_replslot/%s", - NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", replslot_path, + NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)); securec_check_ss(nRet, "\0", "\0"); if (unlikely(CheckFileExists(path) == FILE_NOT_EXIST)) { CreateSlotOnDisk(t_thrd.slot_cxt.MyReplicationSlot); @@ -1301,6 +1309,9 @@ void CheckPointReplicationSlots(void) ereport(DEBUG1, (errmsg("performing replication slot checkpoint"))); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + /* * Prevent any slot from being created/dropped while we're active. As we * explicitly do *not* want to block iterating over replication_slots or @@ -1318,7 +1329,7 @@ void CheckPointReplicationSlots(void) continue; /* save the slot to disk, locking is handled in SaveSlotToPath() */ - nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "pg_replslot/%s", NameStr(s->data.name)); + nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", replslot_path, NameStr(s->data.name)); securec_check_ss(nRet, "\0", "\0"); if (unlikely(CheckFileExists(path) == FILE_NOT_EXIST)) { @@ -1341,12 +1352,15 @@ void StartupReplicationSlots() ereport(DEBUG1, (errmsg("starting up replication slots"))); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + /* restore all slots by iterating over all on-disk entries */ - replication_dir = AllocateDir("pg_replslot"); + replication_dir = AllocateDir(replslot_path); if (replication_dir == NULL) { char tmppath[MAXPGPATH]; - nRet = snprintf_s(tmppath, sizeof(tmppath), MAXPGPATH - 1, "%s", "pg_replslot"); + nRet = snprintf_s(tmppath, sizeof(tmppath), MAXPGPATH - 1, "%s", replslot_path); securec_check_ss(nRet, "\0", "\0"); if (mkdir(tmppath, S_IRWXU) < 0) @@ -1354,14 +1368,14 @@ void StartupReplicationSlots() fsync_fname(tmppath, true); return; } - while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL) { + while ((replication_de = ReadDir(replication_dir, replslot_path)) != NULL) { struct stat statbuf; char path[MAXPGPATH]; if (strcmp(replication_de->d_name, ".") == 0 || strcmp(replication_de->d_name, "..") == 0) continue; - nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s", replication_de->d_name); + nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s", replslot_path, replication_de->d_name); securec_check_ss(nRet, "\0", "\0"); /* we're only creating directories here, skip if it's not our's */ @@ -1370,11 +1384,22 @@ void StartupReplicationSlots() /* we crashed while a slot was being setup or deleted, clean up */ if (string_endswith(replication_de->d_name, ".tmp")) { + if (ENABLE_DSS && CheckExistReplslotPath(path)) { + RestoreSlotFromDisk(replication_de->d_name); + continue; + } if (!rmtree(path, true)) { ereport(WARNING, (errcode_for_file_access(), errmsg("could not remove directory \"%s\"", path))); continue; } - fsync_fname("pg_replslot", true); + fsync_fname(replslot_path, true); + continue; + } + + if (ENABLE_DSS && !CheckExistReplslotPath(path)) { + if (!unlink(path)) { + ereport(WARNING, (errcode_for_file_access(), errmsg("could not remove directory \"%s\"", path))); + } continue; } @@ -1407,15 +1432,18 @@ void CreateSlotOnDisk(ReplicationSlot *slot) struct stat st; int nRet = 0; + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + /* * No need to take out the io_in_progress_lock, nobody else can see this * slot yet, so nobody else will write. We're reusing SaveSlotToPath which * takes out the lock, if we'd take the lock here, we'd deadlock. */ - nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s", NameStr(slot->data.name)); + nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s", replslot_path, NameStr(slot->data.name)); securec_check_ss(nRet, "\0", "\0"); - nRet = snprintf_s(tmppath, sizeof(tmppath), MAXPGPATH - 1, "pg_replslot/%s.tmp", NameStr(slot->data.name)); + nRet = snprintf_s(tmppath, sizeof(tmppath), MAXPGPATH - 1, "%s/%s.tmp", replslot_path, NameStr(slot->data.name)); securec_check_ss(nRet, "\0", "\0"); /* @@ -1441,7 +1469,7 @@ void CreateSlotOnDisk(ReplicationSlot *slot) SaveSlotToPath(slot, tmppath, ERROR); /* Rename the directory into place. */ - if (rename(tmppath, path) != 0) + if (RenameReplslotPath(tmppath, path) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rename file \"%s\" to \"%s\": %m", tmppath, path))); @@ -1453,7 +1481,7 @@ void CreateSlotOnDisk(ReplicationSlot *slot) START_CRIT_SECTION(); fsync_fname(path, true); - fsync_fname("pg_replslot", true); + fsync_fname(replslot_path, true); END_CRIT_SECTION(); @@ -1608,7 +1636,9 @@ static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) fsync_fname(path, false); fsync_fname(dir, true); - fsync_fname("pg_replslot", true); + if (!ENABLE_DSS) { + fsync_fname("pg_replslot", true); + } END_CRIT_SECTION(); @@ -1646,11 +1676,15 @@ static void RestoreSlotFromDisk(const char *name) bool retry = false; char *extra_content = NULL; ArchiveConfig *archive_cfg = NULL; + + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + /* no need to lock here, no concurrent access allowed yet * * delete temp file if it exists */ - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state.tmp", name); + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state.tmp", replslot_path, name); securec_check_ss(rc, "\0", "\0"); ret = unlink(path); @@ -1659,14 +1693,14 @@ static void RestoreSlotFromDisk(const char *name) /* unlink backup file if rename failed */ if (ret == 0) { - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state.backup", name); + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state.backup", replslot_path, name); securec_check_ss(rc, "\0", "\0"); if (unlink(path) < 0 && errno != ENOENT) ereport(PANIC, (errcode_for_file_access(), errmsg("could not unlink file \"%s\": %m", path))); ignore_bak = true; } - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state", name); + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state", replslot_path, name); securec_check_ss(rc, "\0", "\0"); elog(DEBUG1, "restoring replication slot from \"%s\"", path); @@ -1747,7 +1781,7 @@ loop: ereport(WARNING, (errmsg("replication slot file %s: checksum mismatch, is %u, should be %u, try backup file", path, checksum, cp.checksum))); - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state.backup", name); + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state.backup", replslot_path, name); securec_check_ss(rc, "\0", "\0"); ignore_bak = true; retry = true; @@ -1763,7 +1797,7 @@ loop: ereport(WARNING, (errcode_for_file_access(), errmsg("replication slot file \"%s\" has wrong magic %u instead of %d, try backup file", path, cp.magic, SLOT_MAGIC))); - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state.backup", name); + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state.backup", replslot_path, name); securec_check_ss(rc, "\0", "\0"); ignore_bak = true; retry = true; @@ -1780,7 +1814,7 @@ loop: ereport(WARNING, (errcode_for_file_access(), errmsg("replication slot file \"%s\" has corrupted length %u, try backup file", path, cp.length))); - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state.backup", name); + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state.backup", replslot_path, name); securec_check_ss(rc, "\0", "\0"); ignore_bak = true; retry = true; @@ -1795,12 +1829,12 @@ loop: * If we crashed with an ephemeral slot active, don't restore but delete it. */ if (GET_SLOT_PERSISTENCY(cp.slotdata) != RS_PERSISTENT && GET_SLOT_PERSISTENCY(cp.slotdata) != RS_BACKUP) { - rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "pg_replslot/%s", name); + rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", replslot_path, name); securec_check_ss(rc, "\0", "\0"); if (!rmtree(path, true)) { ereport(WARNING, (errcode_for_file_access(), errmsg("could not remove directory \"%s\"", path))); } - fsync_fname("pg_replslot", true); + fsync_fname(replslot_path, true); return; } @@ -1870,7 +1904,10 @@ static void RecoverReplSlotFile(const ReplicationSlotOnDisk &cp, const char *nam char path[MAXPGPATH]; errno_t rc = EOK; - rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "pg_replslot/%s/state", name); + char replslot_path[MAXPGPATH]; + GetReplslotPath(replslot_path); + + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state", replslot_path, name); securec_check_ss(rc, "\0", "\0"); ereport(WARNING, (errmsg("recover the replication slot file %s", name))); @@ -2556,3 +2593,82 @@ void get_hadr_cn_info(char* keyCn, bool* isExitKey, char* deleteCn, bool* isExit } } #endif + +void GetReplslotPath(char *path) +{ + if (ENABLE_DSS) { + errno_t rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/pg_replslot", + g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name); + securec_check_ss(rc, "\0", "\0"); + } else { + errno_t rc = strcpy_s(path, MAXPGPATH, "pg_replslot"); + securec_check_ss(rc, "\0", "\0"); + } +} + +static int RenameReplslotPath(char *path1, char *path2) +{ + int rc = 0; + const char* suffix = ".tmp"; + if (ENABLE_DSS) { + int lenth_of_path1 = strlen(path1); + if (strstr(path1, suffix) == path1 + lenth_of_path1 - strlen(suffix)) { + rc = symlink(path1, path2); + } else { + rc = unlink(path1); + } + } else { + rc = rename(path1, path2); + } + return rc; +} + +static bool CheckExistReplslotPath(char *path) +{ + char path_for_check[MAXPGPATH]; + const char* suffix = ".tmp"; + if (strstr(path, suffix) != NULL) { + int count = strlen(path) - strlen(suffix); + errno_t rc = strncpy_s(path_for_check, MAXPGPATH, path, count); + securec_check_ss(rc, "\0", "\0"); + } else { + errno_t rc = strcat_s(path_for_check, MAXPGPATH, suffix); + securec_check_ss(rc, "\0", "\0"); + } + if (dss_exist_dir(path_for_check)) { + return true; + } + + return false; +} + +void ResetReplicationSlotsShmem() +{ + if (g_instance.attr.attr_storage.max_replication_slots == 0) + return; + + if (t_thrd.slot_cxt.ReplicationSlotCtl != NULL) { + errno_t rc = 0; + + for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) { + ReplicationSlot *slot = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i]; + if (slot != NULL && strlen(slot->data.name.data) != 0) { + slot->in_use = false; + slot->active = false; + slot->just_dirtied = false; + slot->dirty = false; + slot->effective_xmin = 0; + slot->effective_catalog_xmin = 0; + rc = memset_s(&slot->data, sizeof(ReplicationSlotPersistentData), 0, + sizeof(ReplicationSlotPersistentData)); + securec_check(rc, "\0", "\0"); + slot->candidate_catalog_xmin = 0; + slot->candidate_xmin_lsn = 0; + slot->candidate_restart_valid = 0; + slot->candidate_restart_lsn = 0; + slot->is_recovery = false; + slot->last_xmin_change_time = 0; + } + } + } +} diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index c821e0355b..e265180540 100755 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -239,6 +239,11 @@ Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) { ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("return type must be a row type"))); } + + if (SS_IN_REFORM || SS_NORMAL_STANDBY) { + ereport(ERROR, (errmsg("Operation can't be excuted during reform or on DMS standby node!"))); + } + /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(NameStr(*name), RS_PERSISTENT, isDummyStandby, InvalidOid, InvalidXLogRecPtr); @@ -298,6 +303,10 @@ Datum pg_create_physical_replication_slot_extern(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("return type must be a row type"))); } + if (SS_IN_REFORM || SS_NORMAL_STANDBY) { + ereport(ERROR, (errmsg("Operation can't be excuted during reform or on DMS standby node!"))); + } + if (for_backup) { restart_lsn = create_physical_replication_slot_for_backup(NameStr(*name), isDummyStandby, extra_content); } else { @@ -439,9 +448,8 @@ void redo_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra */ Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS) { - if (ENABLE_DMS) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Not support create logical replication slot while DMS and DSS enabled"))); + if (SS_IN_REFORM || SS_NORMAL_STANDBY) { + ereport(ERROR, (errmsg("Operation can't be excuted during reform or on DMS standby node!"))); } Name name = PG_GETARG_NAME(0); @@ -494,9 +502,8 @@ Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS) */ Datum pg_drop_replication_slot(PG_FUNCTION_ARGS) { - if (ENABLE_DMS) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Not support drop replication slot while DMS and DSS enabled"))); + if (SS_IN_REFORM || SS_NORMAL_STANDBY) { + ereport(ERROR, (errmsg("Operation can't be excuted during reform or on DMS standby node!"))); } Name name = PG_GETARG_NAME(0); diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 07aa144827..0da1ea0d9e 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -397,6 +397,10 @@ int WalSenderMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL); + if (SS_IN_REFORM || SS_NORMAL_STANDBY) { + ereport(ERROR, (errmsg("Can't start replication during reform or on DMS standby mode!"))); + } + /* * Use the recovery target timeline ID during recovery */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index fce6913099..864e31ca7a 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -150,8 +150,7 @@ typedef enum WalLevel { /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ #define XLogStandbyInfoActive() \ - (g_instance.attr.attr_storage.wal_level >= WAL_LEVEL_HOT_STANDBY && \ - (!g_instance.attr.attr_storage.dms_attr.enable_dms || SS_PRIMARY_STANDBY_CLUSTER_NORMAL)) + (g_instance.attr.attr_storage.wal_level >= WAL_LEVEL_HOT_STANDBY) /* Do we need to WAL-log information required only for logical replication? */ #define XLogLogicalInfoActive() (g_instance.attr.attr_storage.wal_level >= WAL_LEVEL_LOGICAL) extern const char* DemoteModeDescs[]; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index ed8704f10b..b8baace4b6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -77,7 +77,7 @@ typedef struct LogicalDecodingContext { * User-Provided callback for writing/streaming out data. */ LogicalOutputPluginWriterPrepareWrite prepare_write; - LogicalOutputPluginWriterWrite write; + LogicalOutputPluginWriterWrite do_write; /* * Output buffer. diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index dd8ca4378e..c95bf81161 100755 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -343,5 +343,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo extern void ReplicationSlotDropAtPubNode(char *slotname, bool missing_ok); extern void LogicalCleanSnapDirectory(bool rebuild); extern void CleanMyReplicationSlot(); +void GetReplslotPath(char *path); +void ResetReplicationSlotsShmem(); #endif /* SLOT_H */ -- Gitee