代码拉取完成,页面将自动刷新
/*
* Copyright (c) Huawei Technologies Co., Ltd. 2012-2019. All rights reserved.
*/
import bin.common.Logger;
import bin.common.driver.DriverInfoManager;
import bin.common.driver.IDriverInfo;
import bin.common.driver.OpenGaussDriverInfo;
import common.testcase.BaseJdbcTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opengauss.PGProperty;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.replication.PGReplicationStream;
import org.opengauss.util.PGobject;
import replication.MppdbDecodingPlugin;
import replication.OpenGaussLogSequenceNumber;
import replication.OpenGaussTimestampUtils;
import replication.event.AbstractWalEvent;
import replication.event.WriteRowEvent;
import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
/**
* Title: the MyFirstTest class
* <p>
* Description:
* <p>
* Copyright (c) Huawei Technologies Co., Ltd. 2012-2019.
*
* @author z00588921
* @version [DataStudio 1.0.0, 2021/1/28]
* @since 2021/1/28
*/
public class ReplicationTest extends BaseJdbcTestCase {
public static final String SLOT_NAME_PREFIX = "test_slot";
public static IDriverInfo info = DriverInfoManager.getInfo("db");
public static IntConvert convert = new IntConvert();
public static SimpleParseRs sRs = new SimpleParseRs();
private Connection conn;
private Connection replicationConn;
private MppdbDecodingPlugin plugin;
@Before
public void setUp() throws SQLException {
conn = DriverInfoManager.getConnection(info);
dropTable();
createTable();
initAutoId();
OpenGaussTimestampUtils timestampUtils = new OpenGaussTimestampUtils(conn.unwrap(PgConnection.class).getTimestampUtils());
plugin = new MppdbDecodingPlugin(timestampUtils);
replicationConn = getReplicationConn();
}
private PgConnection getReplicationConn() throws SQLException {
Properties prop = info.getProperties();
PGProperty.USER.set(prop, "test");
PGProperty.PG_DBNAME.set(prop, "postgres");
PGProperty.PASSWORD.set(prop, "Huawei@123");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(prop, "9.4");
PGProperty.REPLICATION.set(prop, "database");
PGProperty.PREFER_QUERY_MODE.set(prop, "simple");
prop.setProperty("hosts", info.getHosts());
prop.setProperty("database", info.getDatabase());
prop.setProperty("driver", info.getDriver());
OpenGaussDriverInfo tmpInfo = new OpenGaussDriverInfo(prop, new Properties());
return DriverInfoManager.getConnection(tmpInfo).unwrap(PgConnection.class);
}
@After
public void tearDown() throws SQLException {
try {
dropSlot();
} catch (SQLException dropExp) {
Logger.info("drop slot with exp:" + dropExp.getMessage());
}
dropTable();
conn.close();
conn = null;
replicationConn.close();
replicationConn = null;
}
@Test
public void testSelectType() throws SQLException {
try (PreparedStatement ps = conn.prepareStatement("select '60'::reltime")) {
try (ResultSet rs = ps.executeQuery()) {
rs.next();
Object obj = rs.getObject(1);
Logger.info("obj:" + obj);
}
}
}
// String input = "12-10-2010";
// String input = "12-10-2010 00:00:00.0";
// String input = "2010-12-10 00:00:00.0";
// String input = "E\\'\\\\xDEADBEEF\\'";
// String sql = String.format("insert into %s values (?, ?)", getTableName());
//PGobject obj = new PGobject();
// obj.setType("bytea");
// obj.setValue("DEADBEEF");
// obj.setValue("DEADBEEf");
// Object input = obj;
// Object[] params = {getNextId(), "21:21:21","21:21:21 pst","2010-12-12","2013-12-11 pst","2003-04-12 04:05:06"};
// public static final List<String> TYPES = Stream.of("tinyint", "smallint", "integer", "binary_integer", "bigint").collect(Collectors.toList());
// public static final List<String> VALUES = Stream.of("46", "30000", "2147483645", "2147483646", "9223372036854775806").collect(Collectors.toList());
// public static final List<String> TYPES = Stream.of("money").collect(Collectors.toList());
// public static final List<String> VALUES = Stream.of("1.08").collect(Collectors.toList());
// public static final List<String> TYPES = Stream.of("boolean").collect(Collectors.toList());
// public static final List<String> VALUES = Stream.of("true").collect(Collectors.toList());
public static final List<String> TYPES = Stream.of(
"time without time zone","time with time zone", "timestamp without time zone",
"timestamp with time zone", "smalldatetime", "date",
"interval day (3) to second (4)", "reltime"
).collect(Collectors.toList());
public static final List<String> VALUES = Stream.of(
"21:21:21","21:21:21 pst","2010-12-12",
"2013-12-11 pst","2003-04-12 04:05:06", "10-10-2021",
"3 days", "60"
).collect(Collectors.toList());
@Test
public void testReplication() throws Exception {
createSlot();
LogSequenceNumber startPos = getStartPos();
PgConnection pg1 = getReplicationConn();
String typeQuestionSql = IntStream.range(0, TYPES.size() + 1).mapToObj(idx -> "?").collect(Collectors.joining(", "));
String sql = String.format("insert into %s values (%s)", getTableName(), typeQuestionSql);
List<Object> params = new ArrayList<>(TYPES.size() + 1);
params.add(getNextId());
params.addAll(VALUES);
runInNewThread(sql, params);
List<AbstractWalEvent> events = startSlot(pg1, startPos);
assertEquals(3, events.size());
List<Object> converted = ((WriteRowEvent)events.get(1)).getAfterRow();
boolean[] equals = new boolean[converted.size()];
equals[0] = true;
int failCount = 0;
for (int i = 1; i < converted.size();i ++) {
if (converted.get(i) != null && params.get(i).equals(converted.get(i).toString())) {
equals[i] = true;
} else {
equals[i] = false;
failCount += 1;
}
}
assertEquals(0, failCount);
}
private void runInNewThread(String sql, List<Object> params) {
new Thread(() -> {
try {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
for (int i = 0; i < params.size(); i++) {
ps.setObject(i + 1, params.get(i));
}
ps.execute();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}).start();
}
private void createSlot() throws SQLException {
createSlot(null);
}
private void createSlot(PgConnection relConn) throws SQLException {
if (isSlotNameExist()) {
return;
}
String sql = String.format("select * from pg_create_logical_replication_slot('%s', '%s')",
getUniqueSlotName(conn),
"mppdb_decoding");
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.execute();
}
}
private void dropSlot() throws SQLException {
String sql = String.format("select * from pg_drop_replication_slot('%s')", getUniqueSlotName(conn));
try (CallableStatement cs = conn.prepareCall(sql)) {
cs.execute();
}
}
private LogSequenceNumber getStartPos() throws SQLException {
try (PreparedStatement ps = conn.prepareStatement("select pg_current_xlog_location()")) {
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
return LogSequenceNumber.valueOf(rs.getString(1));
}
return null;
}
}
}
private List<AbstractWalEvent> startSlot(PgConnection conn, LogSequenceNumber startPos) throws Exception {
List<AbstractWalEvent> events = new LinkedList<>();
PGReplicationStream stream = conn.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName(getUniqueSlotName(this.conn))
.withSlotOption("include-xids", true)
.withSlotOption("skip-empty-xacts", true)
.withStartPosition(startPos)
.start();
long idle = 2 * 1000;
long per_wait = 10L;
long cur = System.currentTimeMillis();
while (true) {
ByteBuffer byteBuffer = stream.readPending();
if (byteBuffer == null) {
try {
TimeUnit.MILLISECONDS.sleep(per_wait);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
if (System.currentTimeMillis() - cur > idle) {
break;
}
continue;
}
cur = System.currentTimeMillis();
AbstractWalEvent event = plugin.decode(byteBuffer,
new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
events.add(event);
}
return events;
}
private boolean isSlotNameExist() throws SQLException {
String sql = "select count(*) as num from pg_replication_slots where slot_name=?";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, getUniqueSlotName(conn));
try (ResultSet rs = ps.executeQuery()) {
rs.next();
return rs.getInt(1) != 0;
}
}
}
@Override
public Connection getConn() {
return conn;
}
@Override
public String getTableName() {
return "replicate_test";
}
@Override
public String getTableColumns4Create() {
//text
// char character nchar
// varchar2
// nvarchar2
// date timestamp
// time boolean
// raw !BYTEAWITHOUTORDERWITHEQUALCOL bytea
// smalldatetime
// return " (id int primary key, data smalldatetime) ";
// return " (id int primary key, da time without time zone ,dai time with time zone,dfgh timestamp without time zone,dfga timestamp with time zone, vbg smalldatetime)";
List<String> types = TYPES;
String typeSql = IntStream.range(0, types.size()).mapToObj(idx -> "data" + idx + " " + types.get(idx)).collect(Collectors.joining(", "));
return " (id int primary key, " + typeSql + " ) ";
}
public static String getUniqueSlotName(Connection conn) throws SQLException {
return String.format("%s_%s", SLOT_NAME_PREFIX, conn.getCatalog());
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。