# mysql2td_gitee **Repository Path**: rubygreat/mysql2td ## Basic Information - **Project Name**: mysql2td_gitee - **Description**: mysql到tdengine数据迁移脚本 - **Primary Language**: Python - **License**: CC-BY-4.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-09-30 - **Last Updated**: 2025-03-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README --- title: 数据迁移脚本案例 date: 2022-09-30 08:26:11 tags: - ETL - 数据迁移 categories: - ETL工具 --- 使用Python脚本,将MySQL数据迁移至TDengine 项目背景: 公司调研过程时序数据库,做技术储备。最开始调研TDengine2.4,随时间推进官方升级2.6,在2.6上做了数据迁移,一批手机上传上来的定位数据,使用DATAX,使用mysqlreader加淘思官方提供的tdenginewriter插件,做好字段对应关系,写入数据,按阿里DATAX效率,开3个管道速度也并不快,走内网平均迁移速递大概在5000条/秒;到最近,涛思升级TDengine为3.0版本,很多附属工具基本重构,支持了流式计算,增加了社区版的数据删除等功能,这次使用DATAX适配失败,可能是改的太多了,索性尝试先使用python脚本迁移下。 说干就干,官方提供了[快速迁移方案](https://docs.taosdata.com/develop/insert-data/high-volume/),但是数据源是虚拟出来的,并没有时间主键,而且拆分子表是按量拆分然后平均分配,索性直接放弃,但是里面的多线程写入还是要参考的。 由于是python初学者,类和一些库函数使用还不是很熟,简简单单一把梭,一条流水从头撸到尾。迁移也是尝试了很多方法,遇到了很多问题这里逐一介绍。 ## TDengine连接建立 [官方提供文档](https://docs.taosdata.com/develop/connect/),两种连接方式,原生客户端,restful方式。 ### taosc 本地客户端方式 #### 安装驱动 安装包见官网 [Linux](https://www.taosdata.com/assets-download/3.0/TDengine-client-3.0.1.4-Linux-x64.tar.gz)|[Windows](https://www.taosdata.com/assets-download/3.0/TDengine-client-3.0.1.1.4-Windows-x64.exe) win安装,linux解压,执行 `chmod+x install_client.sh && install_client.sh` #### 配置客户端 > 前提是与服务端的6030端口,开启TCP和UDP的放行, 如果是集群服务端,所有FQDN都需要添加端口两个协议的放行 - 修改cfg配置文件(win: C:\TDengine\cfg\taos.cfg linux:/etc/taos/taos.cfg) 修改 firstEP 为TDengine服务端的FirstEP ``` # The end point of the first dnode in the cluster to be connected to when `taosd` or `taos` is started firstEp tdengine1:6030 ``` - 修改host文件,添加FQDN解析映射 ``` vim /etc/hosts 192.168.1.xxx tdengine1 192.168.1.xxx tdengine2 192.168.1.xxx tdengine3 ``` - 验证连接 ``` ping tdengine1 ping tdengine2 ping tdengine3 telnet tdengine1 6030 telnet tdengine2 6030 telnet tdengine3 6030 ``` - taosc验证 ``` $ taos -h tdengine1 -u root -p taos > show databases; ``` #### 安装python库 ``` pip uninstall taos taospy pip install taospy ``` 此种安装方法最高版本2.4.x,使用`pip_search`查看最高2.5.1(2022.09.30),还是使用git源直接安装靠谱: ``` pip install git+https://github.com/taosdata/taos-connector-python.git ``` 但是吧,github方式国内连接不畅,可以[下载包](https://github.com/taosdata/taos-connector-python/releases/download/v2.6.4/taospy-2.6.4-py3-none-any.whl)后来本地安装。 > 注意:要求python版本3.8以上,非官方文档上的3.6以上。 #### [测试连接](https://github.com/taosdata/TDengine/blob/main/docs/examples/python/connect_example.py) ``` import taos def test_connection(): # all parameters are optional. # if database is specified, # then it must exist. conn = taos.connect(host="localhost", port=6030, user="root", password="taosdata", database="log") print('client info:', conn.client_info) print('server info:', conn.server_info) conn.close() if __name__ == "__main__": test_connection() ``` ### restful 方式 万能连接大法,能发送GET、POST请求就行。 > 与原生连接器的一个区别是,RESTful 接口是无状态的,因此 USE db_name 指令没有效果,所有 > 对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时 > 如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。 - 确认服务端开启 taosadapter firstEP服务端运行 ``` systemctl status taosadapter ``` 确保开放6041 ``` lsof -nP -iTCP -sTCP:LISTEN ``` - 测试连接 ``` curl -u root:password http://192.168.1.xxx:6041/rest/sql -d "select server_version()" ``` > HTTP 响应码 > response code 说明 > 200 正确返回和 C 接口错误返回 > 400 参数错误返回 > 401 鉴权失败 > 404 接口不存在 > 500 内部错误 > 503 系统资源不足 - 安装python包 同上一章 - [测试连接](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/python/connect_rest_examples.py) ``` from taosrest import connect, TaosRestConnection, TaosRestCursor conn: TaosRestConnection = connect(url="http://localhost:6041", user="root", password="taosdata", timeout=30) ``` ## 源数据batch ### 拆分、循环方法 因源数据有上亿级别数据,不可能一个`SELECT`撸到低,首先源表必须加上自增主键,或者带索引的可以作为分片键的字段(可以现加),然后取个最大值,用for循环分批次去读取,如果数据处理模块、函数是要分多线程来取数据,需要有个全局变量记录不同线程之间取完数据后的增量值。 ## 子表名称拼接 这里会考虑将源表代表设备、人等可分子表的关键键值,作为分组条件,取出各个唯一值,全量再去循环每个子表的固定batch,分批拼接`INSERT`语句后写入。这里结合上面的分批取源表数据,会有嵌套循环的场景, - 先从全量数据取出固定分组,然后循环每个分组内的Id做for循环,这里好处是可以保证子循环里Id可以连续,不会按自增来跑空; - 先取全量Id的连续值,在子循环里再找分组,这里好处是分组的压力对源库压力小,不至于直接给上亿条数据直接`GROUP BY `; - 还有一种是先取全量Id连续值,子循环里拆分组,这个分组之后,由于有设备、人等键的限制,Id是不连续的,那就再次来个for循环遍历分组内的结果,直接拼出来`INSERT`语句。 这里各种循环,没得用函数,也没得分文件,最开始需求就是要迁移数据,有思路更清晰的应该能更好的给模块化下。 ## TDengine语句拼接 TDengine对标签数据和子表内数据写入,和正常的SQL标准内的还稍微有些区别,和其他时序类的应该很相似,这里在[官方文档](https://docs.taosdata.com/taos-sql/insert/)里有段 ``` INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33) d21002 USING meters (groupId) TAGS (2) VALUES ('2021-07-13 14:06:34.255', 10.15, 217, 0.33) d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31); ``` 需要拿出标签数据写在`INSERT`段里,然后`VALUES`值按行写在括号内。 ## 简单日志 上面讲到有实际已经到了三个循环,可以用全局变量累计每个子循环的时间、返回行数的累计,计算出平均迁移速度,返回给前台。 ## 后台运行 ``` nohup python3.9 mysql2tdnative.py & ``` ## 实际脚本 放托管平台了 [GitHub](https://github.com/redgreat/mysql2td) [Gitee](https://gitee.com/rubygreat/mysql2td) [Codeberg](https://codeberg.org/wangcw/mysql2td_codeberg) 本来想写一堆,写着写着忘词了~