48 Star 502 Fork 228

GVP袋鼠云 / chunjun

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
dmwriter.md 5.43 KB
一键复制 编辑 原始数据 按行查看 历史
tudou 提交于 2021-02-05 18:05 . 1、新增Oracle LogMiner实时采集插件

DM Writer

一、插件名称

名称:dmwriter

二、支持的数据源版本

DM7、DM8

三、参数说明

  • connection
    • 描述:数据库连接参数,包含jdbcUrl、schema、table等参数
    • 必选:是
    • 字段类型:List
      • 示例:指定jdbcUrl、schema、table
    "connection": [{
         "jdbcUrl": "jdbc:dm://localhost:5236",
         "table": ["table"],
         "schema":"public"
        }] 
    • 默认值:无

  • jdbcUrl
    • 描述:针对关系型数据库的jdbc连接字符串
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • schema
    • 描述:数据库schema名
    • 必选:否
    • 字段类型:String
    • 默认值:无

  • table

    • 描述:目的表的表名称。目前只支持配置单个表,后续会支持多表
    • 必选:是
    • 字段类型:List
    • 默认值:无

  • username
    • 描述:数据源的用户名
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • password
    • 描述:数据源指定用户名的密码
    • 必选:是
    • 字段类型:String
    • 默认值:无

  • column
    • 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]
    • 必选:是
    • 默认值:否
    • 字段类型:List
    • 默认值:无

  • fullcolumn
    • 描述:目的表中的所有字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age","hobby"],如果不配置,将在系统表中获取
    • 必选:否
    • 字段类型:List
    • 默认值:无

  • preSql
    • 描述:写入数据到目的表前,会先执行这里的一组标准语句
    • 必选:否
    • 字段类型:String
    • 默认值:无

  • postSql
    • 描述:写入数据到目的表后,会执行这里的一组标准语句
    • 必选:否
    • 字段类型:String
    • 默认值:无

  • writeMode
    • 描述:控制写入数据到目标表采用 insert into 或者 merge into 语句
    • 必选:是
    • 所有选项:insert/update
    • 字段类型:String
    • 默认值:insert

  • batchSize
    • 描述:一次性批量提交的记录数大小,该值可以极大减少FlinkX与数据库的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成FlinkX运行进程OOM情况
    • 必选:否
    • 字段类型:int
    • 默认值:1024

  • updateKey
    • 描述:当写入模式为update时,需要指定此参数的值为唯一索引字段
    • 注意:
      • 采用merge into语法,对目标表进行匹配查询,匹配成功时更新,不成功时插入;
    • 必选:否
    • 字段类型:Map<String,List>
      • 示例:"updateKey": {"key": ["id"]}
    • 默认值:无

四、配置示例

1、insert

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "sliceRecordCount": ["100"],
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "age",
                "type": "int"
              }
            ]
          },
          "name": "streamreader"
        },
        "writer": {
          "name": "dmwriter",
          "parameter": {
            "username": "username",
            "password": "password",
            "connection": [
              {
                "jdbcUrl": "jdbc:dm://localhost:5236",
                "table": ["table"]
              }
            ],
            "preSql": [],
            "postSql": [],
            "mode": "insert",
            "column": ["ID","AGE"]
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      }
    }
  }
}

2、update

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "sliceRecordCount": ["1"],
            "column": [
              {
                "name": "int",
                "type": "int",
                "value": "3"
              },
              {
                "name": "age",
                "type": "int",
                "value": "3"
              }
            ]
          },
          "name": "streamreader"
        },
        "writer": {
          "name": "dmwriter",
          "parameter": {
            "username": "username",
            "password": "password",
            "connection": [
              {
                "jdbcUrl": "jdbc:dm://localhost:5236",
                "table": ["table"]
              }
            ],
            "preSql": [],
            "postSql": [],
            "mode": "update",
            "updateKey": {"key": ["ID"]},
            "column": ["ID","AGE"]
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      },
      "errorLimit": {
        "record": 100
      },
      "restore": {
        "maxRowNumForCheckpoint": 0,
        "isRestore": false,
        "restoreColumnName": "",
        "restoreColumnIndex": 0
      }
    }
  }
}
Java
1
https://gitee.com/dtstack_dev_0/chunjun.git
git@gitee.com:dtstack_dev_0/chunjun.git
dtstack_dev_0
chunjun
chunjun
1.10_release

搜索帮助