# flink-sql-connector-sftp **Repository Path**: chneki/flink-sql-connector-sftp ## Basic Information - **Project Name**: flink-sql-connector-sftp - **Description**: flink sql 自定义 table source 实现 sftp读取压缩文件 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 8 - **Forks**: 5 - **Created**: 2022-06-12 - **Last Updated**: 2025-05-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 参数说明 | 参数 | 属性 | 默认值 | 说明 | 必填 | |---------------------------| ------- |----------|:---------------------------------------------------------------------------| ---- | | hostname | String | 无 | ip地址 | 是 | | port | Int | 22 | 端口号 | 否 | | username | String | 无 | 用户名 | 是 | | password | String | "" | 密码 | 是 | | path | String | 无 | 文件路径 1. 可填写xxx/xxx.txt 2. xxx/xxx1.txt,xxx/xxx2.txt 3.xxx 4.xxx/`[正则表达式]` | 是 | | encoding | String | UTF-8 | 文件编码类型 | 否 | | first-line-header | Boolean | false | 是否跳过文件首行 | 否 | | timeout | Int | 10000 | ftp连接超时时间,毫秒 |否| | is-metric | boolean | false | 是否开启监控, 若开启监控需要配置此jar包中的conf.properties文件 |否| | check-ok-file | boolean | false | 是否检查ok文件 |否| | read-mode | String | stream | stream 支持多并发的持续读,streamSingle单线程的顺序读 |否| | delete-read | boolean | false | 是否将已读文件删除 |否| | byte-delimiter-length | int| 1 | 文件换行符的长度,\n,\r\n分别对应1,2 |否| | format | String | sftp-csv | 自定义的format |是| | sftp-csv.column-delimiter | String | 003 | 003便是分割符号为 \003 ,多个分隔符用`&&`连接 例如\003与\007为`003&&007` |否| | sftp-csv.line-delimiter | String | 无 | 换行符理解为这一行文件的终止符,读取文件到此终止符即为一行数据在提交执行 多换行符用`&&`连接 eg:003&&004 |否| | sftp-csv.ignore-parse-errors | boolean | false | 忽略错误行,错误行信息会打印到日志中。。。 |否| ## 数据源说明 | 值 | 说明 | | ------------ | ------------------------------------------------------------ | | path | 文件路径 1. 可填写xxx/xxx.txt 2. xxx/xxx1.txt,xxx/xxx2.txt 3.xxx 4.xxx/`[正则表达式]` | 1. 根据文件名来匹配固定文件。 2. 根据文件名 逗号分隔来匹配多个文件。 3. 根据文件夹来递归匹配该目录下全部文件及其子目录下的全部文件。 4. 根据文件夹+正则法则,来匹配该文件夹及其子目录下的文件。 5. 针对,stream和singleStream模式,会根据stream-interval来发现新增的文件,以及有追加操作的文件。 6. 针对,stream和singleStream模式,可以设置checkpoint相关参数来记录文件读取的信息,其中包含文件的全路径名称,文件读取的字节数。 7. 针对,stream和singleStream模式,可以通过checkpoint或savepoint记录点来实现断点恢复,有效的避免了任务故障后,需要重新启动的问题,同时可以随时的暂停任务。 ## 标识性文件检查 | 值 | 说明 | | ------------ | ------------------------- | | check-ok-file | 是否检查ok文件 | 1. 此处标志性文件为ok文件,以判断ok是否存在进行数据文件的读取。 2. 标志性文件文件名须与数据文件名保持一致 eg: 标志性文件:DM_T_SYS_USER_01_20220508.ok , 数据文件:DM_T_SYS_USER_01_20220508.tar.gz。 3. 标志性文件需要与数据文件在同一个目录下。 ## 流说明 | 值 | 说明 | | ------------ | ------------------------------------------------------------ | | stream | 并行的流式处理,会按照文件绝对路径的hash%并行度,将文件分配给不同的线程来进行读取collect操作 | | streamSingle | 单线程读取文件,按照文件的修改时间来顺序读取 | 1. 如果一次性读取一个或者多个文件建议使用stream。 2. 如果一次性读取一个可以使用streamSingle,但建议使用stream 。 ## 监控说明: | 值 | 说明 | | ------------ | ------------------------------------------------------------ | | is-metric | 是否开启监控, 若开启监控需要配置此jar包中的conf.properties文件 | **sql为监控的建表语句** ```sql create table flink_metric_count( id int(11) primary key not null auto_increment, status varchar(10) comment "状态:ok代表任务执行成功, err代表任务执行异常" , file_name varchar(255), thread_num int(2), count bigint, `create_time` timestamp NULL default CURRENT_TIMESTAMP ) ``` **jar包中`conf.properties`的配置信息** ``` jar包中conf.properties的配置 driver=com.mysql.cj.jdbc.Driver url=jdbc:mysql://mysqldb:3306/flink?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC name=root password=root:CK:123 table=flinksql_metric_count ``` ## 案例1: 读取tar.gz 文件 1. 测试数据(DM_T_TEST_01_20210806.bin ) ```text 1111111liuyi21Time past can’t be called back again2021-08-01 2222222chener21The only thing we have to fear is fear itself2021-08-02 3333333zhangsan21Have you somewhat to do tomorrow, do it today2021-08-03 4444444lisi 21Experience is the best teacher2021-08-04 5555555wangwu21To the world you may be one person, but to one person you may be the world.2021-08-05 6666666zhaoliu21Don't try so hard, the best things come when you least expect them to.2021-08-06 ``` ![image-20220524222532117](image/image-20220524222532117.png) 测试数据说明: 共6条测试数据,其中 **行分隔符号为\003\004\n** **列分隔符号为\007\003** 2. 编写flinksql ```sql CREATE TABLE ftp_source ( dm_id STRING, dm_name STRING, dm_age STRING, dm_motto STRING, dm_date STRING ) WITH ( 'connector' = 'sftp', 'hostname' = 'mysqldb', 'username' = 'root', 'password' = 'root', 'path' = '/root/kkk/DM_T_TEST_01_20210806.tar.gz', 'encoding'='GBK', 'is-metric'='true', 'format'='sftp-csv', 'sftp-csv.column-delimiter'='007&&003', 'sftp-csv.line-delimiter' = '003&&004', 'sftp-csv.ignore-parse-errors'='true' ); CREATE TABLE print ( dm_id STRING, dm_name STRING, dm_age STRING, dm_motto STRING, dm_date STRING ) WITH ( 'connector' = 'print' ); insert into print select * from ftp_source; ``` 提交此flinksql ```shell bin/flink run \ --yarnapplicationId application_1653398079103_0001 \ -c com.chinaums.Submit \ ./bin/flinksqlsubmit-1.0.jar \ -sql /home/chneki/documents/flinksql/DM_T_TEST.sql \ -yarnSessionId application_1653398079103_0001 ``` 查看结果: 6条数据 ![image-20220524212929852](image/image-20220524212929852.png) 查看监控: 6条数据 ![image-20220524213502796](image/image-20220524213502796.png) ## 案例2: 读取 zip 文件 ```sql ------ DM_T_SYS_USER -- CREATE TABLE ftp_source ( seq_no string, runtime_id string, uuid string ) WITH ( 'connector' = 'sftp', 'hostname' = 'mysqldb', 'username' = 'root', 'password' = 'root', 'path' = '/root/kkk/`TEST_TEST_2022[0-9]{4}.txt.zip`', 'encoding'='GBK', 'is-metric'='false', 'format'='sftp-csv', 'sftp-csv.column-delimiter'='`\|`', 'sftp-csv.ignore-parse-errors'='true' ); CREATE TABLE sink ( seq_no string, runtime_id string, uuid string ) WITH ( 'connector' = 'print' ); insert into sink select * from ftp_source; ```