# action-kafka **Repository Path**: forward-seen/action-kafka ## Basic Information - **Project Name**: action-kafka - **Description**: Kafka入门使用与实践代码整理,docker部署、kafka集群搭建 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-03-06 - **Last Updated**: 2025-04-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Kafka说明文档 ## 基础 ### 资料 **精简讲解视频介绍** https://www.douyin.com/user/self/search/kafka?aid=97cb914c-ce67-4d32-9bc2-6357429112ff&modal_id=7363997422499941666&type=general https://www.douyin.com/user/self/search/kafka?aid=97cb914c-ce67-4d32-9bc2-6357429112ff&modal_id=7398695641389501736&type=general **官方文档** https://kafka.apache.org/39/documentation.html **开源地址** https://github.com/apache/kafka **参考文章** https://lionli.blog.csdn.net/article/details/125743132 https://plus-doc.dromara.org/#/ruoyi-cloud-plus/extend-function/kafka https://lionli.blog.csdn.net/article/details/125855550 ### 介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 Kafka 有如下特性: - 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。 - 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。 - 同时支持离线数据处理和实时数据处理。 - Scale out:支持在线水平扩展。 ### 术语 - Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。 - Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic 即可生产或消费数据而不必关心数据存于何处) - Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。 - Producer:负责发布消息到Kafka broker。 - Consumer:消息消费者,向Kafka broker读取消息的客户端。 - Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。 ### 核心API kafka有四个核心API - 应用程序使用producer API发布消息到1个或多个topic中。 - 应用程序使用consumer API来订阅一个或多个topic,并处理产生的消息。 - 应用程序使用streams API充当一个流处理器,从1个或多个topic消费输入流,并产生一个输出流到1个或多个topic,有效地将输入流转换到输出流。 - connector API允许构建或运行可重复使用的生产者或消费者,将topic链接到现有的应用程序或数据系统。 示例图如下: ![img_1.png](static/img/img_1.png) ### 应用场景 - 构建可在系统或应用程序之间可靠获取数据的实时流数据管道。 - 构建实时流应用程序,可以转换或响应数据流。 ## 部署 ### 基于Zookeeper Zookeeper 服务通过 bitnami/zookeeper 镜像提供。 配置了与 Zookeeper 连接的设置(KAFKA_CFG_ZOOKEEPER_CONNECT: 127.0.0.1:2181)。 Kafka Manager 用来管理 Kafka 集群,也依赖于 Zookeeper 连接。 ```bash mkdir -p /docker/kafka/data chmod -R 777 /docker/kafka docker-compose up -d zookeeper kafka kafka-manager ``` ### KRaft 模式(推荐) Kafka的Kraft模式简单来说就是基于raft协议重新实现了zookeeper的功能。传统的zookeeper集群已经被标记为弃用,将在kafka4.0中完全移除。由于去掉了zk组件,部署也简化了不少。我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.9.0来搭建集群。 基于KRaft 模式搭建集群环境 ```bash # 创建数据目录 mkdir -p /docker/kafka/{data1,data2,data3} chmod -R 777 /docker/kafka # 创建局域网 docker network create kafka docker network ls # 将docker-compose文件放到该目录 /docker/kafka/kraft/docker-compose.yml # 根据网络环境修改IP配置 # 运行 cd /docker/kafka/kraft docker-compose up -d ``` ### KRaft 模式2 #### Kafka集群 Kraft模式集群由两种角色的节点组成,分别是broker和controller角色。角色类型在节点启动时通过process.roles配置参数指定,允许指定broker或controller,或者同时指定二者,可就是一个节点兼顾两种角色。 1. broker类型节点的职责是为客户端提供服务,即接收生产者消息,并推送给消费者。还可以与其它broker之间同步副本消息。broker内就是我们熟悉的topic分区副本等结构了。 2. controller类型的节点可以参与Leader选举,有投票权和被投票权,只有被选举称为Leader节点时才能为集群提供服务。一个集群中只能有一个Leader节点,如果出现多个Leader 的异常情况通常被称为脑裂,这不是我们关注的主题。Leader节点负责维护整个集群的元数据与调度broker协同工作,例如创建删除topic、分区重分配、preferred leader选举、topic分区拓展、broker上线下线等。 Kafka集群带来的好处是允许横向扩展broker节点,并且在线就可以完成扩展。 #### 集群拓扑图 我们在宿主机上搭建5个节点的Kafka集群,集群由两个[broker]节点,两个[broker,controller]节点和一个[controller]节点组成,并且使用Docker容器来部署这五个节点。 ![img.png](static/img/img.png) 我们规划了三种流量路径: 1. 外部流量 EXTERNAL://:19092,允许外部客户端(生产者、消费者、集群管理工具都算是外部客户端)连接到集群。我们将容器中的端口映射出来,由于集群部署在单机单网卡电脑上,所有端口不允许重复,我们暴漏出4各端口:19092 、29092、39092、49092 2. 控制流量 CONTROLLER://:9093,我们为控制节点间的通信设置了单独的端口9093,以防数据流量过载造成控制信息下发延迟或失败。 3. 内部流量 INTERNAL://:9092,broker之间同步副本数据走内部端口9092。 #### 实验软件与版本说明 kafka: 3.9.0 docker: 25.0.1 docker compose: v2.24.2 ```bash # 创建网络 docker network create kfk-network # 拉取镜像 docker compose pull # 运行容器 docker compose up -d ``` 这个 docker-compose.yaml 文件定义了一个 Kafka 集群,并且包括了 控制器节点(controller nodes)和 代理节点(broker nodes)的配置。 #### 架构说明 Kafka 集群的架构通常包括以下几个主要角色: - Controller(控制器节点): 控制 Kafka 集群的元数据,负责管理分区的领导选举、数据分配等任务。 - Broker(代理节点): 实际存储数据并提供消息发布和消费功能的节点。 - Kafdrop: 一个 Kafka 集群的监控和可视化工具,帮助查看消息和主题的状态。 这个 docker-compose.yaml 配置的集群包括: - 3 个控制器节点(controller-1, controller-2, controller-3) - 2 个代理节点(broker-1, broker-2) - 1 个监控服务 #### 配置项解读 控制器节点 (controller-1, controller-2, controller-3) 控制器节点负责 Kafka 集群的元数据管理、分区领导者的选举等工作。Kafka 需要一个控制器来保证集群的一致性。 - KAFKA_NODE_ID: Kafka 节点的 ID,用来区分不同的节点。控制器节点的 ID 分别为 1、2、3。 - KAFKA_PROCESS_ROLES: 定义节点的角色。在控制器节点中,这个值设置为 controller,表示该节点是控制器角色。 - KAFKA_LISTENERS: 定义节点的监听器。这里设置为 CONTROLLER://:9093,表示该节点监听控制器通信的端口为 9093。 - KAFKA_CONTROLLER_LISTENER_NAMES: 定义用于控制器通信的监听器名称,通常是 CONTROLLER。 - KAFKA_INTER_BROKER_LISTENER_NAME: 定义代理节点之间通信使用的监听器名称。通常设为 INTERNAL,用于内部节点间的通信。 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 配置不同类型监听器使用的安全协议。这里使用 PLAINTEXT,表示不加密通信。 - KAFKA_CONTROLLER_QUORUM_VOTERS: 这项配置用于控制器选举,指定了参与选举的控制器节点。三个控制器节点都在此列出,确保集群的高可用性和容错性。 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 设置初始消费者群组重新平衡的延迟时间,这里设置为 0,表示没有延迟。 代理节点 (broker-1, broker-2) 代理节点是实际存储消息数据并处理客户端请求的节点。 - KAFKA_NODE_ID: 设置代理节点的 ID,分别为 4 和 5。 - KAFKA_PROCESS_ROLES: 代理节点的角色设置为 broker,表示该节点是一个数据存储和消息处理节点。 - KAFKA_LISTENERS: 配置该节点的监听器,包括 INTERNAL 和 EXTERNAL 两种监听器。INTERNAL 用于集群内部通信,EXTERNAL 用于客户端的连接。 - KAFKA_ADVERTISED_LISTENERS: 配置向外部客户端通告的地址。这是客户端连接 Kafka 时所使用的地址。这里设置为 broker-1:9092(内部)和 192.168.221.128:39092(外部)。 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 配置监听器的安全协议,同样设置为 PLAINTEXT。 - KAFKA_CONTROLLER_QUORUM_VOTERS: 配置代理节点连接的控制器节点,确保代理节点能够找到控制器节点进行元数据管理。 - KAFKA_CONTROLLER_LISTENER_NAMES: 代理节点连接控制器的监听器名称,通常是 CONTROLLER。 - KAFKA_INTER_BROKER_LISTENER_NAME: 设置代理节点之间的通信使用的监听器名称,通常设置为 INTERNAL。 Kafdrop (kafdrop) Kafdrop 是一个 Kafka 集群的可视化工具,用于监控 Kafka 集群的状态,查看各个主题、分区和消息。 - KAFKA_BROKERCONNECT: 设置 Kafdrop 连接到哪个 Kafka broker,这里连接了 broker-1 和 broker-2。 - ports: 映射了端口 9000,可以通过浏览器访问 Kafdrop 的 Web 界面。 我们不但要把集群运行起来,还要理解这些参数的含义。上面的compose文件中使用环境变量作为启动参数传递给Kafka,与直接修改配置文件效果一样。在Kraft模式下,Kafka有如下三个配置文件可以配置。 - config/kraft/broker.properties 当节点作为[broker]角色运行时,会去读该文件中的配置项启动。 - config/kraft/controller.properties 当节点作为[controller]角色运行时,会去读该文件中的配置项启动。 - config/kraft/server.properties 当节点同时作为[broker, controller]角色运行时,会去读该文件中的配置项启动。 接下来我们来看看这些文件内常用的配置项: - KAFKA_KRAFT_CLUSTER_ID:配置集群ID,这个配置项比较特殊,并未在上面列举的配置文件中,而是位于一个自动生成的配置文件meta.properties中,位置不固定,具体位置由bin/kafka-storage. sh命令执行完打印的参数决定。该配置文件中定义了cluster.id,即同一个集群中的所有节点都应该指定相同的集群ID,这个参数有默认值可以不配置。 - KAFKA_NODE_ID:配置节点ID,同一集群内节点ID不允许重复。 - KAFKA_PROCESS_ROLES:指定节点的角色,允许指定broker或controller,或者同时指定二者,可就是一个节点兼顾两种角色。 - KAFKA_LISTENERS:列出节点监听器,通常不同的角色会监听不同的端口,它的格式是{LISTENER_NAME}://{hostname}:{port} ,要搞懂它的配置策略你可以阅读一文搞懂Kafka中的listeners配置策略 - KAFKA_ADVERTISED_LISTENERS:listeners选项配置了以怎样的协议监听某个端口。这个配置项类似防火墙的作用,对外公开允许它们使用怎样的IP 和端口访问集群,这里外部指的是客户端(包括生产者、消费者、管理脚本等)或其它broker节点。 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:配置监听器对应的安全协议。 - KAFKA_CONTROLLER_QUORUM_VOTERS:指定集群内所有的controller节点,格式为:nodeId@ip:port。 - KAFKA_CONTROLLER_LISTENER_NAMES:指定controller监听器名称列表。 - KAFKA_INTER_BROKER_LISTENER_NAME:指定其它broker节点与本节点通信的监听器。 凡是以KAFKA_开头的环境变量,在配置kraft配置文件中都有对应的配置项。例如KAFKA_CFG_NODE_ID在配置文件中就是node.id。 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md 网络配置 networks: 所有的服务都连接到 kfk-network 网络中。设置 external: true 表明该网络是外部网络。 ### 为什么这样设计? 控制器高可用性:设计了多个控制器节点(controller-1, controller-2, controller-3),确保 Kafka 集群能够在一个控制器节点故障时继续正常工作。这是 Kafka 中一个很重要的高可用性设计。 分离角色:控制器和代理节点被明确分离,控制器只负责元数据和集群管理,代理节点负责消息存储和客户端交互,这样能够提升 Kafka 集群的可扩展性和性能。 外部和内部通信分离:通过配置不同的监听器(INTERNAL 和 EXTERNAL),实现了 Kafka 内部和外部客户端之间的通信隔离。内部通信通常只涉及集群内部的节点,而外部通信则是客户端与 Kafka 集群的交互。 简单监控:通过 Kafdrop 监控工具,集群管理员能够方便地查看集群状态、分区信息和消息流动。 ### 注意事项 控制器节点数:控制器节点数通常是奇数,至少 3 个节点,以便确保能够进行选举并避免脑裂现象。这里使用了 3 个控制器节点,符合最佳实践。 集群的扩展性:如果集群需要扩展,可以增加更多的 broker 服务来处理更多的消息和更大的负载。 监听器安全性:当前配置使用了 PLAINTEXT(明文通信),这对于生产环境来说可能存在安全风险。在生产环境中,应该考虑使用 SSL 或 SASL 来加密通信。 Kafdrop 可视化工具:Kafdrop 仅用于监控和查看 Kafka 集群状态,不建议在生产环境中作为唯一的监控工具,特别是对于大规模的 Kafka 集群。 总结来说,这个配置文件设计了一个高可用的 Kafka 集群架构,包括控制器节点、代理节点和监控工具,适用于需要处理较大负载并且需要一定容错能力的应用场景。 # Kafka Manager 的使用方法 Kafka Manager 是一个方便的 Web 工具,可以帮助你管理 Kafka 集群。它主要提供以下功能: - 集群管理:可以查看 Kafka 集群的状态,监控节点。 - 主题管理:创建、删除、查看 Kafka 主题。 - 分区管理:查看分区、分配分区。 - 消费者管理:查看消费者组、消费进度等信息。 启动容器: 运行以下命令来启动所有服务: ```bash mkdir -p /docker/kafka/{data1,data2} docker-compose up -d ``` 问 Kafka Manager: 访问 http://:19092(假设 192.168.31.165 是你的主机 IP)。登录时使用以下凭据: 用户名:temc 密码:temc.518 添加 Kafka 集群: Kafka Manager 启动后,你可以添加你的 Kafka 集群。由于是 KRaft 模式,Kafka Manager 会自动检测集群的两个节点。按照以下步骤操作: 登录后,点击 "Cluster" 菜单。 选择 "Add Cluster"。 输入 Kafka 的集群信息(如节点地址,假设你输入 192.168.31.165:9093 和 192.168.31.165:9094)。 点击 "Save" 添加集群。 查看和管理集群: 监控:查看 Kafka 节点的健康状态、分区情况等。 主题管理:创建、删除、查看主题。 消费者管理:查看消费者组信息,了解消费者的消费情况。