# migrate-mysql-data **Repository Path**: pious68/migrate-mysql-data ## Basic Information - **Project Name**: migrate-mysql-data - **Description**: 针对于传统单库单表存在的因业务发展带来的访问量激增、存储数据量激增等客观事实而导致的数据库并发量过大、可用链接数不足、存储和查询性能下降等问题的可靠的分库分表数据迁移方案,实现完成单库单表到多库多表、多库多表到多库多表的可靠的数据迁移方案,依托于滚动扫表全量同步、canal+rocketmq顺序消息增量同步实现数据迁移的不重不漏。 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-03-06 - **Last Updated**: 2024-04-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # migrate-mysql-data #### (一)介绍 ``` 针对于传统单库单表存在的因业务发展带来的访问量激增、存储数据量激增等客观事实而导致的数据库并发量过大、可用链接数不足、 存储和查询性能下降等问题的可靠的分库分表数据迁移方案,实现完成单库单表到多库多表、多库多表到多库多表的可靠的数据迁移方案, 依托于滚动扫表全量同步、canal+rocketmq顺序消息增量同步实现数据迁移的不重不漏。 ``` #### (二)软件架构 SpringBoot、SpringBoot Starter、Mysql、RocketMQ、MyBatis、ShardingSphere #### (三)使用说明 ##### 1.基础环境搭建 ``` 0. 配置etl_process_config表,手动向etl_process_config表添加数据 | 主键 | 表名 | 表的key | 迁移字段匹配类型 | 0未删除1已删除 | | id | order_info | orderNo | 0 | 0 | | id | order_info | orderNo | 1 | 0 | | id | order_item_tetail | orderNo | 0 | 0 | | id | order_item_detail | orderNo | 1 | 0 | logic_model即逻辑表名,record_key即表中分片键,需要用驼峰命名,因为迁移过程中的key是驼峰的,然后record_type设置为0。 1.修改环境变量后java --version不生效问题: 找到Program Files和Program Files(x86)下的文件,全部修改为想要的版本即可 2.关于JDBC连接数据库时出现的Public Key Retrieval is not allowed错误 连接数据库的url中,加上allowPublicKeyRetrieval=true参数,经过验证解决该问题。 3.启动项目 3.1.修改源库数据源、目标数据源、迁移数据源配置 3.2.修改rocketmq配置 3.3.下载canal的admin和deployer,在数据库执行admin下的数据库脚本并更改yml配置文件,canal.deployer是canalServer, 修改配置文件canal_local.properties修改ip即可,如果使用的是canal.server是1.5而本地JDK1.8以上会因为永久代参数的问题报错, 去掉启动参数中的永久代配置信息即可。 启动canal.admin:startup.bat 启动canal.deployer:startup.bat local 4.常见错误 4.1.canal.server启动报错: requestGet for canal config error: auth :admin is failed--搜索github--issues 解决:canal.deployer的canal.admin.passwd需要配置和canal.admin数据库脚本的密码一致 4.2.order-forward:monomer_order\\.order_info,order.forward:monomer_order\\.order_item_detail 4.3.添加了instance后报错:把plugin目录里的jar包拷贝到lib就行了 github问题地址:https://github.com/alibaba/canal/issues/3199 java.lang.IllegalStateException: Extension instance (name: rocketmq, class: interface com.alibaba.otter.canal.connector.core.spi.CanalMQProducer) could not be instantiated: class could not be found ``` ##### 2.访问地址 ``` 数据迁移平台:http://localhost:8081/migrate/toIndex rocketmq控制台:http://localhost:8888 生成的测试数据所在文件夹:D:\Softwares\MySQLData\Data\monomer_order canal-admin控制台:http://localhost:8089/ 项目中的DB01:源库;DB02:目标库;migraet:迁移配置库。 外卖订单数据模型,库和表的路由算法都是根据order_no和userId进行自定义的,不是简单的说%取模操作, 同一个订单号同一个userId路由到1个表中.库路由算法和表路由算法一样,先orderId后userId, 谁存在就取其后3位对dbSize取模得到后缀0-7然后拼接前缀确定db/table ``` #### (四)过程详解 #### 1.全量同步 ``` 首先开启滚动锁加锁,迁移之前对迁移任务进度计算组件/迁移表EtlProgress初始化入库,然后做滚动数据全量数据迁移: 0.如果迁移过程中捕捉到错误,则设置本次迁移数据的状态为失败FAIL并将pageSize置为0 1.根据数据抽取批次中的起始查询字段值startScrollId和每页捞取数量pageSize查询源数据库,得到map组成的list,一条数据对应一个map 2.遍历指定批次的List数据: 2.1.实例化MergeBinlogWrite 2.2.过滤掉非需要匹配的时间段的数据 2.3.对数据进行数据模型的转换,map转换为BinLog模型,存储到binLogDataMap中,value为BinLog,key:orderNo&tableName 2.4.根据这一批次数据binLogDataMap查询迁移目标库中数据,进行过滤,过滤后的数据存储到binlogDataList中 (目标库中不存在:add直接加入list;update改insert;delete直接pass;目标库中存在:非delete操作 && (数据相同/当前数据比目标库旧)则直接pass,其他加入binlogDataList) 2.5.将binlogDataList数据按照表名分组批量写入(全量同步完成),如果写入成功且为增量,需要更新表消息状态为consumed(增量同步逻辑)(2.1-2.5一个方法中) 2.6.执行完1个批次进行核对是否批量写入的数据为0,判断任务进度计算组件EtlProgress中完成数量是否>=缓存指定表的总数量,满足条件则迁移成功直接返回,否则继续, 2.7.将本批次的迁移明细表数据入库 2.8.更新迁移表上一次滚动最后记录的滚动字段值信息、迁移批次、已完成记录数 + 更新迁移明细表状态为成功 2.9.继续查下一批次数据重复执行 3.全量数据迁移完成,所有批次跑完,更新本次迁移数据的状态为成功 ``` #### 2.增量同步 ``` 1.CanalConsumeTask开启2个线程的线程池:、binLog消息提交任务的线程交给线程池执行。 binLog消息拉取任务从rocketMq中拉取消息:新增记录未消费binlog到mysql + 写队列 | 如果mysql已存在消息且为commited则进行一个手动commit + 写队列。 binLog消息提交线程:扫描binlog表中已消费未提交的消息,手动做提交。 2.而我们的IncrementTask会每隔15s执行数据的写入,遍历localQueue中消息, 经过binLog的合并、校验、过滤, (对binLog数据与新库中的数据比对,过滤掉无用数据) (binLog为delete而新库中没有记录则跳过/binLog为update而新库没有数据则改为insert/旧库有数据并且新库也有并且拼接字符串相等且不为delete则跳过)) 执行数据的分组写入,写入完成后clear读队列,如果写入成功返回true,且为增量同步模式,将会批量更新数据库中消费状态为success。 具体来讲: 1.首先会获取内存中的数据缓存阻塞队列,每次进行增量同步数据读取的时候都会交换读写队列从读队列中拿到数据做遍历, 对每条数据进行merge合并,其中会先【实例化一个MergeBinLogWriter对象】,将mq消息写入这个对象成员变量list中等待批量写入数据库, 同时对每条数据从这个对象成员变量binlogDataMap中查重,map中没有则直接插入,有的话只会在当前数据比较新的时候做覆盖。 (这个队列中存放的是实现数据同步的读队列、写队列、CAS锁,双队列的目的是为了提高我们数据的写入和读取效率) 2.然后对数据做校验,根据binlogDataMap去查询新库中数据,过滤掉无用数据(老库、新库的数据有无问题+数据新旧问题), 将真正需要落库的数据保存到 binlogDataList中 3.最后就是将binlogDataList按照表名做分组,进行批量数据写入,对能合并的数据做批量新增, 剩余的更新和删除就逐条进行,判断如果批量写入成功且为增量同步则还需要批量更新数据库中binlog消息同步消费记录表中消息消费状态为成功 4.一旦我们增量同步的数据批量操作失败,不会更新表中消费进度为success,仍旧为not_consumed,这时候会抛出异常导致本次任务的失败, 也就是说从读队列中拿出数据的写入全失败了-->停留在not_consumed状态。这里我们并没有使用事务,也就有可能说,一批binlog里面, 部分数据写入成功了,部分数据写入库失败了,但整个批次在mq层面都会认为not_consumed,如何故障恢复呢? --->因为关闭了自动commit提交,rocketmq来说,拉到了一批消息,我没任何返回值,自动offset提交也关闭了,因此offset不会自动去提交, rocketmq来说,是收不到任何的反馈和通知的,在这种情况之下,rocketmq来说,我们长时间没有给rocketMQ反馈,会自动的把消息重试投递给 -->rocketmq会重新投递给我们,再次通过消费者他的poll()拿到消息,然后去做判断:消息的消费记录已经存在, 这时候就分情况,我们的canalPullRunner判断我们批量更新过程中可能都是not_consumed也可能将部分更新成为了consumed, 但只要不是committed都去向队列投递,否则就定位这条消息进行commit提交,即【手动对已经消费的偏移量做提交】 5.默认情况下会基于Canal做数据库变更日志的监听写入rocketMQ集群中去, 然后我们的binLog拉取线程会在我们应用启动后自动执行(实现了ApplicationRunner接口), 连接RocketMQ集群去做binLog日志的拉取工作,这里的话我们实现的手动Commit, 之所以避免auto_commit是因为我们这个拉取binLog并消费入库的过程是【异步执行】的 -->我们拉取到消息后会根据queueId和offset去数据库binlog消息同步消费记录表去查重,根据是否存在做不同的业务逻辑处理 -->如果不存在则即把这条mq消息记录标记为【未消费】入库,然后去把这条mq消息以及binlog信息写入内存写队列等待做异步地消费; 存在的话判断数据库中这条mq消息记录状态如果是已提交那么我们去定位这条消息,做手动commit,否则地话向本地写队列投递。 精准的去控制offset提交,我们要确保每一条binlog都已经被应用到你的目标数据源里去了,此时对这条offset才能去做一个提交, rocketmq模块,consumer提交offset都是自动提交,先提交到本次缓存里,再提交到rocketmq里去,这会导致offset提交不是太精准 ``` #### 3.天粒度的数据量统计 ``` 1.迁移数据统计表etl_statistical,当我们指定某一天作为迁移的起始日期, 那么会从那一天起始到当前日期,查表统计每一天生成的数据量保存在当前统计表etl_statistical中. 一开始统计表中无数据,会从表中(eg:订单表)查询订单记录的最小日期,从最小日期开始到当前日期, 遍历每一天进行统计每天数据量,然后我们定时任务会1h执行1次,后续执行发现表中有数据了, 再次执行的时候,只需要etl_statistical中距离当前日期最近(最大日期)的时间, 如果相差2天则只统计2天内的数据防止跨天数据产生,否则就从已统计的最大日期-到当天日期统计, 统计的同时会清空statisticalCountMap.clear()内存中缓存的数据量。 2.滚动批次总数据量缓存,key:迁移批次,value:缓存数据量 map存放的是指定【迁移表etl_process的唯一标识批次】与它的【缓存总数据量】的映射关系. 1h更新一次,考虑每小时的更新原因?==>新数据入库跨天数据产生 3.然后我们在全量数据同步过程中,如果发现写入数据量为0,那么会去检查是否同步完成, 发现如果批量写入数据量为0,判断可能同步完成,因此从statisticalCountMap缓存中获取指定批次它的缓存数据量 (有则直接获取;否则查etl_statistical表汇总每天数据量),与已完成额数据量对比, 已完成 >= 缓存总数据量则同步完成设置,设置etl_process表标志位为2即迁移完成。 同时考虑之所以每次统计最新数据量都清空statisticalCountMap,是为了保证缓存数据量和mysql数据量的一致性, ``` #### 4.数据核对教程 ``` 定时任务,扫描etl_dirty_record中status为2即完成迁移的批次,一次最多扫描100条,然后分别从旧库和新库中 查询出来对应的记录进行校验,先比对旧记录是否存在于新库,不存在即添加到insert对应的list中; 然后比对filedKey和fieldValue拼接后字符串是否相等,我认为直接全量拼接比较好,相等则跳过, 不相等则判断旧记录时间是否>新记录时间,然后记录;然后做修正数据插入/更新,然后更新对应的 etl_dirty_record的值为4数据校对完成,【与此同时更新etl_process中对应的finishRecord的记录数】, 等这最多100条diry_record校对完成,查询etl_process中对应的finishRecord的记录数, 与etl_statistical中统计的总数据量作比较,finishRecord>=源表统计的总数据量则认为校对全部完成, 即更新【校对抽数】的status为4校对完成! ``` #### 5.全量数据迁移任务中断 ``` 全量迁移过程中如果任务中断,我们通过定时任务ScrollTask扫描etl_process中迁移fail或者init的任务, 然后重新开启我们的数据迁移任务,并配置了retryTimes和errMsg记录重试次数和失败原因,不断进行尝试。 ``` #### (五)问题剖析 ``` 0.项目初始化代码bug巨多,但思想还是不错的,v2版本的项目xml文件里的sql语句和数据库根本就对不起来!!! v1能对起来但是数据库仍然是空的,直接入手实现功能的话确实困难! 整体步骤搞了1天才搞明白,改了一天把bug都修复的差不多了!几个点! 1.手动向etl_process_config表添加数据,order_info orderNo 0 0 和order_info orderNo 1 0(注意record_key需要驼峰命名), 这里是你想要迁移哪张表你就添加哪条记录,核心作用是让代码知道迁移过程中的分片键是什么、然后比对新旧记录的时候做拼接的核心点 2.在MergeConfig中手动添加:FIELD_MAP.put("statisticalKey", new ArrayList<>(Collections.singletonList("order_item_detail","order_item"))); 要添加的原因在于CountCacheTask需要用到这个key,搭配@PostConstruct启动后立马执行,将每一天表的数据量写入统计表。 要迁移哪个表就添加那个,其实可以拓展数据库的,在v2版本实现了,但是v2版本又复杂了点,我直接在v1版本修复的bug。 3.在滚动插入数据时候,需要在最后一次插入etl_dirty_record之后,切记需要再查库获取最新插入的dirty_record的主键ID, 然后手动添加xml实现根据id更新dirty_Record的属性值,否则的话将会导致所有记录的同步数量大小变为最后一次同步的数量? 3有待验证,代码好久了都给忘了! ``` #### (六)项目优化 ``` 1.单库单表250w数据时,迁移到8库8表,部署在本地进行迁移,22:56开始迁移,23:04分完成迁移,迁移速度还可以, 理论上千万级数据30min左右,整体不错,但迁移完成后发现存在一个很大的问题就是核对抽数停留在一个数上不动了是什么原因? 2.项目中如何处理多数据源的问题? 3.控制台添加迁移记录时候,如果添加的时间段内没有数据需要额外处理 4.checkData的时候出现DuplicateKey经查询发现是由于数据库中插入了相同key的记录,这个应该在业务侧避免 5.SQL优化 2024-04-07: 发起全量数据同步时候发现可视化页面存在一个阻塞现象,直接查mysql慢查询日志,发现sql耗时10s多: 针对于数据迁移过程中的slow-query进行优化,添加联合索引解决添加全量迁移任务后的前端卡顿问题。 # 未添加索引前执行需要10s,默认走index索引 select order_no from order_item_detail where create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1; explain select order_no from order_item_detail where create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1; explain select count(*) from order_item_detail; # 添加索引耗时16s,加了索引然后查询执行0.794s,filtered 50%,走联合索引idx_order_no_create_time create index idx_order_no_create_time on order_item_detail(order_no,create_time); select order_no from order_item_detail where create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1; explain select order_no from order_item_detail where create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1; drop index idx_order_no_create_time on order_item_detail; # 添加了索引仍然耗时10s多,实际走的是order_no索引inx_item_order_no,using where create index idx_order_no_create_time on order_item_detail(create_time,order_no); select order_no from order_item_detail where create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1; EXPLAIN select order_no from order_item_detail where create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1; drop index idx_order_no_create_time on order_item_detail; ```