1 Star 0 Fork 0

林间有风pious68/migrate-mysql-data

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

migrate-mysql-data

(一)介绍

针对于传统单库单表存在的因业务发展带来的访问量激增、存储数据量激增等客观事实而导致的数据库并发量过大、可用链接数不足、
存储和查询性能下降等问题的可靠的分库分表数据迁移方案,实现完成单库单表到多库多表、多库多表到多库多表的可靠的数据迁移方案,
依托于滚动扫表全量同步、canal+rocketmq顺序消息增量同步实现数据迁移的不重不漏。

(二)软件架构

SpringBoot、SpringBoot Starter、Mysql、RocketMQ、MyBatis、ShardingSphere

(三)使用说明

1.基础环境搭建
0. 配置etl_process_config表,手动向etl_process_config表添加数据
   | 主键  | 表名               | 表的key | 迁移字段匹配类型 | 0未删除1已删除 |
   | id   | order_info        | orderNo | 0            | 0            |
   | id   | order_info        | orderNo | 1            | 0            |
   | id   | order_item_tetail | orderNo | 0            | 0            |
   | id   | order_item_detail | orderNo | 1            | 0            |
logic_model即逻辑表名,record_key即表中分片键,需要用驼峰命名,因为迁移过程中的key是驼峰的,然后record_type设置为0。
   
1.修改环境变量后java --version不生效问题:
找到Program Files和Program Files(x86)下的文件,全部修改为想要的版本即可

2.关于JDBC连接数据库时出现的Public Key Retrieval is not allowed错误
连接数据库的url中,加上allowPublicKeyRetrieval=true参数,经过验证解决该问题。

3.启动项目
3.1.修改源库数据源、目标数据源、迁移数据源配置
3.2.修改rocketmq配置
3.3.下载canal的admin和deployer,在数据库执行admin下的数据库脚本并更改yml配置文件,canal.deployer是canalServer,
修改配置文件canal_local.properties修改ip即可,如果使用的是canal.server是1.5而本地JDK1.8以上会因为永久代参数的问题报错,
去掉启动参数中的永久代配置信息即可。
启动canal.admin:startup.bat
启动canal.deployer:startup.bat local

4.常见错误
4.1.canal.server启动报错:
requestGet for canal config error: auth :admin is failed--搜索github--issues
解决:canal.deployer的canal.admin.passwd需要配置和canal.admin数据库脚本的密码一致
4.2.order-forward:monomer_order\\.order_info,order.forward:monomer_order\\.order_item_detail
4.3.添加了instance后报错:把plugin目录里的jar包拷贝到lib就行了
github问题地址:https://github.com/alibaba/canal/issues/3199
java.lang.IllegalStateException: Extension instance
(name: rocketmq, class: interface com.alibaba.otter.canal.connector.core.spi.CanalMQProducer)  could not be instantiated: class could not be found
2.访问地址
数据迁移平台:http://localhost:8081/migrate/toIndex
rocketmq控制台:http://localhost:8888
生成的测试数据所在文件夹:D:\Softwares\MySQLData\Data\monomer_order
canal-admin控制台:http://localhost:8089/

项目中的DB01:源库;DB02:目标库;migraet:迁移配置库。
外卖订单数据模型,库和表的路由算法都是根据order_no和userId进行自定义的,不是简单的说%取模操作,
同一个订单号同一个userId路由到1个表中.库路由算法和表路由算法一样,先orderId后userId,
谁存在就取其后3位对dbSize取模得到后缀0-7然后拼接前缀确定db/table

(四)过程详解

1.全量同步

