# sparksql **Repository Path**: ahst/sparksql ## Basic Information - **Project Name**: sparksql - **Description**: Spark SQL实验 2026.05.28 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-05-28 - **Last Updated**: 2026-05-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 实验4:Spark SQL实验 ## 1 Spark SQL的前身--Shark > Shark即Hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中的HiveQL解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。 > Shark的设计导致了两个问题:一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;二是因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。 > Shark的实现继承了大量的Hive代码,因而给优化和维护带来了大量的麻烦,特别是基于MapReduce设计的部分,成为整个项目的瓶颈。因此,在2014年的时候,Shark项目中止,并转向Spark SQL的开发。 ## 2 Spark SQL的简介 > Spark SQL的架构如图所示,在Shark原有的架构上重写了逻辑执行计划的优化部分,解决了Shark存在的问题。Spark SQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。 ![image-20260528162445299](https://gitee.com/ahst/pic26/raw/master/images/20260528162458733.png) Spark SQL 主要提供了以下三个功能: 1. Spark SQL可以从各种结构化数据源(如JSON、Hive、Parquet等)中读取数据,进行数据分析。 2. Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于Spark程序内使用SQL语句进行查询。 3. Spark SQL可以无缝的将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中均集成了相关API,这种紧密的集成方式能够轻松的运行SQL查询以及复杂的分析算法。 总体来说,Spark SQL支持多数据源的查询和加载,兼容Hive可以使用JDBC/ODBC的连接方式执行SQL语句,它为Spark框架在结构化数据分析方面提供重要的技术支持。 ## 3 DataFrame ### 3.1 DataFrame简介 Spark SQL使用的数据抽象并非是RDD,而是DataFrame。在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。DataFrame使Spark具备了处理结构化数据的能力。在Spark中,DataFrame是一种以RDD为基础的分布式数据集,因此DataFrame可以完成RDD的绝大多数功能,在开发使用时,也可以调用方法将RDD和DataFrame进行相互转换。DataFrame的结构类似于传统数据库的二维表格,并且可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。下面来了解DataFrame与RDD在结构上的区别。 ![image-20260528162556836](https://gitee.com/ahst/pic26/raw/master/images/20260528162602070.png) 在图中左侧为RDD[Person]数据集。右侧是DataFrame数据集。DataFrame可以看做是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可以获取更多的数据结构信息。从而对在DataFrame背后的数据源以及作用于DataFrame上数据变换进行针对性的优化。最终达到大幅提升计算效率的目的;同时DataFrame与Hive类似,支持嵌套数据类型(如Struct、Array、Map)。 RDD是分布式的JAVA对象的集合,在图中的RDD[person]数据集。虽然它以person为类型参数,但是对象内部之间的结构相对于Spark框架本身是无法得知的,这样在转换数据形式时效率相对较低。 总的来说,DataFrame除了提供比RDD更丰富的算子以外,更重要的特点是提升Spark框架执行效率、减少数据读取时间以及优化执行计划。有了DataFrame这个高层次的抽象后,处理数据就更加简单了,甚至可以直接用SQL来处理数据,这对于开发者来说,易用性有了很大的提升。不仅如此,通过DataFrame API或 SQL处理数据时,Spark优化器(Catalyst)会自动优化代码,即使写的程序或SQL不高效,程序也可以高效地执行。 ### 3.2 DataFrame的创建 在Spark2.0版本之前,Spark SQL中的SQLContext是创建的和执行SQL的入口。可以利用HiveContext接口,通过HiveQL语句操作Hive表数据,实现数据查询功能。而在Spark2.0之后,Spark使用全新的SparkSession接口替代SQLContext即HiveContext接口,完成数据的加载,转换,处理等功能。 创建SparkSession对象可以通过`SparkSession.builder().getOrCreate()`方法获取,但使用Spark-Shell编写程序时,Spark-Shell客户端会默认提供了一个名为sc的SparkContext对象和一个名为spark的SparkSession对象,因此可以直接使用这两个对象,不需要自行创建。启动Spark-Shell的命令如下: ```shell bin/spark-shell --master local[2] ``` ![QQ_1779957059683](https://gitee.com/ahst/pic26/raw/master/images/20260528163112895.png) 从图中可以看出,SparkContext、SparkSession对象已创建完成。创建DataFrame有多种方式,最基本的方式是从一个已经存在的RDD调用toDF()方法进行转换得到DataFrame,或者通过Spark读取数据源直接创建。 在创建DataFrame之前,为了支持RDD转换成DataFrame及后续的SQL操作,需要导入spark.implicits._包启用隐式转换。若使用SparkSession方式创建DataFrame,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,具体的操作API见下表: | **代码示例** | **描述** | |--------------------------------------|------------------------------| | spark.read.text("people.txt") | 读取txt格式的文本文件,创建DataFrame | | spark.read.csv("people.csv") | 读取csv格式的文本文件,创建DataFrame | | spark.read.json("people.json") | 读取json格式的文本文件,创建DataFrame | | spark.read.parquet("people.parquet") | 读取parquet格式的文本文件,创建DataFrame | 下面介绍一下,采用这些API具体创建DataFrame的方式。 #### 3.2.1 数据准备 在HDFS文件系统的/spark目录中创建一个person.txt文件,文件内容如下: ```text 1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 jerry 40 ``` #### 3.2.2 通过文件直接创建DataFrame 启动Spark-Shell后,在Spark-Shell中输入下列代码: ```shell import spark.implicits._ val personDF = spark.read.text("hdfs://hadoop01:9000/spark/person.txt") personDF.printSchema() ``` 从上述返回结果personDF的属性可以看出,DataFrame对象创建完成,之后调用DataFrame的printSchema()方法可以打印当前对象的Schema元数据信息。从返回结果可以看出,当前value字段是String数据类型,并且还可以为Null。 使用DataFrame的show()方法可以查看当前DataFrame的结果数据,具体代码和返回结果如下: ```shell personDF.show() ``` 从上述返回结果可以看出,当前personDF对象中的6条记录就对应了person.txt文本文件中的数据。 #### 3.2.3 RDD转换DataFrame 调用RDD的toDF()方法,可以将RDD转换为DataFrame对象,具体代码如下: ```shell val lineRDD = sc.textFile("hdfs://hadoop01:9000/spark/person.txt").map(_.split(" ")) case class Person(id:Int,name:String,age:Int) val personRDD = lineRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) val personDF=personRDD.toDF() personDF.show() ``` 在上述代码中,第1行代码将文本文件转成RDD;第2行定义一个Person样例类,相当于定义表的Schema元数据信息;第3行表示使RDD中的数组数据与样例类进行关联,最终会将RDD[Array[String]]更改为RDD[Person];第4行表示调用RDD的toDF()方法,就可以把RDD转换成DataFrame。 ### 3.3 DataFrame的操作 #### 3.3.1 show() ```shell personDF.show() ``` #### 3.3.2 printSchema() ```shell personDF.printSchema() ``` #### 3.3.3 select() ```shell personDF.select(personDF.col("name")).show() ``` 同样,我们可以省略`.col`方法,直接使用如下的代码: ```shell personDF.select(personDF("name")).show() ``` #### 3.3.4 filter() ```shell personDF.filter(personDF("age")>=25).show() ``` #### 3.3.5 groupBy() ```shell personDF.groupBy("age").count().show() ``` #### 3.3.6 sort() ```shell personDF.sort(personDF("age").desc).show() ``` 这里需要注意的是,默认是以升序排序,如果需要降序排列,这里加上`.desc`即可。当然并不是只有Int类型的数据才能排序,一般的数据类型都能够进行排序。例如String类型排序: ```shell personDF.sort(personDF("name")).show() ``` #### 3.3.7 SQL形式的操作 DataFrame的强大之处就是可以将它看作是一个关系型数据表,然后可以在程序中直接使用spark.sql()的方法执行SQL查询,结果将作为一个DataFrame返回。使用SQL形式操作的前提是需要将DataFrame注册成一个临时表,代码如下: ```shell personDF.registerTempTable("t_person") ``` 注册过表之后,就可以使用这个临时表进行表查询了。 ```shell // 查询年龄最大的两个人的信息 spark.sql("select * from t_person order by age desc limit 2").show() // 查询年龄大于25岁的人的信息 spark.sql("select * from t_person where age > 25").show() ``` 有了SQL形式的操作,就可以操作几乎所有数据库能操作的内容了,熟悉SQL的开发者也能够快速的掌握DataFrame的使用。 ## 4 Dataset ### 4.1 Dataset的简介 DataSet是Spark1.6添加的分布式数据集合,Spark2.0合并DataSet和DataFrame数据集合API,DataFrame变成DataSet的子集。 Dataset相对于RDD,Dataset提供了强类型⽀持,也是在RDD的每⾏数据加了类型约束,每一行数据类型都可以自己定义,一旦定义后没救具有严格的错误检查机制。 ### 4.2 Dataset对象的创建 创建Dataset可以通过SparkSession中的createDataset来创建: ```shell val personDs=spark.createDataset(sc.textFile("hdfs://hadoop01:9000/spark/person.txt")) personDs.show() ``` 上述返回结果personDs的属性可以看出,Dataset从已存在的RDD中构建成功,并且赋予value为String类型,Dataset和DataFrame拥有完全相同的成员函数,通过show()方法可以展示personDs中数据的具体内容。 Dataset不仅能从RDD中构建,它与DataFrame也可以相互转换,DataFrame可以通过`as[ElementType]`方法转换为Dataset,同样Dataset也可以使用toDF()方法转换为DataFrame,具体代码如下: ```shell spark.read.text("hdfs://hadoop01:9000/spark/person.txt").as[String] spark.read.text("hdfs://hadoop01:9000/spark/person.txt").as[String].toDF() ``` Dataset操作与DataFrame大致相同,这里就不赘述。 ## 5 RDD转换为DataFrame Spark官方提供了两种方法实现RDD转换到DataFrame。第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知数据结构的RDD转换;第二种方法通过编程接口构造一个Schema,并将其应用在一直的RDD数据中。 ### 5.1 反射机制推断Schema 开始之前,需要在IDEA开发工具里创建一个名为`SparkProject`的Maven工程。 #### 5.1.1 添加Spark SQL依赖 在pom.xml文件中添加Spark SQL依赖,代码片段如下: ```xml org.apache.spark spark-sql_2.12 3.3.0 ``` #### 5.1.1 编写代码 实现反射机制推断Schema需要定义一个case class 样例类,定义字段和属性,样例类的参数名称会被反射机制利用作为列明,编写代码文件如下: 文件名:`CaseClassSchema.scala` ```scala import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} case class Person(id:Int,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1.构建SparkSession val spark : SparkSession = SparkSession.builder() .appName("CaseClassSchema") .master("local[2]") .getOrCreate() //2.获取SparkContext val sc : SparkContext =spark.sparkContext //设置日志打印级别 sc.setLogLevel("WARN") //3.读取文件 val data: RDD[Array[String]] = sc.textFile("hdfs://hadoop01:9000/spark/person.txt").map(x=>x.split(" ")) //4.将RDD与样例类关联 val personRdd: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5.获取DF //手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRdd.toDF //------------DSL语法操作开始------------- //1、显示DataFrame的数据,默认显示20行 personDF.show() //2、显示DataFrame的schema信息 personDF.printSchema() //3、显示DataFrame记录数 println(personDF.count()) //4、显示DataFrame的所有字段 personDF.columns.foreach(println) //5、取出DataFrame的第一行记录 println(personDF.head()) //6、显示DataFrame中name字段的所有值 personDF.select("name").show() //7、过滤出DataFrame中年龄大于30的记录 personDF.filter($"age" > 30).show() //8、统计DataFrame中年龄大于30的人数 println(personDF.filter($"age">30).count()) //9、统计DataFrame中按照年龄进行分组,求每个组的人数 personDF.groupBy("age").count().show() //-----------DSL语法操作结束------------- //-----------SQL操作风格开始------------- //将DataFrame注册成表 personDF.createOrReplaceTempView("t_person") //传入sql语句,进行操作 spark.sql("select * from t_person").show() spark.sql("select * from t_person where name='zhangsan'").show() spark.sql("select * from t_person order by age desc").show() //-----------SQL操作风格结束------------- //关闭操作 sc.stop() spark.stop() } } ``` 第5行定义了一个Person的case类,这是因为再利用反射推断RDD模式时,首先需要定义一个case类,因为Spark SQL能够自动将包含case类的RDD隐式转换成DataFrame,case类定义了Table的结构,case类的属性通过反射机制变成表的列明。其余部分与Spark-Shell操作大致相同,这里不再展示执行效果。 ### 5.2 编程方式定义Schema 当case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下: 1. 创建一个Row对象结构的RDD; 2. 基于StructType类型创建Schema; 3. 通过SparkSession提供的`createDataFrame()`方法来拼接Schema。 根据上述步骤,创建`SparkSqlSchema.scala`文件,使用编程方式定义Schema信息的具体代码如下: 文件名:`SparkSqlSchema.scala` ```scala import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} object SparkSqlSchema { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder() .appName("SparkSqlSchema") .master("local[2]") .getOrCreate() //2.获取sparkContext对象 val sc: SparkContext = spark.sparkContext //设置日志打印级别 sc.setLogLevel("WARN") //3.加载数据 val dataRDD: RDD[String] = sc.textFile("hdfs://hadoop01:9000/spark/person.txt") //4.切分每一行 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) //5.加载数据到Row对象中 val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //6.创建Schema val schema:StructType= StructType(Seq( StructField("id", IntegerType, false), StructField("name", StringType, false), StructField("age", IntegerType, false) )) //7.利用personRDD与Schema创建DataFrame val personDF: DataFrame = spark.createDataFrame(personRDD,schema) //8.DSL操作显示DataFrame的数据结果 personDF.show() //9.将DataFrame注册成表 personDF.createOrReplaceTempView("t_person") //10.sql语句操作 spark.sql("select * from t_person").show() //11.关闭资源 sc.stop() spark.stop() } } ``` 第9\~23行代码表示将文件转换成RDD的基本步骤,第25\~29行代码即为编程方式定义Schema的核心代码,Spark SQL提供了`Class StructType(val fields:Array[StructField])`类来表示模式的信息,生成一个StructType对象,需要提供fields作为输入参数,fields是一个集合类型,`StructField(name,dataType,nullable)`参数分别表示为字段名称、字段数据类型、字段值是否允许为空值,根据person.txt文本数据文件分别设置id,name,age字段作为Schema,第31行代码表示通过调用spark.createDataFrame()方法将RDD和Schema进行合并转换为DataFrame,第33\~40行代码即为操作DataFrame进行数据查询。 ## 6 Spark SQL 操作数据源 Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。 ### 6.1 操作MySQL #### 6.1.1 读取MySQL数据库 首先在hadoop01节点创建一个名为`spark`的数据库,并创建名称为person的数据表,向表中添加如下数据。(这个步骤不再赘述,详细过程请查阅MySQL的相关资料) | id | name | age | |-----|----------|-----| | 1 | zhangsan | 18 | | 2 | lisi | 20 | 数据库和数据表创建成功后,如果想通过Spark SQL API方式访问MySQL数据库,需要再pom.xml配置文件中添加MySQL驱动连接包,依赖参数如下: ```xml mysql mysql-connector-java 8.0.31 ``` 当所需依赖添加完毕后,就可以编写代码读取MySQL数据库中的数据,具体代码如下: 文件名:`DataFromMysql.scala` ```scala import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} //需要MySQL连接驱动包 object DataFromMysql { def main(args: Array[String]): Unit = { //1、创建sparkSession对象 val spark: SparkSession = SparkSession.builder() .appName("DataFromMysql") .master("local[2]") .getOrCreate() //2、创建Properties对象,设置连接mysql的用户名和密码 val properties: Properties =new Properties() properties.setProperty("user","root") properties.setProperty("password","你数据库的密码") val mysqlDF : DataFrame = spark.read.jdbc("jdbc:mysql://hadoop01:3306/spark","person",properties) var str = "" mysqlDF.collect().foreach(rdd => { str = str+rdd.get(0).toString+":"+rdd.get(1).toString val id = rdd }) print(str) mysqlDF.show() spark.stop() } } ``` #### 6.1.2 向MySQL数据库写数据 SparkSQL不进能够查询MySQL数据库中的数据,还可以向表中插入新的数据,实现方式的具体代码如下: 文件名:`SparkSqlToMysql.scala` ```scala import java.util.Properties import org.apache.calcite.avatica.ColumnMetaData.StructType import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StructField} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} //创建样例类Student case class Student(id:Int,name:String,age:Int) object SparkSqlToMysql { def main(args: Array[String]): Unit = { //1.创建sparkSession对象 val spark: SparkSession = SparkSession.builder() .appName("SparkSqlToMysql") .master("local[2]") .getOrCreate() //2.读取数据 val data: RDD[String] = spark.sparkContext.textFile("hdfs://hadoop01:9000/spark/person.txt") //3.切分每一行, val arrRDD: RDD[Array[String]] = data.map(_.split(" ")) //4.RDD关联Student val studentRDD: RDD[Student] = arrRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt)) //导入隐式转换 import spark.implicits._ //5.将RDD转换成DataFrame val studentDF: DataFrame = studentRDD.toDF() //6.将DataFrame注册成表 studentDF.createOrReplaceTempView("student") //7.操作student表 ,按照年龄进行降序排列 val resultDF: DataFrame = spark.sql("select * from student order by age asc") resultDF.show() //8.把结果保存在mysql表中 //创建Properties对象,配置连接mysql的用户名和密码 val prop =new Properties() prop.setProperty("user","root") prop.setProperty("password","你数据库的密码") //resultDF.write.jdbc("jdbc:mysql://192.168.121.134:3306/spark","student",prop) //写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错 resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop01:3306/spark?useUnicode=true&characterEncoding=utf8","student",prop) spark.stop() } } ``` ### 6.2 Spark SQL操作Hive #### 6.2.1 同步配置文件 #### 6.2.2 启动MetaStore服务 #### 6.2.3 参考课本 ## 7 参考文献 [1] [Spark入门: Spark SQL简介_厦大数据库实验室博客](http://dblab.xmu.edu.cn/blog/1025-2/) 这里特别感谢厦门大学数据库实验室的林子雨老师授权我摘录部分内容在此实验中。