diff --git a/src/bin/pg_ctl/CMakeLists.txt b/src/bin/pg_ctl/CMakeLists.txt index 4e8e92fbee6c81d3e6c837c2e6d6e68a73810944..1099a5e08ab4ead355d8b51f99c03e10423c0f87 100755 --- a/src/bin/pg_ctl/CMakeLists.txt +++ b/src/bin/pg_ctl/CMakeLists.txt @@ -2,6 +2,7 @@ execute_process( COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/transam/xlogreader.cpp ${CMAKE_CURRENT_SOURCE_DIR}/xlogreader.cpp COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/access/redo/xlogreader_common.cpp ${CMAKE_CURRENT_SOURCE_DIR}/xlogreader_common.cpp + COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/dss/dss_adaptor.cpp ${CMAKE_CURRENT_SOURCE_DIR}/dss_adaptor.cpp ) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_gsctl_SRC) diff --git a/src/bin/pg_ctl/Makefile b/src/bin/pg_ctl/Makefile index c72eb61e4f12d5fdfd793386257c2cdbee4424ea..1fc02d92d6ac2980cc9a99162e5bd7f32cc81ffe 100644 --- a/src/bin/pg_ctl/Makefile +++ b/src/bin/pg_ctl/Makefile @@ -40,6 +40,8 @@ OBJS= pg_ctl.o pg_build.o backup.o receivelog.o streamutil.o xlogreader.o xlogr $(top_builddir)/src/lib/hotpatch/client/libhotpatchclient.a endif +OBJS += $(top_builddir)/src/gausskernel/storage/dss/dss_adaptor.o + all: submake-pagecompression gs_ctl gs_ctl: $(OBJS) | submake-libpq submake-libpgport diff --git a/src/bin/pg_ctl/backup.cpp b/src/bin/pg_ctl/backup.cpp index 754dfb1f186b9afca96fda05b4b694d5a8c09068..f031e92ac0b2aa2361ba68e91d90c9670f146775 100755 --- a/src/bin/pg_ctl/backup.cpp +++ b/src/bin/pg_ctl/backup.cpp @@ -48,6 +48,7 @@ #include "port/pg_crc32c.h" #include "replication/dcf_data.h" #include "PageCompression.h" +#include "storage/file/fio_device.h" #ifdef ENABLE_MOT #include "fetchmot.h" @@ -136,6 +137,7 @@ static XLogRecPtr read_full_backup_label( const char* dirname, char* sysid, uint32 sysid_len, char* tline, uint32 tline_len); static int replace_node_name(char* sSrc, const char* sMatchStr, const char* sReplaceStr); static void show_full_build_process(const char* errmg); +static bool ss_backup_dw_file(const char* target_dir); static bool backup_dw_file(const char* target_dir); void get_xlog_location(char (&xlog_location)[MAXPGPATH]); static bool UpdatePaxosIndexFile(unsigned long long paxosIndex); @@ -870,7 +872,12 @@ static bool ReceiveAndUnpackTarFile(PGconn* conn, PGresult* res, int rownum) */ if (NULL != conn_str) (void)replace_node_name(copybuf, (const char*)remotenodename, (const char*)pgxcnodename); - nRet = snprintf_s(filename, MAXPGPATH, sizeof(filename) - 1, "%s/%s", current_path, copybuf); + + if (is_dss_file(copybuf)) { + nRet = snprintf_s(filename, MAXPGPATH, sizeof(filename) - 1, "%s", copybuf); + } else { + nRet = snprintf_s(filename, MAXPGPATH, sizeof(filename) - 1, "%s/%s", current_path, copybuf); + } securec_check_ss_c(nRet, "\0", "\0"); forbid_write = (IS_CROSS_CLUSTER_BUILD && strcmp(copybuf, "pg_hba.conf") == 0); @@ -916,6 +923,9 @@ static bool ReceiveAndUnpackTarFile(PGconn* conn, PGresult* res, int rownum) * description: we need refactor the communication protocol for well maintaining code */ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + if (is_dss_file(filename)) { + continue; + } if (symlink(©buf[bufOffset + 1], filename) != 0) { if (!streamwal || strcmp(filename + strlen(filename) - len, "/pg_xlog") != 0) { pg_log(PG_WARNING, _("could not create symbolic link from \"%s\" to \"%s\": %s\n"), @@ -1002,6 +1012,11 @@ static bool ReceiveAndUnpackTarFile(PGconn* conn, PGresult* res, int rownum) totaldone += r; continue; } + + if (instance_config.dss.enable_dss && strcmp(filename, "+data/pg_control") == 0) { + pg_log(PG_WARNING, _("file size %d. \n"), r); + } + if (forbid_write == false) { if (fwrite(copybuf, r, 1, file) != 1) { pg_log(PG_WARNING, _("could not write to file \"%s\": %s\n"), filename, strerror(errno)); @@ -1161,6 +1176,7 @@ static bool BaseBackup(const char* dirname, uint32 term) errno_t rc = EOK; int nRet = 0; struct stat st; + char *dssdir = instance_config.dss.vgdata; pqsignal(SIGCHLD, BuildReaper); /* handle child termination */ /* concat file and path */ @@ -1205,6 +1221,11 @@ static bool BaseBackup(const char* dirname, uint32 term) /* delete data/ and pg_tblspc/, but keep .config */ delete_datadir(dirname); + + /* delete data/ and pg_tblspc/ in dss, but keep .config */ + if (instance_config.dss.enable_dss) { + delete_datadir(dssdir); + } show_full_build_process("clear old target dir success"); /* create build tag file */ @@ -1737,7 +1758,11 @@ static bool BaseBackup(const char* dirname, uint32 term) /* fsync all data come from source */ if (!no_need_fsync) { show_full_build_process("starting fsync all files come from source."); - (void) fsync_pgdata(basedir); + if (instance_config.dss.enable_dss) { + (void) fsync_pgdata(dssdir); + } else { + (void) fsync_pgdata(basedir); + } show_full_build_process("finish fsync all files."); } @@ -1746,7 +1771,11 @@ static bool BaseBackup(const char* dirname, uint32 term) if (g_is_obsmode) { backupDWFileSuccess = backup_dw_file(basedir); } else { - backupDWFileSuccess = backup_dw_file(dirname); + if (instance_config.dss.enable_dss) { + backupDWFileSuccess = ss_backup_dw_file(dssdir); + } else { + backupDWFileSuccess = backup_dw_file(dirname); + } } if (!backupDWFileSuccess) { @@ -1768,9 +1797,13 @@ static bool BaseBackup(const char* dirname, uint32 term) if (!deleteFilsSuccess) { return false; } - - nRet = snprintf_s(tblspcPath, MAXPGPATH, MAXPGPATH, "%s/pg_tblspc", dirname); - securec_check_ss_c(nRet, "\0", "\0"); + + if (instance_config.dss.enable_dss) { + nRet = snprintf_s(tblspcPath, MAXPGPATH, MAXPGPATH, "%s/pg_tblspc", dssdir); + } else { + nRet = snprintf_s(tblspcPath, MAXPGPATH, MAXPGPATH, "%s/pg_tblspc", dirname); + securec_check_ss_c(nRet, "\0", "\0"); + } deleteFilsSuccess = DeleteAlreadyDropedFile(tblspcPath, true); if (!deleteFilsSuccess) { return false; @@ -2331,6 +2364,73 @@ static void show_full_build_process(const char* errmg) pg_log(PG_PROGRESS, _("%s\n"), errmg); } +/** + * delete existing double write file if existed in dss, recreate it and write one page of zero + * @param target_dir dss vgdata + */ +static bool ss_backup_dw_file(const char* target_dir) +{ + int rc; + int fd = -1; + char dw_file_path[PATH_MAX]; + char dw_path[PATH_MAX]; + char* buf = NULL; + char* unaligned_buf = NULL; + + /* Delete the dw file, if it exists. */ + rc = snprintf_s(dw_path, PATH_MAX, PATH_MAX - 1, "%s/pg_doublewrite%d", target_dir, + instance_config.dss.instance_id); + securec_check_ss_c(rc, "\0", "\0"); + + /* check whether directory is exits or not, if not exit then mkdir it */ + if (-1 == access(dw_path, R_OK | W_OK)) { + if (mkdir(dw_path, S_IRWXU) != 0) { + pg_log(PG_WARNING, _("failed to make dir %s.\n"), dw_path); + return false; + } + } + + /* Delete the dw file, if it exists. */ + rc = snprintf_s(dw_file_path, PATH_MAX, PATH_MAX - 1, "%s", T_OLD_DW_FILE_NAME); + securec_check_ss_c(rc, "\0", "\0"); + + delete_target_file(dw_file_path); + + /* Delete the dw build file, if it exists. */ + rc = snprintf_s(dw_file_path, PATH_MAX, PATH_MAX - 1, "%s", T_DW_BUILD_FILE_NAME); + securec_check_ss_c(rc, "\0", "\0"); + + delete_target_file(dw_file_path); + + /* Create the dw build file. */ + if ((fd = open(dw_file_path, (DW_FILE_FLAG | O_CREAT), DW_FILE_PERM)) < 0) { + pg_log(PG_WARNING, _("could not create file %s: %s\n"), dw_file_path, gs_strerror(errno)); + return false; + } + + unaligned_buf = (char*)malloc(BLCKSZ + BLCKSZ); + if (unaligned_buf == NULL) { + pg_log(PG_WARNING, _("out of memory")); + close(fd); + return false; + } + + buf = (char*)TYPEALIGN(BLCKSZ, unaligned_buf); + rc = memset_s(buf, BLCKSZ, 0, BLCKSZ); + securec_check_c(rc, "\0", "\0"); + + if (write(fd, buf, BLCKSZ) != BLCKSZ) { + pg_log(PG_WARNING, _("could not write data to file %s: %s\n"), dw_file_path, gs_strerror(errno)); + close(fd); + return false; + } + + free(unaligned_buf); + close(fd); + + return true; +} + /** * delete existing double write file if existed, recreate it and write one page of zero * @param target_dir data base root dir @@ -2411,7 +2511,15 @@ void get_xlog_location(char (&xlog_location)[MAXPGPATH]) char linkpath[MAXPGPATH] = {0}; errno_t rc = EOK; struct stat stbuf; - int nRet = snprintf_s(xlog_location, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog", basedir); + int nRet = 0; + + if (instance_config.dss.enable_dss) { + char *dssdir = instance_config.dss.vgdata; + nRet = snprintf_s(xlog_location, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog%d", dssdir, + instance_config.dss.instance_id); + } else { + nRet = snprintf_s(xlog_location, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog", basedir); + } securec_check_ss_c(nRet, "", ""); if (lstat(xlog_location, &stbuf) == 0) { diff --git a/src/bin/pg_ctl/pg_build.cpp b/src/bin/pg_ctl/pg_build.cpp index 7702c0682f17707081ef13db03b27ee7cf6e9cd3..52011bac10d5ca50d2b3ffc49d9fd16ef6462bce 100755 --- a/src/bin/pg_ctl/pg_build.cpp +++ b/src/bin/pg_ctl/pg_build.cpp @@ -22,6 +22,7 @@ #include "pg_build.h" #include "streamutil.h" #include "logging.h" +#include "tool_common.h" #include "bin/elog.h" #include "nodes/pg_list.h" @@ -32,6 +33,7 @@ #include "common/fe_memutils.h" #include "libpq/libpq-fe.h" #include "libpq/libpq-int.h" +#include "storage/file/fio_device.h" /* global variables for con */ char conninfo_global[MAX_REPLNODE_NUM][MAX_VALUE_LEN] = {{0}, {0}, {0}, {0}, {0}, {0}, {0}, {0}, {0}}; @@ -70,6 +72,7 @@ char g_repl_uuid[MAX_VALUE_LEN] = {0}; int g_replconn_idx = -1; int g_replication_type = -1; bool is_cross_region_build = false; +SSInstanceConfig instance_config; #define RT_WITH_DUMMY_STANDBY 0 #define RT_WITH_MULTI_STANDBY 1 @@ -1210,6 +1213,74 @@ static bool GetDCFKeyValue(const char *filename, const char *key, char *value) return true; } +int IsBeginWith(const char *str1, char *str2) +{ + if (str1 == NULL || str2 == NULL) { + return -1; + } + int len1 = strlen(str1); + int len2 = strlen(str2); + if ((len1 < len2) || (len1 == 0 || len2 == 0)) { + return -1; + } + + char *p = str2; + int i = 0; + while (*p != '\0') { + if (*p != str1[i]) { + return 0; + } + p++; + i++; + } + return 1; +} + +bool SsIsSkipPath(const char* dirname, bool needskipall) +{ + if (!instance_config.dss.enable_dss) { + return false; + } + + if (strcmp(dirname, ".recycle") == 0) { + return true; + } + + /* skip doublewrite of all instances*/ + if (IsBeginWith(dirname, "pg_doublewrite") > 0) { + return true; + } + + /* skip pg_control file when dss enable, only copy pg_control of main standby, + * we need to retain pg_control of other nodes, so pg_contol not be deleted directly. + */ + if (strcmp(dirname, "pg_control") == 0) { + return true; + } + + /* skip directory which not belong to primary in dss */ + if (needskipall) { + /* skip pg_xlog and doublewrite of all instances*/ + if (IsBeginWith(dirname, "pg_xlog") > 0) { + return true; + } + + } else { + /* skip other node pg_xlog except primary */ + if (IsBeginWith(dirname, "pg_xlog") > 0) { + int dirNameLen = strlen("pg_xlog"); + char instanceId[MAX_INSTANCEID_LEN] = {0}; + errno_t rc = EOK; + rc = snprintf_s(instanceId, sizeof(instanceId), sizeof(instanceId) - 1, "%d", + instance_config.dss.instance_id); + securec_check_ss_c(rc, "\0", "\0"); + /* not skip pg_xlog directory in file systerm */ + if (strlen(dirname) > dirNameLen && strcmp(dirname + dirNameLen, instanceId) != 0) + return true; + } + } +} + static void DeleteSubDataDir(const char* dirname) { DIR* dir = NULL; @@ -1257,6 +1328,9 @@ static void DeleteSubDataDir(const char* dirname) continue; if (g_is_obsmode && (strcmp(de->d_name, "pg_replslot") == 0)) continue; + if (is_dss_file(dirname) && SsIsSkipPath(de->d_name, true)) + continue; + rc = memset_s(fullpath, MAXPGPATH, 0, MAXPGPATH); securec_check_c(rc, "", ""); /* others */ @@ -1372,6 +1446,7 @@ static void DeleteSubDataDir(const char* dirname) (IS_CROSS_CLUSTER_BUILD && strcmp(de->d_name, "pg_hba.conf") == 0) || strcmp(de->d_name, "pg_hba.conf.old") == 0) continue; + /* Skip paxos index files for building process will write them */ if (enableDCF && ((strcmp(de->d_name, "paxosindex") == 0) || (strcmp(de->d_name, "paxosindex.backup") == 0))) @@ -1413,6 +1488,7 @@ void delete_datadir(const char* dirname) pg_log(PG_WARNING, _("input parameter is NULL.\n")); exit(1); } + nRet = snprintf_s(fullpath, MAXPGPATH, sizeof(fullpath) - 1, "%s/pg_tblspc", dirname); securec_check_ss_c(nRet, "", ""); @@ -1496,7 +1572,12 @@ void delete_datadir(const char* dirname) * this is to keep the basedir/pg_xlog, and delete all files and * directories under it. */ - nRet = snprintf_s(xlogpath, MAXPGPATH, sizeof(xlogpath) - 1, "%s/pg_xlog", dirname); + if (strncmp(dirname, "+", 1) == 0 ) { + nRet = snprintf_s(xlogpath, MAXPGPATH, sizeof(xlogpath) - 1, "%s/pg_xlog%d", dirname, + instance_config.dss.instance_id); + } else { + nRet = snprintf_s(xlogpath, MAXPGPATH, sizeof(xlogpath) - 1, "%s/pg_xlog", dirname); + } securec_check_ss_c(nRet, "", ""); if (lstat(xlogpath, &stbuf) == 0) { @@ -1704,7 +1785,12 @@ void fsync_pgdata(const char *pg_data) char pg_tblspc[MAXPGPATH] = {0}; errno_t errorno = EOK; - errorno = snprintf_s(pg_xlog, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog", pg_data); + if (is_dss_file(pg_data)) { + errorno = snprintf_s(pg_xlog, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog%d", pg_data, + instance_config.dss.instance_id); + } else { + errorno = snprintf_s(pg_xlog, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog", pg_data); + } securec_check_ss_c(errorno, "\0", "\0"); errorno = snprintf_s(pg_tblspc, MAXPGPATH, MAXPGPATH - 1, "%s/pg_tblspc", pg_data); securec_check_ss_c(errorno, "\0", "\0"); diff --git a/src/bin/pg_ctl/pg_build.h b/src/bin/pg_ctl/pg_build.h index 497001123ece104f62b18170a0236c81f6b3e2d9..f45848cca59cd3fc5db80799f19a7ee9d4a4cb87 100755 --- a/src/bin/pg_ctl/pg_build.h +++ b/src/bin/pg_ctl/pg_build.h @@ -12,6 +12,7 @@ #include #include #include "replication/replicainternal.h" +#include "tool_common.h" #define CONFIGRURE_FILE "postgresql.conf" #define CONFIGRURE_FILE_BAK "postgresql.conf.bak" @@ -43,6 +44,7 @@ extern char conninfo_global[MAX_REPLNODE_NUM][MAX_VALUE_LEN]; extern int standby_recv_timeout; extern int standby_connect_timeout; /* 120 sec = default */ extern char gaussdb_state_file[MAXPGPATH]; +extern SSInstanceConfig instance_config; void delete_datadir(const char* dirname); diff --git a/src/bin/pg_ctl/pg_ctl.cpp b/src/bin/pg_ctl/pg_ctl.cpp index 9129bdf6d36bdd9b027945e0dbd6b2311ac436a0..c66bd5ba64c78713f11185f776f3c8a0c7090410 100755 --- a/src/bin/pg_ctl/pg_ctl.cpp +++ b/src/bin/pg_ctl/pg_ctl.cpp @@ -55,6 +55,9 @@ #include "common/fe_memutils.h" #include "logging.h" #include "tool_common.h" +#include "catalog/pg_control.h" +#include "storage/dss/dss_adaptor.h" +#include "storage/file/fio_device.h" #ifdef ENABLE_MOT #include "fetchmot.h" @@ -239,6 +242,14 @@ BuildFailReason g_inc_fail_reason = DEFAULT_REASON; bool g_is_obsmode = false; +/* dss parameter */ +static char* vgname = NULL; +static char* vgdata = NULL; +static char* vglog = NULL; +static char* socketpath = NULL; +static bool enable_dss = false; +static char* ss_nodedatainfo = NULL; + #ifndef FREE_AND_RESET #define FREE_AND_RESET(ptr) \ do { \ @@ -3824,6 +3835,12 @@ static void do_help(void) printf(_("\nBuild connection option:\n")); printf(_(" -r, --recvtimeout=INTERVAL time that receiver waits for communication from server (in seconds)\n")); printf(_(" -C, connector CN/DN connect to specified CN/DN for build\n")); +#ifndef ENABLE_LITE_MODE + printf(_(" --enable-dss enable dss function\n")); + printf(_(" --instance-id=instance_id id number of instance when dss and dms are enabled\n")); + printf(_(" --vgname vg name in dss when dss is enabled\n")); + printf(_(" --socketpath=socketpath \n")); +#endif #if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS))) printf("\nReport bugs to GaussDB support.\n"); @@ -6146,6 +6163,37 @@ void SetConfigFilePath() } } } +#ifndef ENABLE_LITE_MODE +static void parse_vgname_args(char* args) +{ + vgname = xstrdup(args); + enable_dss = true; + if (strstr(vgname, "/") != NULL) { + fprintf(stderr, "invalid token \"/\" in vgname"); + exit(1); + } + + char *comma = strstr(vgname, ","); + if (comma == NULL) { + vgdata = vgname; + vglog = (char *)""; + return; + } + + vgdata = xstrdup(vgname); + comma = strstr(vgdata, ","); + comma[0] = '\0'; + vglog = comma + 1; + if (strstr(vgdata, ",") != NULL) { + fprintf(stderr, "invalid vgname args, should be two volume group names, example: \"+data,+log\""); + exit(1); + } + if (strstr(vglog, ",") != NULL) { + fprintf(stderr, "invalid vgname args, should be two volume group names, example: \"+data,+log\""); + exit(1); + } +} +#endif int main(int argc, char** argv) { @@ -6177,6 +6225,10 @@ int main(int argc, char** argv) {"keycn", required_argument, NULL, 'k'}, {"slotname", required_argument, NULL, 'K'}, {"taskid", required_argument, NULL, 'I'}, + {"vgname", required_argument, NULL, 5}, + {"socketpath", required_argument, NULL, 6}, + {"enable-dss", no_argument, NULL, 7}, + {"dms_url", required_argument, NULL, 8}, {NULL, 0, NULL, 0}}; int option_index; @@ -6248,14 +6300,14 @@ int main(int argc, char** argv) pgxcCommand = xstrdup("--single_node"); #ifdef ENABLE_PRIVATEGAUSS #ifndef ENABLE_LITE_MODE - while ((c = getopt_long(argc, argv, "a:b:cD:e:fi:G:l:m:M:N:n:o:O:p:P:r:R:v:x:sS:t:u:U:wWZ:C:dqL:I:T:Q:", + while ((c = getopt_long(argc, argv, "a:b:cD:e:fi:G:l:m:M:N:n:o:O:p:P:r:R:v:x:sS:t:u:U:wWZ:C:dqL:I:T:Q:g:", long_options, &option_index)) != -1) #else while ((c = getopt_long(argc, argv, "b:cD:e:fi:G:l:m:M:N:o:O:p:P:r:R:v:x:sS:t:u:U:wWZ:C:dqL:I:T:Q:", long_options, &option_index)) != -1) #endif #else - while ((c = getopt_long(argc, argv, "b:cD:e:fi:G:l:m:M:N:o:O:p:P:r:R:v:x:sS:t:u:U:wWZ:C:dqL:I:T:Q:", + while ((c = getopt_long(argc, argv, "b:cD:e:fi:G:l:m:M:N:o:O:p:P:r:R:v:x:sS:t:u:U:wWZ:C:dqL:I:T:Q:g:", long_options, &option_index)) != -1) #endif #endif @@ -6606,6 +6658,15 @@ int main(int argc, char** argv) } break; } + case 'g': + check_input_for_security(optarg); + if (atoi(optarg) < MIN_INSTANCEID || atoi(optarg) > MAX_INSTANCEID) { + pg_log(PG_WARNING, _("unexpected node id specified, valid range is %d - %d.\n"), + MIN_INSTANCEID, MAX_INSTANCEID ); + goto Error; + } + instance_config.dss.instance_id = atoi(optarg); + break; case 1: clear_backup_dir = true; break; @@ -6625,6 +6686,47 @@ int main(int argc, char** argv) } break; } + case 5:{ + check_input_for_security(optarg); + if (strlen(optarg) > MAX_PATH_LEN) { + pg_log(PG_WARNING, _("max path length is exceeded\n")); + goto Error; + } + + FREE_AND_RESET(vgname); + FREE_AND_RESET(vgdata); + FREE_AND_RESET(vgdata); + parse_vgname_args(optarg); + instance_config.dss.vgname = xstrdup(vgname); + instance_config.dss.vgdata = xstrdup(vgdata); + instance_config.dss.vglog = xstrdup(vglog); + break; + } + case 6:{ + check_input_for_security(optarg); + if (strlen(optarg) > MAX_PATH_LEN) { + pg_log(PG_WARNING, _("max path length is exceeded\n")); + goto Error; + } + socketpath = xstrdup(optarg); + instance_config.dss.socketpath = xstrdup(optarg); + break; + } + case 7: + instance_config.dss.enable_dss = true; + break; + case 8:{ + check_input_for_security(optarg); + if (strlen(optarg) > MAX_PATH_LEN) { + pg_log(PG_WARNING, _("max path length is exceeded\n")); + goto Error; + } + + FREE_AND_RESET(ss_nodedatainfo); + securec_check_c(ret, ss_nodedatainfo, "\0"); + ss_nodedatainfo = xstrdup(optarg); + break; + } default: /* getopt_long already issued a suitable error message */ do_advice(); @@ -6787,7 +6889,30 @@ int main(int argc, char** argv) do_wait = false; } - initDataPathStruct(false); + if (instance_config.dss.enable_dss) { + // dss device init + if (dss_device_init(instance_config.dss.socketpath, + instance_config.dss.enable_dss) != DSS_SUCCESS) { + pg_log(PG_WARNING, _("failed to init dss device\n")); + goto Error; + } + + /* Prepare some g_datadir parameters */ + g_datadir.instance_id = instance_config.dss.instance_id; + + errno_t rc = strcpy_s(g_datadir.dss_data, strlen(instance_config.dss.vgdata) + 1, instance_config.dss.vgdata); + securec_check_c(rc, "\0", "\0"); + + rc = strcpy_s(g_datadir.dss_log, strlen(instance_config.dss.vglog) + 1, instance_config.dss.vglog); + securec_check_c(rc, "\0", "\0"); + + /* The default of XLogSegmentSize was set 16M during configure, we reassign 1G to XLogSegmentSize + when dss enable */ + XLogSegmentSize = DSS_XLOG_SEG_SIZE; + } + + initDataPathStruct(instance_config.dss.enable_dss); + SetConfigFilePath(); pg_host = getenv("PGHOST"); diff --git a/src/bin/pg_ctl/receivelog.cpp b/src/bin/pg_ctl/receivelog.cpp index b6460e4cbec928b07c6b9b8bf5bce13d18a033ed..14b62847cc0f234547b3154c1422acb14844035a 100644 --- a/src/bin/pg_ctl/receivelog.cpp +++ b/src/bin/pg_ctl/receivelog.cpp @@ -38,6 +38,7 @@ #include #include #include +#include "storage/file/fio_device.h" /* Size of the streaming replication protocol headers */ #define STREAMING_HEADER_SIZE (1 + sizeof(WalDataMessageHeader)) @@ -98,12 +99,19 @@ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, const char* base nRet = snprintf_s(fn, sizeof(fn), sizeof(fn) - 1, "%s/%s.partial", basedir, namebuf); securec_check_ss_c(nRet, "", ""); - - retVal = realpath(fn, Lrealpath); - if (retVal == NULL && '\0' == Lrealpath[0]) { - pg_log(PG_PRINT, _("%s: realpath WAL segment path %s failed : %s\n"), progname, Lrealpath, strerror(errno)); - } - + + /* This basedir is real path when dss enabled,we need to not transform to realpath */ + if (is_dss_file(fn)) { + nRet = snprintf_s(Lrealpath, sizeof(Lrealpath), sizeof(Lrealpath) - 1, "%s", fn); + securec_check_ss_c(nRet, "", ""); + pg_log(PG_DEBUG, _("%s: WAL segment path that will open is %s \n"), progname, fn); + } else { + retVal = realpath(fn, Lrealpath); + if (retVal == NULL && '\0' == Lrealpath[0]) { + pg_log(PG_PRINT, _("%s: realpath WAL segment path %s failed : %s\n"), progname, Lrealpath, strerror(errno)); + } + } + int f = open(Lrealpath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); if (f == -1) { pg_log(PG_PRINT, _("%s: Could not open WAL segment %s: %s\n"), progname, Lrealpath, strerror(errno)); @@ -137,12 +145,16 @@ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, const char* base f = -1; return -1; } - - /* New, empty, file. So pad it to 16Mb with zeroes */ - zerobuf = (char*)xmalloc0(XLOG_BLCKSZ); - for (bytes = 0; bytes < (int)XLogSegSize; bytes += XLOG_BLCKSZ) { - if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { - pg_log(PG_PRINT, _("%s: could not pad WAL segment %s: %s\n"), progname, Lrealpath, strerror(errno)); + + + if (is_dss_fd(f)) { + /* extend file and fill space at once to avoid performance issue */ + errno = 0; + if (ftruncate(f, XLogSegSize) != 0) { + int save_errno = errno; + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + pg_log(PG_PRINT, _("%s: could not write to file %s: %s\n"), progname, Lrealpath, strerror(errno)); if (close(f) != 0) { pg_log(PG_PRINT, _("%s: close file failed %s: %s\n"), progname, Lrealpath, strerror(errno)); } @@ -150,13 +162,29 @@ static int open_walfile(XLogRecPtr startpoint, uint32 timeline, const char* base if (unlink(Lrealpath) != 0) { pg_log(PG_PRINT, _("%s: unlink file failed %s: %s\n"), progname, Lrealpath, strerror(errno)); } - free(zerobuf); - zerobuf = NULL; return -1; } + } else { + /* New, empty, file. So pad it to 16Mb with zeroes */ + zerobuf = (char*)xmalloc0(XLOG_BLCKSZ); + for (bytes = 0; bytes < (int)XLogSegSize; bytes += XLOG_BLCKSZ) { + if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { + pg_log(PG_PRINT, _("%s: could not pad WAL segment %s: %s\n"), progname, Lrealpath, strerror(errno)); + if (close(f) != 0) { + pg_log(PG_PRINT, _("%s: close file failed %s: %s\n"), progname, Lrealpath, strerror(errno)); + } + f = -1; + if (unlink(Lrealpath) != 0) { + pg_log(PG_PRINT, _("%s: unlink file failed %s: %s\n"), progname, Lrealpath, strerror(errno)); + } + free(zerobuf); + zerobuf = NULL; + return -1; + } + } + free(zerobuf); + zerobuf = NULL; } - free(zerobuf); - zerobuf = NULL; if (lseek(f, SEEK_SET, 0) != 0) { pg_log(PG_PRINT, diff --git a/src/bin/pg_probackup/pg_probackupb.h b/src/bin/pg_probackup/pg_probackupb.h index 54ddc6fe7f327ae95d176259e380c981d750f67d..3f36b73fa247f502c33f201cfbf294572a94afb0 100644 --- a/src/bin/pg_probackup/pg_probackupb.h +++ b/src/bin/pg_probackup/pg_probackupb.h @@ -11,6 +11,8 @@ #ifndef PG_PROBACKUPB_H #define PG_PROBACKUPB_H +#include "tool_common.h" + /* Information about single file (or dir) in backup */ typedef struct pgFile_t { @@ -129,16 +131,6 @@ typedef struct ArchiveOptions const char *user; } ArchiveOptions; -typedef struct DssOptions -{ - bool enable_dss; - int instance_id; - const char *vgname; - char *vglog; - char *vgdata; - char *socketpath; -} DssOptions; - /* * An instance configuration. It can be stored in a configuration file or passed * from command line. diff --git a/src/bin/pg_resetxlog/pg_resetxlog.cpp b/src/bin/pg_resetxlog/pg_resetxlog.cpp index 005424237a885e35f86418c4a18eeb18602cd06a..5959b29157ee59edf3927a1a00281804605affc1 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.cpp +++ b/src/bin/pg_resetxlog/pg_resetxlog.cpp @@ -86,14 +86,6 @@ static void usage(void); #define MAX_STRING_LENGTH 1024 const uint64 FREEZE_MAX_AGE = 2000000000; -typedef struct DssOptions -{ - bool enable_dss; - char *vgname; - char *socketpath; - int primaryInstId; -} DssOptions; - /* DSS connect parameters */ static DssOptions dss; diff --git a/src/common/port/tool_common.cpp b/src/common/port/tool_common.cpp index 888d80888b5554ff031febbee2a9f05fb80ba265..aa424b43709ad982b3c6b2efdf1c017188cd09ba 100644 --- a/src/common/port/tool_common.cpp +++ b/src/common/port/tool_common.cpp @@ -168,34 +168,34 @@ static void initDSSDataPathStruct(datadir_t *dataDir) // Unix file directory (instance owner) rc = snprintf_s(dataDir->dwDir.dwOldPath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_doublewrite%d/pg_dw", - dataDir->pg_data, dataDir->instance_id); + dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwPathPrefix, MAXPGPATH, MAXPGPATH - 1, "%s/pg_doublewrite%d/pg_dw_", - dataDir->pg_data, dataDir->instance_id); + dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwSinglePath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_doublewrite%d/pg_dw_single", - dataDir->pg_data, dataDir->instance_id); + dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwBuildPath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_doublewrite%d/pg_dw.build", - dataDir->pg_data, dataDir->instance_id); + dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwUpgradePath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_doublewrite%d/dw_upgrade", - dataDir->pg_data, dataDir->instance_id); + dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwMetaPath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_doublewrite%d/pg_dw_meta", - dataDir->pg_data, dataDir->instance_id); + dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwBatchUpgradeMetaPath, MAXPGPATH, MAXPGPATH - 1, - "%s/pg_doublewrite%d/dw_batch_upgrade_meta", dataDir->pg_data, dataDir->instance_id); + "%s/pg_doublewrite%d/dw_batch_upgrade_meta", dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); rc = snprintf_s(dataDir->dwDir.dwBatchUpgradeFilePath, MAXPGPATH, MAXPGPATH - 1, - "%s/pg_doublewrite%d/dw_batch_upgrade_files", dataDir->pg_data, dataDir->instance_id); + "%s/pg_doublewrite%d/dw_batch_upgrade_files", dataDir->dss_data, dataDir->instance_id); securec_check_ss_c(rc, "", ""); } diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index d70cb290e4a5322d80e008c91045e46ea02a1098..83a5336155178a74ad2669916ff122c38825a624 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -184,8 +184,9 @@ static int CBGetTxnCSN(void *db_handle, dms_opengauss_xid_csn_t *csn_req, dms_op } static int CBGetSnapshotData(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot) -{ - if (RecoveryInProgress()) { +{ + /* SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY always is in recovery progress, but it can acquire snapshot*/ + if (RecoveryInProgress() && !SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY) { return DMS_ERROR; } diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 49a0e07e009ddbd8df052fde0cc671e4041df13b..b32e1c03043c9e8640561ce7bca63c733b45920f 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -35,6 +35,7 @@ #include "ddes/dms/ss_reform_common.h" #include "storage/file/fio_device.h" #include "storage/smgr/segment_internal.h" +#include "replication/walreceiver.h" /* * Add xlog reader private structure for page read. @@ -111,7 +112,7 @@ static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) } static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen, - char *buf, TimeLineID *readTLI, char* xlog_path) + XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path) { /* Load reader private data */ XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data; @@ -173,7 +174,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int /* Read the requested page */ t_thrd.xlog_cxt.readOff = targetPageOff; - bool ret = SSReadXlogInternal(xlogreader, targetPagePtr, buf); + bool ret = SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, buf); if (!ret) { ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m", static_cast(targetPagePtr >> BIT_NUM_INT32), @@ -211,22 +212,44 @@ next_record_is_invalid: int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path) { - int read_len = SSReadXLog(xlogreader, targetPagePtr, Max(XLOG_BLCKSZ, reqLen), readBuf, - readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlogDir); + int read_len = SSReadXLog(xlogreader, targetPagePtr, Max(XLOG_BLCKSZ, reqLen), targetRecPtr, + readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlogDir); return read_len; } -bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, char *buf) +bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf) { uint32 preReadOff; + XLogRecPtr xlogFlushPtrForPerRead = xlogreader->xlogFlushPtrForPerRead; + bool isReadFile = true; do { - if (XLByteInPreReadBuf(targetPagePtr, xlogreader->preReadStartPtr)) { + /* + * That source is XLOG_FROM_STREAM indicate that walreceiver receive xlog and walrecwriter have wrriten xlog + * into pg_xlog segment file in dss. There exists a condition which preReadBuf possibly is zero for some xlog + * record just writing into pg_xlog file when source is XLOG_FROM_STREAM and dms and dss are enabled. So we + * need to reread xlog from dss to preReadBuf. + */ + if (SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY) { + volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; + if (XLByteInPreReadBuf(targetPagePtr, xlogreader->preReadStartPtr) && + ((targetRecPtr < xlogFlushPtrForPerRead && t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM) || + (!xlogctl->IsRecoveryDone))) { + isReadFile = false; + } + } + + if ((XLByteInPreReadBuf(targetPagePtr, xlogreader->preReadStartPtr) && + !SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY) || (!isReadFile)) { preReadOff = targetPagePtr % XLogPreReadSize; int err = memcpy_s(buf, XLOG_BLCKSZ, xlogreader->preReadBuf + preReadOff, XLOG_BLCKSZ); securec_check(err, "\0", "\0"); break; } else { + if (SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY) { + xlogreader->xlogFlushPtrForPerRead = GetWalRcvWriteRecPtr(NULL); + xlogFlushPtrForPerRead = xlogreader->xlogFlushPtrForPerRead; + } // pre-reading for dss uint32 targetPageOff = targetPagePtr % XLogSegSize; preReadOff = targetPageOff - targetPageOff % XLogPreReadSize; @@ -266,6 +289,8 @@ XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private } else { state->preReadBuf = (char *)TYPEALIGN(alignedSize, state->preReadBufOrigin); } + + state->xlogFlushPtrForPerRead = InvalidXLogRecPtr; } return state; diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 8380d4b994d5b0b7a7c40514a6472b31a8c35c7b..2c3157baedb1db17bab3bb6ec36af44ee5d61ac9 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -9951,10 +9951,12 @@ static void sigusr1_handler(SIGNAL_ARGS) /* the parent process, return 0 if the fork failed, return the PID if fork succeed. */ StartPgjobWorker(); } - + + /* if xlog_file_path is not equel to zero and dms is enabled, main standby need to initialize walreceiver + * and walrecwrite. Other modes don't need when dms is enabled. */ if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER) && g_instance.pid_cxt.WalReceiverPID == 0 && (pmState == PM_STARTUP || pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY || pmState == PM_WAIT_READONLY) && - g_instance.status == NoShutdown && !ENABLE_DMS) { + g_instance.status == NoShutdown && (!ENABLE_DMS || SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY)) { if (g_instance.pid_cxt.WalRcvWriterPID == 0) { g_instance.pid_cxt.WalRcvWriterPID = initialize_util_thread(WALRECWRITE); SetWalRcvWriterPID(g_instance.pid_cxt.WalRcvWriterPID); @@ -14728,14 +14730,29 @@ void InitShmemForDmsCallBack() InitProcessAndShareMemory(); } -const char *GetSSServerMode() +const char *GetSSServerMode(ServerMode mode) { - if (!SS_OFFICIAL_PRIMARY) { - return "Standby"; - } - - if (SS_OFFICIAL_PRIMARY) { - return "Primary"; + if (g_instance.attr.attr_storage.xlog_file_path != 0) { + if (SS_OFFICIAL_PRIMARY && mode == PRIMARY_MODE) { + return "Primary"; + } + + /* main standby in standby cluster */ + if (SS_OFFICIAL_PRIMARY && mode == STANDBY_MODE) { + return "Standby"; + } + + if (!SS_OFFICIAL_PRIMARY) { + return "Standby"; + } + } else { + if (!SS_OFFICIAL_PRIMARY) { + return "Standby"; + } + + if (SS_OFFICIAL_PRIMARY) { + return "Primary"; + } } return "Unknown"; diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index a912b63f24b845129a459a811a2abf1f4f71c659..b05dcfc7c2d4fc8bdcd34e2976f43b1b644a69e6 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -6844,6 +6844,7 @@ void XLOGShmemInit(void) t_thrd.shemem_ptr_cxt.XLogCtl->IsRecoveryDone = false; t_thrd.shemem_ptr_cxt.XLogCtl->SharedHotStandbyActive = false; t_thrd.shemem_ptr_cxt.XLogCtl->WalWriterSleeping = false; + t_thrd.shemem_ptr_cxt.XLogCtl->xlogFlushPtrForPerRead = InvalidXLogRecPtr; if (!IsInitdb && !dummyStandbyMode) { t_thrd.shemem_ptr_cxt.XLogCtl->lastRemovedSegNo = GetOldestXLOGSegNo(t_thrd.proc_cxt.DataDir); } @@ -17898,7 +17899,7 @@ retry: t_thrd.xlog_cxt.readOff = targetPageOff; if (ENABLE_DSS && ENABLE_DMS) { - bool ss_ret = SSReadXlogInternal(xlogreader, targetPagePtr, readBuf); + bool ss_ret = SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, readBuf); if (!ss_ret) { ereport(emode_for_corrupt_record(emode, RecPtr), (errcode_for_file_access(), @@ -18574,7 +18575,14 @@ pg_crc32 GetXlogRecordCrc(XLogRecPtr RecPtr, bool &crcvalid, XLogPageReadCB page /* Set up XLOG reader facility */ rc = memset_s(&readprivate, sizeof(XLogPageReadPrivate), 0, sizeof(XLogPageReadPrivate)); securec_check(rc, "\0", "\0"); - xlogreader = XLogReaderAllocate(pagereadfunc, &readprivate, bufAlignSize); + + /* we need to read xlog from dss when dms and dss enabled */ + if (ENABLE_DMS && ENABLE_DSS) { + xlogreader = SSXLogReaderAllocate(pagereadfunc, &readprivate, ALIGNOF_BUFFER); + } else { + xlogreader = XLogReaderAllocate(pagereadfunc, &readprivate, bufAlignSize); + } + if (xlogreader == NULL) { ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), errdetail("Failed while allocating an XLog reading processor"))); diff --git a/src/gausskernel/storage/replication/basebackup.cpp b/src/gausskernel/storage/replication/basebackup.cpp index 5d3c85bf4b53db993b31a8e0db9ab7fd950801e3..211c1c112edf8458c97cdacd70e0a8ace8fcc915 100755 --- a/src/gausskernel/storage/replication/basebackup.cpp +++ b/src/gausskernel/storage/replication/basebackup.cpp @@ -39,6 +39,7 @@ #include "storage/page_compression.h" #include "storage/pmsignal.h" #include "storage/checksum.h" +#include "storage/file/fio_device.h" #ifdef ENABLE_MOT #include "storage/mot/mot_fdw.h" #endif @@ -187,8 +188,14 @@ static void send_xlog_location() char fullpath[MAXPGPATH] = {0}; struct stat statbuf; int rc = 0; - - rc = snprintf_s(fullpath, sizeof(fullpath), sizeof(fullpath) - 1, "%s/pg_xlog", t_thrd.proc_cxt.DataDir); + + if (ENABLE_DSS) { + char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name; + rc = snprintf_s(fullpath, sizeof(fullpath), sizeof(fullpath) - 1, "%s/pg_xlog%d", dssdir, + g_instance.attr.attr_storage.dms_attr.instance_id); + } else { + rc = snprintf_s(fullpath, sizeof(fullpath), sizeof(fullpath) - 1, "%s/pg_xlog", t_thrd.proc_cxt.DataDir); + } securec_check_ss(rc, "", ""); if (lstat(fullpath, &statbuf) != 0) { @@ -849,9 +856,20 @@ void SendBaseBackup(BaseBackupCmd *cmd) set_ps_display(activitymsg, false); } + + if (ENABLE_DSS) { + int rc = 0; + char fullpath[MAXPGPATH] = {0}; + char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name; + + rc = snprintf_s(fullpath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_tblspc", dssdir); + securec_check_ss(rc, "", ""); - /* Make sure we can open the directory with tablespaces in it */ - dir = AllocateDir("pg_tblspc"); + dir = AllocateDir(fullpath); + } else { + /* Make sure we can open the directory with tablespaces in it */ + dir = AllocateDir("pg_tblspc"); + } if (dir == NULL) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not open directory \"%s\": %m", "pg_tblspc"))); return; @@ -1164,6 +1182,28 @@ int64 sendTablespace(const char *path, bool sizeonly) return size; } +int IsBeginWith(const char *str1, char *str2) +{ + if (str1 == NULL || str2 == NULL) + return -1; + int len1 = strlen(str1); + int len2 = strlen(str2); + if ((len1 < len2) || (len1 == 0 || len2 == 0)) { + return -1; + } + + char *p = str2; + int i = 0; + while (*p != '\0') { + if (*p != str1[i]) { + return 0; + } + p++; + i++; + } + return 1; +} + bool IsSkipDir(const char * dirName) { if (strcmp(dirName, ".") == 0 || strcmp(dirName, "..") == 0) @@ -1185,30 +1225,33 @@ bool IsSkipDir(const char * dirName) return true; if (strcmp(dirName, DISABLE_CONN_FILE) == 0) return true; - - return false; -} - -int IsBeginWith(const char *str1, char *str2) -{ - if (str1 == NULL || str2 == NULL) - return -1; - int len1 = strlen(str1); - int len2 = strlen(str2); - if ((len1 < len2) || (len1 == 0 || len2 == 0)) { - return -1; - } - - char *p = str2; - int i = 0; - while (*p != '\0') { - if (*p != str1[i]) { - return 0; + + /* skip .recycle in dss */ + if (ENABLE_DSS && strcmp(dirName, ".recycle") == 0) + return true; + + /* skip directory which not belong to primary in dss */ + if (ENABLE_DSS) { + /* skip primary doublewrite and other node doublewrite */ + if (IsBeginWith(dirName, "pg_doublewrite") > 0) { + return true; + } + + /* skip other node pg_xlog except primary */ + if (IsBeginWith(dirName, "pg_xlog") > 0) { + int dirNameLen = strlen("pg_xlog"); + char instance_id[MAX_INSTANCEID_LEN]; + errno_t rc = EOK; + rc = snprintf_s(instance_id, sizeof(instance_id), sizeof(instance_id) - 1, "%d", + g_instance.attr.attr_storage.dms_attr.instance_id); + securec_check_ss_c(rc, "\0", "\0"); + /* not skip pg_xlog directory in file systerm */ + if (strlen(dirName) > dirNameLen && strcmp(dirName + dirNameLen, instance_id) != 0) + return true; } - p++; - i++; } - return 1; + + return false; } bool IsSkipPath(const char * pathName) @@ -1250,6 +1293,11 @@ bool IsSkipPath(const char * pathName) if (t_thrd.walsender_cxt.is_obsmode == true && strcmp(pathName, "./pg_replslot") == 0) return true; + /* skip pg_control in dss */ + if (ENABLE_DSS && strcmp(pathName, "+data/pg_control") == 0) { + return true; + } + return false; } @@ -1309,7 +1357,12 @@ static int64 SendRealFile(bool sizeOnly, char* pathbuf, int basepathlen, struct } else { bool sent = false; if (!sizeOnly) { - sent = sendFile(pathbuf, pathbuf + basepathlen + 1, statbuf, true); + /* dss file send to other node in entire path */ + if (ENABLE_DSS && is_dss_file(pathbuf)) { + sent = sendFile(pathbuf, pathbuf, statbuf, true); + } else { + sent = sendFile(pathbuf, pathbuf + basepathlen + 1, statbuf, true); + } } if (sent || sizeOnly) { /* Add size, rounded up to 512byte block */ @@ -1465,7 +1518,8 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab * WAL archive anyway. But include it as an empty directory anyway, so * we get permissions right. */ - if (strcmp(pathbuf, "./pg_xlog") == 0) { + int pathNameLen = strlen("+data/pg_xlog"); + if (strcmp(pathbuf, "./pg_xlog") == 0 || strncmp(pathbuf, "+data/pg_xlog", pathNameLen) == 0) { if (!sizeonly) { /* If pg_xlog is a symlink, write it as a directory anyway */ #ifndef WIN32 @@ -1486,7 +1540,11 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab linkpath[MAXPGPATH - 1] = '\0'; if (!sizeonly) - _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + if (ENABLE_DSS && is_dss_file(pathbuf)) { + _tarWriteHeader(pathbuf, linkpath, &statbuf); + } else { + _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + } #else /* @@ -1500,7 +1558,12 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab #endif /* HAVE_READLINK */ } else if (S_ISDIR(statbuf.st_mode)) { statbuf.st_mode = S_IFDIR | S_IRWXU; - _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + /* dss directory send to other node in entire path */ + if (ENABLE_DSS && is_dss_file(pathbuf)) { + _tarWriteHeader(pathbuf, NULL, &statbuf); + } else { + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + } } } size += BUILD_PATH_LEN; /* Size of the header just added */ @@ -1521,7 +1584,11 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG), errmsg("symbolic link \"%s\" target is too long", pathbuf))); linkpath[MAXPGPATH - 1] = '\0'; - _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + if (ENABLE_DSS && is_dss_file(pathbuf)) { + _tarWriteHeader(pathbuf, linkpath, &statbuf); + } else { + _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + } #else /* * If the platform does not have symbolic links, it should not be @@ -1539,7 +1606,11 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab * statbuf from above ...). */ statbuf.st_mode = S_IFDIR | S_IRWXU; - _tarWriteHeader("pg_xlog/archive_status", NULL, &statbuf); + if (ENABLE_DSS && is_dss_file(pathbuf)) { + _tarWriteHeader(pathbuf, NULL, &statbuf); + } else { + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + } } } size += BUILD_PATH_LEN; /* Size of the header just added */ @@ -1565,7 +1636,11 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab (errcode(ERRCODE_NAME_TOO_LONG), errmsg("symbolic link \"%s\" target is too long", pathbuf))); linkpath[rllen] = '\0'; if (!sizeonly) - _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + if (ENABLE_DSS && is_dss_file(pathbuf)) { + _tarWriteHeader(pathbuf, linkpath, &statbuf); + } else { + _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf); + } size += BUILD_PATH_LEN; /* Size of the header just added */ #else @@ -1587,7 +1662,11 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab * permissions right. */ if (!sizeonly) - _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + if (ENABLE_DSS && is_dss_file(pathbuf)) { + _tarWriteHeader(pathbuf, NULL, &statbuf); + } else { + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); + } size += BUILD_PATH_LEN; /* Size of the header just added */ /* @@ -1801,9 +1880,19 @@ bool is_row_data_file(const char *path, int *segNo, UndoFileType *undoFileType) static void SendTableSpaceForBackup(basebackup_options* opt, List* tablespaces, char* labelfile, char* tblspc_map_file) { ListCell *lc = NULL; + int64 asize = 0; + char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name; + + if (ENABLE_DSS) { + /* Add a node for all directory in dss*/ + asize = sendDir(".", 1, true, tablespaces, true) + sendDir(dssdir, 1, true, tablespaces, true); + } else { + asize = sendDir(".", 1, true, tablespaces, true); + } + /* Add a node for the base directory at the end */ tablespaceinfo *ti = (tablespaceinfo *)palloc0(sizeof(tablespaceinfo)); - ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1; + ti->size = opt->progress ? asize : -1; tablespaces = (List *)lappend(tablespaces, ti); /* Send tablespace header */ @@ -1839,8 +1928,12 @@ static void SendTableSpaceForBackup(basebackup_options* opt, List* tablespaces, sendDir(".", 1, false, tablespaces, false); } else sendDir(".", 1, false, tablespaces, true); + /* send file in dss*/ + if (ENABLE_DSS) { + sendDir(dssdir, 1, false, tablespaces, true); + } } - + /* In the main tar, include pg_control last. */ if (iterti->path == NULL) { struct stat statbuf; @@ -2092,7 +2185,12 @@ static bool sendFile(char *readfilename, char *tarfilename, struct stat *statbuf /* send the pkg header containing msg like file size */ _tarWriteHeader(tarfilename, NULL, statbuf); - + + if (ENABLE_DSS && strcmp(tarfilename, XLOG_CONTROL_FILE) == 0) { + int read_size = BUFFERALIGN(sizeof(ControlFileData)); + statbuf->st_size = read_size; + } + while ((cnt = fread(t_thrd.basebackup_cxt.buf_block, 1, Min(TAR_SEND_SIZE, statbuf->st_size - len), fp)) > 0) { if (t_thrd.walsender_cxt.walsender_ready_to_stop) ereport(ERROR, (errcode_for_file_access(), errmsg("base backup receive stop message, aborting backup"))); diff --git a/src/gausskernel/storage/replication/walrcvwriter.cpp b/src/gausskernel/storage/replication/walrcvwriter.cpp index 42c361d7d9d2921187d8f3a9d20a7a2f6a13caf9..daf343379e1cbfbe89b7634923beaafad39c5e50 100755 --- a/src/gausskernel/storage/replication/walrcvwriter.cpp +++ b/src/gausskernel/storage/replication/walrcvwriter.cpp @@ -29,6 +29,7 @@ #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/smgr/smgr.h" +#include "storage/file/fio_device.h" #include "utils/guc.h" #include "access/xlog.h" #include "access/multi_redo_api.h" diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index 18f4d4030c2a2ea57d5ccf6750edf812ec5e02f3..7079e8b7e77747bf1121c036bb2a7760a952db29 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -453,7 +453,14 @@ void WalReceiverMain(void) int nRet = 0; errno_t rc = 0; - Assert(ENABLE_DSS == false); + if (ENABLE_DSS && t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && + g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY && + g_instance.attr.attr_storage.xlog_file_path != 0) { + ereport(LOG, (errmsg("walreceiver thread started for main standby"))); + } else { + Assert(ENABLE_DSS == false); + } + t_thrd.walreceiver_cxt.last_sendfilereply_timestamp = GetCurrentTimestamp(); t_thrd.walreceiver_cxt.standby_config_modify_time = time(NULL); @@ -2436,7 +2443,7 @@ Datum pg_stat_get_stream_replications(PG_FUNCTION_ARGS) /* local role */ if (g_instance.attr.attr_storage.dms_attr.enable_dms) { - values[0] = CStringGetTextDatum(GetSSServerMode()); + values[0] = CStringGetTextDatum(GetSSServerMode(local_role)); } else { values[0] = CStringGetTextDatum(wal_get_role_string(local_role)); } diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index b857872e379366aafc3e35074fd5ca17324678f1..6309edc25c2447f0ae5ed41b4e0f14b79121a4fe 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -369,9 +369,15 @@ SegPageLocation seg_logic_to_physic_mapping(SMgrRelation reln, SegmentHead *seg_ BlockNumber blocknum; /* Recovery thread should use physical location to read data directly. */ - if (RecoveryInProgress() && !CurrentThreadIsWorker() && !SS_IN_FLUSHCOPY) { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is in progress"), - errhint("cannot do segment address translation during recovery"))); + if (ENABLE_DMS && t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && + g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY && + g_instance.attr.attr_storage.xlog_file_path != 0) { + ereport(DEBUG1, (errmsg("can segment address translation when role is SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY"))); + } else { + if (RecoveryInProgress() && !CurrentThreadIsWorker() && !SS_IN_FLUSHCOPY) { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is in progress"), + errhint("cannot do segment address translation during recovery"))); + } } SegLogicPageIdToExtentId(logic_id, &extent_id, &offset, &extent_size); diff --git a/src/gausskernel/storage/xlog_share_storage/xlog_share_storage.cpp b/src/gausskernel/storage/xlog_share_storage/xlog_share_storage.cpp index be07e183fe43cfb98780f66673f72779b05b0c0f..17750e84dc0e1693b0ab02cf1a483a9bf6139360 100644 --- a/src/gausskernel/storage/xlog_share_storage/xlog_share_storage.cpp +++ b/src/gausskernel/storage/xlog_share_storage/xlog_share_storage.cpp @@ -39,6 +39,7 @@ #include "replication/syncrep_gramparse.h" #include "replication/walsender_private.h" #include "storage/ipc.h" +#include "storage/file/fio_device.h" #include "storage/dorado_operation/dorado_fd.h" #include "storage/xlog_share_storage/xlog_share_storage.h" #include "replication/shared_storage_walreceiver.h" diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index db5903d524b4e5f34c8b6de2c82c9d2528645fbd..3ec4ad7899de02172a94438446c6a2e9896db680 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -134,7 +134,7 @@ typedef enum WalLevel { /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ #define XLogStandbyInfoActive() \ (g_instance.attr.attr_storage.wal_level >= WAL_LEVEL_HOT_STANDBY && \ - !g_instance.attr.attr_storage.dms_attr.enable_dms) + (!g_instance.attr.attr_storage.dms_attr.enable_dms || SS_PRIMARY_STANDBY_CLUSTER_NORMAL)) /* Do we need to WAL-log information required only for logical replication? */ #define XLogLogicalInfoActive() (g_instance.attr.attr_storage.wal_level >= WAL_LEVEL_LOGICAL) extern const char* DemoteModeDescs[]; @@ -612,6 +612,9 @@ typedef struct XLogCtlData { bool is_need_log_remain_segs; XLogRecPtr remainCommitLsn; + /* streaming replication during pre-reading for dss */ + XLogRecPtr xlogFlushPtrForPerRead; + slock_t info_lck; /* locks shared variables shown above */ } XLogCtlData; diff --git a/src/include/access/xlog_basic.h b/src/include/access/xlog_basic.h index d714fa160843ca7a9cddc59b4b9b79eb5fe332b0..31b526292d967dae9e62d0efd185f0a0cea262de 100644 --- a/src/include/access/xlog_basic.h +++ b/src/include/access/xlog_basic.h @@ -314,6 +314,9 @@ struct XLogReaderState { XLogRecPtr preReadStartPtr; char* preReadBuf; char* preReadBufOrigin; + + /* streaming replication during pre-reading for dss */ + XLogRecPtr xlogFlushPtrForPerRead; /* last read segment, segment offset, TLI for data currently in readBuf */ XLogSegNo readSegNo; diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 61966fb60dfe76e2fd373885d4633259f4e6ff3f..8a87c7d7f2d67481c000017190784af0755d986c 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -89,6 +89,27 @@ (ENABLE_DMS && (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_WAITING || \ g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_REDIRECT)) +/* Mode in dorado hyperreplication and dms enabled as follow */ + +/* main standby which is runing normally, not in intermediate state */ +#define SS_PRIMARY_CLUSTER_NORMAL_PRIMARY \ + (ENABLE_DMS && (t_thrd.xlog_cxt.server_mode == PRIMARY_MODE) && \ + (g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_PRIMARY) && \ + (g_instance.attr.attr_storage.xlog_file_path != 0)) + +/* main standby which is runing normally, not in intermediate state */ +#define SS_STANDBY_CLUSTER_NORMAL_MAIN_STANDBY \ + (ENABLE_DMS && (t_thrd.xlog_cxt.server_mode == STANDBY_MODE || \ + t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE) && \ + (g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY) && \ + (g_instance.attr.attr_storage.xlog_file_path != 0)) + +/* main standby which is runing normally, not in intermediate state */ +#define SS_PRIMARY_STANDBY_CLUSTER_NORMAL \ + (ENABLE_DMS && ((g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_PRIMARY) || \ + (g_instance.attr.attr_common.cluster_run_mode == RUN_MODE_STANDBY)) && \ + (g_instance.attr.attr_storage.xlog_file_path != 0)) + /* DMS_BUF_NEED_LOAD */ #define BUF_NEED_LOAD 0x1 /* DMS_BUF_IS_LOADED */ diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index 3b4a893c60b6a9c2189aec66a386ff28caf12f5e..b3b4539c7e0cd5ea1ac8bf2c41fb21f4d4d45a90 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -37,7 +37,7 @@ typedef struct SSBroadcastCancelTrx { int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path); -bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, char *buf); +bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf); XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize); void SSGetXlogPath(); void SSSaveReformerCtrl(); diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 46199be40f6e9dae9200d86ef4029dc1f94186ad..82e7ad1d25b79a45d0a0705ec564bd955389e486 100755 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -220,7 +220,7 @@ extern bool get_addr_from_socket(int sock, struct sockaddr *saddr); extern int get_ip_port_from_addr(char* sock_ip, int* port, struct sockaddr saddr); #endif -const char *GetSSServerMode(); +const char *GetSSServerMode(ServerMode mode); bool SSIsServerModeReadOnly(); bool IsFromLocalAddr(Port* port); extern bool IsMatchSocketAddr(const struct sockaddr* sock_addr, int compare_port); diff --git a/src/include/tool_common.h b/src/include/tool_common.h index a7e84ba4c8dc6dd6c0ac993d156888b6f03c3f53..bf75bc0e6b63b25e3e67cf0f688af69f5bae864a 100644 --- a/src/include/tool_common.h +++ b/src/include/tool_common.h @@ -105,8 +105,25 @@ typedef struct st_datadir_t { dw_subdatadir_t dwDir; } datadir_t; -void initDataPathStruct(bool enable_dss); +typedef struct DssOptions +{ + bool enable_dss; + int instance_id; + const char *vgname; + char *vglog; + char *vgdata; + char *socketpath; + int primaryInstId; +} DssOptions; + +typedef struct SSInstanceConfig +{ + /* DSS conntct parameters */ + DssOptions dss; +} SSInstanceConfig; extern datadir_t g_datadir; +void initDataPathStruct(bool enable_dss); + #endif