首先开启滚动锁加锁,迁移之前对迁移任务进度计算组件/迁移表EtlProgress初始化入库,然后做滚动数据全量数据迁移:
0.如果迁移过程中捕捉到错误,则设置本次迁移数据的状态为失败FAIL并将pageSize置为0
1.根据数据抽取批次中的起始查询字段值startScrollId和每页捞取数量pageSize查询源数据库,得到map组成的list,一条数据对应一个map
2.遍历指定批次的List<Map>数据:
	2.1.实例化MergeBinlogWrite
	2.2.过滤掉非需要匹配的时间段的数据
	2.3.对数据进行数据模型的转换,map转换为BinLog模型,存储到binLogDataMap中,value为BinLog,key:orderNo&tableName
	2.4.根据这一批次数据binLogDataMap查询迁移目标库中数据,进行过滤,过滤后的数据存储到binlogDataList中
	(目标库中不存在:add直接加入list;update改insert;delete直接pass;目标库中存在:非delete操作 && (数据相同/当前数据比目标库旧)则直接pass,其他加入binlogDataList)
	2.5.将binlogDataList数据按照表名分组批量写入(全量同步完成),如果写入成功且为增量,需要更新表消息状态为consumed(增量同步逻辑)(2.1-2.5一个方法中)
	2.6.执行完1个批次进行核对是否批量写入的数据为0,判断任务进度计算组件EtlProgress中完成数量是否>=缓存指定表的总数量,满足条件则迁移成功直接返回,否则继续,
	2.7.将本批次的迁移明细表数据入库
	2.8.更新迁移表上一次滚动最后记录的滚动字段值信息、迁移批次、已完成记录数 + 更新迁移明细表状态为成功
	2.9.继续查下一批次数据重复执行
3.全量数据迁移完成,所有批次跑完,更新本次迁移数据的状态为成功

2.增量同步

 1.CanalConsumeTask开启2个线程的线程池:、binLog消息提交任务的线程交给线程池执行。
 binLog消息拉取任务从rocketMq中拉取消息:新增记录未消费binlog到mysql + 写队列 | 如果mysql已存在消息且为commited则进行一个手动commit + 写队列。
 binLog消息提交线程:扫描binlog表中已消费未提交的消息,手动做提交。
 
 2.而我们的IncrementTask会每隔15s执行数据的写入,遍历localQueue中消息,
 经过binLog的合并、校验、过滤,
 (对binLog数据与新库中的数据比对,过滤掉无用数据)
  (binLog为delete而新库中没有记录则跳过/binLog为update而新库没有数据则改为insert/旧库有数据并且新库也有并且拼接字符串相等且不为delete则跳过))
 执行数据的分组写入,写入完成后clear读队列,如果写入成功返回true,且为增量同步模式,将会批量更新数据库中消费状态为success。
 
 具体来讲:
 1.首先会获取内存中的数据缓存阻塞队列,每次进行增量同步数据读取的时候都会交换读写队列从读队列中拿到数据做遍历,
 对每条数据进行merge合并,其中会先【实例化一个MergeBinLogWriter对象】,将mq消息写入这个对象成员变量list中等待批量写入数据库,
 同时对每条数据从这个对象成员变量binlogDataMap中查重,map中没有则直接插入,有的话只会在当前数据比较新的时候做覆盖。
(这个队列中存放的是实现数据同步的读队列、写队列、CAS锁,双队列的目的是为了提高我们数据的写入和读取效率)

 2.然后对数据做校验,根据binlogDataMap去查询新库中数据,过滤掉无用数据(老库、新库的数据有无问题+数据新旧问题),
 将真正需要落库的数据保存到 binlogDataList中
 
 3.最后就是将binlogDataList按照表名做分组,进行批量数据写入,对能合并的数据做批量新增,
 剩余的更新和删除就逐条进行,判断如果批量写入成功且为增量同步则还需要批量更新数据库中binlog消息同步消费记录表中消息消费状态为成功

 4.一旦我们增量同步的数据批量操作失败,不会更新表中消费进度为success,仍旧为not_consumed,这时候会抛出异常导致本次任务的失败, 
 也就是说从读队列中拿出数据的写入全失败了-->停留在not_consumed状态。这里我们并没有使用事务,也就有可能说,一批binlog里面,
 部分数据写入成功了,部分数据写入库失败了,但整个批次在mq层面都会认为not_consumed,如何故障恢复呢?
 --->因为关闭了自动commit提交,rocketmq来说,拉到了一批消息,我没任何返回值,自动offset提交也关闭了,因此offset不会自动去提交,
 rocketmq来说,是收不到任何的反馈和通知的,在这种情况之下,rocketmq来说,我们长时间没有给rocketMQ反馈,会自动的把消息重试投递给
 -->rocketmq会重新投递给我们,再次通过消费者他的poll()拿到消息,然后去做判断:消息的消费记录已经存在,
 这时候就分情况,我们的canalPullRunner判断我们批量更新过程中可能都是not_consumed也可能将部分更新成为了consumed,
 但只要不是committed都去向队列投递,否则就定位这条消息进行commit提交,即【手动对已经消费的偏移量做提交】

 5.默认情况下会基于Canal做数据库变更日志的监听写入rocketMQ集群中去,
 然后我们的binLog拉取线程会在我们应用启动后自动执行(实现了ApplicationRunner接口),
 连接RocketMQ集群去做binLog日志的拉取工作,这里的话我们实现的手动Commit,
 之所以避免auto_commit是因为我们这个拉取binLog并消费入库的过程是【异步执行】的
