1 Star 0 Fork 3

wukong/datax-script

forked from 龙章铭/datax-script 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

datax-script

DataX非最佳实践。

一、写在前面

这里主要是说一下为什么要写这个项目。

刚学习完DataX,想用DataX解决一些实际项目中遇到到问题。先说说需求吧。业务系统大概有10多个不同的数据库,要做的就是从10多个不同的数据库中,每个数据库抽取部分表到数据分析系统中(以下简称A数据库),大概每个数据库需要抽100张表左右。架构如下。

各个业务系统    	  -->     数据分析系统
databaseA (100张表)----|
                      |
databaseB (100张表)----|	
                      |
databaseC (100张表)----|
                      |
databaseD (100张表)----| =====> A数据库
                      |
databaseE (100张表)----|
                      |
databaseF (100张表)----|
                      |
databaseG (100张表)----|

问题一:

DataX实现数据同步的方式是通过调用 python {Datax_PATH}/bin/datax.py {Job_PATH}/job/xxx.json 命令来执行作业的,那么如果要实现上述的需求,假设每个数据库抽取100张表,总共就是700张表。如果一两个手动配就完事了,但是如果有上百个,手动配的话可太麻烦了。最愚蠢的做法,你需要为此创建700个json文件。并为每个json文件写一条调用命令,而后你可以把这些命令都塞到一个.sh文件里面,再通过系统定时的执行实现定时抽取。类似的.sh文件写法可能如下:

python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_1.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_2.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_3.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_4.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_5.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_6.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_7.json
python {Datax_PATH}/bin/datax.py  {Job_PATH}/job/tab_8.json
......

问题二:

日志问题:DataX执行的时候会在控制台打印执行日志,并把日志写入到datax/log目录下面。但是在实际项目使用中,是不允许随时登录到机器上去查看日志文件的,且日志文件也不利于日常的抽取监控,所以需要把执行的结果写入到数据库中,通过前端查询数据库表来监控抽取情况。但DataX并未提供类似接口来实现此操作。

问题三:

数据迁移问题:针对于上面的两个问题,现在成熟的解决方案目测是 DataX-Web ,应该能解决上述问题,但有一个问题,所有的日志及配置都是存在于MySQL数据库中,我们现有项目使用的是DB2数据库,如果要满足现有需求(把日志及配置写入DB2数据库),铁定是需要二次开发的。PS:要二开别人的项目,难度还是很大的,不知道要踩多少坑。虽然此项目也是也是使用MySQL来存储配置文件,但是程序逻辑简单,二开很容易。

基于以上的问题,想着自己写一个小程序,来解决这些问题。才有了此项目。

二、基本介绍

能做什么

  • 根据配置+模板,批量生成DataX执行所需要的JSON文件。
  • 根据配置,批量执行DataX命令。
  • 详细的日志记录,写入到数据库保存。方便查看、迁移。
  • 列自动同步,第一次执行只需要配置“*”,即可自动用目标端的列信息更新配置。

三、软件架构

  • 执行流程图如下(因为我没有图床,本地图片你们看不到,就用这种方式画图了,git上可能渲染不了流程图,建议用Typora打开即可查看流程图)。
graph TD pz[配置数据] & tep[模板文件] --> json[使用FreeMarker生JSON文件] json --> run[DataX调用json] --> log[记录日志]
  • 再画一个吧
配置数据 =\
          ==> 使用FreeMarker生JSON文件 ==> DataX调用JSON ==> 记录日志
模板文件 =/

执行流程为:程序先查询数据库的配置数据,使用FreeMarker根据配置数据+模板,生成供DataX执行的JSON文件,并使用DataX执行此JSON,执行作业,作业完成之后写入日志到日志表中。如果有多条配置,按照顺序重复此操作。

四、快速启动

  • 将带你从零开始搭建一个环境,从而让你快速熟悉DataX-Script的使用姿势。
  • liunx:入门示例

五、使用说明

  • 调用方式
# 执行所有,执行所有配置表中有效的配置。
# 即:valid_flag='1'
$ java -jar datax-script-1.0.0.jar 1

# 执行批次,执行所有配置表中整个批次下面的配置。
# 即:valid_flag='1' AND module_type='ZZ1'
$ java -jar datax-script-1.0.0.jar 1 ZZ1

# 执行单表,不知道怎么描述了。
# 即:valid_flag='1' AND module_type='ZZ1' AND src_object_name='test' 
$ java -jar datax-script-1.0.0.jar 1 ZZ1 test

# 对于DB2等表名为Schema.table的可以写成如下
$ java -jar datax-script-1.0.0.jar 1 ZZ1 schema.test

5.1 配置表信息详解

5.1.1 表字段列表

