# datachanged **Repository Path**: tmluwei/datachanged ## Basic Information - **Project Name**: datachanged - **Description**: 支持Oracle/MySQL/PostgreSQL/SQLServer的数据库表变化量计算、数据映射转换、数据同步模块化组件 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-04-21 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 数据库表数据变化量计算 博客文章:https://blog.csdn.net/inrgihc/article/details/103932231 ## 目录 - [数据库表数据变化量计算](#数据库表数据变化量计算) - [目录](#目录) - [一、功能简述](#一功能简述) - [二、功能点说明](#二功能点说明) - [1、提供PostgreSQL/Greenplum的copy/insert数据批量写入功能](#1提供postgresqlgreenplum的copyinsert数据批量写入功能) - [2、提供MySQL/Oracle/SqlServer的insert数据批量写入功能](#2提供mysqloraclesqlserver的insert数据批量写入功能) - [3、提供基于前后两次的全量计算变化量](#3提供基于前后两次的全量计算变化量) - [4、提供数据映射转换功能](#4提供数据映射转换功能) - [5、提供变化量数据同步(批量Insert/Update/Delete)功能](#5提供变化量数据同步批量insertupdatedelete功能) - [三、结构部署](#三结构部署) - [1、目录结构](#1目录结构) - [2、打包部署](#2打包部署) - [四、使用教程](#四使用教程) - [1、数据批量写入dbwriter模块](#1数据批量写入dbwriter模块) - [2、变化量计算dbchange模块](#2变化量计算dbchange模块) - [3、映射转换dbmapper模块](#3映射转换dbmapper模块) - [4、数据同步dbsynch模块](#4数据同步dbsynch模块) - [五、FAQ](#五faq) ## 一、功能简述 - 提供PostgreSQL/Greenplum的copy/insert数据批量写入、MySQL/Oracle/SqlServer的insert数据批量写入功能 - 提供Oralce/MySQL/SqlServer/PostgreSQL/Greenplum数据库表数据库表前后变化量计算功能 - 提供Oralce/MySQL/SqlServer/PostgreSQL/Greenplum映射转换功能(字段映射转换/取值映射转换) - 提供Oracle/MySQL/SqlServer/PostgreSQL/Greenplum变化量数据同步(批量Insert/Update/Delete)功能 ## 二、功能点说明 ### 1、提供PostgreSQL/Greenplum的copy/insert数据批量写入功能 基于PostgreSQL的jdbc驱动包中的copy接口提供copy数据写入功能,要求如下: - 支持写入多种数据类型的数据(整型、浮点型、时间类型、文本型、二进制类型等),**已支持二进制数据类型**; - 使用的jdbc驱动包必须为PostgreSQL的驱动包(因为Greenplum的驱动包中不提供copy的API接口); - 数据最好按批次写入,**每批次数据写入的数量不宜过大(否则可能会出现OOM)**,建议设置为100~20000之间; - 每一写入批次为一个事务操作; ### 2、提供MySQL/Oracle/SqlServer的insert数据批量写入功能 基于数据库的insert语句提供数据写入功能,支持的功能如下: - 写入的数据类型无限制(整型、浮点型、时间类型、文本型、二进制类型等),**可以为二进制数据类型***; - 内部基于事务操作; - 数据最好按批次写入,**每批次数据写入的数量不宜过大(否则可能会出现OOM)**,建议设置为100~20000之间; ### 3、提供基于前后两次的全量计算变化量 - 根据同一张表(有主键)的前后两次全量数据计算变化量(insert/update/delete); ### 4、提供数据映射转换功能 - 支持可配置SQL的一对一映射转换 - 支持提供查询SQL的多对一映射转换 - 支持代码值的一个是转换( boy->M ) - **支持含有二进制字段的表参与变化量计算**; ### 5、提供变化量数据同步(批量Insert/Update/Delete)功能 - 支持Oracle/MySQL/SqlServer/PostgreSQL等数据库间的数据变化量同步insert/update/delete功能 ## 三、结构部署 ### 1、目录结构 ``` \- |-smartdm-common 通用查询select的封装模块 | |-smartdm-dbwriter 数据批量写入insert的封装模块 | |-smartdm-dbchange 变化量计算的封装模块 | |-smartdm-dbmapper 映射转换的封装模块 | |-smartdm-dbsynch 数据批量同步的封装模块 | |-smartdm-example 提供的示例demo | ``` ### 2、打包部署 ``` git clone https://gitee.com/inrgihc/datachanged.git cd datachanged/ mvn clean mvn package mvn install ``` ## 四、使用教程 ### 1、数据批量写入dbwriter模块 - 接口定义: ``` public interface IDatabaseWriter { /** * 获取数据源对象 * * @return DataSource数据源对象 */ public DataSource getDataSource(); /** * 批量写入预处理 * * @param schemaName schema名称 * @param tableName table名称 */ public void prepareWrite(String schemaName, String tableName); /** * 批量数据写入 * * @param fieldNames 字段名称列表 * @param recordValues 数据记录 * @return 返回实际写入的数据记录条数 */ public long write(List fieldNames, List recordValues); } ``` - 调用示例: ``` // 数据源 DataSource dataSource; // 构造数据写入器 IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(dataSource); //做预处理准备 writer.prepareWrite("test_schema_name", "test_name_name"); //执行实际的批量数据写入功能 while(true){ //构造批量写入的数据后执行实际的数据写入 long ret = writer.write(fieldNames, recordValues); } ``` **特别说明** - 写入数据的类型必须符合字段的类型要求,**不负责类型的转换**; - 各个接口均**非线程安全**,请勿多线程穿插调用; ### 2、变化量计算dbchange模块 - 结果处理接口定义 ``` public interface IDatabaseRowHandler { /** * 行数据处理 * * @param fields 字段名称列表,该列表只读 * @param record 一条数据记实录 * @param flag 数据变化状态 */ public void handle(List fields, Object[] record, RecordChangeType flag); /** * 计算结束通知 * * @param fields 字段名称列表,该列表只读 */ public void destroy(List fields); } ``` - 变化量计算接口定义 ``` public interface IDatabaseChangeCaculator { /** * 是否记录无变化的数据 * * @return */ public boolean isRecordIdentical(); /** * 设置是否记录无变化的数据 * * @param recordOrNot 是否记录无变化的数据 */ public void setRecordIdentical(boolean recordOrNot); /** * 获取JDBC驱动批量读取数据的行数大小 * * @return 批量行数大小 */ public int getFetchSize(); /** * 设置JDBC驱动批量读取数据的行数大小 * * @param size 批量行数大小 */ public void setFetchSize(int size); /** * 执行变化量计算任务 * * @param task 任务描述对象 * @param handler 计算结果回调处理器 */ public void executeCalculate(TaskParamBean task, IDatabaseRowHandler handler); } ``` 其中TaskEntity的定义: ``` @Data @Builder @AllArgsConstructor public class TaskParamBean { // 老表的数据源 @NonNull DataSource oldDataSource; // 老表的schema名 @NonNull private String oldSchemaName; // 老表的table名 @NonNull private String oldTableName; // 新表的数据源 @NonNull DataSource newDataSource; // 新表的schema名 @NonNull private String newSchemaName; // 新表的table名 @NonNull private String newTableName; } ``` - 调用示例: ``` // 数据源 DataSource dataSource; // 准备数据参数 String outSchemaName = "private"; String outTableName = "ofuser_diff"; String flagFieldName = "diff_type"; // 构造计算任务参数 TaskParamBean.TaskParamBeanBuilder taskBuilder = TaskParamBean.builder(); taskBuilder.oldDataSource(dataSource); taskBuilder.oldSchemaName("private"); taskBuilder.oldTableName("ofuser_old"); taskBuilder.newDataSource(dataSource); taskBuilder.newSchemaName("private"); taskBuilder.newTableName("ofuser_new"); // 构造计算结果写入数据库对象 IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(dataSource); writer.prepareWrite(outSchemaName, outTableName); StopWatch watch = new StopWatch(); IDatabaseChangeCaculator changeCaculator = new ChangeCaculatorService(); // 这里设置批量查询操作的fetch-size的大小(大于100的整型数) changeCaculator.setFetchSize(5000); watch.start("watcher"); // 执行数据变化量计算过程 changeCaculator.executeCalculate(taskBuilder.build(), new IDatabaseRowHandler() { private List cache = new LinkedList(); @Override public void handle(List fields, Object[] record, RecordChangeType flag) { //这里处理输出的每一条变化量记录数据:处理的方法为: //(1)向对记录结果数组record扩展一个元素,来存储变化状态 //(2)然后将该条记录数据缓存到内存cache对象中, //(3)当内存cache缓冲满一个批次大小后,将其批量写入结果表中。 Object[] item = Arrays.copyOf(record, record.length + 1); if (flag == RecordChangeType.VALUE_INSERT) { item[item.length - 1] = "new"; } else if (flag == RecordChangeType.VALUE_CHANGED) { item[item.length - 1] = "changed"; } else if (flag == RecordChangeType.VALUE_DELETED) { item[item.length - 1] = "deleted"; } else { return; } cache.add(item); if (cache.size() >= BATCH_SIZE) { doSave(fields); } } @Override public void destroy(List fields) { if (cache.size() > 0) { doSave(fields); } } private void doSave(List fields) { List fieldNames = new ArrayList(fields); fieldNames.add(flagFieldName); long ret = writer.write(fieldNames, cache); System.out.println("handle result count: " + ret); cache.clear(); } }); watch.stop(); System.out.println("Total elipse :" + watch.getTotalTimeSeconds() + " s"); System.out.println(watch.prettyPrint()); ``` ### 3、映射转换dbmapper模块 - 结果处理接口定义 ``` public interface IResultRowHandler { /** * 映射结果行处理函数 *

