Flink执行流程
架构层面
从集群角色看
Flink中有三个角色:Client、JobManager、TaskManager。
- Client将作业提交给JobManager
- JobManager将作业分发给TaskManager
- TaskManager负责执行作业
从执行图角度看
Flink集群中有四层图:StreamGraph、JobGrap、ExecutionGrap、物理执行图。
- Client程序执行excute()时生成StreamGraph,Client将StreamGraph转换为JobGraph,并提交给JobManager
- JobManager将JobGraph拆分为ExecutionGraph,并将ExecutionGraph转换为物理执行图,最后发送给TaskManager
- TaskManager启动task执行
源码层面
Local
LocalExecutor执行job时通过构建一个MiniCluster来完成job的执行,job的流通过程大致如下:
# Client提交Job
env.execute('<job name>')
--> StreamGraphGenerator.generate(env)
--> PipelineExecutorUtils.getJobGraph(streamGraph)
--> miniCluster.start()
--> startTaskManagers()
--> resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever)
--> dispatcherLeaderRetriever.start(dispatcherGatewayRetriever)
--> dispatcherGateway.submitJob(jobGraph)
# Jobmanager运行Job
--> MiniCluster.runJobBlocking(jobGraph)
--> MiniClusterDispatcher.runJobBlocking(jobGraph)
--> MiniClusterDispatcher.startJobRunners
--> JobManagerRunner.start
--> JobMaster.<init> (build ExecutionGraph)
--> JobMaster.start
--> JobMaster.startJobExecution(这里还没开始执行呢..)
--> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
--> ResourceManagerLeaderListener.notifyLeaderAddress
--> JobMaster.notifyOfNewResourceManagerLeader
--> ResourceManagerConnection.start
--> ResourceManagerConnection.onRegistrationSuccess(callback,由flink rpc框架发送并回调)
--> JobMaster.onResourceManagerRegistrationSuccess
--> createDeploymentDescriptor
# TaskManager执行Job
--> TaskExecutor.submitTask
--> new Task()
--> task.startTaskThread()
--> Task.run()
--> StreamTask.invoke()
--> OneInputStreamTask.init()
--> StreamTask.processInput()
--> StreamOneInputProcessor.processInput()
--> StreamTaskNetworkInput.emitNext()
--> OneInputStreamTask.emitRecord()
- 生成JobGraph:
- 用户编写的程序通过env对StreamExecutionEnvironment进行赋值
- StreamExecutionEnvironment通过对StreamGraphGenerator进行赋值并调用,生成StreamGraph
- 通过pipeline将StreamGraph转换为JobGraph
- 启动MiniCluster:启动MiniCluster,MiniCluster一共启动了三个角色TaskManager、ResourceManager、Dispatcher
- 提交JobGraph:Client利用Dispatcher提交JobGraph
- 接收JobGraph:ResourceManager利用DIspatcher接收JobGraph
- 启动JobMaster:运行JobGraph时先创建了JobManagerRunner 、JobMaster;创建JobMaster的时候转换了ExecutionGraph;随后启动了JobManagerRunner、JobMaster
- ExecutionGraph转换为物理执行图:createDeploymentDescriptor()将ExecutionGraph转换为物理执行图,并通过RPC将物理执行图发送给TaskManager
- 执行Job:TaskExecutor.submitTask接收物理执行图,创建task并开始运行,调用StreamTaskNetworkInput的processElement循环不断的处理数据。
参考:
TaskManager端的执行过程:https://developer.aliyun.com/article/225618
TaskManager如何执行用户逻辑:https://www.cnblogs.com/ljygz/p/11504220.html
Standalone
集群将启动Client、JobManager、TaskManager,启动脚本分别为flink、jobmanager.sh、taskmanager.sh。
Clients
查看flink
脚本,client的入口类是org.apache.flink.client.cli.CliFrontend。
主要调用的类有
- flink-clients包中cli的CliFrontend
- flink-clients包中program的ClusterClient
- flink-streaming-java包中graph的StreamGraphGenerator
- flink-streaming-java包中environment的StreamContextEnvironment
- flink-optimizer的plantranslate包中JobGraphGenerator
执行流程
- 执行CliFrontend.run()、CliFrontend.runProgram()、CliFrontend.executeProgram(),调用ClusterClient.run()
- 执行ClusterClient.run()调用PackagedProgram.invokeInteractiveModeForExecution(),反射调用自己编写的程序中的main函数
- 执行StreamExecutionEnvironment.execute(),调用StreamGraphGenerator.generate()生成StreamGraph
- 执行StreamExecutionEnvironment.execute(),实际调用的是StreamContextEnvironment.execute()方法,调用ClusterClient.run方法给ContextEnvironment的实例ctx赋值
- 执行ClusterClient.run(),调用JobGraphGenerator.compileJobGraph()生成JobGraph
- 执行ClusterClient.run(),调用RestClusterClient.submitJob()方法,将jobGraph用rest请求提交到JobManager,至此客户端提交完成。
JobManager
查看jobmanager.sh
,实际执行了flink-daemon.sh
jobmanager的入口类是org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
主要调用的类有
- flink-runtime包中entrypoint的StandaloneSessionClusterEntrypoint
- flink-runtime包中的entrypoint的ClusterEntrypoint
flink的jobmanager的源码在flink-runtime
包中,主要看entrypoint、dispatcher、resourcemanager
执行流程
- 执行ClusterEntrypoint.runClusterEntrypoint(entrypoint),调用clusterEntrypoint.startCluster()启动集群
- 执行runCluster(configuration),调用initializeServices(configuration)和dispatcherResourceManagerComponentFactory.create()进行初始化
- 执行AbstractDispatcherResourceManagerComponentFactory.create(),调用resourceManagerFactory.createResourceManager()创建ResourceManager
- 执行StandaloneResourceManagerFactory.createResourceManager(),调用ResourceManagerRuntimeServices.fromConfiguration(),new StandaloneResourceManager()进行初始化
- 执行AbstractDispatcherResourceManagerComponentFactory.create(),调用resourceManager.start()和dispatcher.start()启动resourceManager和dispatcher,dispatcher.start()将传入的rpcService启动起来了,等待接受来自Driver端提交上来的JobGraph
- 执行LegacyScheduler.createAndRestoreExecutionGraph(),调用ExecutionGraph newExecutionGraph = createExecutionGraph()生成executionGraph
- 执行Execution.deploy(),调用final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory生成TaskDeploymentDescriptor用于调度TaskManager端任务的tdd对象
TaskManager
查看taskmanager.sh
脚本,实际执行了flink-daemon.sh
taskmanager的入口类是org.apache.flink.runtime.taskexecutor.TaskManagerRunner
执行流程
- 执行TaskManagerRunner的runTaskManager(),调用taskManagerRunner.start()启动taskManager
- taskManager实现了TaskExecutor,TaskExecutor实现了submitTask接收tdd
- Task task = new Task()创建Task
总结
- Client将用户代码转换成StreamGraph,再转换成JobGraph后提交给JobManager。
- JobManager启动后会在Dispatcher.java起来RPC方法submitJob(jobGraph),用于接收来自Client转化得到的JobGraph并传换成ExecutionGraph发送给taskManager
- TaskManagerRunner利用submitTask接收tdd,并执行
参考:
https://blog.csdn.net/weixin_43161811/article/details/103152867
https://blog.csdn.net/qq475781638/article/details/90707260
https://www.cnblogs.com/ljygz/tag/flink/