2 Star 0 Fork 0

mirrors_datastax/spark-cassandra-connector

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
8_streaming.md 3.30 KB
一键复制 编辑 原始数据 按行查看 历史

Documentation

Spark Streaming with Cassandra

Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra.

For our initial streaming release, due to time constraints, we started with Akka integration of Spark Streaming. However coming soon is Kafka, ZeroMQ, then Twitter Streaming.

The Basic Idea

Spark Streaming

Here is a basic Spark Streaming sample which writes to the console with wordCounts.print():

Create a StreamingContext with a SparkConf configuration

    val ssc = new StreamingContext(sparkConf, Seconds(1))

Create a DStream that will connect to serverIP:serverPort

    val lines = ssc.socketTextStream(serverIP, serverPort)

Count each word in each batch

    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

Print a few of the counts to the console. Start the computation.

    wordCounts.print()
    ssc.start()  
    ssc.awaitTermination() // Wait for the computation to terminate

Spark Streaming With Cassandra

Now let's add the Cassandra-specific functions on the StreamingContext and RDD into scope, and we simply replace the print to console with pipe the output to Cassandra:

    import com.datastax.spark.connector.streaming._
    wordCounts.saveToCassandra("streaming_test", "words")

Setting up Streaming

Follow the directions for creating a SparkConf

Create A StreamingContext

The second required parameter is the batchDuration which sets the interval streaming data will be divided into batches: Note the Spark API provides a Milliseconds, Seconds, Minutes, all of which are accepted as this Duration. This Duration is not to be confused with the scala.concurrent.duration.Duration

    val ssc = new StreamingContext(conf, Seconds(n))

Enable Saving To Cassandra

Enable Cassandra-specific functions on the StreamingContext, DStream and RDD:

    import com.datastax.spark.connector.streaming._

Creating A Stream and Writing to Cassandra

Create any of the available or custom Spark streams. The connector supports Akka Actor streams so far, but will be supporting many more in the next release. You can extend the provided import com.datastax.spark.connector.streaming.TypedStreamingActor:

    val stream = ssc.actorStream[String](Props[TypedStreamingActor[String]], "stream", StorageLevel.MEMORY_AND_DISK)
Configure and start the computation.

Where streaming_test is the keyspace name and words is the table name:

Saving data:

    val wc = stream.flatMap(_.split("\\s+"))
        .map(x => (x, 1))
        .reduceByKey(_ + _)
        .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 

Start the computation:

    ssc.start()

For a more detailed description as well as tuning writes, see Saving Data to Cassandra.

Find out more

http://spark.apache.org/docs/latest/streaming-programming-guide.html

Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_datastax/spark-cassandra-connector.git
git@gitee.com:mirrors_datastax/spark-cassandra-connector.git
mirrors_datastax
spark-cassandra-connector
spark-cassandra-connector
b1.0

搜索帮助