48 Star 500 Fork 228

GVP袋鼠云 / chunjun

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
binlogreader.md 9.68 KB
一键复制 编辑 原始数据 按行查看 历史
tudou 提交于 2021-02-05 18:05 . 1、新增Oracle LogMiner实时采集插件

MySQL Binlog Reader


一、插件名称

名称:binlogreader


二、支持的数据源版本

MySQL5.1.5及以上


三、数据库配置

1.修改配置文件

binlog_format需要修改为 ROW 格式,在/etc/my.cnf文件里[mysqld]下添加下列配置

server_id=109
log_bin = /var/lib/mysql/mysql-bin
binlog_format = ROW
expire_logs_days = 30

2.添加权限

mysql binlog权限需要三个权限 SELECT, REPLICATION SLAVE, REPLICATION CLIENT

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  • 缺乏SELECT权限时,报错为
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:
Access denied for user 'canal'@'%' to database 'binlog'
  • 缺乏REPLICATION SLAVE权限时,报错为
java.io.IOException: 
Error When doing Register slave:ErrorPacket [errorNumber=1045, fieldCount=-1, message=Access denied for user 'canal'@'%'
  • 缺乏REPLICATION CLIENT权限时,报错为
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:
        Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation

binlog为什么需要这些权限:

  • Select权限代表允许从表中查看数据
  • Replication client权限代表允许执行show master status,show slave status,show binary logs命令
  • Replication slave权限代表允许slave主机通过此用户连接master以便建立主从 复制关系

四、参数说明

  • jdbcUrl
    • 描述:MySQL数据库的jdbc连接字符串,参考文档:Mysql官方文档
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • username
    • 描述:数据源的用户名
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • password
    • 描述:数据源指定用户名的密码
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • host
    • 描述:启动MySQL slave的机器ip
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • port
    • 描述:启动MySQL slave的端口
    • 必选:否
    • 字段类型:int
    • 默认值:3306

  • table
    • 描述:需要解析的数据表。
    • 注意:指定此参数后filter参数将无效,table和filter都为空,监听jdbcUrl里的schema下所有表
    • 必选:否
    • 字段类型:list
    • 默认值:无

  • filter
    • 描述:过滤表名的Perl正则表达式
    • 注意:table和filter都为空,监听jdbcUrl里的schema下所有表
    • 必选:否
    • 字段类型:string
    • 默认值:无
    • 例子:
      • 所有表:_.*_
      • canal schema下所有表: canal\..*
      • canal下的以canal打头的表:canal\.canal.*
      • canal schema下的一张表:canal\.test1

  • cat
    • 描述:需要解析的数据更新类型,包括insert、update、delete三种
    • 注意:以英文逗号分割的格式填写。如果为空,解析所有数据更新类型
    • 必选:否
    • 字段类型:string
    • 默认值:无

  • start
    • 描述:要读取的binlog文件的开始位置
    • 注意:为空,则从当前position处消费,timestamp的优先级高于 journalName+position
    • 参数:
      • timestamp:时间戳,采集起点从指定的时间戳处消费;
      • journalName:文件名,采集起点从指定文件的起始处消费;
      • position:文件的指定位置,采集起点从指定文件的指定位置处消费
    • 字段类型:map
    • 默认值:无

  • pavingData
    • 描述:是否将解析出的json数据拍平
    • 必选:否
    • 字段类型:boolean
    • 默认值:true
    • 示例:假设解析的表为tb1,数据库为test,对tb1中的id字段做update操作,id原来的值为1,更新后为2,则pavingData为true时,数据格式为:
{
    "type":"update",
    "schema":"test",
    "table":"tb1",
    "ts":6760525407742726144,
    "before_id":1,
    "after_id":2
}

pavingData为false时:

{
    "message":{
         "type":"update",
         "schema":"test",
         "table":"tb1",
         "ts":6760525407742726144,
         "before":{
             "id":1
         },
         "after":{
             "id":2
         }
    }
}
  • type:变更类型,INSERT,UPDATE、DELETE
  • ts:自增ID,不重复,可用于排序,解码后为FlinkX的事件时间,解码规则如下:
    long id = Long.parseLong("6760525407742726144");
    long res = id >> 22;
    DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println(sdf.format(res));		//2021-01-28 19:54:21

  • slaveId
    • 描述:从服务器的ID
    • 注意:同一个MYSQL复制组内不能重复
    • 必选:否
    • 字段类型:long
    • 默认值:new Object().hashCode()

  • connectionCharset
    • 描述:编码信息
    • 必选:否
    • 字段类型:string
    • 默认值:UTF-8

  • detectingEnable
    • 描述:是否开启心跳
    • 必选:否
    • 字段类型:boolean
    • 默认值:true

  • detectingSQL
    • 描述:心跳SQL
    • 必选:否
    • 字段类型:string
    • 默认值:SELECT CURRENT_DATE

  • enableTsdb
    • 描述:是否开启时序表结构能力
    • 必选:否
    • 字段类型:boolean
    • 默认值:true

  • bufferSize
    • 描述:并发缓存大小
    • 注意:必须为2的幂
    • 必选:否
    • 默认值:1024

  • parallel
    • 描述:是否开启并行解析binlog日志
    • 必选:否
    • 字段类型:boolean
    • 默认值:true

  • parallelThreadSize
    • 描述:并行解析binlog日志线程数
    • 注意:只有 paraller 设置为true才生效
    • 必选:否
    • 字段类型:int
    • 默认值:2

  • isGTIDMode
    • 描述:是否开启gtid模式
    • 必选:否
    • 字段类型:boolean
    • 默认值:false

五、配置示例

1、单表监听

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "table": ["table"],
            "password": "passwd",
            "database": "database",
            "port": 3306,
            "cat": "DELETE,INSERT,UPDATE",
            "host": "host",
            "jdbcUrl": "jdbc:mysql://host:port/schema",
            "pavingData": true,
            "username": "name"
          },
          "name": "binlogreader"
        },
        "writer": {
          "parameter": {
            "print": true
          },
          "name": "streamwriter"
        }
      }
    ],
    "setting": {
      "restore": {
        "isStream": true
      },
      "speed": {
        "channel": 1
      }
    }
  }
}

