# 基于Python Java Scala语言的MapReduce及Spark分词及词频统计效率对比 **Repository Path**: lovetoeatmeat/platform-technology-of-big-data ## Basic Information - **Project Name**: 基于Python Java Scala语言的MapReduce及Spark分词及词频统计效率对比 - **Description**: Platform Technology of Big Data - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-11-24 - **Last Updated**: 2024-02-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: Java, Python, Scala, IDEA ## README # 基于Python Java Scala语言的MapReduce及Spark分词及词频统计效率对比 #### 介绍 通过使用三种不同语言编写来编写分词及词频统计程序,比较在大数数据背景下,MapReduce和Spark对三种语言的适应性及其各自的效率对比;项目均采用IDEA+Maven进行构建,相关依赖均在对应pom.xml中给出; #### 软件架构 项目分为三个模块,分别用Java,Python,Scala编写逻辑相同的分词词频统计程序,比较其编写难度及运行效率。 三个模块分别为: 1. wordCountJava 2. wordCountPython 3. wordCountScala #### 主要函数截取 1. wordCountJava **public class WordCountJava {** **//main****函数** **public static void main(String[] args) {** **//****创建SparkConf对象,设置Spark应用的配置信息** **SparkConf conf = new SparkConf()** ​ **.setAppName("WordCountJava")//****设置程序名称** ​ **.setMaster("local");//****本地运行** **//****创建JavaSparkContext对象,将之前设置的配置信息作为SparkContext的参数传入** **JavaSparkContext sc = new JavaSparkContext(conf);** **//Java****中创建的普通RDD叫做JavaRDD,创建时将所要处理的文件路径作为参数传入** **JavaRDD lines = sc.textFile("hdfs://localhost:9000/dataset/example.txt");** **//****传递给flatMap函数一个匿名内部类的实例,将每一行拆分成单个的单词** **JavaRDD words = lines.flatMap(new FlatMapFunction() {** ​ **public Iterator call(String line) throws Exception {** ​ **return Arrays.asList(line.split(" ")).iterator();//****返回拆分后形成的单词数组** ​ **}** **});** **//****传递给mapToPair一个匿名内部类的实例,将每一个单词映射为(单词, 1)格式** **JavaPairRDD pairs = words.mapToPair(** ​ **new PairFunction() {** ​ **public Tuple2 call(String word) throws Exception {** ​ **return new Tuple2(word, 1);//****返回map处理后形成的(key,value)类型的键值对** ​ **}** **});** **//****传递给reduceByKey一个匿名内部类的实例,将相同key的两个值进行相加** **JavaPairRDD wordCounts = pairs.reduceByKey(** ​ **new Function2() {** ​ **public Integer call(Integer v1, Integer v2) throws Exception {** ​ **return v1 + v2;** ​ **}//****返回加和后的结果,即该单词词频** **});** **//****传递给foreach函数一个匿名内部类的实例,该实例主要负责打印输出** **wordCounts.foreach(new VoidFunction>() {** ​ **public void call(Tuple2 wordCount) throws Exception {** ​ **//Tuple2****是Scala中的元组** ​ **System.out.println(wordCount._1 + " " + wordCount._2);//****逐行打印输出单词和单词出现的次数** ​ **}** **});** **sc.close();** **}** **}** ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-1.1.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-1.2.jpg) 2. wordCountPython **if __name__ == "__main__":** **conf = SparkConf().setAppName("tfidf")** **sc = SparkContext(conf=conf)** **#****示例文档数据,每个文档是一个单词列表** **documents_list=[["hello","world","china","good","spark","good"],** ​ **["hello","china","china","great","love","china"],** ​ **["love","spark","spark","good","hello","spark"]]** **#****创建RDD并进行缓存** **tokenized_document_rdd=sc.parallelize(documents_list).cache()** **print "\**\**\**\**\**\**\**\**\**\**\**\**\**\* compute idf\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**"** **#****这个阶段的主要操作是计算单词的idf值** **#****获取文档的个数用来计算逆文档频率** **num_document=tokenized_document_rdd.count()** **#****计算每个单词的文档支持度** **#****实现思路是,针对每个文本文档,通过将单词列表转成set来获取每个文档中出现的单词,然后** **#****通过flatMap操作,将每个文档出现的单词合并成一个新的集合。在新的集合中,一个单词出现** **#****的次数即是其文档支持度。因此,我们可以在flatMap操作之后应用map和reducebykey操作来统** **#****计每个单词的文档支持度。** **words_df_rdd=tokenized_document_rdd.flatMap(lambda words_list:word_contains(words_list)) \** **.map(lambda word:(word,1)) \** **.reduceByKey(lambda a,b:a+b)** **#****根据单词的文档频率和文档的总数计算每个单词的idf** **# computeIDF****函数实现的是具体计算idf的值** **words_idf_rdd=words_df_rdd.map(lambda word_df_tuple:** ​ **computeIDF(word_df_tuple, num_document))** **print "\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\* compute tf \**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\*"** **#****计算每个文本中每个单词出现的频次,进而计算tf值** **#****返回包含所有单词的列表** **#flatMap****是将所有文档中的单词合并成一个大的列表,distinct是将列表中重复的单词去除** **all_words_list= tokenized_document_rdd.flatMap(lambda words_list:words_list) \** **.distinct() \** **.collect()** **#****考虑到单词可能很多,我们将包含所有单词的all_words_list变量做出广播变量,使得一个executor** **#****上的多个Task可以共享该变量** **all_words_broadcast=sc.broadcast(all_words_list)** **#****计算单词的tf,得到文档的tf向量** **document_tf_rdd= tokenized_document_rdd.map(lambda words_list:** ​ **computeTF(words_list, all_words_broadcast.value))** **print "\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\* compute tfidf\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\*"** **#****提取从rdd中提取每个单词的idf值,并将提取的列表变量转成字典变量,进而转成广播变量,以** **#****供发送给各个executor计算每个文档中每个单词的tfidf值** **words_idf_list= words_idf_rdd.collect()** **words_idf_dic={}** **for item in words_idf_list:#****将单词的idf值列表转为字典易于获取每个单词的idf值** **words_idf_dic[item[0]]=item[1]** **words_idf_broadcast=sc.broadcast(words_idf_dic)** **#****计算每个文本中每个单词的tfidf值** **document_tfidf_rdd= document_tf_rdd.map(lambda words_tf_list:computeTFIDF(words_tf_list,** ​ **words_idf_broadcast.value,all_words_broadcast.value))** **#****将每个文本对应的列表向量进行归一化** **normalized_document_tfidf_rdd= document_tfidf_rdd.map(lambda tfidf_vector:** ​ **nomoralize(tfidf_vector))** **print "\**\**\**\**\**\**\**\**\**\**\**\**\** print tfidf vectors\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\**\*"** **#****打印输出每个tfidf向量** **tfidf_vectors= normalized_document_tfidf_rdd.collect()** **for item in tfidf_vectors:** **print item** ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-2.1.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-2.2.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-2.3.jpg) 3. wordCountScala **//****构建单例对象wordCountScala** **object wordCountScala {** **def main(args:Array[String]): Unit ={** **//****配置相关参数** **val conf=new SparkConf()** **.setMaster("local")//****设置为本地模式** **.setAppName("wordCountScala")//****设置程序名称** **val sc=new SparkContext(conf)//****生成SparkContext对象** **val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/localReceive/week2/week2Input.txt")//****从hdfs中读取需要进行处理的文件” week2Input.txt”并创建RDD** **val wordCount=textFile.flatMap(line=>line.split(" "))//****对创建好的RDD逐行进行split操作,并进一步对各行生成的Array数组进行整合,使该RDD只含一个RDD元素,即所有单词构成的Array数组** **.map((_,1))//****对flatMap操作后的RDD进行map操作,使其转化为(_,1)型的键值对** **.reduceByKey((a,b)=>a+b)//****对map操作后的RDD进行加和操作,计算每个单词出现的次数,即该单词词频** **.collect()//****将分布运算于各WorkNode节点上的运算结果收集到当前程序执行所在的位置** **.foreach(println)//****对收集到的运算结果(RDD)进行遍历操作,并打印输出** **}** **}** ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-3.1.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-3.2.jpg) #### 项目比较结果 1. 三种语言中Scala语言对大数据处理条件更适配,具有最高的运行效率,其次是Java语言,Python语言由于插件不够成熟,略低于前两种语言。 2. Scala模块运行结果 ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-3.3.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-3.4.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-3.5.jpg) 3. Java模块运行结果 ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-1.3.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-1.4.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-1.5.jpg) 4. Python模块运行结果 ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-2.4.jpg) ![img](https://treathy.com/wp-content/uploads/2023/12/Platform-Technology-of-Big-Data-2.5.jpg)