11 Star 53 Fork 16

Sisyphus / flink-batch-stream

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Flink执行流程.md 7.09 KB
一键复制 编辑 原始数据 按行查看 历史
Sisyphus 提交于 2020-12-31 17:34 . flink

Flink执行流程

架构层面

从集群角色看

Flink中有三个角色:Client、JobManager、TaskManager。

  1. Client将作业提交给JobManager
  2. JobManager将作业分发给TaskManager
  3. TaskManager负责执行作业

从执行图角度看

Flink集群中有四层图:StreamGraph、JobGrap、ExecutionGrap、物理执行图。

  1. Client程序执行excute()时生成StreamGraph,Client将StreamGraph转换为JobGraph,并提交给JobManager
  2. JobManager将JobGraph拆分为ExecutionGraph,并将ExecutionGraph转换为物理执行图,最后发送给TaskManager
  3. TaskManager启动task执行

源码层面

Local

LocalExecutor执行job时通过构建一个MiniCluster来完成job的执行,job的流通过程大致如下:

# Client提交Job
env.execute('<job name>')
  --> StreamGraphGenerator.generate(env)
  --> PipelineExecutorUtils.getJobGraph(streamGraph)
  --> miniCluster.start()
  	--> startTaskManagers()
    --> resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever)
	--> dispatcherLeaderRetriever.start(dispatcherGatewayRetriever)
  --> dispatcherGateway.submitJob(jobGraph)
# Jobmanager运行Job  
  --> MiniCluster.runJobBlocking(jobGraph)
  --> MiniClusterDispatcher.runJobBlocking(jobGraph)
  --> MiniClusterDispatcher.startJobRunners
    --> JobManagerRunner.start
    --> JobMaster.<init> (build ExecutionGraph)
  --> JobMaster.start 
    --> JobMaster.startJobExecution(这里还没开始执行呢..)
    --> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); 
  --> ResourceManagerLeaderListener.notifyLeaderAddress
    --> JobMaster.notifyOfNewResourceManagerLeader
    --> ResourceManagerConnection.start
    --> ResourceManagerConnection.onRegistrationSuccess(callback,由flink rpc框架发送并回调)
    --> JobMaster.onResourceManagerRegistrationSuccess
  --> createDeploymentDescriptor
# TaskManager执行Job
  --> TaskExecutor.submitTask
  	--> new Task()
  	--> task.startTaskThread()
  	--> Task.run()
  	--> StreamTask.invoke()
  	--> OneInputStreamTask.init()
  	--> StreamTask.processInput()
  	--> StreamOneInputProcessor.processInput()
  	--> StreamTaskNetworkInput.emitNext()
  	--> OneInputStreamTask.emitRecord()
  1. 生成JobGraph:
    • 用户编写的程序通过env对StreamExecutionEnvironment进行赋值
    • StreamExecutionEnvironment通过对StreamGraphGenerator进行赋值并调用,生成StreamGraph
    • 通过pipeline将StreamGraph转换为JobGraph
  2. 启动MiniCluster:启动MiniCluster,MiniCluster一共启动了三个角色TaskManager、ResourceManager、Dispatcher
  3. 提交JobGraph:Client利用Dispatcher提交JobGraph
  4. 接收JobGraph:ResourceManager利用DIspatcher接收JobGraph
  5. 启动JobMaster:运行JobGraph时先创建了JobManagerRunner 、JobMaster;创建JobMaster的时候转换了ExecutionGraph;随后启动了JobManagerRunner、JobMaster
  6. ExecutionGraph转换为物理执行图:createDeploymentDescriptor()将ExecutionGraph转换为物理执行图,并通过RPC将物理执行图发送给TaskManager
  7. 执行Job:TaskExecutor.submitTask接收物理执行图,创建task并开始运行,调用StreamTaskNetworkInput的processElement循环不断的处理数据。

参考:

TaskManager端的执行过程:https://developer.aliyun.com/article/225618

TaskManager如何执行用户逻辑:https://www.cnblogs.com/ljygz/p/11504220.html

Standalone

集群将启动Client、JobManager、TaskManager,启动脚本分别为flink、jobmanager.sh、taskmanager.sh。

