From ace8d5cd40a8f52ad707b2fae58be79824cde265 Mon Sep 17 00:00:00 2001 From: yisai <869018811@qq.com> Date: Thu, 23 Dec 2021 17:34:27 +0800 Subject: [PATCH 1/2] =?UTF-8?q?mysql=E5=90=8C=E6=AD=A5=E5=88=B0clickHouse?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E4=BF=AE=E6=94=B9=E9=87=8D=E5=A4=8D=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../MysqlToClickHouseUpdateNoRepeat.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java diff --git a/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java b/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java new file mode 100644 index 0000000..68ac2af --- /dev/null +++ b/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java @@ -0,0 +1,76 @@ +package com.clougence.cloudcanal.dataprocess.widetable; + +import com.clougence.cloudcanal.sdk.api.CloudCanalProcessor; +import com.clougence.cloudcanal.sdk.api.constant.rdb.RecordAction; +import com.clougence.cloudcanal.sdk.api.contextkey.RdbContextKey; +import com.clougence.cloudcanal.sdk.api.metakey.RdbMetaKey; +import com.clougence.cloudcanal.sdk.api.model.CustomField; +import com.clougence.cloudcanal.sdk.api.model.CustomProcessorContext; +import com.clougence.cloudcanal.sdk.api.model.CustomRecord; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.List; + +/** + * 功能描述:mysql同步到clickHouse防止修改重复数据 + * @param + * @return + * @author yisai + * @Date 2021-12-23 17:23 + */ +public class MysqlToClickHouseUpdateNoRepeat implements CloudCanalProcessor { + /** + * 库 + */ + private String schemaName; + /** + * 表 + */ + private String tableName; + /** + * 主键 + */ + private Long primaryKey; + /** + * 动作 + */ + private String actionName; + + @Override + public List process(List customRecordList, CustomProcessorContext customProcessorContext) { + //修改同步前先删除对端库老数据 + updateRecord(customRecordList,(DataSource) customProcessorContext.getProcessorContextMap().get(RdbContextKey.TARGET_DATASOURCE)); + return customRecordList; + } + + /** + * 功能描述:修改同步前先删除对端库老数据 + * 注意主键要放在表第一个字段 + * @param + * @return + * @author yisai + * @Date 2021-12-23 17:00 + */ + private void updateRecord(List customRecordList, DataSource dataSource) { + try { + Connection connection = dataSource.getConnection(); + for (CustomRecord customRecord : customRecordList) { + actionName = customRecord.getRecordMetaMap().get(RdbMetaKey.ACTION_NAME).toString(); + if (RecordAction.UPDATE.name().equals(actionName)) { + schemaName = customRecord.getRecordMetaMap().get(RdbMetaKey.SCHEMA_NAME).toString(); + tableName = customRecord.getRecordMetaMap().get(RdbMetaKey.TABLE_NAME).toString(); + int endIndex = customRecord.getFieldMapBefore().toString().indexOf("="); + String primaryKeyName = customRecord.getFieldMapBefore().toString().substring(1, endIndex); + CustomField primaryKeyCustomField = (CustomField) customRecord.getFieldMapBefore().get(primaryKeyName); + primaryKey = Long.parseLong(primaryKeyCustomField.getValue().toString()); + PreparedStatement ps = connection.prepareStatement("ALTER TABLE " + schemaName + "." + tableName + " DELETE WHERE " + primaryKeyName + " = " + primaryKey); + ps.execute(); + } + } + }catch (Exception e){ + e.printStackTrace(); + } + } +} -- Gitee From 1a3004da8076a9dc803efa40433f34ded3ab50fb Mon Sep 17 00:00:00 2001 From: yisai <869018811@qq.com> Date: Thu, 23 Dec 2021 17:46:06 +0800 Subject: [PATCH 2/2] =?UTF-8?q?mysql=E5=90=8C=E6=AD=A5=E5=88=B0clickHouse?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E4=BF=AE=E6=94=B9=E9=87=8D=E5=A4=8D=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../widetable/MysqlToClickHouseUpdateNoRepeat.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java b/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java index 68ac2af..6122010 100644 --- a/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java +++ b/wide-table/src/main/java/com/clougence/cloudcanal/dataprocess/widetable/MysqlToClickHouseUpdateNoRepeat.java @@ -12,6 +12,8 @@ import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * 功能描述:mysql同步到clickHouse防止修改重复数据 @@ -21,6 +23,7 @@ import java.util.List; * @Date 2021-12-23 17:23 */ public class MysqlToClickHouseUpdateNoRepeat implements CloudCanalProcessor { + protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); /** * 库 */ @@ -70,7 +73,7 @@ public class MysqlToClickHouseUpdateNoRepeat implements CloudCanalProcessor { } } }catch (Exception e){ - e.printStackTrace(); + customLogger.error(e.getMessage()); } } } -- Gitee