From e9337f303ee189eef45afc5880c9bdf0a3444d1d Mon Sep 17 00:00:00 2001 From: chunlin <3049478157@qq.com> Date: Thu, 3 Jul 2025 20:49:38 +0800 Subject: [PATCH 1/4] feat: TiDBSrcVarcharToDate --- .../datatransform/TiDBSrcVarcharToDate.java | 61 +++++++++++-------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java index 451abee..729b05c 100644 --- a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java @@ -6,15 +6,12 @@ import com.clougence.cloudcanal.sdk.api.modelv2.CustomData; import com.clougence.cloudcanal.sdk.api.modelv2.CustomFieldV2; import com.clougence.cloudcanal.sdk.api.modelv2.CustomRecordV2; import com.clougence.cloudcanal.sdk.api.modelv2.SchemaInfo; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.JDBCType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; /** * @author chunlin create time is 2025/7/3 @@ -23,27 +20,35 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); - private final SchemaInfo srcTable = new SchemaInfo(null, "test", "test14"); - - private final SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy"); + private final Map> tableCols = new HashMap<>(); + private final Map> formatMap = new HashMap<>(); @Override public void start(ProcessorContext processorContext) { + SchemaInfo test14 = new SchemaInfo(null, "test", "test14"); + // The columns we need to convert + tableCols.put(test14, new HashSet<>(Arrays.asList("va"))); + // Format of each column + formatMap.put(test14, new HashMap() {{ + put("va", new SimpleDateFormat("dd-MM-yyyy")); + }}); } @Override public List process(CustomData data) { - customLogger.info("{}.{}.{},equals:{}", data.getSchemaInfo().getCatalog(), data.getSchemaInfo().getSchema(), data.getSchemaInfo().getTable(), data.getSchemaInfo().equals(srcTable)); - if (data.getSchemaInfo().equals(srcTable)) { + SchemaInfo schemaInfo = data.getSchemaInfo(); + Set colsNeedConvert = tableCols.get(schemaInfo); + Map formats = formatMap.get(schemaInfo); + if (colsNeedConvert != null && !colsNeedConvert.isEmpty() && formats != null && !formats.isEmpty()) { for (CustomRecordV2 recordV2 : data.getRecords()) { switch (data.getEventType()) { case INSERT: { - changeColumn(recordV2.getAfterColumnMap()); + convertColumn(recordV2.getAfterColumnMap(), colsNeedConvert, formats); break; } case UPDATE: { - changeColumn(recordV2.getBeforeColumnMap()); - changeColumn(recordV2.getAfterColumnMap()); + convertColumn(recordV2.getBeforeColumnMap(), colsNeedConvert, formats); + convertColumn(recordV2.getAfterColumnMap(), colsNeedConvert, formats); break; } default: @@ -52,21 +57,27 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { } } - List re = new ArrayList<>(); - re.add(data); - return re; + return Collections.singletonList(data); } - protected void changeColumn(LinkedHashMap columnMap) { - CustomFieldV2 col = columnMap.get("va"); - if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && StringUtils.isNotBlank((String) col.getValue())) { - String val = col.getValue().toString(); - try { - col.setValue(format.parse(val)); - col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); - col.setDbType("DATE"); - } catch (Exception e) { - customLogger.error("Failed to convert VARCHAR to DATE for value: " + val, e); + protected void convertColumn(LinkedHashMap columnMap, Set colsNeedConvert, Map formats) { + for (String colName : colsNeedConvert) { + CustomFieldV2 col = columnMap.get(colName); + Object value = col.getValue(); + SimpleDateFormat format = formats.get(colName); + if (formats.get(colName) == null) { + customLogger.info("Format has not been initialized yet, so ignore this column: {}", colName); + continue; + } + if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && value != null) { + String val = value.toString(); + try { + col.setValue(format.parse(val)); + col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); + col.setDbType("DATE"); + } catch (Exception e) { + customLogger.error("Failed to convert VARCHAR to DATE for value: {}", val, e); + } } } } -- Gitee From 6c3e4c5eea0c25c75ee86e88d8087c34ae85be7c Mon Sep 17 00:00:00 2001 From: chunlin <3049478157@qq.com> Date: Thu, 3 Jul 2025 22:38:24 +0800 Subject: [PATCH 2/4] feat: TiDBSrcVarcharToDate --- ...e.java => TiDBSrcVarcharDateFormatConvert.java} | 14 +++++++++----- .../META-INF/cloudcanal/plugin.properties | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) rename data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/{TiDBSrcVarcharToDate.java => TiDBSrcVarcharDateFormatConvert.java} (86%) diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java similarity index 86% rename from data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java rename to data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java index 729b05c..3188f7e 100644 --- a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java @@ -16,10 +16,12 @@ import java.util.*; /** * @author chunlin create time is 2025/7/3 */ -public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { +public class TiDBSrcVarcharDateFormatConvert implements CloudCanalProcessorV2 { protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); + private static final SimpleDateFormat DTF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private final Map> tableCols = new HashMap<>(); private final Map> formatMap = new HashMap<>(); @@ -27,10 +29,13 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { public void start(ProcessorContext processorContext) { SchemaInfo test14 = new SchemaInfo(null, "test", "test14"); // The columns we need to convert - tableCols.put(test14, new HashSet<>(Arrays.asList("va"))); + tableCols.put(test14, new HashSet<>(Arrays.asList("va", "va2"))); // Format of each column formatMap.put(test14, new HashMap() {{ + // target datatype is DATE put("va", new SimpleDateFormat("dd-MM-yyyy")); + // target datatype is DATETIME + put("va2", new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")); }}); } @@ -72,9 +77,8 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && value != null) { String val = value.toString(); try { - col.setValue(format.parse(val)); - col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); - col.setDbType("DATE"); + Date date = format.parse(val); + col.setValue(DTF.format(date)); } catch (Exception e) { customLogger.error("Failed to convert VARCHAR to DATE for value: {}", val, e); } diff --git a/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties b/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties index 0e62a62..8640502 100644 --- a/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties +++ b/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties @@ -1 +1 @@ -loadClassName=com.clougence.cloudcanal.dataprocess.datatransform.TiDBSrcVarcharToDate +loadClassName=com.clougence.cloudcanal.dataprocess.datatransform.TiDBSrcVarcharDateFormatConvert -- Gitee From 966e18a419254136e113719b4c92ed62ea4aba77 Mon Sep 17 00:00:00 2001 From: chunlin <3049478157@qq.com> Date: Thu, 3 Jul 2025 22:51:27 +0800 Subject: [PATCH 3/4] feat: review --- .../datatransform/TiDBSrcVarcharDateFormatConvert.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java index 3188f7e..737f408 100644 --- a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java @@ -28,13 +28,11 @@ public class TiDBSrcVarcharDateFormatConvert implements CloudCanalProcessorV2 { @Override public void start(ProcessorContext processorContext) { SchemaInfo test14 = new SchemaInfo(null, "test", "test14"); - // The columns we need to convert + // The source columns we need to convert tableCols.put(test14, new HashSet<>(Arrays.asList("va", "va2"))); - // Format of each column + // Format of each source column formatMap.put(test14, new HashMap() {{ - // target datatype is DATE put("va", new SimpleDateFormat("dd-MM-yyyy")); - // target datatype is DATETIME put("va2", new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")); }}); } -- Gitee From 7e01e345920701c21df9287fd24027b908a3d3c5 Mon Sep 17 00:00:00 2001 From: chunlin <3049478157@qq.com> Date: Fri, 4 Jul 2025 10:58:19 +0800 Subject: [PATCH 4/4] feat: ddMMyyyy --- .../datatransform/TiDBSrcVarcharDateFormatConvert.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java index 737f408..3cc8d1c 100644 --- a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java @@ -20,7 +20,7 @@ public class TiDBSrcVarcharDateFormatConvert implements CloudCanalProcessorV2 { protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); - private static final SimpleDateFormat DTF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static final SimpleDateFormat DTF = new SimpleDateFormat("yyyy-MM-dd"); private final Map> tableCols = new HashMap<>(); private final Map> formatMap = new HashMap<>(); @@ -32,8 +32,10 @@ public class TiDBSrcVarcharDateFormatConvert implements CloudCanalProcessorV2 { tableCols.put(test14, new HashSet<>(Arrays.asList("va", "va2"))); // Format of each source column formatMap.put(test14, new HashMap() {{ + // 23-07-2023 put("va", new SimpleDateFormat("dd-MM-yyyy")); - put("va2", new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")); + // 23072023 + put("va2", new SimpleDateFormat("ddMMyyyy")); }}); } -- Gitee