# Flink
**Repository Path**: naclnezn/flink
## Basic Information
- **Project Name**: Flink
- **Description**: Flink
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2024-08-12
- **Last Updated**: 2024-08-12
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Flink 1.17
## 一、 Flink 特点
- 1.批流统
- 同一套代码,可以跑流也可以跑批
- 同一个SQL,可以跑流也可以跑批
- 2.性能卓越
- 高吞吐
- 低时延
- 3.规模计算
- 支持水平扩展架构
- 支持超大状态与增量检查点机制
- 4.生态兼容
- 支持与Yarn集成
- 支持与Kubernetes集成
- 支持单机模式运行
- 5.高容错
- 故障自动重试
- 一致性检查点
- 保证故障场景下精确一次的状态一致性
## 1.1 Flink
Apache Flink是一个框架和分布式处理引擎,
用于对无界和有界数据流进行有状态计算。

- **无界数据流**:
- 有定义流的开始,但没有定义流的结束:
- 它们会无休止的产生数据:
- 无界流的数据必须持续处理,即数据被摄取后需要立刻处理。
- 我们不能等到所有数据都到达再处理,因为输入是无限的。
- **有界数据流**:
- 有定义流的开始,也有定义流的结束:
- 有界流可以在摄取所有数据后再进行计算:
- 有界流所有数据可以被排序,所以并不需要有序摄取:
- 有界流处理通常被称为批处理。
## 1.2 Flink vs SparkStreaming
### 1.2.1 Spark
Spark以批处理为根本
Spark数据模型:Spark采用RDD模型,Spark Streaming的DStream实际上也就是 一组组小批数据RDD的集合
Spark运行时架构: Spark是批计算,将DAG划分为不同的stage,
一个完成后才可以计算下一个

### 1.2.2 Flink
Flink以流处理为根本。
Flink据模型:Flink基本数据摸型是数据流,以及事件(Event)序列
Flink运行时架构:Fik是标佳的流执行模式,一个事件在个节点处理完后可以直接发往下一个节点进行处理

| | Flink | Spark |
|-------|-----------|--------------------|
| 计算模型 | 流计算 | 微批处理 |
| 时间语义 | 事件时间、处理时间 | 处理时间 |
| 窗口 | 多、灵活 | 少、不灵活(窗口必须是批次的整数倍) |
| 状态 | 有 | 没有 |
| 流式SQL | 有 | 没有 |
## 二、 部署
### 2.1 集群角色
- 客户端(Client):代码由客户端获取并做转换
,之后提交给JobManger
- JobManager:就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业
后,会进一步处理转换,然后分发任务给众多的TaskManager。
- TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的。
### 2.2 集群规划
| 服务器 | master | slave1 | slave1 |
|-----|------------------------------------------------------|--------------|--------------|
| ip: | 192.168.3.46 | 192.168.3.47 | 192.168.3.48 |
| 角色: | JobManagere
TaskManager | TaskManager | TaskManager |
### 2.3 安装
下载地址: https://mirrors.aliyun.com/apache/flink/
解压:
```shell
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
```
### 2.4 修改配置
进入conf路径,修改fink-conf.yaml文件
```shell
# JobManager节点地址(主节点)
jobmanager.rpc.address: master
jobmanager.bind-host: 0.0.0.0
rest.address: master
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
```
修改workers文件,指定 master、slave1 和 slave2 为TaskManager
```shell
master
slave1
slave2
```
修改masters文件
```shell
master:8081
```
## 2.5 启动 / 结束
在 master 上 输入 启动命令
```shell
bin/start-cluster.sh
```
输入jps
如果master 出现 StandaloneSessionClusterEntrypoint + TaskManagerRunner
slave1 slave2 出现 TaskManagerRunner
说明启动成功
结束命令
```shell
bin/stop-cluster.sh
```
## 2.6 其他
在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:
- jobmanager.memory.process.size:
- 对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
- taskmanager.memory.process.size:
- 对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。
- taskmanager.numberOfTaskSlots:
- 对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
- parallelism.default:Flink任务执行的并行度,默认为1。
- 优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
## 2.7 访问Web UI
http://master:8081/