列名 注释
job_id 主键id
job_seq job顺序号
job_type job类型
reader_type reader类型
writer_type writer类型
src_system_id 源系统Schema
des_system_id 目标系统Schema
src_object_name 源系统对象名称
des_object_name 目标系统对象名称
src_object_desc 源对象描述
split_pk 切分键
where where条件
cols 加载列
frequency 加载频率
valid_flag job是否有效标志
last_update 最近更新时刻
job_name job名称
module_type job模块类型
job_script_template_name 生成json文件的模板名称
job_script_run_name 要执行的json文件名称
is_create_script 是否自动生成脚本::0:每次自动生成,1:job_script_run_name为空时生成
ddl_auto_sync 是否自动更新cols,0:不更新,1:当cols列为空或*时更新
ddl_specific ddl修饰
cols_cal_def 计算/转换列定义
cols_cal_exp 计算/转换列表达式
core_byte core_byte限速(单channel字节)
job_byte job_byte限速(全局字节)
core_record core_record限速(单channel条数)
job_record job_record限速(全局channel条数)
job_channel 全局并发
job_jvm_xms JVM堆内存初始大小(G)
job_jvm_xmx JVM堆内存最大大小(G)

5.1.2 字段详解

  • job_id
    • 描述:配置的表自增组件
    • 必选:是
    • 默认值:无
  • ☆job_seq
    • 描述:作业执行的顺序,升序。如两条作业,1跟2,先执行1,再执行2。
    • 必选:是
    • 默认值:无
  • job_type
    • 描述:暂无实际意义,可做注释信息。
    • 必选:否
    • 默认值:无
  • reader_type
    • 描述:配合模板使用。使用方式:${reader_type}。DataX脚本中reader部分的name。
    • 必选:否
    • 默认值:无
    • 可选值:oraclereadermysqlreaderpostgresqlreaderrdbmsreader
  • writer_type
    • 描述:配合模板使用。使用方式:${writer_type}。DataX脚本中writer部分的name。
    • 必选:否
    • 默认值:无
    • 可选值:oraclewritermysqlwriterpostgresqlwriterrdbmswriter
  • src_system_id
    • 描述:源表的Schema,对于DB2等数据库,表名的表示方式为schema.table,此字段可用来存放schema,对于mysql等,空着就行。
    • 必选:否
    • 默认值:无
  • des_system_id
    • 描述:目标表的Schema,对于DB2等数据库,表名的表示方式为schema.table,此字段可用来存放schema,对于mysql等,空着就行。
    • 必选:否
    • 默认值:无
  • ☆src_object_name
    • 描述:配合模板使用,使用方式${src_object_name}。源表的table名。如果上面的src_system_id不为空,程序会自动拼凑成des_system_id.src_object_name。为空就是src_object_name。所以对于DB2数据库来说,可以schama写在src_system_id里,table写在这里。也可也直接在这里写schema.table
    • 必选:否
    • 默认值:无
  • ☆des_object_name
    • 描述:配合模板使用,使用方式${des_object_name}。目标表的table名。如果上面的des_system_id不为空,程序会自动拼凑成des_system_id.des_object_name。为空就是des_object_name。所以对于DB2数据库来说,可以schama写在des_system_id里,table写在这里。也可也直接在这里写schema.table
    • 必选:否
    • 默认值:无
  • src_object_desc
    • 描述:要抽取的表的表名描述,暂无实际意义,可做注释信息。
    • 必选:否
    • 默认值:无
  • split_pk
    • 描述:配合模板使用,使用方式${split_pk}。DataX脚本中reader部分的splitPk。
    • 必选:否
    • 默认值:无
  • ☆where
    • 描述:配合模板使用,使用方式${where}。DataX脚本中reader部分的where。
    • 必选:否
    • 默认值:1=1
  • ☆cols
    • 描述:配合模板使用,使用方式${cols}。DataX脚本中reader部分的column。注意事项①:配置成:id,name,sex,程序会转换成:id","name","sex。所以用的时候要注意。注意事项②:当ddl_auto_sync='1' AND cols IN ('', '*')的时候,可以自动用目标表的列信息更新配置。
    • 必选:否
    • 默认值:*
  • frequency
    • 描述:暂无实际意义
    • 必选:否
    • 默认值:无
  • ☆valid_flag
    • 描述:配置是否有效标志,对应启动命令java -jar datax-script.jar 1 ZZ1中的1,启动命令传入中的第一个参数传入1,就会查询所有此字段为1的配置,然后顺序执行。
    • 必选:否
    • 默认值:1
  • last_update
    • 描述:配置的执行时间,调用一次更新一次,可查看此配置最近一次调用的时刻。
    • 必选:否
    • 默认值:无
  • job_name
    • 描述:job名称,暂无实际意义,注释信息。
    • 必选:否
    • 默认值:无
  • ☆module_type
    • 描述:job模块类型,对应启动命令java -jar datax-script.jar 1 ZZ1中的ZZ1,启动命令传入中的第二个参数传入ZZ1,就会查询所有此字段为ZZ1的配置,然后顺序执行。
    • 必选:否
    • 默认值:无
  • ☆job_script_template_name
    • 描述:生成json文件的模板名称。对应template目录下的模板文件名。如测试示例配置的是mysqlTomysql.ftl
    • 必选:否
    • 默认值:无
  • ☆job_script_run_name
    • 描述:要执行的JSON文件名称。此名称一般由job_script_template_name生成而来。也可以自己指定。
    • 必选:是
    • 默认值:无
  • ☆is_create_script
    • 描述:是否自动生成脚本。0:每次都去请求模板并生成新的JSON文件,1:job_script_run_name为空时生成。推荐设置为1,生成一次后续就一直用它了。如果更改配置,把job_script_run_name字段的内容置为空,就能重新生成。
    • 必选:否
    • 默认值:无
  • ☆ddl_auto_sync
    • 描述:是否自动更新cols字段,0:不自动更新,1:当cols字段为‘ ’ 或‘*’时更新。
    • 必选:是
    • 默认值:无
  • ddl_specific
    • 描述:ddl修饰,暂无实际意义。
    • 必选:否
    • 默认值:无
  • cols_cal_def
    • 描述:计算/转换列定义,暂无实际意义。
    • 必选:否
    • 默认值:无
  • cols_cal_exp
    • 描述:计算/转换列表达式,暂无实际意义。
    • 必选:否
    • 默认值:无
  • core_byte
    • 描述:core_byte限速(单channel字节)。配合模板使用,使用方式${core_byte}
    • 必选:否
    • 默认值:无
  • job_byte
    • 描述:job_byte限速(全局字节)。配合模板使用,使用方式${job_byte}
    • 必选:否
    • 默认值:无
  • core_record
    • 描述:core_record限速(单channel条数)。配合模板使用,使用方式${core_record}
    • 必选:否
    • 默认值:无
  • job_record
    • 描述:job_record限速(全局channel条数)。配合模板使用,使用方式${job_record}
    • 必选:否
    • 默认值:无
  • job_channel
    • 描述:全局并发
    • 必选:否
    • 默认值:无
  • job_jvm_xms
    • 描述:JVM堆内存初始大小(G),如指定为3,那么执行JSON时生成的命令如:python .../bin/datax3.py --jvm="-Xms3G -Xmx?G" job/xxx.json
    • 必选:否
    • 默认值:无
  • job_jvm_xmx
    • 描述:JVM堆内存最大大小(G),如指定为3,那么执行JSON时生成的命令如:python .../bin/datax3.py --jvm="-Xms?G -Xmx3G" job/xxx.json
    • 必选:否
    • 默认值:无

