Fetch the repository succeeded.
This action will force synchronization from sxfad/porter, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
业务需求: 同步mysql数据表c_role、c_department到oracle数据表DATA_ROLE、DATA_DEPARTMENT中;c_role表中id字段数值在同步中+1000.
数据准备:
来源数据信息:
mysql数据源
url:jdbc:mysql://127.0.0.1:3306/data_test?useUnicode=true&characterEncoding=utf8
用户名:user
密码:123456
同步数据来源(canal)
地址:127.0.0.1:3306
数据库:data_test
用户:user
密码:123456
过滤器:data_test\.(c_role|c_department)
目标数据信息
oracle数据源
url:jdbc:oracle:thin:@127.0.0.1:1521:test
用户名:test234
密码:123456
自定义代码:
public class CustomKeyProcessor implements EventProcessor {
private Logger log = LoggerFactory.getLogger(CustomKeyProcessor.class);
@Override
public void process(ETLBucket etlBucket) {
List<ETLRow> rows = etlBucket.getRows();
for (ETLRow row : rows) {
log.info("自定义逻辑Schema:[{}] table:[{}]",row.getFinalSchema(),row.getFinalTable());
if (row.getFinalTable().equalsIgnoreCase("c_role")) {
for (ETLColumn eTLColumn : row.getColumns()) {
log.info("自定义逻辑Column:[{}]",eTLColumn.getFinalName());
if(eTLColumn.getFinalName().equalsIgnoreCase("ID")){
log.info("自定义逻辑value:[{}]",eTLColumn.getFinalValue());
eTLColumn.setFinalValue(Long.valueOf(eTLColumn.getFinalValue())+1000l+"");;
}
}
}
}
}
}
业务需求: 同步oracle数据表DATA_USER到mysql数据表c_user中.
数据准备:
来源数据信息:
oracle数据源
url:jdbc:oracle:thin:@127.0.0.1:1521:test
用户名:test234
密码:123456
同步数据来源(kafka)
服务器列表:127.0.0.2:9092,127.0.0.1:9092
主题:test234
查询超时时间:5000
单次查询数量:1000
目标数据信息
mysql数据源
url:jdbc:mysql://127.0.0.1:3306/data_test?useUnicode=true&characterEncoding=utf8
用户名:user
密码:123456
* 请注意,oracle作为来源数据,必须选择器kafka相关插件、ogg格式;
* 请注意,oracle作为来源数据,同步数据来源现阶段只能选择kafka类型。
* 请注意,载入插件影响的是最终sql拼装方式。
* 请注意,表映射关系只能新建、删除,不提供修改功能;
* 请注意,表映射关系在任务第二三步变更后,进行清空操作;
* 请注意,表映射关系现阶段支持一对一、多对一、一对多操作;
* 请注意,表映射关系不能进行重复保存;
* 请注意,字段映射现阶段仅支持一对一严格映射;
Sign in for post a comment
Comment ( 0 )