代码拉取完成,页面将自动刷新
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams
import java.lang.{Long => JLong}
import java.util.Properties
import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.{KStream, KTable, Produced}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}
import org.apache.kafka.test.TestUtils
import org.assertj.core.api.Assertions.assertThat
import org.junit._
import org.scalatest.junit.AssertionsForJUnit
/**
* End-to-end integration test based on [[WordCountLambdaExample]], using an embedded Kafka cluster.
*
* See [[WordCountLambdaExample]] for further documentation.
*
* See [[WordCountLambdaIntegrationTest]] for the equivalent Java example.
*
* Note: We intentionally use JUnit4 (wrapped by ScalaTest) for implementing this Scala integration
* test so it is easier to compare this Scala code with the equivalent Java code at
* StreamToTableJoinIntegrationTest. One difference is that, to simplify the Scala/Junit integration, we
* switched from BeforeClass (which must be `static`) to Before as well as from @ClassRule (which
* must be `static` and `public`) to a workaround combination of `@Rule def` and a `private val`.
*/
class WordCountScalaIntegrationTest extends AssertionsForJUnit {
private val privateCluster: EmbeddedSingleNodeKafkaCluster = new EmbeddedSingleNodeKafkaCluster
@Rule def cluster: EmbeddedSingleNodeKafkaCluster = privateCluster
private val inputTopic = "inputTopic"
private val outputTopic = "output-topic"
@Before
def startKafkaCluster() {
cluster.createTopic(inputTopic)
cluster.createTopic(outputTopic)
}
@Test
def shouldCountWords() {
// To convert between Scala's `Tuple2` and Streams' `KeyValue`.
import KeyValueImplicits._
val inputTextLines: Seq[String] = Seq(
"Hello Kafka Streams",
"All streams lead to Kafka",
"Join Kafka Summit"
)
val expectedWordCounts: Seq[KeyValue[String, Long]] = Seq(
("hello", 1L),
("all", 1L),
("streams", 2L),
("lead", 1L),
("to", 1L),
("join", 1L),
("kafka", 3L),
("summit", 1L)
)
//
// Step 1: Configure and start the processor topology.
//
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-scala-integration-test")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
// The commit interval for flushing records to state stores and downstream must be lower than
// this integration test's timeout (30 secs) to ensure we observe the expected processing results.
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// Use a temporary directory for storing state, which will be automatically removed after the test.
p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath)
p
}
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[JLong] = Serdes.Long()
val builder: StreamsBuilder = new StreamsBuilder()
// Construct a `KStream` from the input topic, where message values represent lines of text (for
// the sake of this example, we ignore whatever may be stored in the message keys).
val textLines: KStream[String, String] = builder.stream(inputTopic)
// Scala-Java interoperability: to convert `scala.collection.Iterable` to `java.util.Iterable`
// in `flatMapValues()` below.
import collection.JavaConverters.asJavaIterableConverter
val wordCounts: KTable[String, JLong] = textLines
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava)
// no need to specify explicit serdes because the resulting key and value types match our default serde settings
.groupBy((_, word) => word)
.count()
wordCounts.toStream.to(outputTopic, Produced.`with`(stringSerde, longSerde))
val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()
//
// Step 2: Publish some input text lines.
//
val producerConfig: Properties = {
val p = new Properties()
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, "0")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p
}
import collection.JavaConverters._
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputTextLines.asJava, producerConfig)
//
// Step 3: Verify the application's output data.
//
val consumerConfig = {
val p = new Properties()
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-scala-integration-test-standard-consumer")
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
p
}
val actualWordCounts: java.util.List[KeyValue[String, Long]] =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size)
streams.close()
assertThat(actualWordCounts).containsExactlyElementsOf(expectedWordCounts.asJava)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。