1 Star 1 Fork 0

洁悫. / SparkStreamingDemo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Spark Streaming

  1. 概观

    Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP套接字)中提取,并且可以使用以高级函数表示的复杂算法进行处理map,例如reducejoinwindow。最后,处理后的数据可以推送到文件系统,数据库和实时仪表。

    1565163824(1)

    在内部,它的工作原理如下。SparkStreaming 接收实时输入数据流并将数据分成批处理,然后由 Spark 引擎处理以批量生成最终结果流。

    1565163837(1)

    Spark Streaming 提供成为离散流或DStream的高级抽象,表示连续的数据流。DStream可以来自于 Kafka, Flume等源的输入数据流的创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示一系列RDD

  2. 实例 – WordCount

    首先,我们将Spark Streaming类的名称和StreamingContext中的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)。StreamingContext是所有流功能的主要入口点。我们使用两个执行线程创建一个本地StreamingContext,批处理间隔为1秒。

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    
    // Create a local StreamingContext with two working thread and batch interval of 1 second.
    // The master requires 2 cores to prevent a starvation scenario.
    
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    使用此上下文,我们可以创建一个DStream来表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如8080)。

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop111", 8080)

    linesDStream表示将从数据服务器接收的数据流。此DStream中的每条记录都是一行文本。接下来,我们希望将空格字符分割为单词。

    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    //val words = lines.flatMap(a => a.split(" "))

    flatMap是一对多DStream操作,它通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每行将被分成多个单词,单词流表示为 wordsDStream。接下来,我们要计算这些单词。

    import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    所述wordsDSTREAM被进一步映射(一到一个变换)到一个DSTREAM (word, 1)对,然后将其还原得到的单词的频率数据中的每一批。最后,wordCounts.print()将打印每秒生成的一些计数。

    请注意,执行这些行时,Spark Streaming仅设置它在启动时将执行的计算,并且尚未启动实际处理。要在设置完所有转换后开始处理,我们最终调用

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    $ nc -lk 9999

    完整代码

    def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    
        /**
          *  创建 StreamingContext
          *  1:conf
          *  2:时间间隔
          */
        val ssc = new StreamingContext(conf,Seconds(2))
        /**
          *  读取 socket 信息
          */
        val inputDStream = ssc.socketTextStream("hadoop111",8080)
        val words = inputDStream.flatMap(a => a.split(" "))
        val word = words.map(a => (a,1))
        val count = word.reduceByKey((a,b) => a+ b)
        count.print()
        ssc.start() // 开始
        ssc.awaitTermination() // 等待结束
      }

空文件

简介

Spark Streaming学习 展开 收起
Scala
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Scala
1
https://gitee.com/stg20150529/SparkStreamingDemo.git
git@gitee.com:stg20150529/SparkStreamingDemo.git
stg20150529
SparkStreamingDemo
SparkStreamingDemo
master

搜索帮助