Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra.
For our initial streaming release, due to time constraints, we started with Akka integration of Spark Streaming. However coming soon is Kafka, ZeroMQ, then Twitter Streaming.
Here is a basic Spark Streaming sample which writes to the console with wordCounts.print()
:
Create a StreamingContext with a SparkConf configuration
val ssc = new StreamingContext(sparkConf, Seconds(1))
Create a DStream that will connect to serverIP:serverPort
val lines = ssc.socketTextStream(serverIP, serverPort)
Count each word in each batch
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Print a few of the counts to the console. Start the computation.
wordCounts.print()
ssc.start()
ssc.awaitTermination() // Wait for the computation to terminate
Now let's add the Cassandra-specific functions on the StreamingContext
and RDD
into scope,
and we simply replace the print to console with pipe the output to Cassandra:
import com.datastax.spark.connector.streaming._
wordCounts.saveToCassandra("streaming_test", "words")
Follow the directions for creating a SparkConf
StreamingContext
The second required parameter is the batchDuration
which sets the interval streaming data will be divided into batches:
Note the Spark API provides a Milliseconds, Seconds, Minutes, all of which are accepted as this Duration
.
This Duration
is not to be confused with the scala.concurrent.duration.Duration
val ssc = new StreamingContext(conf, Seconds(n))
Enable Cassandra-specific functions on the StreamingContext
, DStream
and RDD
:
import com.datastax.spark.connector.streaming._
Create any of the available or custom Spark streams. The connector supports Akka Actor streams so far, but
will be supporting many more in the next release. You can extend the provided import com.datastax.spark.connector.streaming.TypedStreamingActor
:
val stream = ssc.actorStream[String](Props[TypedStreamingActor[String]], "stream", StorageLevel.MEMORY_AND_DISK)
Where streaming_test
is the keyspace name and words
is the table name:
Saving data:
val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("streaming_test", "words", SomeColumns("word", "count"))
Start the computation:
ssc.start()
For a more detailed description as well as tuning writes, see Saving Data to Cassandra.
http://spark.apache.org/docs/latest/streaming-programming-guide.html
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。