* 说明:可使用dbwrite模块写入数据 *

* * @param columns 列名称,该列表只读 * @param record 对应的列值 */ void handle(List columns, Object[] record); /** * 计算结束通知 * * @param columns 字段名称列表,该列表只读 */ public void destroy(List columns); } ``` - 映射转换接口定义 ``` public interface IDatabaseMapperEngine { /** * 代码映射异常时是否抛出异常 * * @return 是为true,否则为false */ boolean isQuietWhenCoderMapperException(); /** * 设置代码映射异常时是否抛出异常 * * @param quiet 是否抛异常 */ void setQuietWhenCoderMapperException(boolean quiet); /** * 设置数据映射引擎的查询端数据源 * * @param sourceDataSource 查询端数据源 */ void setMapperEngineDataSource(DataSource dataSource); /** * 执行一个数据映射任务 * * @param task 任务参数实例 * @param handler 结果处理器 */ void runMapperTransfer(TaskParamEntity task, IResultRowHandler handler); } ``` - 调用示例: ``` //批量处理大小 int BATCH_SIZE = 10000; // 数据源 DataSource dataSource; String sourceSchemaName="tang"; String sourceTableName="ws_user,ws_org"; String targetSchemaName = "tang"; String taregetTableName = "new_user"; IDatabaseMapperEngine mapper = new DefaultMapperEngine(); mapper.setMapperEngineDataSource(dataSource); mapper.setQuietWhenCoderMapperException(true); IDatabaseWriter writer = DatabaseWriterFactory.createDatabaseWriter(dataSource); writer.prepareWrite(targetSchemaName, taregetTableName); Map fieldsMapper=new HashMap(); // key为目的表的字段名称,value为源表(查询)的字段名称 fieldsMapper.put("number", "xgh"); fieldsMapper.put("name", "xm"); fieldsMapper.put("sex", "xbm"); fieldsMapper.put("xbm", "xbm"); fieldsMapper.put("sfzjh", "sfzjh"); fieldsMapper.put("rysf", "rysf"); fieldsMapper.put("sznj", "sznj"); fieldsMapper.put("bmdm", "bmdm"); fieldsMapper.put("parent", "pbmdm"); Map> valuesMapper=new HashMap>(); // key为目的表字段名称 valuesMapper.put("sex", new HashMap() { { put("1", "男"); put("2", "女"); } }); TaskParamEntity.TaskParamEntityBuilder taskBuilder = TaskParamEntity.builder(); //taskBuilder.mapType(TaskMapType.OneByOne);//这里标识为一对一映射 //taskBuilder.viewSql(null);//如果一对一映射自己写了SQL语句,那模块将会使用这个SQL语句,这里只为增量映射设计使用 taskBuilder.mapType(TaskMapType.MultiByOne);//这里标识为多对一映射 taskBuilder.viewSql("SELECT u.xgh,u.xm,u.xbm,u.sfzjh,u.rysf,u.sznj,u.rxnf,o.bmdm,o.bmmc,o.pbmdm from tang.ws_user u LEFT JOIN tang.ws_org o on u.bmdm=o.bmdm"); taskBuilder.schemaName(sourceSchemaName); taskBuilder.tableName(sourceTableName); taskBuilder.fetchSize(BATCH_SIZE); taskBuilder.fieldsMapper(fieldsMapper); taskBuilder.valuesMapper(valuesMapper); StopWatch watch = new StopWatch(); watch.start("watcher"); mapper.runMapperTransfer(taskBuilder.build(), new IResultRowHandler() { private long count=0; private List cache = new LinkedList(); @Override public void handle(List columns, Object[] record) { cache.add(record); count++; if (cache.size() >= BATCH_SIZE) { doSave(columns); } } @Override public void destroy(List columns) { if (cache.size() > 0) { doSave(columns); } System.out.println("total handle count: " + this.count); } private void doSave(List columns) { long ret = writer.write(columns, cache); cache.clear(); System.out.println("handle count=" + ret); } }); watch.stop(); System.out.println("Total elipse :" + watch.getTotalTimeSeconds() + " s"); System.out.println(watch.prettyPrint()); ``` ### 4、数据同步dbsynch模块 - 结果处理接口定义 ``` /** * 数据同步接口定义 * * @author tang * */ public interface IDatabaseSynchronize { /** * 获取数据源对象 * * @return DataSource数据源对象 */ DataSource getDataSource(); /** * 批量Insert/Update/Delete预处理 * * @param schemaName schema名称 * @param tableName table名称 * @param fieldNames 字段列表 * @param pks 主键字段列表 */ void prepare(String schemaName, String tableName, List fieldNames, List pks); /** * 批量数据Insert * * @param records 数据记录 * @return 返回实际影响的记录条数 */ long executeInsert(List records); /** * 批量数据Update * * @param records 数据记录 * @return 返回实际影响的记录条数 */ long executeUpdate(List records); /** * 批量数据Delete * * @param records 数据记录 * @return 返回实际影响的记录条数 */ long executeDelete(List records); } ``` - 调用示例: 该模块需要结合dbchange模块共同使用,示例代码如下: ``` DataSource sds; // source datasource configuration DataSource tds; // tareget datasource configuration //将old_*表里的数据同步为new_*表里的数据 TaskParamBean.TaskParamBeanBuilder taskBuilder = TaskParamBean.builder(); taskBuilder.oldDataSource(tds); taskBuilder.oldSchemaName("test"); taskBuilder.oldTableName("old_user"); taskBuilder.newDataSource(sds); taskBuilder.newSchemaName("tang"); taskBuilder.newTableName("new_user"); TaskParamBean task = taskBuilder.build(); IDatabaseSynchronize synch = DatabaseSynchronizeFactory.createDatabaseWriter(tds); JdbcMetaDataUtils mdt = new JdbcMetaDataUtils(tds); List pks = mdt.queryTablePrimaryKeys(task.getOldSchemaName(), task.getOldTableName()); List fieldNames = mdt.queryTableColumnName(task.getOldSchemaName(), task.getOldTableName()); synch.prepare(task.getOldSchemaName(), task.getOldTableName(), fieldNames, pks); IDatabaseChangeCaculator changeCaculator = new ChangeCaculatorService(); changeCaculator.setFetchSize(BATCH_SIZE); changeCaculator.setRecordIdentical(false); changeCaculator.executeCalculate(task, new IDatabaseRowHandler() { private long count = 0; private List cacheInsert = new LinkedList(); private List cacheUpdate = new LinkedList(); private List cacheDelete = new LinkedList(); @Override public void handle(List fields, Object[] record, RecordChangeType flag) { if (flag == RecordChangeType.VALUE_INSERT) { cacheInsert.add(record); } else if (flag == RecordChangeType.VALUE_CHANGED) { cacheUpdate.add(record); } else { cacheDelete.add(record); } count++; checkFull(fields); } private void checkFull(List fields) { if (cacheInsert.size() >= BATCH_SIZE) { doInsert(fields); } if (cacheUpdate.size() >= BATCH_SIZE) { doUpdate(fields); } if (cacheDelete.size() >= BATCH_SIZE) { doDelete(fields); } } @Override public void destroy(List fields) { if (cacheInsert.size() > 0) { doInsert(fields); } if (cacheUpdate.size() > 0) { doUpdate(fields); } if (cacheDelete.size() > 0) { doDelete(fields); } System.out.println("处理的数据记录总数为: " + count); } private void doInsert(List fields) { long ret = synch.executeInsert(cacheInsert); System.out.println("处理Insert记录总数为:" + ret); cacheInsert.clear(); } private void doUpdate(List fields) { long ret = synch.executeUpdate(cacheUpdate); System.out.println("处理Update记录总数为:" + ret); cacheUpdate.clear(); } private void doDelete(List fields) { long ret = synch.executeDelete(cacheDelete); System.out.println("处理Delete记录总数为:" + ret); cacheDelete.clear(); } }); ``` ## 五、FAQ - 1、调用示例参考:smartdm-example模块代码 ``` └── example ├── DbwriterExampleApplication.java //数据批量写入模块dbwrite使用DEMO ├── DbchangeExampleApplication.java //变化量计算模块dbchange使用DEMO ├── DbmapperExampleApplication.java //数据映射转换模块dbmapper使用DEMO └── DbsynchExampleApplication.java //数据数据同步模块dbsynch使用DEMO ```