9 Star 0 Fork 0

杭州开云集致科技有限公司/cloudcanal-tunnel-sdk

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
TunnelDataSyncApplyTest.java 7.21 KB
一键复制 编辑 原始数据 按行查看 历史
bucketli 提交于 2023-09-18 17:40 . fix(ds):add tunnel api
package com.clougence.cloudcanal.tunnel.sdk.biz.writer;
import java.math.BigDecimal;
import java.sql.Types;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Test;
import com.clougence.cloudcanal.tunnel.sdk.model.schema.TunnelColumnDef;
import com.clougence.cloudcanal.tunnel.sdk.model.schema.TunnelTableDef;
/**
* @author bucketli 2023/9/18 16:24:36
*/
public class TunnelDataSyncApplyTest {
static TunnelTableDef t1Def;
static TunnelTableDef t2Def;
static {
//table simple_tab
t1Def = new TunnelTableDef();
t1Def.setDbName("cc_virtual_db");
t1Def.setSchemaName("cc_virtual_schema");
t1Def.setTableName("simple_tab");
t1Def.getPkNameList().add("id");
TunnelColumnDef t1c1 = new TunnelColumnDef("id", Types.BIGINT, "bigint");
TunnelColumnDef t1c2 = new TunnelColumnDef("worker_id", Types.BIGINT, "bigint");
TunnelColumnDef t1c3 = new TunnelColumnDef("col_new", Types.VARCHAR, "varchar");
t1Def.getColumnDefMap().put(t1c1.getColumnName(), t1c1);
t1Def.getColumnDefMap().put(t1c2.getColumnName(), t1c2);
t1Def.getColumnDefMap().put(t1c3.getColumnName(), t1c3);
//table worker_stats
t2Def = new TunnelTableDef();
t2Def.setDbName("cc_virtual_db");
t2Def.setSchemaName("cc_virtual_schema");
t2Def.setTableName("worker_stats");
t2Def.getPkNameList().add("id");
TunnelColumnDef t2c1 = new TunnelColumnDef("id", Types.BIGINT, "bigint");
TunnelColumnDef t2c2 = new TunnelColumnDef("gmt_create", Types.TIMESTAMP, "datetime");
TunnelColumnDef t2c3 = new TunnelColumnDef("aucrdt", Types.TIMESTAMP, "timestamp");
TunnelColumnDef t2c4 = new TunnelColumnDef("worker_id", Types.BIGINT, "bigint");
TunnelColumnDef t2c5 = new TunnelColumnDef("cpu_stat", Types.VARCHAR, "text");
TunnelColumnDef t2c6 = new TunnelColumnDef("mem_stat", Types.VARCHAR, "text");
TunnelColumnDef t2c7 = new TunnelColumnDef("disk_stat", Types.VARCHAR, "text");
TunnelColumnDef t2c8 = new TunnelColumnDef("col_new", Types.VARCHAR, "varchar");
TunnelColumnDef t2c9 = new TunnelColumnDef("unsigned_col", Types.BIGINT, "bigint");
TunnelColumnDef t2c10 = new TunnelColumnDef("new_col_col", Types.VARCHAR, "varchar");
TunnelColumnDef t2c11 = new TunnelColumnDef("timestamp_col", Types.TIMESTAMP, "timestamp");
TunnelColumnDef t2c12 = new TunnelColumnDef("abc", Types.VARCHAR, "varchar");
TunnelColumnDef t2c13 = new TunnelColumnDef("changed_col", Types.VARCHAR, "varchar");
TunnelColumnDef t2c14 = new TunnelColumnDef("new_add_msg", Types.VARCHAR, "varchar");
TunnelColumnDef t2c15 = new TunnelColumnDef("haha", Types.BIGINT, "bigint");
TunnelColumnDef t2c16 = new TunnelColumnDef("ob_add_1", Types.VARCHAR, "varchar");
TunnelColumnDef t2c17 = new TunnelColumnDef("decimal_col", Types.DECIMAL, "decimal");
t2Def.getColumnDefMap().put(t2c1.getColumnName(), t2c1);
t2Def.getColumnDefMap().put(t2c2.getColumnName(), t2c2);
t2Def.getColumnDefMap().put(t2c3.getColumnName(), t2c3);
t2Def.getColumnDefMap().put(t2c4.getColumnName(), t2c4);
t2Def.getColumnDefMap().put(t2c5.getColumnName(), t2c5);
t2Def.getColumnDefMap().put(t2c6.getColumnName(), t2c6);
t2Def.getColumnDefMap().put(t2c7.getColumnName(), t2c7);
t2Def.getColumnDefMap().put(t2c8.getColumnName(), t2c8);
t2Def.getColumnDefMap().put(t2c9.getColumnName(), t2c9);
t2Def.getColumnDefMap().put(t2c10.getColumnName(), t2c10);
t2Def.getColumnDefMap().put(t2c11.getColumnName(), t2c11);
t2Def.getColumnDefMap().put(t2c12.getColumnName(), t2c12);
t2Def.getColumnDefMap().put(t2c13.getColumnName(), t2c13);
t2Def.getColumnDefMap().put(t2c14.getColumnName(), t2c14);
t2Def.getColumnDefMap().put(t2c15.getColumnName(), t2c15);
t2Def.getColumnDefMap().put(t2c16.getColumnName(), t2c16);
t2Def.getColumnDefMap().put(t2c17.getColumnName(), t2c17);
}
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test
public void testWrite() {
TunnelDataApplier applier = null;
try {
String clientUuid = "testClientThread1";
applier = new TunnelDataSyncApplier(clientUuid, "127.0.0.1:18443", null, "my_test", "ssadf23123sdaf", 200);
applier.start();
TunnelDataApplier finalApplier = applier;
Thread t1 = new Thread(() -> {
List<Map<String, Object>> t1Records = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
Map<String, Object> r = genRecordFromDef(t1Def, t1Def.getPkNameList().get(0), i);
t1Records.add(r);
}
finalApplier.apply(t1Records, t1Def);
});
Thread t2 = new Thread(() -> {
List<Map<String, Object>> t2Records = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
Map<String, Object> r = genRecordFromDef(t2Def, t2Def.getPkNameList().get(0), i);
t2Records.add(r);
}
finalApplier.apply(t2Records, t2Def);
});
t1.start();
t2.start();
t1.join();
t2.join();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (applier != null) {
applier.stop();
}
}
}
protected Map<String, Object> genRecordFromDef(TunnelTableDef def, String pkName, long pkVal) {
Map<String, Object> r = new HashMap<>();
for (Map.Entry<String, TunnelColumnDef> colEntry : def.getColumnDefMap().entrySet()) {
if (colEntry.getKey().equals(pkName)) {
r.put(pkName, pkVal);
} else {
switch (colEntry.getValue().getSqlType()) {
case Types.BIGINT: {
Long v = RandomUtils.nextLong();
r.put(colEntry.getKey(), v);
break;
}
case Types.VARCHAR: {
String v = "random varchar " + RandomUtils.nextLong();
r.put(colEntry.getKey(), v);
break;
}
case Types.TIMESTAMP: {
String nowStr = LocalDateTime.now().format(formatter);
r.put(colEntry.getKey(), nowStr);
break;
}
case Types.DECIMAL: {
BigDecimal v = BigDecimal.valueOf(RandomUtils.nextDouble());
r.put(colEntry.getKey(), v);
break;
}
default:
throw new UnsupportedOperationException("Unsupported column (" + colEntry.getKey() + ") type:" + colEntry.getValue().getSqlType());
}
}
}
return r;
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/clougence/cloudcanal-tunnel-sdk.git
git@gitee.com:clougence/cloudcanal-tunnel-sdk.git
clougence
cloudcanal-tunnel-sdk
cloudcanal-tunnel-sdk
master

搜索帮助

Cb406eda 1850385 E526c682 1850385