# Spark-Streaming **Repository Path**: fengze7758/Spark-Streaming ## Basic Information - **Project Name**: Spark-Streaming - **Description**: Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架。它的底层,其实,也是基于我们之前讲解的Spark Core的。基本的计算模型,还是基于内存的大数据实时计算模型。而且,它的底层的组件或者叫做概念,其实还是最核心的RDD。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2018-10-09 - **Last Updated**: 2023-06-03 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Spark-Steaming #### 项目介绍 Spark-Steaming学习记录 #### 学习目录 * 1.Spark Streaming:大数据实时计算介绍 * 2.Spark Streaming:DStream以及基本工作原理 * 3.Spark Streaming:与Storm的对比分析 * 4.Spark Streaming:实时wordcount程序开发 * 5.Spark Streaming:StreamingContext详解 * 6.Spark Streaming:输入DStream和Receiver详解 * 7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序 * 8.Spark Streaming:输入DStream之Kafka数据源实战(基于Receiver的方式) * 9.Spark Streaming:输入DStream之Kafka数据源实战(基于Direct的方式) * 10.Spark Streaming:DStream的transformation操作概览 * 11.Spark Streaming:updateStateByKey以及基于缓存的实时wordcount程序 * 12.Spark Streaming:transform以及广告计费日志实时黑名单过滤案例实战 * 13.Spark Streaming:window滑动窗口以及热点搜索词滑动统计案例实战 * 14.Spark Streaming:DStream的output操作以及foreachRDD详解 * 15.Spark Streaming:与Spark SQL结合使用之top3热门商品实时统计案例实战 * 16.Spark Streaming:缓存与持久化机制 * 17.Spark Streaming:Checkpoint机制 * 18.Spark Streaming:部署、升级和监控应用程序 * 19.Spark Streaming:容错机制以及事务语义详解 * 20.Spark Streaming:架构原理深度剖析 * 21.Spark Streaming:StreamingContext初始化与Receiver启动原理剖析与源码分析 * 22.Spark Streaming:数据接收原理剖析与源码分析 * 23.Spark Streaming:数据处理原理剖析与源码分析(block与batch关系透彻解析) * 23.Spark Streaming:性能调优 #### Spark Streaming:大数据实时计算介绍 * Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架。它的底层,其实,也是基于我们之前讲解的Spark Core的。基本的计算模型,还是基于内存的大数据实时计算模型。而且,它的底层的组件或者叫做概念,其实还是最核心的RDD。 * 只不多,针对实时计算的特点,在RDD之上,进行了一层封装,叫做DStream。其实,学过了Spark SQL之后,你理解这种封装就容易了。之前学习Spark SQL是不是也是发现,它针对数据查询这种应用,提供了一种基于RDD之上的全新概念,DataFrame,但是,其底层还是基于RDD的。所以,RDD是整个Spark技术生态中的核心。要学好Spark在交互式查询、实时计算上的应用技术和框架,首先必须学好Spark核心编程,也就是Spark Core。 #### Spark Streaming:DStream以及基本工作原理 * Spark Core API的一种扩展,它可以用于进行大规模、高吞吐量、容错的实时数据流的处理。它支持从很多种数据源中读取数据,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis或者是TCP Socket。并且能够使用类似高阶函数的复杂算法来进行数据处理,比如map、reduce、join和window。处理后的数据可以被保存到文件系统、数据库、Dashboard等存储中。 * 基本工作原理如下:接收实时输入数据流,然后将数据拆分成多个batch,比如每收集1秒的数据封装为一个batch,然后将每个batch交给Spark的计算引擎进行处理,最后会生产出一个结果数据流,其中的数据,也是由一个一个的batch所组成的。 * Spark Streaming提供了一种高级的抽象,叫做DStream,英文全称为Discretized Stream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume和Kinesis;也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。 > ##### DStream * DStream的内部,其实一系列持续不断产生的RDD。RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集。DStream中的每个RDD都包含了一个时间段内的数据。 * 对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream。但是,在底层,其实其原理为,对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。底层的RDD的transformation操作,其实,还是由Spark Core的计算引擎来实现的。Spark Streaming对Spark Core进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的API #### Spark Streaming:与Storm的对比分析 >对于Storm来说: * 1、建议在那种需要纯实时,不能忍受1秒以上延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析 * 2、此外,如果对于实时计算的功能中,要求可靠的事务机制和可靠性机制,即数据的处理完全精准,一条也不能多,一条也不能少,也可以考虑使用Storm * 3、如果还需要针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况),也可以考虑用Storm * 4、如果一个大数据应用系统,它就是纯粹的实时计算,不需要在中间执行SQL交互式查询、复杂的transformation算子等,那么用Storm是比较好的选择 ``` * 对于Spark Streaming来说: ``` * 1、如果对上述适用于Storm的三点,一条都不满足的实时场景,即,不要求纯实时,不要求强大可靠的事务机制,不要求动态调整并行度,那么可以考虑使用Spark Streaming * 2、考虑使用Spark Streaming最主要的一个因素,应该是针对整个项目进行宏观的考虑,即,如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,那么就应该首选Spark生态,用Spark Core开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。 ``` #### Spark Streaming:实时wordcount程序开发 >1、安装nc工具:yum install nc
* 给端口发送数据 nc -lk 9999 >2、开发实时wordcount程序 运行脚本wordcount.sh ``` /usr/local/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \ --class cn.spark.study.streaming.WordCount \ --num-executors 3 \在· --driver-memory 100m \ --executor-memory 100m \ --executor-cores 3 \ /usr/local/spark-study/scala/streaming/spark-study-scala.jar \ ``` #### Spark Streaming:StreamingContext详解 * 有两种创建StreamingContext的方式: ``` 第一种: val conf = new SparkConf().setAppName(appName).setMaster(master); val ssc = new StreamingContext(conf, Seconds(1)); ``` ``` 第二种: StreamingContext,还可以使用已有的SparkContext来创建 val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)); ``` ``` appName,是用来在Spark UI上显示的应用名称。master,是一个Spark、Mesos或者Yarn集群的URL,或者是local[*]。 batch interval可以根据你的应用程序的延迟要求以及可用的集群资源情况来设置。 ``` * 一个StreamingContext定义之后,必须做以下几件事情: ``` 1、通过创建输入DStream来创建输入数据源。 2、通过对DStream定义transformation和output算子操作,来定义实时计算逻辑。 3、调用StreamingContext的start()方法,来开始实时处理数据。 4、调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。 5、也可以通过调用StreamingContext的stop()方法,来停止应用程序。 ``` * 需要注意的要点: ```1、只要一个StreamingContext启动之后,就不能再往其中添加任何计算逻辑了。比如执行start()方法之后,还给某个DStream执行一个算子。 2、一个StreamingContext停止之后,是肯定不能够重启的。调用stop()之后,不能再调用start() 3、一个JVM同时只能有一个StreamingContext启动。在你的应用程序中,不能创建两个StreamingContext。 4、调用stop()方法时,会同时停止内部的SparkContext,如果不希望如此,还希望后面继续使用SparkContext创建其他类型的Context,比如SQLContext,那么就用stop(false)。 5、一个SparkContext可以创建多个StreamingContext,只要上一个先用stop(false)停止,再创建下一个即可。 ``` #### Spark Streaming:输入DStream和Receiver详解 * 输入DStream代表了来自数据源的输入数据流。在之前的wordcount例子中,lines就是一个输入DStream(JavaReceiverInputDStream),代表了从netcat(nc)服务接收到的数据流。除了文件数据流之外,所有的输入DStream都会绑定一个Receiver对象,该对象是一个关键的组件,用来从数据源接收数据,并将其存储在Spark的内存中,以供后续处理。 > Spark Streaming提供了两种内置的数据源支持: * 1、基础数据源:StreamingContext API中直接提供了对这些数据源的支持,比如文件、socket、Akka Actor等。 * 2、高级数据源:诸如Kafka、Flume、Kinesis、Twitter等数据源,通过第三方工具类提供支持。这些数据源的使用,需要引用其依赖。 * 3、自定义数据源:我们可以自己定义数据源,来决定如何接受和存储数据。 * 要注意的是,如果你想要在实时计算应用中并行接收多条数据流,可以创建多个输入DStream。这样就会创建多个Receiver,从而并行地接收多个数据流。但是要注意的是,一个Spark Streaming Application的Executor,是一个长时间运行的任务,因此,它会独占分配给Spark Streaming Application的cpu core。从而只要Spark Streaming运行起来以后,这个节点上的cpu core,就没法给其他应用使用了。 >使用本地模式 * 运行程序时,绝对不能用local或者local[1],因为那样的话,只会给执行输入DStream的executor分配一个线程。而Spark Streaming底层的原理是,至少要有两条线程,一条线程用来分配给Receiver接收数据,一条线程用来处理接收到的数据。因此必须使用local[n],n>=2的模式。 * 如果不设置Master,也就是直接将Spark Streaming应用提交到集群上运行,那么首先,必须要求集群节点上,有>1个cpu core,其次,给Spark Streaming的每个executor分配的core,必须>1,这样,才能保证分配到executor上运行的输入DStream,两条线程并行,一条运行Receiver,接收数据;一条处理数据。否则的话,只会接收数据,不会处理数据。 * 因此,基于此,特此声明,我们本系列课程所有的练习,都是基于local[2]的本地模式,因为我们的虚拟机上都只有一个1个cpu core。但是大家在实际企业工作中,机器肯定是不只一个cpu core的,现在都至少4核了。到时记得给每个executor的cpu core,设置为超过1个即可。 #### Spark Streaming:输基于Receiver的方式 ##### 基于Receiver的方式 * 这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。 * 然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。 ##### 如何进行Kafka数据源连接 * 1、在maven添加依赖 groupId = org.apache.spark artifactId = spark-streaming-kafka_2.10 version = 1.5.1 * 2、使用第三方工具类创建输入DStream JavaPairReceiverInputDStream kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]); ##### 需要注意的要点 * 1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。 * 2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。 * 3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。 ##### Kafka命令 > 创建topic bin/kafka-topics.sh --zookeeper 192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create > 生产者 bin/kafka-console-producer.sh --broker-list 192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092 --topic TestTopic #### Spark Streaming:输入DStream之Kafka数据源实战(基于Direct的方式) ##### 基于Direct的方式 * 这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。 ##### 这种方式有如下优点: * 1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。 * 2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。 * 3、一次且仅一次的事务机制: 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。 基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。 #### Spark Streaming:DStream的transformation操作概览 1、RDD提供了两种类型的操作:transformation和action > ##### transformation操作 * map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集 * flatMap(func):和map差不多,但是flatMap生成的是多个结果 * filter:对传入的元素返回true和false,返回的false是的元素被过滤掉 * union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合 * count:返回元素个数 * reduceByKey:对所有的vlues进行聚合 * groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist * reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数 * cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数 * join:对两个DStream进行join操作,每个连接起来的pair,作为新DStream的RDD的一个元素 * transform:对数据进行转换操作 * updateStateByKey:为每个key维护一份state,并进行更新(这个,我认为,是在普通的实时计算中,最有用的一种操作) * window:对滑动窗口数据执行操作(实时计算中最有特色的一种操作) #### Spark Streaming:updateStateByKey以及基于缓存的实时wordcount程序 >updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。 * 1、首先,要定义一个state,可以是任意的数据类型; * 2、其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。 * 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。 * 当然,对于每个新出现的key,也会执行state更新函数。 * 注意,updateStateByKey操作,要求必须开启Checkpoint机制。 * 案例:基于缓存的实时wordcount程序(在实际业务场景中,这个是非常有用的) #### Spark Streaming:transform以及广告计费日志实时黑名单过滤案例实战 * transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。 * DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。 * 案例:广告计费日志实时黑名单过滤 #### Spark Streaming:window滑动窗口以及热点搜索词滑动统计案例实战(重点) * ##### Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。( Spark Streaming对滑动窗口的支持,是比Storm更加完善和强大的) ![输入图片说明](https://images.gitee.com/uploads/images/2018/0829/104843_cfd5a70d_1656611.png "图片1.png") * window滑动窗口操作 ![输入图片说明](https://images.gitee.com/uploads/images/2018/0829/104951_d84b888c_1656611.png "TIM截图20180829104944.png") 案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出排名最靠前的3个搜索词以及出现次数 #### Spark Streaming:DStream的output操作以及foreachRDD详解 > output操作 *DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作,那么,压根儿就不会执行定义的计算逻辑。 *此外,即使你使用了foreachRDD output操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。否则,光有foreachRDD output操作,在里面没有对RDD执行action操作,也不会触发任何逻辑。 ![输入图片说明](https://images.gitee.com/uploads/images/2018/0829/105422_c9e43941_1656611.png "TIM截图20180829105245.png") > foreachRDD详解 * 误区二:在RDD的foreach操作内部,创建Connection * 这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。 ``` dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } } ``` * 合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。 ``` dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } } ``` * 合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。 ``` dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) } } ``` * foreachRDD实战 ##### 案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数据库中。 #### Spark Streaming:与Spark SQL结合使用之top3热门商品实时统计案例实战 >与Spark SQL结合使用 * Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。 * 案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3热门的商品。 #### 缓存与持久化机制 * 与RDD类似,Spark Streaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让Spark Streaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。 * 对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制。即Spark Streaming默认就会将上述操作产生的Dstream中的数据,缓存到内存中,不需要开发人员手动调用persist()方法。 * 对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是,用的是类似MEMORY_ONLY_SER_2。 * 与RDD不同的是,默认的持久化级别,统一都是要序列化的。 ####Spark Streaming:Checkpoint机制 >Checkpoint机制概述 * 每一个Spark Streaming应用,正常来说,都是要7 * 24小时运转的,这就是实时计算程序的特点。因为要持续不断的对数据进行计算。因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错。 * 如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复。有两种数据需要被进行checkpoint: >1、元数据checkpoint——将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS。当运行Spark Streaming应用程序的Driver进程所在节点失败时,该信息可以用于进行恢复。元数据信息包括了: * 1.1 配置信息——创建Spark Streaming应用程序的配置信息,比如SparkConf中的信息。 * 1.2 DStream的操作信息——定义了Spark Stream应用程序的计算逻辑的DStream操作信息。 * 1.3 未处理的batch信息——那些job正在排队,还没处理的batch信息。 >2、数据checkpoint——将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。 对于一些将多个batch的数据进行聚合的,有状态的transformation操作,这是非常有用的。在这种transformation操作中,生成的RDD是依赖于之前的batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长。 要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中间产生的RDD,会定期地被checkpoint到可靠的存储系统上,比如HDFS。从而削减RDD的依赖链条,进而缩短失败恢复时,RDD的恢复时间。 一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;而RDD checkpoint主要是为了,使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复。 >何时启用Checkpoint机制? * 1、使用了有状态的transformation操作——比如updateStateByKey,或者reduceByKeyAndWindow操作,被使用了,那么checkpoint目录要求是必须提供的,也就是必须开启checkpoint机制,从而进行周期性的RDD checkpoint。 * 2、要保证可以从Driver失败中进行恢复——元数据checkpoint需要启用,来进行这种情况的恢复。 * 要注意的是,并不是说,所有的Spark Streaming应用程序,都要启用checkpoint机制,如果即不强制要求从Driver失败中自动进行恢复,又没使用有状态的transformation操作,那么就不需要启用checkpoint。事实上,这么做反而是有助于提升性能的。 * 1、对于有状态的transformation操作,启用checkpoint机制,定期将其生产的RDD数据checkpoint,是比较简单的。 可以通过配置一个容错的、可靠的文件系统(比如HDFS)的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。使用StreamingContext的checkpoint()方法即可。然后,你就可以放心使用有状态的transformation操作了。 * 2、如果为了要从Driver失败中进行恢复,那么启用checkpoint机制,是比较复杂的。需要改写Spark Streaming应用程序。 当应用程序第一次启动的时候,需要创建一个新的StreamingContext,并且调用其start()方法,进行启动。当Driver从失败中恢复过来时,需要从checkpoint目录中记录的元数据中,恢复出来一个StreamingContext。