# flink-jobs **Repository Path**: wyspgbj/flink-jobs ## Basic Information - **Project Name**: flink-jobs - **Description**: Flink流批一体数据处理快速集成开发框架,玩转flink - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 159 - **Created**: 2021-10-12 - **Last Updated**: 2021-10-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-jobs ### 介绍 flink-jobs为基于Flink的Java应用程序提供快速集成的能力,可通过继承FlinkJobsRunner快速构建基于Java的Flink流批一体应用程序。还可以通过使用[flink-jobs-launcher](https://gitee.com/tenmg/flink-jobs-launcher),实现基于Java API启动flink-jobs应用程序。 ### 起步 以基于SpringBoot的Maven项目为例 1. pom.xml添加依赖(Flink等其他相关依赖此处省略),${flink-jobs.version}为版本号,可定义属性或直接使用版本号替换 ``` cn.tenmg flink-jobs ${flink-jobs.version} ``` 2. 配置文件application.properties ``` bootstrap.servers=192.168.10.40:9092,192.168.10.78:9092,192.168.10.153:9092 topics=topic1,topic2 auto.offset.reset=latest group.id=flink-jobs ``` 3. 编写配置类 ``` @Configuration @PropertySource(value = "application.properties") public class Context { @Bean public Properties kafkaProperties(@Value("${bootstrap.servers}") String servers, @Value("${auto.offset.reset}") String autoOffsetReset, @Value("${group.id}") String groupId) { Properties kafkaProperties = new Properties(); kafkaProperties.put("bootstrap.servers", servers); kafkaProperties.put("auto.offset.reset", autoOffsetReset); kafkaProperties.put("group.id", groupId); return kafkaProperties; } } ``` 4. 编写应用入口类 ``` @ComponentScan("com.sinochem.flink.jobs") public class App extends FlinkJobsRunner implements CommandLineRunner { @Autowired private ApplicationContext springContext; @Override protected StreamService getStreamService(String serviceName) { return (StreamService) springContext.getBean(serviceName); } public static void main(String[] args) { SpringApplication.run(App.class, args); } } ``` 5. 编写Flink流批一体服务 ``` @Service public class HelloWorldService implements StreamService { /** * */ private static final long serialVersionUID = -6651233695630282701L; @Autowired private Properties kafkaProperties; @Value("${topics}") private String topics; @Override public void run(StreamExecutionEnvironment env, Arguments arguments) throws Exception { DataStream stream; if (RuntimeExecutionMode.STREAMING.equals(arguments.getRuntimeMode())) { stream = env.addSource(new FlinkKafkaConsumer(Arrays.asList(topics.split(",")), new SimpleStringSchema(), kafkaProperties)); } else { stream = env.fromElements("Hello, World!"); } stream.print(); } } ``` 6. 到此,一个flink-jobs应用程序已完成,他可以通过各种方式运行。 - 在IDE环境中,可直接运行App类启动flink-jobs应用程序; - 也可打包后,通过命令行提交给flink集群执行(通常在pom.xml配置org.apache.maven.plugins.shade.resource.ManifestResourceTransformer的mainClass为App这个类,请注意是完整类名):`flink run /yourpath/yourfile.jar "{\"serviceName\":\"yourServiceName\"}"`,更多运行参数详见[运行参数](https://gitee.com/tenmg/flink-jobs#%E8%BF%90%E8%A1%8C%E5%8F%82%E6%95%B0arguments)。 - 此外,通过使用[flink-jobs-launcher](https://gitee.com/tenmg/flink-jobs-launcher)可以通过Java API的方式启动flink-jobs应用程序,这样启动操作就可以轻松集成到其他系统中(例如Java Web程序)。 ### 快速入门 详见https://gitee.com/tenmg/flink-jobs-quickstart ### 运行参数(arguments) flink-jobs应用程序的运行参数通过JSON格式的字符串(注意,如果是命令行运行,JSON格式字符串前后需加上双引号或单引号,JSON格式字符串内部的双引号或单引号则需要转义)或者一个.json文件提供,结构如下: ``` { "serviceName": "specifyName", "runtimeMode": "BATCH"/"STREAMING"/"AUTOMATIC", "params": { "key1": "value1", "key2": "value2", … }, "operates": [{ "script": "specifySQL", "type": "ExecuteSql" }, { "dataSource": "kafka", "script": "specifySQL", "type": "ExecuteSql" }, { "saveAs": "specifyTemporaryTableName", "catalog": "specifyCatalog", "script": "specifySQL", "type": "SqlQuery" }, … ] } ``` 属性 | 类型 | 必需 | 说明 ------------|--------------------|----|-------- serviceName | `String` | 否 | 运行的服务名称。该名称由用户定义并实现根据服务名称获取服务的方法,flink-jobs则在运行时调用并确定运行的实际服务。在运行SQL任务时,通常指定operates,而无需指定serviceName。 runtimeMode | `String` | 否 | 运行模式。可选值:"BATCH"/"STREAMING"/"AUTOMATIC",相关含义详见[Flink](https://flink.apache.org)官方文档。 params | `Map` | 否 | 参数查找表。通常可用于SQL中,也可以在自定义服务中通过arguments参数获取。 operates | `List` | 否 | 操作列表。目前支持类型为[Bsh](#bsh%E6%93%8D%E4%BD%9C)、[ExecuteSql](#executesql%E6%93%8D%E4%BD%9C)、[SqlQuery](#sqlquery%E6%93%8D%E4%BD%9C)和[Jdbc](#jdbc%E6%93%8D%E4%BD%9C)四种操作。 #### Bsh操作 Bsh操作的作用是运行基于Beanshell的java代码,支持版本:1.1.0+,相关属性及说明如下: 属性 | 类型 | 必需 | 说明 -------|-------------|----|-------- type | `String` | 是 | 操作类型。这里是"Bsh"。 saveAs | `String` | 否 | 操作结果另存为一个新的变量的名称。变量的值是基于Beanshell的java代码的返回值(通过`return xxx;`表示)。 vars | `List` | 否 | 参数声明列表。 java | `String` | 是 | java代码。注意:使用泛型时,不能使用尖括号声明泛型。例如,使用Map不能使用“Map map = new HashMap();”,但可以使用“Map map = new HashMap();”。 ##### Var 属性 | 类型 | 必需 | 说明 ------|----------|----|-------- name | `String` | 是 | Beanshell中使用的变量名称 value | `String` | 否 | 变量对应的值的名称。默认与name相同。flink-jobs会从参数查找表中查找名称为value值的参数值,如果指定参数存在且不是null,则该值作为该参数的值;否则,使用value值作为该变量的值。 #### ExecuteSql操作 ExecuteSql操作的作用是运行基于[DSL](https://gitee.com/tenmg/dsl)的SQL代码,支持版本:1.1.0+,相关属性及说明如下: 属性 | 类型 | 必需 | 说明 -----------|----------|----|-------- type | `String` | 是 | 操作类型。这里是"ExecuteSql"。 saveAs | `String` | 否 | 操作结果另存为一个新的变量的名称。变量的值是flink的`tableEnv.executeSql(statement);`的返回值。 dataSource | `String` | 否 | 使用的数据源名称。 catalog | `String` | 否 | 执行SQL使用的Flink SQL的catalog名称。 script | `String` | 是 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。由于Flink SQL不支持DELETE、UPDATE语句,因此如果配置的SQL脚本是DELETE或者UPDATE语句,该语句将在程序main函数中采用JDBC执行。 #### SqlQuery操作 SqlQuery操作的作用是运行基于[DSL](https://gitee.com/tenmg/dsl)的SQL查询代码,支持版本:1.1.0+,相关属性及说明如下: 属性 | 类型 | 必需 | 说明 -----------|--------|----|-------- saveAs | `String` | 否 | 查询结果另存为临时表的表名及操作结果另存为一个新的变量的名称。变量的值是flink的`tableEnv.executeSql(statement);`的返回值。 catalog | `String` | 否 | 执行SQL使用的Flink SQL的catalog名称。 script | `String` | 是 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。 #### Jdbc操作 Jdbc操作的作用是运行基于[DSL](https://gitee.com/tenmg/dsl)的JDBC SQL代码,支持版本:1.1.1+,相关属性及说明如下: 属性 | 类型 | 必需 | 说明 -----------|----------|----|-------- type | `String` | 是 | 操作类型。这里是"Jdbc"。 saveAs | `String` | 否 | 执行结果另存为一个新的变量的名称。变量的值是执行JDBC指定方法的返回值。 dataSource | `String` | 是 | 使用的数据源名称。 method | `String` | 否 | 调用的JDBC方法。默认是"executeLargeUpdate"。 script | `String` | 是 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。 目标JDBC SQL代码是在flink-jobs应用程序的main函数中运行的。 #### DataSync操作 DataSync操作的作用是运行基于Flink SQL的流式任务实现数据同步,支持版本:1.1.2+,相关属性及说明如下: 属性 | 类型 | 必需 | 说明 -----------|----------------|----|-------- type | `String` | 是 | 操作类型。这里是"DataSync"。 saveAs | `String` | 否 | 执行结果另存为一个新的变量的名称。变量的值是执行`INSERT`语句返回的`org.apache.flink.table.api.TableResult`对象。 from | `String` | 是 | 来源数据源名称。目前仅支持Kafka数据源。 topic | `String` | 否 | Kafka主题。也可在fromConfig中配置`topic=xxx`。 fromConfig | `String` | 否 | 来源配置。例如:`properties.group.id=flink-jobs`。 to | `String` | 是 | 目标数据源名称,目前仅支持JDBC数据源。 toConfig | `String` | 是 | 目标配置。例如:`sink.buffer-flush.max-rows = 0`。 table | `String` | 是 | 同步数据表名。 columns | `List` | 否 | 同步数据列。当开启智能模式时,会自动获取列信息。 primaryKey | `String` | 否 | 主键,多个列名以“,”分隔。当开启智能模式时,会自动获取主键信息。 smart | `Boolean` | 否 | 是否开启智能模式。不设置时,根据全局配置确定是否开启智能模式,全局默认配置为`data.sync.smart=true`。 ##### Column 属性 | 类型 | 必需 | 说明 ---------|----------|----|-------- fromName | `String` | 是 | 来源列名。 fromType | `String` | 否 | 来源数据类型。如果缺省,则如果开启智能模式会自动获取目标数据类型作为来源数据类型,如果关闭智能模式则必填。 toName | `String` | 否 | 目标列名。默认为来源列名。 toType | `String` | 否 | 目标列数据类型。如果缺省,则如果开启智能模式会自动获取,如果关闭智能模式则默认为来源列数据类型。 script | `String` | 否 | 自定义脚本。通常是需要进行函数转换时使用。 ### 配置文件 默认的配置文件为flink-jobs.properties(注意:需在classpath下),可通过flink-jobs-context-loader.properties配置文件的`config.location`修改配置文件路径和名称。配置项的值允许通过占位符`${}`引用,例如`key=${anotherKey}`。 #### 数据源配置 每个数据源有一个唯一的命名,数据源配置以“datasource”为前缀,以“.”作为分隔符,格式为`datasource.${name}.${key}=${value}`。其中,第一和第二个“.”符号之间的是数据源名称,第二个“.”符号之后和“=”之前的是该数据源具体的配置项,“=”之后的是该配置项的值。数据源的配置项与[Flink](https://flink.apache.org)保持一致,具体配置项详见[Flink官方文档](https://flink.apache.org)。以下给出部分常用数据源配置示例: ``` #FlinkSQL数据源配置 #Debezium #配置名称为kafka的数据源 datasource.kafka.connector=kafka datasource.kafka.properties.bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092 datasource.kafka.properties.group.id=flink-jobs datasource.kafka.scan.startup.mode=earliest-offset datasource.kafka.format=debezium-json datasource.kafka.debezium-json.schema-include=true #PostgreSQL #配置名称为bidb的数据源 datasource.bidb.connector=jdbc datasource.bidb.driver=org.postgresql.Driver datasource.bidb.url=jdbc:postgresql://192.168.1.104:5432/bidb datasource.bidb.username=your_name datasource.bidb.password=your_password #引用配置文件内的另一个配置 #配置名称为syndb的数据源 datasource.syndb.connector=${datasource.bidb.connector} datasource.syndb.driver=${datasource.bidb.driver} datasource.syndb.url=${datasource.bidb.url}?currentSchema=syndb datasource.syndb.username=${datasource.bidb.username} datasource.syndb.password=${datasource.bidb.password} #MySQL #配置名称为kaorder的数据源 datasource.kaorder.connector=jdbc datasource.kaorder.driver=com.mysql.cj.jdbc.Driver datasource.kaorder.url=jdbc:mysql://192.168.1.105:3306/kaorder?useSSL=false&serverTimezone=Asia/Shanghai datasource.kaorder.username=your_name datasource.kaorder.password=your_password #SQLServer #配置名称为sqltool的数据源 datasource.sqltool.connector=jdbc datasource.sqltool.driver=org.postgresql.Driver datasource.sqltool.url=jdbc:sqlserver://192.168.1.106:1433;DatabaseName=sqltool; datasource.sqltool.username=your_name datasource.sqltool.password=your_password #Hive #配置名称为hivedb的数据源 datasource.hivedb.type=hive datasource.hivedb.default-database=default datasource.hivedb.hive-conf-dir=/etc/hive/conf ``` #### Table API & SQL [Flink](http://)的Table API & SQL配置除了在Flink配置文件中指定之外,也可以在flink-jobs的配置文件中指定。例如: `table.exec.sink.not-null-enforcer=drop` 注意:如果是在flink-jobs的配置文件中配置这些参数,当执行自定义Java服务时,只有通过`FlinkJobsContext.getOrCreateStreamTableEnvironment()`或`FlinkJobsContext.getOrCreateStreamTableEnvironment(env)`方法获取的`StreamTableEnvironment`执行Table API & SQL,这些配置才会生效。 ### 系统集成 [flink-jobs-launcher](https://gitee.com/tenmg/flink-jobs-launcher)实现了使用XML配置文件来管理flink-jobs任务,这样开发Flink SQL任务会显得非常简单;同时,用户自定义的flink-jobs服务也可以被更轻松得集成到其他系统中。XML文件具有良好的可读性,并且在IDE环境下能够对配置进行自动提示。具体使用方法详见[flink-jobs-launcher开发文档](https://gitee.com/tenmg/flink-jobs-launcher),以下介绍几种通过XML管理的flink-jobs任务: #### 运行自定义服务 以下为一个自定义服务任务XML配置文件: ``` ``` #### 运行批处理SQL 以下为一个简单订单量统计SQL批处理任务XML配置文件: ``` 2021-01-01 2021-07-01 = :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date) ]]> = :beginDate and stats_date < :endDate ]]> ``` #### 运行流处理SQL 以下为通过Debezium实现异构数据库同步任务XML配置文件: ``` ``` ### 发布计划 计划将在1.1.2中发布以下功能: 标签 | 功能 | 说明 -----------|---------|-------- `DataSync` | 数据同步 | 实现基于Debezuim的数据同步,以便简化通过`ExecuteSql`实现的数据同步功能。 组件升级 | DSL升级 | 升级dsl到1.2.0,增强注释支持,注释中的所有内容(包括参数表达式、动态片段)原样保留。 ### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request ### 相关链接 flink-jobs-launcher开源地址:https://gitee.com/tenmg/flink-jobs-launcher DSL开源地址:https://gitee.com/tenmg/dsl Flink官网:https://flink.apache.org Debezuim官网:https://debezium.io