代码拉取完成,页面将自动刷新
package com.newFlink.submit.remote;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Optional;
/**
* @Author gcwel
* @Description
* @Date 2021/11/17
*/
public class Submit2Remote {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);
String kafkaFK = "CREATE TABLE test_fk ( " +
" `id` BIGINT, " +
" `num` INT, " +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp' " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'TEST_FK', " +
" 'properties.bootstrap.servers' = 'gcw1:9092', " +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'csv' " +
")";
String mysqlFK = " CREATE TABLE test_demo ( " +
" id BIGINT, " +
" ct_num BIGINT, " +
" submit_time TIMESTAMP(3) , " +
" PRIMARY KEY (id) NOT ENFORCED " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://gcw3:3306/test', " +
" 'table-name' = 'test_demo', " +
" 'username' = 'root', " +
" 'password' = '123456' " +
")";
stEnv.executeSql(kafkaFK);
stEnv.executeSql(mysqlFK);
TableResult tableResult = stEnv.executeSql("insert into test_demo select id,sum(num),max(ts) from test_fk group by id");
//获取任务id
Optional<JobClient> jobClient = tableResult.getJobClient();
JobClient jobClient1 = jobClient.get();
JobID jobID = jobClient1.getJobID();
System.out.println(jobID);
//多Insert语句可以使用StatementSet
//stEnv.createStatementSet();
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。