5.1.3 template示例

  • datax-script的架构很简单,FreeMarker的模板+配置表里面的数据=要执行JSON文件,如果你了解FreeMarker,那么将非常容易理解,并解锁多种用法,如果不了解,也非常简单,就简单了解成在模板里写${split_pk},就会用配置表里面split_pk字段的值对这个字符串做替换。就仅仅只是一个字符串替换而已。下面给出一个示例模板。所有用${}表示的特殊符号,都会用配置表同名的字段做字符串替换,从而生成可执行的JSON文件。
{
  "job": {
    "setting": {
      "speed": {
        "channel": ${job_channel}
      }
    },
    "content": [
      {
        "reader": {
          "name": "${reader_type}",
          "parameter": {
            "username": "root",
            "password": "LZM123456",
            "column": [
              "${cols}"
            ],
            "splitPk": "${split_pk}",
            "connection": [
              {
                "table": [
                  "${src_object_name}"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/al_test1"
                ]
              }
            ],
            "where": "${where}"
          }
        },
        "writer": {
          "name": "${writer_type}",
          "parameter": {
            "writeMode": "insert",
            "username": "root",
            "password": "LZM123456",
            "column": [
              "${cols}"
            ],
            "session": [
              "set session sql_mode='ANSI'"
            ],
            "preSql": [
              "delete from ${des_object_name} where ${where}"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/al_test2?useUnicode=true&characterEncoding=utf8",
                "table": [
                  "${des_object_name}"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

5.2 扩展

  • 如上所示,配置表里面大部分字段都是为给RDBMS 关系型数据库 使用的,如果要使用其他数据库直接的抽取怎么使用模板自动生成呢?

  • 对于其他类型的数据库的模板文件书写方式解决方案,如下是alibaba提供的hdfs的读示例JSON,你还是可以直接把此文件的内容当做一个FreeMarker的模板,并同样的用${xxx}的方式用配置表里面的字符来替换模板里面的内容,如果配置表的字段语义不够使用,可以随意的增加字段。比如给配置表增加一个abc字段,那么在如下的内容中,使用${abc}就能用abc字段的内容替换模板生成新的JSON

{
    "job": {
        "setting": {
            "speed": {
                "channel": ${abc}
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/user/hive/warehouse/mytable01/*",
                        "defaultFS": "hdfs://xxx:port",
                        "column": [
                               {
                                "index": 0,
                                "type": "long"
                               },
                               {
                                "index": 1,
                                "type": "boolean"
                               },
                               {
                                "type": "string",
                                "value": "hello"
                               },
                               {
                                "index": 2,
                                "type": "double"
                               }
                        ],
                        "fileType": "orc",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true
                    }
                }
            }
        ]
    }
}

六、参与贡献

  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request

空文件

简介

DataX非最佳实践,对DataX封装,批量生成JSON文件,批量调用执行。 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/ahuachen/datax-script.git
git@gitee.com:ahuachen/datax-script.git
ahuachen
datax-script
datax-script
master

搜索帮助