2、多表监听

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "table": ["table1","table2"],
            "password": "passwd",
            "database": "database",
            "port": 3306,
            "cat": "DELETE,INSERT,UPDATE",
            "host": "host",
            "jdbcUrl": "jdbc:mysql://host:port/schema",
            "pavingData": true,
            "username": "name"
          },
          "name": "binlogreader"
        },
        "writer": {
          "parameter": {
            "print": true
          },
          "name": "streamwriter"
        }
      }
    ],
    "setting": {
      "restore": {
        "isStream": true
      },
      "speed": {
        "channel": 1
      }
    }
  }
}

3、正则监听

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "filter": "schema\\..*",
            "password": "passwd",
            "database": "database",
            "port": 3306,
            "cat": "DELETE,INSERT,UPDATE",
            "host": "host",
            "jdbcUrl": "jdbc:mysql://host:port/schema",
            "pavingData": true,
            "username": "name"
          },
          "name": "binlogreader"
        },
        "writer": {
          "parameter": {
            "print": true
          },
          "name": "streamwriter"
        }
      }
    ],
    "setting": {
      "restore": {
        "isStream": true
      },
      "speed": {
        "channel": 1
      }
    }
  }
}

4、指定起始位置

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "filter": "schema\\..*",
            "password": "passwd",
            "database": "database",
            "port": 3306,
             "start" : {
              "journalName": "binlog.000031",
              "position": 4
            },
            "cat": "DELETE,INSERT,UPDATE",
            "host": "host",
            "jdbcUrl": "jdbc:mysql://host:port/schema",
            "pavingData": true,
            "username": "name"
          },
          "name": "binlogreader"
        },
        "writer": {
          "parameter": {
            "print": true
          },
          "name": "streamwriter"
        }
      }
    ],
    "setting": {
      "restore": {
        "isStream": true
      },
      "speed": {
        "channel": 1
      }
    }
  }
}
Java
1
https://gitee.com/dtstack_dev_0/chunjun.git
git@gitee.com:dtstack_dev_0/chunjun.git
dtstack_dev_0
chunjun
chunjun
1.10_release

搜索帮助