Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储。
Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储。
Kafka 的核心功能如下:
Kafka 的设计目标:
Kafka 主要有以下发行版本:
Kafka 有以下重大版本:
下载最新的 Kafka 版本并解压到本地。
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0
注意:本地必须已安装 Java8
执行以下指令,保证所有服务按照正确的顺序启动:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话,并执行:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务成功启动,您就已经成功运行了一个基本的 kafka 环境。
Kafka 是一个分布式事件流处理平台,它可以让您通过各种机制读、写、存储并处理事件(events,也被称为记录或消息)
示例事件包括付款交易,手机的地理位置更新,运输订单,物联网设备或医疗设备的传感器测量等等。 这些事件被组织并存储在主题中(topics)。 简单来说,主题类似于文件系统中的文件夹,而事件是该文件夹中的文件。
因此,在您写入第一个事件之前,您必须先创建一个 Topic。执行以下指令:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
所有的 Kafka 命令行工具都有附加可选项:不加任何参数,运行 kafka-topics.sh
命令会显示使用信息。例如,会显示新 Topic 的分区数等细节。
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Kafka 客户端和 Kafka Broker 的通信是通过网络读写 Event。一旦收到信息,Broker 会将其以您需要的时间(甚至永久化)、容错化的方式存储。
执行 kafka-console-producer.sh
命令将 Event 写入 Topic。默认,您输入的任意行会作为独立 Event 写入 Topic:
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以通过
Ctrl-C
在任何时候中断kafka-console-producer.sh
执行 kafka-console-consumer.sh 以读取写入 Topic 中的 Event
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以通过
Ctrl-C
在任何时候中断kafka-console-consumer.sh
由于 Event 被持久化存储在 Kafka 中,因此您可以根据需要任意多次地读取它们。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松地验证这一点。
您可能有大量数据,存储在传统的关系数据库或消息队列系统中,并且有许多使用这些系统的应用程序。 通过 Kafka Connect,您可以将来自外部系统的数据持续地导入到 Kafka 中,反之亦然。 因此,将已有系统与 Kafka 集成非常容易。为了使此过程更加容易,有数百种此类连接器可供使用。
需要了解有关如何将数据导入和导出 Kafka 的更多信息,可以参考:Kafka Connect section 章节。
一旦将数据作为 Event 存储在 Kafka 中,就可以使用 Kafka Streams 的 Java / Scala 客户端。它允许您实现关键任务的实时应用程序和微服务,其中输入(和/或)输出数据存储在 Kafka Topic 中。
Kafka Streams 结合了 Kafka 客户端编写和部署标准 Java 和 Scala 应用程序的简便性,以及 Kafka 服务器集群技术的优势,使这些应用程序具有高度的可伸缩性、弹性、容错性和分布式。该库支持一次性处理,有状态的操作,以及聚合、窗口化化操作、join、基于事件时间的处理等等。
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));
Kafka Streams demo 和 app development tutorial 展示了如何从头到尾的编码并运行一个流式应用。
Ctrl-C
停止生产者和消费者客户端。Ctrl-C
停止 Kafka 代理。Ctrl-C
停止 ZooKeeper 服务器。如果您还想删除本地 Kafka 环境的所有数据,包括您在此过程中创建的所有事件,请执行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
Stream API 的 maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
其他 API 的 maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Kafka 有 5 个核心 API
代码如下,直接通过 send
方法来发送
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
代码如下,与“发送并忘记”的方式区别在于多了一个 get
方法,会一直阻塞等待 Broker
返回结果:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如记录错误或者成功日志。
首先,定义一个 callback
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
然后,使用这个 callback
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Kafka 生产者生产消息示例 生产者配置参考:https://kafka.apache.org/documentation/#producerconfigs
*/
public class ProducerDemo {
private static final String HOST = "localhost:9092";
public static void main(String[] args) {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
// 3. 使用 send 方法发送异步消息
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者
producer.close();
}
}
}
具体步骤如下:
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();
// 2.设置主题
consumer.subscribe(Arrays.asList(topic));
// 3.接受消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("customer Message---");
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
} finally {
// 4.关闭消息
consumer.close();
}
创建消费者的代码如下:
public Consumer buildCustomer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
return consumer;
}
分为订阅主题和指定分组两种方式:
Partition
。(1)订阅主题方式
consumer.subscribe(Arrays.asList(topic));
(2)独立消费者模式
通过 consumer 的 assign(Collection<TopicPartition> partitions)
方法来为消费者指定分区。
public void consumeMessageForIndependentConsumer(String topic){
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();
// 2.指定分区
// 2.1获取可用分区
List<PartitionInfo> partitionInfoList = buildCustomer().partitionsFor(topic);
// 2.2指定分区,这里是指定了所有分区,也可以指定个别的分区
if(null != partitionInfoList){
List<TopicPartition> partitions = Lists.newArrayList();
for(PartitionInfo partitionInfo : partitionInfoList){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(partitions);
}
// 3.接受消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("consume Message---");
for (ConsumerRecord<String, String> record : records) {
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
// 异步提交
consumer.commitAsync();
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。