# kafkaStream **Repository Path**: cy_bupt/kafka-stream ## Basic Information - **Project Name**: kafkaStream - **Description**: 利用protobuf和kafka实现行情回放功能 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-08-16 - **Last Updated**: 2025-04-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # kafka stream 官方实战及踩坑 1. 安装kafka及maven环境 ``` wget https://downloads.apache.org/kafka/3.2.1/kafka_2.12-3.2.1.tgz tar -xzf kafka_2.12-3.2.1.tgz wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz tar xzvf apache-maven-3.8.6-bin.tar.gz PATH=/home/apache-maven-3.8.6/bin:$PATH ``` 2. 使用maven构建项目 ``` mvn archetype:generate \ -DarchetypeGroupId=org.apache.kafka \ -DarchetypeArtifactId=streams-quickstart-java \ -DarchetypeVersion=3.2.1 \ -DgroupId=streams.examples \ -DartifactId=streams.examples \ -Dversion=0.1 \ -Dpackage=myapps ``` 3. 编译项目 ``` mvn compile # mvn package ``` # 环境搭建 因为没有环境,因此使用docker搭建,但是docker在windows上的网络非常操蛋,还是直接裸机搭建 ``` bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties ``` # 安装protobuf ```shell wget https://github.com/protocolbuffers/protobuf/releases/download/v21.5/protobuf-all-21.5.tar.gz tar -xzf protobuf-all-21.5.tar.gz cd protobuf-21.0 ./configure make ``` # 测试流程 ``` # topic创建 bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact # 开启生产者客户端 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input # 开启消费者客户端 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer # 开启word计数stream bin/kafka-run-class.sh org.apache.kafka.streams.examples.myapps.WordCountDemo # 通过客户端输入输出测试即可 ``` 不同证券的数据 一定时间内的平均价格和涨跌幅