From 5d2dd6bb2b7171a9fc3898a55a271919e4e72de7 Mon Sep 17 00:00:00 2001 From: chunlin <3049478157@qq.com> Date: Thu, 3 Jul 2025 19:07:04 +0800 Subject: [PATCH] feat: TiDBSrcVarcharToDate --- .../datatransform/TiDBSrcVarcharToDate.java | 78 +++++++++++++++++++ .../META-INF/cloudcanal/plugin.properties | 2 +- 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java new file mode 100644 index 0000000..451abee --- /dev/null +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java @@ -0,0 +1,78 @@ +package com.clougence.cloudcanal.dataprocess.datatransform; + +import com.clougence.cloudcanal.sdk.api.CloudCanalProcessorV2; +import com.clougence.cloudcanal.sdk.api.ProcessorContext; +import com.clougence.cloudcanal.sdk.api.modelv2.CustomData; +import com.clougence.cloudcanal.sdk.api.modelv2.CustomFieldV2; +import com.clougence.cloudcanal.sdk.api.modelv2.CustomRecordV2; +import com.clougence.cloudcanal.sdk.api.modelv2.SchemaInfo; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.JDBCType; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * @author chunlin create time is 2025/7/3 + */ +public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { + + protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); + + private final SchemaInfo srcTable = new SchemaInfo(null, "test", "test14"); + + private final SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy"); + + @Override + public void start(ProcessorContext processorContext) { + } + + @Override + public List process(CustomData data) { + customLogger.info("{}.{}.{},equals:{}", data.getSchemaInfo().getCatalog(), data.getSchemaInfo().getSchema(), data.getSchemaInfo().getTable(), data.getSchemaInfo().equals(srcTable)); + if (data.getSchemaInfo().equals(srcTable)) { + for (CustomRecordV2 recordV2 : data.getRecords()) { + switch (data.getEventType()) { + case INSERT: { + changeColumn(recordV2.getAfterColumnMap()); + break; + } + case UPDATE: { + changeColumn(recordV2.getBeforeColumnMap()); + changeColumn(recordV2.getAfterColumnMap()); + break; + } + default: + break; + } + } + } + + List re = new ArrayList<>(); + re.add(data); + return re; + } + + protected void changeColumn(LinkedHashMap columnMap) { + CustomFieldV2 col = columnMap.get("va"); + if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && StringUtils.isNotBlank((String) col.getValue())) { + String val = col.getValue().toString(); + try { + col.setValue(format.parse(val)); + col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); + col.setDbType("DATE"); + } catch (Exception e) { + customLogger.error("Failed to convert VARCHAR to DATE for value: " + val, e); + } + } + } + + @Override + public void stop() { + + } +} diff --git a/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties b/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties index 8f5c504..0e62a62 100644 --- a/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties +++ b/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties @@ -1 +1 @@ -loadClassName=com.clougence.cloudcanal.dataprocess.datatransform.SQLServerToOracleDeleteTransform +loadClassName=com.clougence.cloudcanal.dataprocess.datatransform.TiDBSrcVarcharToDate -- Gitee