From 453bd38fa408d13061f7d22f6ec1c4fee764d764 Mon Sep 17 00:00:00 2001 From: liu-tong-8848 Date: Tue, 13 Jun 2023 10:11:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=8D=95=E7=8B=AC=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E5=8F=8D=E5=90=91=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=8F=82?= =?UTF-8?q?=E6=95=B0=EF=BC=8C=E5=90=AF=E5=8A=A8=E5=8F=8D=E5=90=91=E6=97=B6?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E9=80=89=E6=8B=A9=E4=B8=8D=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E5=A4=8D=E5=88=B6=E6=A7=BD=EF=BC=8C=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E4=BD=BF=E7=94=A8=E8=87=AA=E5=AE=9A=E4=B9=89=E7=9A=84?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E5=A4=8D=E5=88=B6=E6=A7=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- portal/config/migrationConfig.properties | 6 ++ .../opengauss/portalcontroller/JdbcTools.java | 61 +++++++++++++++---- .../org/opengauss/portalcontroller/Plan.java | 17 +++++- .../check/CheckTaskReverseMigration.java | 1 + .../portalcontroller/constant/Check.java | 3 + 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/portal/config/migrationConfig.properties b/portal/config/migrationConfig.properties index 9a1004b..368b91f 100644 --- a/portal/config/migrationConfig.properties +++ b/portal/config/migrationConfig.properties @@ -47,3 +47,9 @@ zookeeper.port=127.0.0.1:2181 kafka.port=127.0.0.1:9092 confluent.port=127.0.0.1:8081 + +drop.logical.slot.on.stop=false + +use.specified.logical.slot=false + +specified.slot.name=slot_name diff --git a/src/main/java/org/opengauss/portalcontroller/JdbcTools.java b/src/main/java/org/opengauss/portalcontroller/JdbcTools.java index f96c4e3..4553301 100644 --- a/src/main/java/org/opengauss/portalcontroller/JdbcTools.java +++ b/src/main/java/org/opengauss/portalcontroller/JdbcTools.java @@ -16,6 +16,7 @@ package org.opengauss.portalcontroller; import org.opengauss.jdbc.PgConnection; +import org.opengauss.portalcontroller.constant.Check; import org.opengauss.portalcontroller.constant.Mysql; import org.opengauss.portalcontroller.constant.Opengauss; import org.opengauss.portalcontroller.exception.PortalException; @@ -146,9 +147,9 @@ public class JdbcTools { if (value.equals(defaultValue)) { flag = true; } else { - String reason = "If you want to use reverse migration," + - "please alter system set " + columnName + " to " + defaultValue + " " + - "and restart openGauss to make it work."; + String reason = "If you want to use reverse migration," + + "please alter system set " + columnName + " to " + defaultValue + " " + + "and restart openGauss to make it work."; PortalControl.refuseReverseMigrationReason = reason; LOGGER.error(reason); } @@ -233,19 +234,14 @@ public class JdbcTools { * @param connection the connection */ public static void createLogicalReplicationSlot(PgConnection connection) { - String slotName = "slot_" + Plan.workspaceId; if (connection != null) { try (Statement statement = connection.createStatement()) { - String selectSlotSql = "SELECT * FROM pg_get_replication_slots()"; - String columnName = "slot_name"; - boolean isReplicationSlotExists = isSpecifiedNameExist(statement, selectSlotSql, slotName, columnName); - if (isReplicationSlotExists) { - slotName += "_" + System.currentTimeMillis(); - } - String createSlotSql = "SELECT * FROM pg_create_logical_replication_slot('" + slotName + "', 'pgoutput')"; + generateSlotName(); + checkSlotName(); + String createSlotSql = "SELECT * FROM pg_create_logical_replication_slot('" + Plan.slotName + + "', 'pgoutput')"; statement.execute(createSlotSql); - Plan.slotName = slotName; - LOGGER.info("Create logical replication slot " + slotName + " finished."); + LOGGER.info("Create logical replication slot {} finished.", Plan.slotName); String selectPublicationSql = "SELECT pubname from pg_publication"; String publicationName = "dbz_publication"; String pubName = "pubname"; @@ -324,4 +320,43 @@ public class JdbcTools { } } } + + /** + * Get slot name string. + * + * @return the string + */ + public static String generateSlotName() { + String slotName = "slot_" + Plan.workspaceId; + try (PgConnection conn = JdbcTools.getPgConnection()) { + try (Statement statement = conn.createStatement()) { + String selectSlotSql = "SELECT * FROM pg_get_replication_slots()"; + String columnName = "slot_name"; + while (JdbcTools.isSpecifiedNameExist(statement, selectSlotSql, slotName, columnName)) { + slotName = "slot_" + Plan.workspaceId + "_" + System.currentTimeMillis(); + } + } catch (SQLException e) { + LOGGER.error(e.getMessage()); + } + } catch (SQLException e) { + PortalException portalException = new PortalException("SQL exception", "get slot name", e.getMessage()); + portalException.setRequestInformation("Get last slot name failed."); + PortalControl.refuseReverseMigrationReason = portalException.getMessage(); + LOGGER.error(portalException.toString()); + } + Plan.changeSlotName(slotName); + return slotName; + } + + /** + * Use last slot name. + */ + public static void checkSlotName() { + String slotName = Plan.slotName; + String useLastSlot = PortalControl.toolsMigrationParametersTable.get(Check.USE_SPECIFIED_LOGICAL_SLOT); + if (useLastSlot.equals("true")) { + slotName = PortalControl.toolsMigrationParametersTable.get(Check.SPECIFIED_SLOT_NAME); + } + Plan.changeSlotName(slotName); + } } diff --git a/src/main/java/org/opengauss/portalcontroller/Plan.java b/src/main/java/org/opengauss/portalcontroller/Plan.java index a1f9988..b23575c 100644 --- a/src/main/java/org/opengauss/portalcontroller/Plan.java +++ b/src/main/java/org/opengauss/portalcontroller/Plan.java @@ -148,6 +148,20 @@ public final class Plan { return plan; } + + /** + * Change slot name. + * + * @param str the str + */ + public static void changeSlotName(String str) { + slotName = str; + LOGGER.info("Current slot name is {}", str); + String slotNameStr = "Slot name:" + str; + String path = PortalControl.portalWorkSpacePath + "lastTaskInformation.txt"; + LogView.writeFile(slotNameStr, path, false); + } + /** * Check full datacheck running. */ @@ -374,7 +388,8 @@ public final class Plan { CheckTaskMysqlFullMigration checkTaskMysqlFullMigration = new CheckTaskMysqlFullMigration(); checkTaskMysqlFullMigration.cleanData(workspaceId); } - if (PortalControl.taskList.contains(Command.Start.Mysql.REVERSE)) { + if (PortalControl.taskList.contains(Command.Start.Mysql.REVERSE) + && PortalControl.toolsMigrationParametersTable.get(Check.DROP_LOGICAL_SLOT).equals("true")) { try (PgConnection conn = JdbcTools.getPgConnection()) { JdbcTools.changeAllTable(conn); JdbcTools.dropLogicalReplicationSlot(conn); diff --git a/src/main/java/org/opengauss/portalcontroller/check/CheckTaskReverseMigration.java b/src/main/java/org/opengauss/portalcontroller/check/CheckTaskReverseMigration.java index 5c491c1..bc76e36 100644 --- a/src/main/java/org/opengauss/portalcontroller/check/CheckTaskReverseMigration.java +++ b/src/main/java/org/opengauss/portalcontroller/check/CheckTaskReverseMigration.java @@ -61,6 +61,7 @@ public class CheckTaskReverseMigration implements CheckTask { hashtable1.put("transforms.route.replacement", "opengauss_server_" + workspaceId + "_topic"); hashtable1.put("source.process.file.path", hashtable.get(Status.REVERSE_FOLDER)); hashtable1.put("create.count.info.path", hashtable.get(Status.REVERSE_FOLDER)); + JdbcTools.checkSlotName(); hashtable1.put("slot.name", Plan.slotName); Tools.changePropertiesParameters(hashtable1, sourceConfigPath); Hashtable hashtable2 = new Hashtable<>(); diff --git a/src/main/java/org/opengauss/portalcontroller/constant/Check.java b/src/main/java/org/opengauss/portalcontroller/constant/Check.java index 11d010a..4d6d6bb 100644 --- a/src/main/java/org/opengauss/portalcontroller/constant/Check.java +++ b/src/main/java/org/opengauss/portalcontroller/constant/Check.java @@ -19,6 +19,9 @@ public interface Check { String INCREMENTAL_EXTRACT_SOURCE_JVM = "incremental.check.extract.source.jvm"; String INCREMENTAL_EXTRACT_SINK_JVM = "incremental.check.extract.sink.jvm"; String INCREMENTAL_CHECK_JVM = "incremental.check.jvm"; + String DROP_LOGICAL_SLOT = "drop.logical.slot.on.stop"; + String USE_SPECIFIED_LOGICAL_SLOT = "use.specified.logical.slot"; + String SPECIFIED_SLOT_NAME = "specified.slot.name"; interface Parameters { String SCHEMA = "spring.extract.schema"; -- Gitee