-->我们拉取到消息后会根据queueId和offset去数据库binlog消息同步消费记录表去查重,根据是否存在做不同的业务逻辑处理
-->如果不存在则即把这条mq消息记录标记为【未消费】入库,然后去把这条mq消息以及binlog信息写入内存写队列等待做异步地消费;
存在的话判断数据库中这条mq消息记录状态如果是已提交那么我们去定位这条消息,做手动commit,否则地话向本地写队列投递。

  精准的去控制offset提交,我们要确保每一条binlog都已经被应用到你的目标数据源里去了,此时对这条offset才能去做一个提交,
  rocketmq模块,consumer提交offset都是自动提交,先提交到本次缓存里,再提交到rocketmq里去,这会导致offset提交不是太精准

3.天粒度的数据量统计

1.迁移数据统计表etl_statistical,当我们指定某一天作为迁移的起始日期,
那么会从那一天起始到当前日期,查表统计每一天生成的数据量保存在当前统计表etl_statistical中.
一开始统计表中无数据,会从表中(eg:订单表)查询订单记录的最小日期,从最小日期开始到当前日期,
遍历每一天进行统计每天数据量,然后我们定时任务会1h执行1次,后续执行发现表中有数据了,
再次执行的时候,只需要etl_statistical中距离当前日期最近(最大日期)的时间,
如果相差2天则只统计2天内的数据防止跨天数据产生,否则就从已统计的最大日期-到当天日期统计,
统计的同时会清空statisticalCountMap.clear()内存中缓存的数据量。

2.滚动批次总数据量缓存,key:迁移批次,value:缓存数据量
map存放的是指定【迁移表etl_process的唯一标识批次】与它的【缓存总数据量】的映射关系.
1h更新一次,考虑每小时的更新原因?==>新数据入库跨天数据产生

3.然后我们在全量数据同步过程中,如果发现写入数据量为0,那么会去检查是否同步完成,
发现如果批量写入数据量为0,判断可能同步完成,因此从statisticalCountMap缓存中获取指定批次它的缓存数据量
(有则直接获取;否则查etl_statistical表汇总每天数据量),与已完成额数据量对比,
已完成 >= 缓存总数据量则同步完成设置,设置etl_process表标志位为2即迁移完成。
同时考虑之所以每次统计最新数据量都清空statisticalCountMap,是为了保证缓存数据量和mysql数据量的一致性,

4.数据核对教程

定时任务,扫描etl_dirty_record中status为2即完成迁移的批次,一次最多扫描100条,然后分别从旧库和新库中
查询出来对应的记录进行校验,先比对旧记录是否存在于新库,不存在即添加到insert对应的list中;
然后比对filedKey和fieldValue拼接后字符串是否相等,我认为直接全量拼接比较好,相等则跳过,
不相等则判断旧记录时间是否>新记录时间,然后记录;然后做修正数据插入/更新,然后更新对应的
etl_dirty_record的值为4数据校对完成,【与此同时更新etl_process中对应的finishRecord的记录数】,
等这最多100条diry_record校对完成,查询etl_process中对应的finishRecord的记录数,
与etl_statistical中统计的总数据量作比较,finishRecord>=源表统计的总数据量则认为校对全部完成,
即更新【校对抽数】的status为4校对完成!

5.全量数据迁移任务中断