# 3 提交作业
## 3.1 WebUI提交
将写好的本地运行的wordCount[SocketStreamWordCount.java](src%2Fmain%2Fjava%2Fcom%2Fnacl%2FwordCount%2FSocketStreamWordCount.java)
提交到Flink
官方推荐使用shade 打包 pom.xml已替换
1. 先在master启动netcat
```shell
# 7777 是端口
nc -lk 7777
```
2. 打包项目
3. 上传jar包

4. 配置运行信息

5. 查看作业

6. 看结果


## 3.2 命令行提交
将jar上传到服务器后
```shell
# -m指定了提交到的JobManager -c指定了入口类
bin/flink run -m hadoop102:8081 -c com.nacl.wordCount.SocketStreamWordCount ./Flink1.17-1.0.0.jar
```
之后和3.1一样
## 3.3 部署模式
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:
- 会话模式(Session Mode)
- 单作业模式(Per-Job Mode)
- 应用模式(Application Mode)
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。
### 3.3.1 会话模式(Session Mode)
会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话
,在这个会话中通过客
户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

会话模式比较适合于单个规模小、执行时间短的大量作业。
### 3.3.2 单作业模式(Per-Job Mode)
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的
作业启动一个集群(现提交作业现启动集群),这就是所谓的单作业(Per-Job)模式。

作业完成后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群
,比如YARN、Kubernetes(K8S)
### 3.3.3 应用模式(Application Mode)
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式
客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是
同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为
每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,
执行结束之后JobManager也就关闭了,这就是所谓的应用模式。

应用模式与单作业模式,都是提交作业之后才创建集群:单作业模式是通过客户端来提交的,客户端解析出的
每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的。
## 3.4 运行模式
### 3.4.1 Standalone运行模式
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果
资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立
模式一般只用在开发测试或作业非常少的场景下。
### 3.4.2 YARN运行模式
YARN上部署的过程是:客户端把Flink应用提交给Yarm的ResourceManager,
Yarm的 ResourceManager会向Yarn的NodeManager申请容器。
在这些容器上,Flink会部署 JobManager和TaskManager的实例,从而启动集群。
Flink会根据运行在JobManger上的作业 所需要的Slot数量动态分配TaskManager资源。
#### 3.4.2.1 配置
/etc/profile.d/
增加环境变量配置如下(在 master 上即可)
```shell
HADOOP_HOME=/opt/module/hadoop-3.3.4
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
```
```shell
source /etc/profile
```
#### 3.4.2.2 启动
```shell
cd /nacl/flink/flink-1.17.2
```
分离模式启动
```shell
bin/yarn-session.sh -nm test -d
```

单作业模式
将打包好的jar包放到lib下
```shell
bin/flink run -d -t yarn-per-job -c com.nacl.wordCount.SocketStreamWordCount lib/Flink1.17-1.0.0.jar
```
应用模式部署
```shell
bin/flink run-application -t yarn-application -c com.nacl.wordCount.SocketStreamWordCount lib/Flink1.17-1.0.0.jar
```
HDFS 上传jar运行
1)上传flink的lib和plugins到HDFS上(每次上面的方式单独运行一次都会去加载这些,不如直接放进去)
```shell
hadoop fs -mkdir /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist
```
2)上传自己的jar包到HDFS
```shell
hadoop fs -mkdir /flink-jars
hadoop fs -put lib/Flink1.17-1.0.0.jar /flink-jars
```
3)提交作业
master:9000 这个9000 得看hadoop的 core-site.xml 中的 fs.defaultFS 是怎么设置的
```shell
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://master:9000/flink-dist" -c com.nacl.wordCount.SocketStreamWordCount hdfs://master:9000/flink-jars/Flink1.17-1.0.0.jar
```
#### 3.5 历史服务器
运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,
不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。
如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,
所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。
Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。此外,它对外提供了 REST API,
它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,
JobManager 会将已经完成任务的统计信息进行存档,
History Server 进程则在任务停止后可以对任务统计信息进行查询。
比如:最后一次的 Checkpoint、任务运行时的相关配置
1. 创建存储目录
```shell
hadoop fs -mkdir -p /logs/flink-job
```
2. 在 flink-config.yaml中添加如下配置
```yaml
jobmanager.archive.fs.dir: hdfs://master:9000/logs/flink-job
historyserver.web.address: master
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://master:9000/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
```
3. 启动 / 暂停
```shell
bin/historyserver.sh start
bin/historyserver.sh stop
```
# 4 Flink 运行时架构

