diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java similarity index 35% rename from data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java rename to data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java index 451abee1e97059dce8ba35c88eb74d1965b26782..3cc8d1cd0cb135f561bc32243749b2e24bdd85e6 100644 --- a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharDateFormatConvert.java @@ -6,44 +6,54 @@ import com.clougence.cloudcanal.sdk.api.modelv2.CustomData; import com.clougence.cloudcanal.sdk.api.modelv2.CustomFieldV2; import com.clougence.cloudcanal.sdk.api.modelv2.CustomRecordV2; import com.clougence.cloudcanal.sdk.api.modelv2.SchemaInfo; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.JDBCType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; /** * @author chunlin create time is 2025/7/3 */ -public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { +public class TiDBSrcVarcharDateFormatConvert implements CloudCanalProcessorV2 { protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); - private final SchemaInfo srcTable = new SchemaInfo(null, "test", "test14"); + private static final SimpleDateFormat DTF = new SimpleDateFormat("yyyy-MM-dd"); - private final SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy"); + private final Map> tableCols = new HashMap<>(); + private final Map> formatMap = new HashMap<>(); @Override public void start(ProcessorContext processorContext) { + SchemaInfo test14 = new SchemaInfo(null, "test", "test14"); + // The source columns we need to convert + tableCols.put(test14, new HashSet<>(Arrays.asList("va", "va2"))); + // Format of each source column + formatMap.put(test14, new HashMap() {{ + // 23-07-2023 + put("va", new SimpleDateFormat("dd-MM-yyyy")); + // 23072023 + put("va2", new SimpleDateFormat("ddMMyyyy")); + }}); } @Override public List process(CustomData data) { - customLogger.info("{}.{}.{},equals:{}", data.getSchemaInfo().getCatalog(), data.getSchemaInfo().getSchema(), data.getSchemaInfo().getTable(), data.getSchemaInfo().equals(srcTable)); - if (data.getSchemaInfo().equals(srcTable)) { + SchemaInfo schemaInfo = data.getSchemaInfo(); + Set colsNeedConvert = tableCols.get(schemaInfo); + Map formats = formatMap.get(schemaInfo); + if (colsNeedConvert != null && !colsNeedConvert.isEmpty() && formats != null && !formats.isEmpty()) { for (CustomRecordV2 recordV2 : data.getRecords()) { switch (data.getEventType()) { case INSERT: { - changeColumn(recordV2.getAfterColumnMap()); + convertColumn(recordV2.getAfterColumnMap(), colsNeedConvert, formats); break; } case UPDATE: { - changeColumn(recordV2.getBeforeColumnMap()); - changeColumn(recordV2.getAfterColumnMap()); + convertColumn(recordV2.getBeforeColumnMap(), colsNeedConvert, formats); + convertColumn(recordV2.getAfterColumnMap(), colsNeedConvert, formats); break; } default: @@ -52,21 +62,26 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { } } - List re = new ArrayList<>(); - re.add(data); - return re; + return Collections.singletonList(data); } - protected void changeColumn(LinkedHashMap columnMap) { - CustomFieldV2 col = columnMap.get("va"); - if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && StringUtils.isNotBlank((String) col.getValue())) { - String val = col.getValue().toString(); - try { - col.setValue(format.parse(val)); - col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); - col.setDbType("DATE"); - } catch (Exception e) { - customLogger.error("Failed to convert VARCHAR to DATE for value: " + val, e); + protected void convertColumn(LinkedHashMap columnMap, Set colsNeedConvert, Map formats) { + for (String colName : colsNeedConvert) { + CustomFieldV2 col = columnMap.get(colName); + Object value = col.getValue(); + SimpleDateFormat format = formats.get(colName); + if (formats.get(colName) == null) { + customLogger.info("Format has not been initialized yet, so ignore this column: {}", colName); + continue; + } + if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && value != null) { + String val = value.toString(); + try { + Date date = format.parse(val); + col.setValue(DTF.format(date)); + } catch (Exception e) { + customLogger.error("Failed to convert VARCHAR to DATE for value: {}", val, e); + } } } } diff --git a/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties b/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties index 0e62a62d93a987f007cef66b858d96146d423564..8640502db69171220a7cb1b1dff34000d601ab1f 100644 --- a/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties +++ b/data-transform/src/main/resources/META-INF/cloudcanal/plugin.properties @@ -1 +1 @@ -loadClassName=com.clougence.cloudcanal.dataprocess.datatransform.TiDBSrcVarcharToDate +loadClassName=com.clougence.cloudcanal.dataprocess.datatransform.TiDBSrcVarcharDateFormatConvert