概观
Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP套接字)中提取,并且可以使用以高级函数表示的复杂算法进行处理map
,例如reduce
,join
和window
。最后,处理后的数据可以推送到文件系统,数据库和实时仪表。
在内部,它的工作原理如下。SparkStreaming 接收实时输入数据流并将数据分成批处理,然后由 Spark 引擎处理以批量生成最终结果流。
Spark Streaming 提供成为离散流或DStream的高级抽象,表示连续的数据流。DStream可以来自于 Kafka, Flume等源的输入数据流的创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示一系列RDD
实例 – WordCount
首先,我们将Spark Streaming类的名称和StreamingContext中的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)。StreamingContext是所有流功能的主要入口点。我们使用两个执行线程创建一个本地StreamingContext,批处理间隔为1秒。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
使用此上下文,我们可以创建一个DStream来表示来自TCP源的流数据,指定为主机名(例如localhost
)和端口(例如8080
)。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop111", 8080)
此lines
DStream表示将从数据服务器接收的数据流。此DStream中的每条记录都是一行文本。接下来,我们希望将空格字符分割为单词。
// Split each line into words
val words = lines.flatMap(_.split(" "))
//val words = lines.flatMap(a => a.split(" "))
flatMap
是一对多DStream操作,它通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每行将被分成多个单词,单词流表示为 words
DStream。接下来,我们要计算这些单词。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
所述words
DSTREAM被进一步映射(一到一个变换)到一个DSTREAM (word, 1)
对,然后将其还原得到的单词的频率数据中的每一批。最后,wordCounts.print()
将打印每秒生成的一些计数。
请注意,执行这些行时,Spark Streaming仅设置它在启动时将执行的计算,并且尚未启动实际处理。要在设置完所有转换后开始处理,我们最终调用
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
$ nc -lk 9999
完整代码
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
/**
* 创建 StreamingContext
* 1:conf
* 2:时间间隔
*/
val ssc = new StreamingContext(conf,Seconds(2))
/**
* 读取 socket 信息
*/
val inputDStream = ssc.socketTextStream("hadoop111",8080)
val words = inputDStream.flatMap(a => a.split(" "))
val word = words.map(a => (a,1))
val count = word.reduceByKey((a,b) => a+ b)
count.print()
ssc.start() // 开始
ssc.awaitTermination() // 等待结束
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。