1 Star 0 Fork 19

Steven / flink-sprint-boot_1

forked from imomoda / flink-sprint-boot 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Flink框架:Flink整合springboot

首先说一下, 为什么flink 需要集成flink, spring boot给我们带来了更好的框架整合, 同时使用spring的DI和IOC,能更好的使用bean,当然直接使用spring 整合也是一样。

实现原理

实现原理, spring 的启动 一般使用 AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(AppConfig.class); 即可启动spring 容器, 对么spring boot 呢, 看过源码的人或许知道

SpringApplication.run(arge); 只需要在启动flink之前启动sping boot 即可。

代码

​ flink 整合spring boot 以及redission, 并将事件的id放入redis 中, 代码库 https://gitee.com/imomoda/flink-sprint-boot

  • spring boot 启动工具类

    @SpringBootApplication(scanBasePackages = {"io.github.jeesk.flink"})
    @Import(SpringUtil.class)
    @Slf4j
    @EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
    public class SpringBootApplicationUtil {
    
    
        static SpringApplication springBootApplication = null;
        static SpringApplicationBuilder springApplicationBuilder = null;
    
        public static synchronized void run(String[] arge) {
            if (springBootApplication == null) {
                StandardEnvironment standardEnvironment = new StandardEnvironment();
                MutablePropertySources propertySources = standardEnvironment.getPropertySources();
                propertySources.addFirst(new SimpleCommandLinePropertySource(arge));
                String startJarPath = SpringBootApplicationUtil.class.getResource("/").getPath().split("!")[0];
                String[] activeProfiles = standardEnvironment.getActiveProfiles();
                propertySources.addLast(new MapPropertySource("systemProperties", standardEnvironment.getSystemProperties()));
                propertySources.addLast(new SystemEnvironmentPropertySource("systemEnvironment", standardEnvironment.getSystemEnvironment()));
                if (springBootApplication == null) {
                    springApplicationBuilder = new SpringApplicationBuilder(SpringBootApplicationUtil.class);
                    // 这里可以通过命令行传入
                    springApplicationBuilder.profiles("dev");
                    springApplicationBuilder.sources(SpringBootApplicationUtil.class).web(WebApplicationType.NONE);
                }
                springBootApplication = springApplicationBuilder.build();
                springBootApplication.run(arge);
            }
        }
    
    
    }
  • flink job

    package io.github.jeesk.flink;
    
    import cn.hutool.extra.spring.SpringUtil;
    import io.github.jeesk.flink.config.SpringBootApplicationUtil;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.walkthrough.common.entity.Alert;
    import org.apache.flink.walkthrough.common.entity.Transaction;
    import org.apache.flink.walkthrough.common.sink.AlertSink;
    import org.apache.flink.walkthrough.common.source.TransactionSource;
    import org.springframework.data.redis.core.StringRedisTemplate;
    
    public class FraudDetectionJob {
        public static void main(String[] args) throws Exception {
    
            Configuration configuration = new Configuration();
            if (args != null) {
                configuration.setString("args", String.join(" ", args));
            }
            SpringBootApplicationUtil.run(args);
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Transaction> transactions = env
                    .addSource(new TransactionSource())
                    .name("transactions");
    
            DataStream<Alert> alerts = transactions
                    .keyBy(Transaction::getAccountId)
                    .process(new FraudDetector())
                    .name("fraud-detector");
    
            alerts
                    .addSink(new AlertSink())
                    .name("send-alerts");
    
            env.execute("Fraud Detection");
        }
    
        static public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
    
            private StringRedisTemplate redisTemplate = null;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化bean
                super.open(parameters);
                SpringBootApplicationUtil.run(parameters.getString("arge", "").split(" "));
                redisTemplate = SpringUtil.getBean(StringRedisTemplate.class);
    
            }
    
            @Override
            public void processElement(
                    Transaction transaction,
                    Context context,
                    Collector<Alert> collector) throws Exception {
    
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());
                // 将id 放入redis 中
                redisTemplate.opsForSet().add("tmpKey", String.valueOf(alert.getId()));
                collector.collect(alert);
            }
        }
    }
    
  • flink 使用logback 还是log4j, 本demo 使用的是Logback , 需要做以下的处理

    1. 服务器端处理: flink 的安装目录下面放入logback 的包,log4j-over-slf4j-1.7.15.jar,logback-classic-1.2.3.jar,logback-core-1.2.3.jar ,
    2. 然后删除lib下面关于log4j的包 log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar), 如果不懂这些包的作用可以仔细阅读: JAVA 常见日志依赖处理细节
    3. 在代码的pom文件里面排除log4j的包
     	<dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-java</artifactId>
               <version>1.13.1</version>
               <!--排除log4j-->
               <exclusions>
                   <exclusion>
                       <groupId>log4j</groupId>
                       <artifactId>*</artifactId>
                   </exclusion>
                   <exclusion>
                       <groupId>org.slf4j</groupId>
                       <artifactId>slf4j-log4j12</artifactId>
                   </exclusion>
               </exclusions>
               <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java_2.12</artifactId>
               <version>1.13.1</version>
               <!--排除log4j-->
               <exclusions>
                   <exclusion>
                       <groupId>log4j</groupId>
                       <artifactId>*</artifactId>
                   </exclusion>
                   <exclusion>
                       <groupId>org.slf4j</groupId>
                       <artifactId>slf4j-log4j12</artifactId>
                   </exclusion>
               </exclusions>
               <!--<scope>provided</scope>-->
    </dependency>
    
  1. 如果想修改flink 的logback的日志文件 , 可以在flink的conf目录下面修改下面的三个文件

      logback-console.xml
      logback-session.xml
      logback.xml

参考内容

空文件

简介

flink 集成spring boot 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/phoenix_legend/flink-sprint-boot_1.git
git@gitee.com:phoenix_legend/flink-sprint-boot_1.git
phoenix_legend
flink-sprint-boot_1
flink-sprint-boot_1
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891