## 4.1 作业管理器(JobManager)
### 4.1.1 JobMaster
JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。所以JobMaster和具体的Job是一一对应的,多个Job可以同时运行在一个Flink集群中,
每个Job都有一个自己的JobMaster。
### 4.1.2 资源管理器(ResourceManager)
ResourceManager主要负责资源的分配和管理,在Flink 集群中只有一个。所谓“资源”,主要是指TaskManager的任务槽(task
slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。
### 4.1.3 分发器(Dispatcher)
Dispatcher主要负责提供一个REST接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster
组件。Dispatcher也会启动一个Web
UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中并不是必需的,在不同的部署模式下可能会被忽略掉
## 4.2 核心概念
### 4.2.1 并行度(Parallelism)
当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。

一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)
。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream
partition)来分配并行任务。一般情况下,一个流程序的并行度
,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
### 4.2.2 设置并行度
优先级:
算子 > env > 提交时指定 > 配置文件
#### 4.2.2.1 代码中设置
指定某个算子的并行度
```java
stream.map(word ->Tuple2.
of(word, 1L)).
setParallelism(2);
```
我们也可以直接调用执行环境的setParallelism()方法,全局设定并行度
```java
env.setParallelism(2);
```
#### 4.2.2.2 提交应用时设置
可以增加-p参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
```shell
bin/flink run –p 2 -c com.nacl.wordCount.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
```
### 4.2.3 算子链(Operator Chain)
(1)一对一(One-to-one,forwarding)
这种模式下,数据流维护着分区以及元素的顺序。

比如图中的source和map算子,source 算子读取数据之后,
可以直接发送给ap算子做处理,它们之间不需要重新分区,也
不需要调整数据的顺序。
这就意味着map算子的子任务,看到的元素个数和顺序跟source算子的子任务产生的完全一样,保证着“一对一”的关系。
map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。
(2)重分区(Redistributing)
在这种模式下,数据流的分区会发生改变。比如图中的map和后面的keyBy/window算子
之间,以及keyBy/window算子和Sink算子之间,都是这样的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这
些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。
2)合并算子链

在Flink中,并行度相同的一对(one to one)算子操作,可以直接链接在一起形成一个 “大”的任务(task)
这样原来的算子就成为了真正任务里的一部分,如下图所示。每个 task会被一个线程执行。
这样的技术被称为“算子链”(Operator Chain)。
可以使用代码进行禁用
```java
//禁用算子链
map(word -Tuple2.of(word,1L)).
disableChaining()
//从当前算子开始新链
map(word -Tuple2.of(word,1L)).
startNewChain()
```
### 4.2.4 任务槽(Task Slots)
1)任务槽(Task Slots)
Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执 行多个子任务(subtask)。
很显然,TaskManager的计算资源是有限的,并行的任务越多,每个线程的资源就会越少。
那一个TaskManager到底能并行处理多少个任务呢?为了控制并发量,我们需要在TaskManager上对每个任务运行所占用的资源做出明确的划分,
这就是所谓的任务槽(task slots)。
> 每个任务槽(task slot)其实表示了TaskManager拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。
> 假如一个TaskManager有三个slot 那么它会将管理的内平均分成三份,每个slot独自占据一份
> 这样一来,我们在slot上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其
> 他作业的任务去竞争内存资源了。
2)任务槽数量的设置
在Flink的```flink-1.17.0/conf/flink-conf.yaml```配置文件中,可以设置TaskManager 的slot数量,默认是1个slot。
taskmanager.numberofTaskslots:8
需要注意的是,slot目前仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,可
以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发
环境默认并行度设为机器CPU数量的原因。
3)任务对任务槽的共享

### 4.2.5任务槽和并行度的关系
任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。
简单来说任务槽 是静态的概念,是指TaskManager具有的并发执行能力,
可以通过参数 askmanager.numberOfTaskSlots进行配置;
而并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,
可以通过参数parallelism.default进行配置。
举例说明:假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,
那么一共有9个task slot,表示集群最多能并行执行9个算子的子任务。
而我们定义word count程序的处理操作是四个转换算子:
source→flatmap→reduce→sink
当所有算子并行度相同时,容易看出source和flatmap可以合并算子链,于是最终有三个
当资源不够时 alone模式会报错 资源不够 yarn模式会再去申请tm