# k8s-install-kafka **Repository Path**: worm1/k8s-install-kafka ## Basic Information - **Project Name**: k8s-install-kafka - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2022-01-13 - **Last Updated**: 2022-01-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # K8s - 安装部署Kafka、ZooKeeper集群教程(支持从K8s外部访问) 本文演示如何在 K8s下部署 Kafka 集群,并且搭建后除了可以从 K8s 内部访问 Kafka 服务,也支持从 K8s 集群外部访问 Kafka 服务。服务的集群部署通常有两种方式:一种是 StatefulSet,另一种是 Service&Deployment。本次我们使用 StatefulSet 方式搭建 ZooKeeper 集群,使用 Service&Deployment 搭建 Kafka 集群。 原文出自:www.hangge.com 转载请保留原文链接:https://www.hangge.com/blog/cache/detail_3091.html ## 一、创建 NFS 存储 NFS 存储主要是为了给 Kafka、ZooKeeper 提供稳定的后端存储,当 Kafka、ZooKeeper 的 Pod 发生故障重启或迁移后,依然能获得原先的数据。 1,安装 NFS 这里我选择在 master 节点创建 NFS 存储,首先执行如下命令安装 NFS: ``` yum -y install nfs-utils yum -y install rpcbind ``` 2,创建共享文件夹 (1)执行如下命令创建 6 个文件夹: ``` mkdir -p /usr/local/k8s/zookeeper/pv{1..3} mkdir -p /usr/local/k8s/kafka/pv{1..3} ``` (2)编辑 /etc/exports 文件: ``` vi /etc/exports ``` (3)在里面添加如下内容: ``` /usr/local/k8s/kafka/pv1 *(rw,sync,no_root_squash) /usr/local/k8s/kafka/pv2 *(rw,sync,no_root_squash) /usr/local/k8s/kafka/pv3 *(rw,sync,no_root_squash) /usr/local/k8s/zookeeper/pv1 *(rw,sync,no_root_squash) /usr/local/k8s/zookeeper/pv2 *(rw,sync,no_root_squash) /usr/local/k8s/zookeeper/pv3 *(rw,sync,no_root_squash) ``` (4)保存退出后执行如下命令重启服务: ``` 如果执行 systemctl restart nfs 报“Failed to restart nfs.service: Unit nfs.service not found.”错误,可以尝试改用如下命令: sudo service nfs-server start ``` ``` systemctl restart rpcbind systemctl restart nfs systemctl enable nfs ``` (5)执行 exportfs -v 命令可以显示出所有的共享目录: ``` [root@node1 ~]# exportfs -v /usr/local/k8s/kafka/pv1 (sync,wdelay,hide,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash) /usr/local/k8s/kafka/pv2 (sync,wdelay,hide,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash) /usr/local/k8s/kafka/pv3 (sync,wdelay,hide,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash) /usr/local/k8s/zookeeper/pv1 (sync,wdelay,hide,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash) /usr/local/k8s/zookeeper/pv2 (sync,wdelay,hide,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash) /usr/local/k8s/zookeeper/pv3 (sync,wdelay,hide,no_subtree_check,sec=sys,rw,secure,no_root_squash,no_all_squash) ``` (6)而其他的 Node 节点上需要执行如下命令安装 nfs-utils 客户端: ```yum -y install nfs-utils``` 若出现无法安装的问题(没有可用软件包),需要增加阿里云的源 ``` 下载阿里云 wget -O /etc/yum.repos.d/CentOS-Base-epel.repo http://mirrors.aliyun.com/repo/Centos-7.repo 清理缓存 yum clean all 重新生成缓存 yum makecache 升级yum源 yum update -y ``` ``` 下载网易源 wget -P /etc/yum.repos.d http://mirrors.163.com/.help/CentOS7-Base-163.repo 清理缓存 yum clean all 重新生成缓存 yum makecache 升级yum源 yum update ``` ``` 安装epel源 sudo yum install epel-release 清理缓存 yum clean all 重新生成缓存 yum makecache 升级yum源 yum update ``` (7)然后其他的 Node 节点上可执行如下命令(ip 为 Master 节点 IP)查看 Master 节点上共享的文件夹: ```showmount -e 10.130.58.11``` 显示 ``` Export list for 10.130.58.11: /usr/local/k8s/zookeeper/pv3 * /usr/local/k8s/zookeeper/pv2 * /usr/local/k8s/zookeeper/pv1 * /usr/local/k8s/kafka/pv3 * /usr/local/k8s/kafka/pv2 * /usr/local/k8s/kafka/pv1 * ``` # 二、创建 ZooKeeper 集群 1,创建 ZooKeeper PV (1)首先创建一个 zookeeper-pv.yaml 文件,内容如下: 注意:10.130.58.11 需要改成实际 NFS 服务器地址: ``` apiVersion: v1 kind: PersistentVolume metadata: name: k8s-pv-zk01 labels: app: zk annotations: volume.beta.kubernetes.io/storage-class: "anything" spec: capacity: storage: 1Gi accessModes: - ReadWriteOnce nfs: server: 10.130.58.11 path: "/usr/local/k8s/zookeeper/pv1" persistentVolumeReclaimPolicy: Recycle --- apiVersion: v1 kind: PersistentVolume metadata: name: k8s-pv-zk02 labels: app: zk annotations: volume.beta.kubernetes.io/storage-class: "anything" spec: capacity: storage: 1Gi accessModes: - ReadWriteOnce nfs: server: 10.130.58.11 path: "/usr/local/k8s/zookeeper/pv2" persistentVolumeReclaimPolicy: Recycle --- apiVersion: v1 kind: PersistentVolume metadata: name: k8s-pv-zk03 labels: app: zk annotations: volume.beta.kubernetes.io/storage-class: "anything" spec: capacity: storage: 1Gi accessModes: - ReadWriteOnce nfs: server: 10.130.58.11 path: "/usr/local/k8s/zookeeper/pv3" persistentVolumeReclaimPolicy: Recycle ``` (2)然后执行如下命令创建 PV: ``` kubectl apply -f zookeeper-pv.yaml ``` (3)执行如下命令可以查看是否创建成功: ``` kubectl get pv ``` 2,创建 ZooKeeper 集群 (1)我们这里要搭建一个包含 3 个节点的 ZooKeeper 集群。首先创建一个 zookeeper.yaml 文件,内容如下: ``` apiVersion: v1 kind: Service metadata: name: zk-hs labels: app: zk spec: selector: app: zk clusterIP: None ports: - name: server port: 2888 - name: leader-election port: 3888 --- apiVersion: v1 kind: Service metadata: name: zk-cs labels: app: zk spec: selector: app: zk type: NodePort ports: - name: client port: 2181 nodePort: 31811 --- apiVersion: apps/v1 kind: StatefulSet metadata: name: zk spec: serviceName: "zk-hs" replicas: 3 # by default is 1 selector: matchLabels: app: zk # has to match .spec.template.metadata.labels updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: zk # has to match .spec.selector.matchLabels spec: containers: - name: zk imagePullPolicy: Always image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10 ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election command: - sh - -c - "start-zookeeper \ --servers=3 \ --data_dir=/var/lib/zookeeper/data \ --data_log_dir=/var/lib/zookeeper/data/log \ --conf_dir=/opt/zookeeper/conf \ --client_port=2181 \ --election_port=3888 \ --server_port=2888 \ --tick_time=2000 \ --init_limit=10 \ --sync_limit=5 \ --heap=4G \ --max_client_cnxns=60 \ --snap_retain_count=3 \ --purge_interval=12 \ --max_session_timeout=40000 \ --min_session_timeout=4000 \ --log_level=INFO" readinessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 livenessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 volumeMounts: - name: datadir mountPath: /var/lib/zookeeper volumeClaimTemplates: - metadata: name: datadir annotations: volume.beta.kubernetes.io/storage-class: "anything" spec: accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 1Gi ``` (2)然后执行如下命令开始创建: ``` kubectl apply -f zookeeper.yaml ``` (3)执行如下命令可以查看是否创建成功: ``` kubectl get pods kubectl get svc ``` 显示 ``` [root@node1 k8s-install-kafka]# kubectl get pods NAME READY STATUS RESTARTS AGE zk-0 1/1 Running 0 3m51s zk-1 1/1 Running 0 3m51s zk-2 1/1 Running 0 3m51s [root@node1 k8s-install-kafka]# kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 10.233.0.1 443/TCP 74d zk-cs NodePort 10.233.32.158 2181:31811/TCP 4m1s zk-hs ClusterIP None 2888/TCP,3888/TCP 4m1s ``` # 三、创建 Kafka 集群 (1)我们这里要搭建一个包含 3 个节点的 Kafka 集群。首先创建一个 kafka.yaml 文件,内容如下: ``` 注意: nfs 地址需要改成实际 NFS 服务器地址。 status.hostIP 表示宿主机的 IP,即 Pod 实际最终部署的 Node 节点 IP(本文我是直接部署到 Master 节点上),将 KAFKA_ADVERTISED_HOST_NAME 设置为宿主机 IP 可以确保 K8s 集群外部也可以访问 Kafka。 ``` ``` apiVersion: v1 kind: Service metadata: name: kafka-service-1 labels: app: kafka-service-1 spec: type: NodePort ports: - port: 9092 name: kafka-service-1 targetPort: 9092 nodePort: 30901 protocol: TCP selector: app: kafka-1 --- apiVersion: v1 kind: Service metadata: name: kafka-service-2 labels: app: kafka-service-2 spec: type: NodePort ports: - port: 9092 name: kafka-service-2 targetPort: 9092 nodePort: 30902 protocol: TCP selector: app: kafka-2 --- apiVersion: v1 kind: Service metadata: name: kafka-service-3 labels: app: kafka-service-3 spec: type: NodePort ports: - port: 9092 name: kafka-service-3 targetPort: 9092 nodePort: 30903 protocol: TCP selector: app: kafka-3 --- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-deployment-1 spec: replicas: 1 selector: matchLabels: app: kafka-1 template: metadata: labels: app: kafka-1 spec: containers: - name: kafka-1 image: wurstmeister/kafka imagePullPolicy: IfNotPresent ports: - containerPort: 9092 env: - name: KAFKA_ZOOKEEPER_CONNECT value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 - name: KAFKA_BROKER_ID value: "1" - name: KAFKA_CREATE_TOPICS value: mytopic:2:1 - name: KAFKA_LISTENERS value: PLAINTEXT://0.0.0.0:9092 - name: KAFKA_ADVERTISED_PORT value: "30901" - name: KAFKA_ADVERTISED_HOST_NAME valueFrom: fieldRef: fieldPath: status.hostIP volumeMounts: - name: datadir mountPath: /var/lib/kafka volumes: - name: datadir nfs: server: 10.130.58.11 path: "/usr/local/k8s/kafka/pv1" --- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-deployment-2 spec: replicas: 1 selector: matchLabels: app: kafka-2 template: metadata: labels: app: kafka-2 spec: containers: - name: kafka-2 image: wurstmeister/kafka imagePullPolicy: IfNotPresent ports: - containerPort: 9092 env: - name: KAFKA_ZOOKEEPER_CONNECT value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 - name: KAFKA_BROKER_ID value: "2" - name: KAFKA_LISTENERS value: PLAINTEXT://0.0.0.0:9092 - name: KAFKA_ADVERTISED_PORT value: "30902" - name: KAFKA_ADVERTISED_HOST_NAME valueFrom: fieldRef: fieldPath: status.hostIP volumeMounts: - name: datadir mountPath: /var/lib/kafka volumes: - name: datadir nfs: server: 10.130.58.11 path: "/usr/local/k8s/kafka/pv2" --- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-deployment-3 spec: replicas: 1 selector: matchLabels: app: kafka-3 template: metadata: labels: app: kafka-3 spec: containers: - name: kafka-3 image: wurstmeister/kafka imagePullPolicy: IfNotPresent ports: - containerPort: 9092 env: - name: KAFKA_ZOOKEEPER_CONNECT value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 - name: KAFKA_BROKER_ID value: "3" - name: KAFKA_LISTENERS value: PLAINTEXT://0.0.0.0:9092 - name: KAFKA_ADVERTISED_PORT value: "30903" - name: KAFKA_ADVERTISED_HOST_NAME valueFrom: fieldRef: fieldPath: status.hostIP volumeMounts: - name: datadir mountPath: /var/lib/kafka volumes: - name: datadir nfs: server: 10.130.58.11 path: "/usr/local/k8s/kafka/pv3" ``` (2)然后执行如下命令开始创建: ``` kubectl apply -f kafka.yaml ``` (3)执行如下命令可以查看是否创建成功: ``` kubectl get pods kubectl get service ``` 显示 ``` [root@node1 k8s-install-kafka]# kubectl get pods NAME READY STATUS RESTARTS AGE kafka-deployment-1-56db97b867-qns52 1/1 Running 0 11m kafka-deployment-2-859d6dbdc-j9dc6 1/1 Running 0 2m7s kafka-deployment-3-7495f68bcb-9d9sr 1/1 Running 0 2m7s zk-0 1/1 Running 0 106m zk-1 1/1 Running 0 106m zk-2 1/1 Running 0 106m [root@node1 k8s-install-kafka]# kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka-service-1 NodePort 10.233.52.53 9092:30901/TCP 19m kafka-service-2 NodePort 10.233.43.204 9092:30902/TCP 19m kafka-service-3 NodePort 10.233.18.35 9092:30903/TCP 19m kubernetes ClusterIP 10.233.0.1 443/TCP 74d zk-cs NodePort 10.233.32.158 2181:31811/TCP 106m zk-hs ClusterIP None 2888/TCP,3888/TCP 106m ``` # 四、开始测试 1,K8s 集群内部测试 (1)首先执行如下命令进入一个容器: ``` kubectl exec -it kafka-deployment-1-56db97b867-qns52 /bin/bash ``` (2)接着执行如下命令创建一个名为 test_topic 的 topic: ``` kafka-topics.sh --create --topic test_topic --zookeeper zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 --partitions 1 --replication-factor 1 ``` (3)创建后执行如下命令开启一个生产者,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。 ``` kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic ``` (4)重新再打开一个终端连接服务器,然后进入容器后执行如下命令开启一个消费者: ``` kafka-console-consumer.sh --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic ``` (5)再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验 Kafka 对消息的基础处理。 ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ``` ```