全量迁移过程中如果任务中断,我们通过定时任务ScrollTask扫描etl_process中迁移fail或者init的任务,
然后重新开启我们的数据迁移任务,并配置了retryTimes和errMsg记录重试次数和失败原因,不断进行尝试。

(五)问题剖析

0.项目初始化代码bug巨多,但思想还是不错的,v2版本的项目xml文件里的sql语句和数据库根本就对不起来!!!
v1能对起来但是数据库仍然是空的,直接入手实现功能的话确实困难!
整体步骤搞了1天才搞明白,改了一天把bug都修复的差不多了!几个点!
1.手动向etl_process_config表添加数据,order_info orderNo 0 0 和order_info orderNo 1 0(注意record_key需要驼峰命名),
这里是你想要迁移哪张表你就添加哪条记录,核心作用是让代码知道迁移过程中的分片键是什么、然后比对新旧记录的时候做拼接的核心点
2.在MergeConfig中手动添加:FIELD_MAP.put("statisticalKey", 
new ArrayList<>(Collections.singletonList("order_item_detail","order_item"))); 
要添加的原因在于CountCacheTask需要用到这个key,搭配@PostConstruct启动后立马执行,将每一天表的数据量写入统计表。
要迁移哪个表就添加那个,其实可以拓展数据库的,在v2版本实现了,但是v2版本又复杂了点,我直接在v1版本修复的bug。
3.在滚动插入数据时候,需要在最后一次插入etl_dirty_record之后,切记需要再查库获取最新插入的dirty_record的主键ID,
然后手动添加xml实现根据id更新dirty_Record的属性值,否则的话将会导致所有记录的同步数量大小变为最后一次同步的数量?
3有待验证,代码好久了都给忘了!

(六)项目优化

1.单库单表250w数据时,迁移到8库8表,部署在本地进行迁移,22:56开始迁移,23:04分完成迁移,迁移速度还可以,
理论上千万级数据30min左右,整体不错,但迁移完成后发现存在一个很大的问题就是核对抽数停留在一个数上不动了是什么原因?

2.项目中如何处理多数据源的问题?

3.控制台添加迁移记录时候,如果添加的时间段内没有数据需要额外处理

4.checkData的时候出现DuplicateKey经查询发现是由于数据库中插入了相同key的记录,这个应该在业务侧避免

5.SQL优化
2024-04-07:
发起全量数据同步时候发现可视化页面存在一个阻塞现象,直接查mysql慢查询日志,发现sql耗时10s多:
针对于数据迁移过程中的slow-query进行优化,添加联合索引解决添加全量迁移任务后的前端卡顿问题。

# 未添加索引前执行需要10s,默认走index索引
select order_no from order_item_detail where  create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1;
explain select order_no from order_item_detail where  create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1;

explain select count(*) from order_item_detail;

# 添加索引耗时16s,加了索引然后查询执行0.794s,filtered 50%,走联合索引idx_order_no_create_time
create index idx_order_no_create_time on order_item_detail(order_no,create_time);
select order_no from order_item_detail where  create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1;
explain select order_no from order_item_detail where  create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1;
drop index idx_order_no_create_time on order_item_detail;

# 添加了索引仍然耗时10s多,实际走的是order_no索引inx_item_order_no,using where
create index idx_order_no_create_time on order_item_detail(create_time,order_no);
select order_no from order_item_detail where  create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1;
EXPLAIN select order_no from order_item_detail where  create_time >= '2024-02-14 00:00:00' order by order_no asc LIMIT 1;
drop index idx_order_no_create_time on order_item_detail;
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

针对于传统单库单表存在的因业务发展带来的访问量激增、存储数据量激增等客观事实而导致的数据库并发量过大、可用链接数不足、存储和查询性能下降等问题的可靠的分库分表数据迁移方案,实现完成单库单表到多库多表、多库多表到多库多表的可靠的数据迁移方案,依托于滚动扫表全量同步、canal+rocketmq顺序消息增量同步实现数据迁移的不重不漏。 展开 收起
README
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pious68/migrate-mysql-data.git
git@gitee.com:pious68/migrate-mysql-data.git
pious68
migrate-mysql-data
migrate-mysql-data
master

搜索帮助