diff --git a/mysql2openGauss/src/main/java/org/opengauss/migration/replay/TransactionDispatcher.java b/mysql2openGauss/src/main/java/org/opengauss/migration/replay/TransactionDispatcher.java index 1c4e5540b6b46b9a997f841d91a4358512ff8cda..98a820f4c958f03cd0689502b4f389faba65c841 100644 --- a/mysql2openGauss/src/main/java/org/opengauss/migration/replay/TransactionDispatcher.java +++ b/mysql2openGauss/src/main/java/org/opengauss/migration/replay/TransactionDispatcher.java @@ -18,9 +18,14 @@ package org.opengauss.migration.replay; import org.opengauss.migration.vo.ConnectionInfo; import org.opengauss.migration.vo.Transaction; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.concurrent.ConcurrentLinkedDeque; /** * Description: TransactionDispatcher class @@ -33,12 +38,13 @@ public class TransactionDispatcher { */ public static final int MAX_THREAD_COUNT = 50; + private static boolean isExit = false; + private int threadCount; private ConnectionInfo connectionInfo; private Transaction selectedTransaction = null; private ArrayList threadList = new ArrayList<>(); private ConcurrentLinkedDeque trxQueue; - private BlockingDeque queue = new LinkedBlockingDeque<>(); private int count = 0; private final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @@ -71,11 +77,12 @@ public class TransactionDispatcher { * Dispatcher */ public void dispatcher() { + Runtime.getRuntime().addShutdownHook(new ExitHandler()); createThreads(); statTask(); Transaction transaction = null; int freeThreadIndex = -1; - while (true) { + while (!isExit) { if (selectedTransaction == null) { transaction = trxQueue.poll(); if (transaction != null) { @@ -145,4 +152,22 @@ public class TransactionDispatcher { } return freeThreadIndex; } + + private class ExitHandler extends Thread { + @Override + public void run() { + try (Connection connection = connectionInfo.createOpenGaussConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery("select pg_current_xlog_location();")) { + while (rs.next()) { + String xlogPosition = rs.getString(1); + System.out.println("Online migration from mysql to openGauss has stopped and the current xlog " + + "position in openGauss is " + xlogPosition); + } + } catch (SQLException exp) { + exp.printStackTrace(); + } + isExit = true; + } + } }