diff --git a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java index 451abee1e97059dce8ba35c88eb74d1965b26782..729b05c722767817a5a97391664218de584963ac 100644 --- a/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java +++ b/data-transform/src/main/java/com/clougence/cloudcanal/dataprocess/datatransform/TiDBSrcVarcharToDate.java @@ -6,15 +6,12 @@ import com.clougence.cloudcanal.sdk.api.modelv2.CustomData; import com.clougence.cloudcanal.sdk.api.modelv2.CustomFieldV2; import com.clougence.cloudcanal.sdk.api.modelv2.CustomRecordV2; import com.clougence.cloudcanal.sdk.api.modelv2.SchemaInfo; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.JDBCType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; /** * @author chunlin create time is 2025/7/3 @@ -23,27 +20,35 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { protected static final Logger customLogger = LoggerFactory.getLogger("custom_processor"); - private final SchemaInfo srcTable = new SchemaInfo(null, "test", "test14"); - - private final SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy"); + private final Map> tableCols = new HashMap<>(); + private final Map> formatMap = new HashMap<>(); @Override public void start(ProcessorContext processorContext) { + SchemaInfo test14 = new SchemaInfo(null, "test", "test14"); + // The columns we need to convert + tableCols.put(test14, new HashSet<>(Arrays.asList("va"))); + // Format of each column + formatMap.put(test14, new HashMap() {{ + put("va", new SimpleDateFormat("dd-MM-yyyy")); + }}); } @Override public List process(CustomData data) { - customLogger.info("{}.{}.{},equals:{}", data.getSchemaInfo().getCatalog(), data.getSchemaInfo().getSchema(), data.getSchemaInfo().getTable(), data.getSchemaInfo().equals(srcTable)); - if (data.getSchemaInfo().equals(srcTable)) { + SchemaInfo schemaInfo = data.getSchemaInfo(); + Set colsNeedConvert = tableCols.get(schemaInfo); + Map formats = formatMap.get(schemaInfo); + if (colsNeedConvert != null && !colsNeedConvert.isEmpty() && formats != null && !formats.isEmpty()) { for (CustomRecordV2 recordV2 : data.getRecords()) { switch (data.getEventType()) { case INSERT: { - changeColumn(recordV2.getAfterColumnMap()); + convertColumn(recordV2.getAfterColumnMap(), colsNeedConvert, formats); break; } case UPDATE: { - changeColumn(recordV2.getBeforeColumnMap()); - changeColumn(recordV2.getAfterColumnMap()); + convertColumn(recordV2.getBeforeColumnMap(), colsNeedConvert, formats); + convertColumn(recordV2.getAfterColumnMap(), colsNeedConvert, formats); break; } default: @@ -52,21 +57,27 @@ public class TiDBSrcVarcharToDate implements CloudCanalProcessorV2 { } } - List re = new ArrayList<>(); - re.add(data); - return re; + return Collections.singletonList(data); } - protected void changeColumn(LinkedHashMap columnMap) { - CustomFieldV2 col = columnMap.get("va"); - if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && StringUtils.isNotBlank((String) col.getValue())) { - String val = col.getValue().toString(); - try { - col.setValue(format.parse(val)); - col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); - col.setDbType("DATE"); - } catch (Exception e) { - customLogger.error("Failed to convert VARCHAR to DATE for value: " + val, e); + protected void convertColumn(LinkedHashMap columnMap, Set colsNeedConvert, Map formats) { + for (String colName : colsNeedConvert) { + CustomFieldV2 col = columnMap.get(colName); + Object value = col.getValue(); + SimpleDateFormat format = formats.get(colName); + if (formats.get(colName) == null) { + customLogger.info("Format has not been initialized yet, so ignore this column: {}", colName); + continue; + } + if (col.getSqlType() == JDBCType.VARCHAR.getVendorTypeNumber() && value != null) { + String val = value.toString(); + try { + col.setValue(format.parse(val)); + col.setSqlType(JDBCType.DATE.getVendorTypeNumber()); + col.setDbType("DATE"); + } catch (Exception e) { + customLogger.error("Failed to convert VARCHAR to DATE for value: {}", val, e); + } } } }