This project contains code examples that demonstrate how to implement real-time applications and event-driven microservices using the Streams API of Apache Kafka aka Kafka Streams.
For more information take a look at the latest Confluent documentation on the Kafka Streams API, notably the Developer Guide.
Table of Contents
This repository has several branches to help you find the correct code examples for the version of Apache Kafka and/or Confluent Platform that you are using. See Version Compatibility Matrix below for details.
There are two kinds of examples:
Note: We use the label "Lambda" to denote examples that make use of lambda expressions and thus require Java 8+.
reduce
, using the Kafka Streams DSLKStream
and a KTable
, i.e. an example of a stateful computation
KStream#transform()
and
KStream#process()
, which allow you to include custom Transformer
and Processor
implementations, respectively,
within topologies defined via the DSLbin/kafka-streams-application-reset
)KStream
and GlobalKTable
.We also provide several integration tests, which demonstrate end-to-end data pipelines. Here, we spawn embedded Kafka clusters and the Confluent Schema Registry, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client).
Tip: Run
mvn test
to launch the integration tests.
We also provide several integration tests, which demonstrate end-to-end data pipelines. Here, we spawn embedded Kafka clusters and the Confluent Schema Registry, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client).
Tip: Run
mvn test
to launch the integration tests.
The code in this repository requires Apache Kafka 0.10+ because from this point onwards Kafka includes its Kafka Streams library. See Version Compatibility Matrix for further details, as different branches of this repository may have different Kafka requirements.
For the
master
branch: To build a development version, you typically need the latesttrunk
version of Apache Kafka (cf.kafka.version
in pom.xml for details). The following instructions will build and locally install the latesttrunk
Kafka version:
$ git clone git@github.com:apache/kafka.git $ cd kafka $ git checkout trunk # Bootstrap gradle wrapper $ gradle # Now build and install Kafka locally $ ./gradlew clean installAll
The code in this repository requires Confluent Schema Registry. And to build Confluent Schema Registry in its development version, further dependencies of Confluent Platform are needed (e.g. Confluent Common and Confluent Rest Utils, please read its own README file for details). See Version Compatibility Matrix for further details, as different branches of this repository may have different Confluent Platform requirements.
For the
master
branch: To build a development version, you typically need the latestmaster
version of Confluent Platform's Schema Registry (cf.confluent.version
in pom.xml for details). The following instructions will build and locally install the latestmaster
Schema Registry version:
$ git clone https://github.com/confluentinc/common.git $ cd common $ git checkout master # Build and install common locally $ mvn -DskipTests=true clean install $ git clone https://github.com/confluentinc/rest-utils.git $ cd rest-utils $ git checkout master # Build and install rest-utils locally $ mvn -DskipTests=true clean install $ git clone https://github.com/confluentinc/schema-registry.git $ cd schema-registry $ git checkout master # Now build and install schema-registry locally $ mvn -DskipTests=true clean install
Also, each example states its exact requirements at the very top.
Some code examples require Java 8, primarily because of the usage of lambda expressions.
IntelliJ IDEA users:
Scala is required only for the Scala examples in this repository. If you are a Java developer you can safely ignore this section.
If you want to experiment with the Scala examples in this repository, you need a version of Scala that supports Java 8
and SAM / Java lambda (e.g. Scala 2.11 with -Xexperimental
compiler flag, or 2.12).
Tip: If you only want to run the integration tests (
mvn test
), then you do not need to package or install anything -- just runmvn test
. The instructions below are only needed if you want to interactively test-drive the examples under src/main/.
The first step is to install and run a Kafka cluster, which must consist of at least one Kafka broker as well as at least one ZooKeeper instance. Some examples may also require a running instance of Confluent schema registry. The Confluent Platform Quickstart guide provides the full details.
In a nutshell:
# Start ZooKeeper
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
# In a separate terminal, start Kafka broker
$ ./bin/kafka-server-start ./etc/kafka/server.properties
# In a separate terminal, start Confluent schema registry
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
# Again, please refer to the Confluent Platform Quickstart for details such as
# how to download Confluent Platform, how to stop the above three services, etc.
Tip: You can also run
mvn test
, which executes the included integration tests. These tests spawn embedded Kafka clusters to showcase the Kafka Streams functionality end-to-end. The benefit of the integration tests is that you don't need to install and run a Kafka cluster yourself.
If you want to run the examples against a Kafka cluster, you may want to create a standalone jar ("fat jar") of the Kafka Streams examples via:
# Create a standalone jar
#
# Tip: You can also disable the test suite (e.g. to speed up the packaging
# or to lower JVM memory usage) if needed:
#
# $ mvn -DskipTests=true clean package
#
$ mvn clean package
# >>> Creates target/kafka-streams-examples-4.0.0-SNAPSHOT-standalone.jar
You can now run the example applications as follows:
# Run an example application from the standalone jar.
# Here: `WordCountLambdaExample`
$ java -cp target/kafka-streams-examples-4.0.0-SNAPSHOT-standalone.jar \
io.confluent.examples.streams.WordCountLambdaExample
The application will try to read from the specified input topic (in the above example it is TextLinesTopic
),
execute the processing logic, and then try to write back to the specified output topic (in the above example it is WordsWithCountsTopic
).
In order to observe the expected output stream, you will need to start a console producer to send messages into the input topic
and start a console consumer to continuously read from the output topic. More details in how to run the examples can be found
in the java docs of each example code.
If you want to turn on log4j while running your example application, you can edit the log4j.properties file and then execute as follows:
# Run an example application from the standalone jar.
# Here: `WordCountLambdaExample`
$ java -cp target/kafka-streams-examples-4.0.0-SNAPSHOT-standalone.jar \
-Dlog4j.configuration=file:src/main/resources/log4j.properties \
io.confluent.examples.streams.WordCountLambdaExample
Keep in mind that the machine on which you run the command above must have access to the Kafka/ZK clusters you
configured in the code examples. By default, the code examples assume the Kafka cluster is accessible via
localhost:9092
(aka Kafka's bootstrap.servers
parameter) and the ZooKeeper ensemble via localhost:2181
.
You can override the default bootstrap.servers
parameter through a command line argument.
This project uses the standard maven lifecycle and commands such as:
$ mvn compile # This also generates Java classes from the Avro schemas
$ mvn test # Runs unit and integration tests
Branch (this repo) | Apache Kafka | Confluent Platform | Notes |
---|---|---|---|
master | 1.0.0-SNAPSHOT | 4.0.0-SNAPSHOT | You must manually build the trunk version of Apache Kafka and the master version of Confluent Platform. See instructions above. |
3.3.x | 0.11.0.1-SNAPSHOT | 3.3.1-SNAPSHOT | You must manually build the 0.11.0 version of Apache Kafka and the 3.3.x version of Confluent Platform. See instructions above. |
3.3.0-post | 0.11.0.0(-cp1) | 3.3.0 | Works out of the box |
The master
branch of this repository represents active development, and may require additional steps on your side to
make it compile. Check this README as well as pom.xml for any such information.
This example launches:
The Kafka Music application demonstrates how to build of a simple music charts application that continuously computes, in real-time, the latest charts such as Top 5 songs per music genre. It exposes its latest processing results -- the latest charts -- via Kafka’s Interactive Queries feature and a REST API. The application's input data is in Avro format and comes from two sources: a stream of play events (think: "song X was played") and a stream of song metadata ("song X was written by artist Y").
More specifically, we will run the following services:
You can find detailed documentation at http://docs.confluent.io/current/streams/kafka-streams-examples/docs/index.html
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。