Clients

查看flink脚本,client的入口类是org.apache.flink.client.cli.CliFrontend。

主要调用的类有

  • flink-clients包中cli的CliFrontend
  • flink-clients包中program的ClusterClient
  • flink-streaming-java包中graph的StreamGraphGenerator
  • flink-streaming-java包中environment的StreamContextEnvironment
  • flink-optimizer的plantranslate包中JobGraphGenerator

执行流程

  1. 执行CliFrontend.run()、CliFrontend.runProgram()、CliFrontend.executeProgram(),调用ClusterClient.run()
  2. 执行ClusterClient.run()调用PackagedProgram.invokeInteractiveModeForExecution(),反射调用自己编写的程序中的main函数
  3. 执行StreamExecutionEnvironment.execute(),调用StreamGraphGenerator.generate()生成StreamGraph
  4. 执行StreamExecutionEnvironment.execute(),实际调用的是StreamContextEnvironment.execute()方法,调用ClusterClient.run方法给ContextEnvironment的实例ctx赋值
  5. 执行ClusterClient.run(),调用JobGraphGenerator.compileJobGraph()生成JobGraph
  6. 执行ClusterClient.run(),调用RestClusterClient.submitJob()方法,将jobGraph用rest请求提交到JobManager,至此客户端提交完成。

JobManager

查看jobmanager.sh,实际执行了flink-daemon.sh

jobmanager的入口类是org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

主要调用的类有

  • flink-runtime包中entrypoint的StandaloneSessionClusterEntrypoint
  • flink-runtime包中的entrypoint的ClusterEntrypoint

flink的jobmanager的源码在flink-runtime包中,主要看entrypoint、dispatcher、resourcemanager

执行流程

  1. 执行ClusterEntrypoint.runClusterEntrypoint(entrypoint),调用clusterEntrypoint.startCluster()启动集群
  2. 执行runCluster(configuration),调用initializeServices(configuration)和dispatcherResourceManagerComponentFactory.create()进行初始化
  3. 执行AbstractDispatcherResourceManagerComponentFactory.create(),调用resourceManagerFactory.createResourceManager()创建ResourceManager
  4. 执行StandaloneResourceManagerFactory.createResourceManager(),调用ResourceManagerRuntimeServices.fromConfiguration(),new StandaloneResourceManager()进行初始化
  5. 执行AbstractDispatcherResourceManagerComponentFactory.create(),调用resourceManager.start()和dispatcher.start()启动resourceManager和dispatcher,dispatcher.start()将传入的rpcService启动起来了,等待接受来自Driver端提交上来的JobGraph
  6. 执行LegacyScheduler.createAndRestoreExecutionGraph(),调用ExecutionGraph newExecutionGraph = createExecutionGraph()生成executionGraph
  7. 执行Execution.deploy(),调用final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory生成TaskDeploymentDescriptor用于调度TaskManager端任务的tdd对象

TaskManager

查看taskmanager.sh脚本,实际执行了flink-daemon.sh

taskmanager的入口类是org.apache.flink.runtime.taskexecutor.TaskManagerRunner

执行流程

  1. 执行TaskManagerRunner的runTaskManager(),调用taskManagerRunner.start()启动taskManager
  2. taskManager实现了TaskExecutor,TaskExecutor实现了submitTask接收tdd
  3. Task task = new Task()创建Task

总结

  • Client将用户代码转换成StreamGraph,再转换成JobGraph后提交给JobManager。
  • JobManager启动后会在Dispatcher.java起来RPC方法submitJob(jobGraph),用于接收来自Client转化得到的JobGraph并传换成ExecutionGraph发送给taskManager
  • TaskManagerRunner利用submitTask接收tdd,并执行

参考:

https://blog.csdn.net/weixin_43161811/article/details/103152867

https://blog.csdn.net/qq475781638/article/details/90707260

https://www.cnblogs.com/ljygz/tag/flink/

Scala
1
https://gitee.com/sweetdream/flink-batch-stream.git
git@gitee.com:sweetdream/flink-batch-stream.git
sweetdream
flink-batch-stream
flink-batch-stream
master

搜索帮助