1 Star 0 Fork 0

csh/java笔记

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
spark 21.86 KB
一键复制 编辑 原始数据 按行查看 历史
csh 提交于 2年前 . add spark.
SparkCore
1.概述:Spark是一种基于scala开发的快速、通用、可扩展的大数据分析引擎
Sparkcore提供了spark中最基础与最核心的功能;Saprk SQL是用来操作结构化数据的组件;Spark Streaming 是Spark平台上针对实时数据进行流式计算的组件
2.MR和spark:
MR和Spark框架都是Hadoop的数据处理框架;
MR在多并行运行的数据可复用场景中存在者计算效率问题,spark利用其计算过程的优化,大大加快了数据分析挖掘的运行和读写速度,并将计算单元缩小到更适合并行计算和重复使用的RDD计算模型;
Spark基于的scala语言擅长函数的处理;
spark的核心技术是弹性分布式数据集,提供了比MR丰富的模型,可以快速的在内存中对数据集进行多次迭代,来支持复杂的数据挖掘和图形计算算法;
两者直接的根本差距在于:spark多个作业之间数据通信是基于内存的,而MR是基于磁盘的;
spark只有在shuffle的时候将数据写入磁盘,而多个MR作业之间的数据交互都要依赖于磁盘;
spark是基于内存的,但是由于内存的限制,可能会导致任务失败,所以spark并不能完全替代MR。
3.spark核心模块
Spark core:提供了最基础和最核心的功能
Spark sql:用来操作结构化的数据,通过spark sql可以使用sql来查询数据
Spark streaming:针对实时数据进行流式计算的组件
Spark Mllib:一个机器学习算法库
Spark GraphX:面向图计算提供的框架和算法库
4.wordcount代码
5.spark运行环境
Local模式:不需要再其他任何节点资源就可以再本地执行spark代码的环境
Standalone模式:独立部署模式,只使用spark自身节点运行的集群模式
Yarn模式:spark用来计算,yarn用作资源调度
K8S&Mesos模式:容器化部署
Windows模式:可以在windows系统下启动本地集群
6.spark运行架构:Driver+Executor;Master+worker
Driver:用于执行spark任务中的main方法,负责实际代码的执行工作
将用户程序转化为作业;
在Executor之间调度任务并跟踪executor的执行情况
通过UI查询运行情况。
Executor:是集群中工作节点的一个JVM进程,负责与逆行具体的任务
运行组成spark应用的任务,并将结果返回给驱动器进程
通过自身的块管理器为用户程序中要求缓存的RDD提供内存式存储
Master+worker:独立部署环境中,不需要依赖其他的资源调度框架,Master是一个进程主要负责资源的调度和分配并进行集群的监控;worker也是进程,一个worker运行在集群中的一台服务器上,由master分配资源对数据进行并行的处理和计算。
ApplicationMaster:hadoop向yarn集群提交应用程序时,包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败的情况。
7.一些核心概念
并行度:整个集群并性执行任务的数量
有向无环图(DAG)
8.提交流程图
Yarn client模式:
一般用于测试
Driver在任务提交的本地机器上运行
Driver启动后会和RM通讯申请启动ApplicationMaster
RM分配container,再合适的NM上启动ApplicationMaster,负责向RM申请Executor内存
RM接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NM上启动Executor进程
Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
之后执行到Action算子,触发一个job,并依赖宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发都各个Executor上执行
Yarn cluster模式:
一般用于实际的生产环境
向RM申请ApplicationMaster
RM分配container,在合适的NM上启动AM,此时的AM就是Driver
Driver启动后向RM申请Executer内存,RM接到后会分配container,在合适的NM上启动Executor进程
Executor进程启动后会向Driver反向注册,Executor全部注册完毕后Driver开始执行
之后执行到Action算子,触发一个job,并依赖宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发都各个Executor上执行
9.Spark核心编程:
RDD:弹性分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量
RDD
1.概述:是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
弹性:存储的弹性,内存与磁盘自动切换;容错的弹性,数据丢失可以自动恢复;计算的弹性,计算出错重试机制;分片的弹性,可根据需要重新分片。
分布式:数据存储在大数据集群不同节点上
数据集:RDD封装了计算逻辑,并不保存数据
数据抽象:RDD是一个抽象类,需要子类具体实现
不可变:RDD封装了计算逻辑,是不可以改变的,要是想改变只能产生新的RDD,在新的里面封装逻辑
可分区、并行计算
2.执行原理
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上,按照指定的模型进行数据计算,最后得到计算结果。
RDD在整个流程中的作用是主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算
3.RDD的创建
从内存中创建RDD:makeRDD方法和parallelize方法
从文件中创建RDD:
从其他RDD创建:通过一个RDD运算完成后,再产生新的RDD
直接创建RDD:使用new的方式直接构造RDD,一般由spark框架自身使用
4.RDD并行度与分区
并行度:spark可以将一个作业切分成多个任务后,发送给Executor节点并行九四u按,而能够并行计算的任务数量我们称之为并行度。
分区:读取内存数据时,数据可以按照并行度的设定进行数据的分区操作;读取文件数据时,数据是按照hadoop文件读取的规则进行切片分区。
5.RDD转换算子
(1)Value类型
Map:将处理的数据逐条进行映射转换,可以转换类型也可以转换值
Val dataRDD:RDD[Int] =sparkContext.makeRDD(List(1,2,3,4))
Val dataRDD1:RDD[Int] =dataRDD.map(
Num=>{
Num*2
}
)
Val dataRDD2:RDD[Int] =dataRDD.map(
Num=>{
“”+num
}
)
mapPartitions:将待处理的数据以分区为单位发送到计算节点进行处理
mapPartionsWithIndex:将待处理的数据以分区为单位发送到节点进行处理,在处理的同时可以获取当前分区索引
faltMap:将处理的数据进行扁平化在进行映射处理
groupBy:将数据根据指定的规则进行分组,分区不变但是数据会被打乱重新组合。极限情况下,数据可能会被分配在同一个分区中(一个组的数据在一个分区中,但是并不是说一个分区中只有一个组)
Filter:将数据根据指定的规则进行过滤,符合规则的数据保留,不符合的数据丢弃。过滤后,分区后的数据可能不均衡,会出现数据倾斜的情况。
Sample:根据指定的规则从数据集中抽取数据。
Distinct:将数据集中重复的数据去重。
Coalese:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
Repartition:将分区数多的RDD转换成少的RDD
sortBy:该操作用于排序数据
(2)双value类型
Intersection:源RDD和参数RDD求交集后返回一个新的RDD
Union:并集
Subtract:以一个RDD为主,去除两个RDD中的重复元素,将其他元素保留下来,求差集
Zip:将两个RDD中的元素以键值对的形式进行合并
(3)Key-Value类型
partitionBy:将数据按照指定的partitioner重新进行分区
reduceByKey:将数据按照相同的key对value进行聚合
groupByKey:根据key对value进行分组
reduceByKey和groupByKey的区别:
从shuffle的角度:两者都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同的key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是分组,不存在数据量减少的问题;reduceByKey性能比较高。
从功能的角度:reduceByKey包含分组和聚合,而groupByKey只有聚合
AggregateByKey:将数据根据不同的规则进行分区内计算和分区间计算
foldByKey:当分区内和分区间的规则相同时
combineByKey:允许用户返回值的类型预输入不一致
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:
reduceByKey:相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同
foldByKey:相同key的第一个数据和初始值及逆行分区内计算,分区内和分区间计算规则相同
aggregateByKey:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构,分区内和分区间计算规则不相同
sortByKey:返回一个按照key进行排序的
join:返回一个相同的key对应的所有元素连接在一起的RDD- (K,(V,W))的
leftOuterJoin:类似于SQL语句的左外连接
cogroup: (K,(Iterable<V>,Iterable<W>))
6.RDD行动算子
Reduce:聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
Collect:以数组Array的形式返回数据集的所有元素
Count:返回RDD中元素的个数
First:返回RDD中的第一个元素
Take:返回一个由RDD的前n个元素组成的数组
takeOrdered:返回该RDD排序后的前n个元素组成的数组
aggregate:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
fold:折叠操作,aggregate的简化版操作
countByKey:统计每种key的个数
save:savaAsTextFile,savaAsObjectFile,
foreach:分布式遍历RDD中的每一个元素
7.RDD序列化
闭包检查:算子以外的代码都是再Driver端执行,算子里面的代码都是在Executor端执行。则会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果。需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作叫作闭包检测
序列化方法:Serializable,Kryo序列化框架
8.RDD依赖关系
RDD的血缘关系会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,她可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD的依赖关系就是两个相邻RDD之间的关系
RDD窄依赖:每一个父RDD的partit最多被子RDD的一个partition使用
RDD宽依赖:可以被多个子RDD的partition依赖,会引起shuffle
9.
RDD阶段划分
RDD任务划分:Application、job、stage、task
初始化一个sparkContext即生成一个Application
一个action算子就会生成一个job
Stage等于宽依赖的个数加一
一个stage阶段中,最后一个RDD的分区个数就是Task的个数
RDD持久化:通过cache或者persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中
RDD CheckPoint检查点:将RDD的中间结果写入到磁盘中
缓存和检查点的区别:
Cache缓存只是将数据保存起来,不切断血缘依赖。checkPoint检查点切断血缘依赖
Cache缓存的数据通常保存在磁盘、内存等地方,可靠性低;checkpoint数据通常保存在HDFS等容错、高可用的文件系统,可靠性高。
建议堆checkPoint的RDD使用cache缓存,这样checkPoint的job只需从cache缓存中读取数据即可,否则需要从头计算一次RDD
RDD分区器:分区器直接决定了RDD中分区的个数、RDD中每条数据经过shuffle后进入哪个分区,进而决定了reduce的个数。只有KV类型的RDD才有分区器,非KV类型的RDD分区的值时None。每个RDD的分区ID决定了这个值是属于哪个分区的
Hash分区:对于给定的key,计算器hashCode,并除以分区个数取余
Range分区:将一定范围i的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
10.RDD文件读取与保存
文件格式分为:text文件、csv文件、sequence文件,Object文件
文件系统分为:本地文件系统、HDFS、HBASE、数据库
Sequence文件:hadoop用来存储二进制形式的KV对而设计的一种平面文件
Object对象文件:将对象序列化后保存的文件
累加器
用来把executor端变量信息聚合到driver端。在driver程序中定义的变量,在executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge
广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用。
SparkSQL
1.简介
sparkSQL是spark用于结构化数据处理的spark模块
hive的缺点是在MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的IO,降低了运行效率
两个分支:sparkSQL和hive on spark。
SparkSQL不再受限于Hive,只是兼容Hive;hive on spark是hive的一个发展计划,计划将spark作为hive的底层引擎之一。
实际工作中采用的是sparkSQL,因为他可以简化RDD的开发,提高开发效率,且执行效率非常快。
sparkSQL提供了两个编程抽象:DataFrame和Dataset
2.sparkSQL的特点
易整合:无缝的整合了SQL查询和spark编程
统一的数据访问:可以使用相同的方式连接不同的数据源
兼容Hive:基本的原理类似,可以在已有的仓库上直接运行SQL或者HiveSQL
标准的数据连接:通过JDBC或者ODBC来连接
3.DataFrame
是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格
DataFrame与RDD最大的区别在于:前者带有schema元信息,也就是所表示的二维数据集的每一列都带有名称和类型。RDD无从得知所存数据元素的具体内部结构。
DataFrame也支持嵌套数据类型(struct、array、map)
DataFrame也是懒执行的,性能上比RDD要高,原因在于:优化的执行计划,即查询计划通过spark catalyst optimizer进行优化。逻辑查询优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
4.DataSet
是分布式的数据集合,是DataFrame的一个扩展。提供了RDD的优势(强类型,使用强大lambda函数的能力)以及sparkSQL优化执行引擎的优点。DataSet可以使用功能性的转换(Map、flatMap、filter)
5.sparkSQL核心编程
(1)概述sparkSQL可以理解成是对spark core的一种封装,不仅在模型上进行了封装,上下文环境对象也进行了封装。
sparkSession是spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合。sparkSession内部封装了SaprkContext,所以计算实际上是由sparkSession完成的。
(2)DataFrame
创建的三种形式:通过saprk数据源进行创建;从一个存在的RDD进行转换;从hive table进行查询返回。
第一种:val df=spark.read.json(“data/user.json”)
(3)语法风格
1)SQL语法
查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
读取json文件创建DataFrame
Val df=spark.read.json(“data/user.json”)
对DataFrame创建一个临时表
Df.createOrReplaceTempView(“people”)
通过SQL语句实现全表查询
Val sqlDF=spark.sql(“select * from people”)
结果展示
SqlDF.show
其中,普通临时表是session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时时需要全路径访问的。
Df.createGlobalTempView(“people”)
Spark.sql(“select * from global_temp.people”).show()
2)DSL语法
使用DSL语法不必去创建临时视图
创建一个DataFrame
Val df=spark.read.json(“data/user.json”)
查看DataFrame 的Schema信息
Df.printSchema
查看username列数据
Df.select(“username”).show()
查看username列数据以及age+1数据
设计到运算的时候可以使用$或者采用引号表达式:单引号+字段名
Df.select($”username”,$”age”+1).show
Df.select(‘username,’age+1).show()
(4)转换操作
在idea开发程序中,如果需要RDD与DF或者DS之间相互操作,需要引入import spark.implicits._(saprk是创建的sparkSession对象的变量名称)
RDD<->DF:
Val idRDD=sc.textFile(“data/id.txt”)
Val df=idRDD.toDF(“id”).show
Val rdd=df.rdd
创建DataSet:
使用样例类:
Case calss Person(name:String,age:Long)
Val caseClassDS=Seq(Person(“zhangsan”,2)).toDS()
使用基本类型的序列创建DataSet
Val ds=Seq(1,2,3,4,5).toDS
RDD<->DataSet
Val ds=rdd.toDS
Val rdd=ds.rdd
DataFrame<->DataSet
Case class User(name:String,age:Int)
Val df=sc.makeRDD(List((“zhangsan”,30),(“list”,49))).toDF(“name”,”age”)
Val ds=df.as[User]
Val df=ds.toDF
6. RDD,DF,DS三者之间的关系
共性:
全都是spark平台下的分布式弹性数据集,为处理大数据提供便利;
三者都具有惰性机制,只有在遇到action算子时才会开始遍历运算;
三者有许多共通的函数,如filter、排序等;
在对DF和DS进行操作时都需要导入import spark.implicits._;
三者会根据spark的内存情况自动进行缓存运算,即使数据量很大,也不用担心内存溢出;
三者都有partiton的概念;
DF和DS均可以使用模式匹配获得各个字段的值和类型
区别:
RDD一般和spark mlib同时使用;RDD不支持SaprkSQL操作
DF每一行的类型固定为row,每一列的值没法直接访问,只有通过解析次啊能获取各个字段的值
DF和DS一般不和spark mlib同时使用,两者均支持sparkSQL操作,均支持一些特别方便的保存方式,比如保存成CSV,可以带上表头
DS每一行的数据类型不同,DF是DS的一个特例
DF也可以叫作DataSet[ROW],每一行的类型是ROW,不解析,每一行究竟有哪些字段,各个字段又是什么类型也无从得知,只能用上面的getAS方法或者模式匹配拿出特定字段;DS中每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息。
7.用户自定义函数
用户可以通过spark.udf功能调价自定义函数,实现自定义功能。
UDF
UDAF
8.数据的加载和保存
(1)sparkSQL默认读取和保存的文件格式为parquet,是一种能够有效存储嵌套数据的列式存储格式
(2)JSON格式:spark SQL能够自动推测JSON数据集的结构,并将它加载为一个DataSet[Row],可以通过spark.read.json()去加载json文件。
(3)CSV格式:spark SQL可以配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列
(4)MySQL:可以通过JDBC从关系型数据库中读取数据的方式创建DF,通过对DF一系列的计算后,可以将数据在写回关系型数据库中
(5)Hive:包含hive支持的Spark SQL可以支持Hive表访问,UDF以及hive查询语言
内嵌的HIVE:使用spark内嵌的hive,则什么都不用做,直接使用即可;
外部的HIVE:需要将hive-site.xml拷贝到conf目录下;把mysql的驱动copy到jars目录下;如果访问不到hdfs,需要把core-site.xml和hdfs-site.xml拷贝到conf目录下;重启spark-shell
运行spark SQL CLI:可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务
运行spark beeline:spark thrift server是spark社区基于HiveServer2实现的一个Thrift服务。
加载数据:spark.read.load
保存数据:spark.write.save
sparkStreaming
1.概述
用于流式数据的处理。准实时,微批次的数据处理框架。
特点:易用;容错;易整合到spark体系;
2.架构图
背压机制backpressure:saprk1.5以前的版本,限制Receiver的数据接收速率,可以通过设置静态配置参数spark.streaming.receiver.maxRate来实现,这样可以通过限制接收速率来适配当前的处理能力,防止内存溢出,但是也会引入其他问题。1.5版本之后可以动态控制数据接收速率来适配集群数据处理能力。背压机制指的是根据jobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
3.Kafka数据源
版本选型:
ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的executor做计算。接收数据的executor和计算的executor速度会有所不同,有可能会导致数据的节点内存溢出。
DirectAPI:由计算的Executor开主动消费kafka的数据,速度由自身控制
4.DStream转换
有transformation和output operations两种
无状态转换操作:把简单的RDD操作应用到每个批次上,转化DStream中的每一个RDD。
Transform:允许DStream上执行任意的RDD-to-RDD函数。
Join:两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。
有状态转化操作
updateStateByKey:提供了一个对状态变量的访问,用于键值对形式的DStream。使得我们可以再用新信息进行更新时保持任意的状态。
5.DStream输出
输出操作指定了对流数据经转换操作得到数据所要执行的操作。
Print:在运行流程序的驱动节点上打印DStream中每一批次数据的最开始10个元素
saveAsTextFiles:以text文件形式存储这个DStream的内容
saveAsObjectFiles:以java对象序列化的方式将stream中数据保存为SequenceFiles
saveAsHadoopFiles:将stream中的数据保存为Hadoop Files
foreachRDD(func):最通用的输出操作,将函数func用于产生于stream的每一个RDD
6.优雅关闭
使用外部文件系统来控制内部程序关闭。
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ccsshh/java-notes.git
git@gitee.com:ccsshh/java-notes.git
ccsshh
java-notes
java笔记
master

搜索帮助