# kafka **Repository Path**: taoshumin/kafka ## Basic Information - **Project Name**: kafka - **Description**: kafka 使用 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-09-25 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 关闭防火墙 查看防火墙状态 `firewall-cmd --state` 关闭防火墙 `systemctl stop firewalld.service` 禁用防火墙 `systemctl disable firewalld.servic` 重启docker `service docker restart` ``` 执行关闭命令: systemctl stop firewalld.service 再次执行查看防火墙命令:systemctl status firewalld.service 执行开机禁用防火墙自启命令 : systemctl disable firewalld.service ``` ## 网络设置 `sudo vi /etc/hosts` ```shell 127.0.0.1 localhost # Your_Local_IP kafka-1 kafka-2 kafka-3 192.168.31.215 kafka-1 kafka-2 kafka-3 # The following lines are desirable for IPv6 capable hosts ::1 ip6-localhost ip6-loopback ``` ## 可能报错 `INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x0` 解决: 禁用防火墙,重启docker ## 配置文件 #### 进入kafka容器 ``` # kafka docker docker exec -it eaac8c2bd713 /bin/bash whereis kafka kafka-topics --help ``` #### 查看服务配置文件 ``` cd /etc/kafka/ vi server.properties ``` #### 配置介绍 ``` # The id of the broker. This must be set to a unique integer for each broker. # broker 每一个服务器的broker id 是唯一的 broker.id=0 # 是否可以删除topic,默认是fase(不可删) delete.topic.enable=true # A comma separated list of directories under which to store log files # kafka日志文件夹 (log.dirs=/opt/kafka/logs) log.dirs=/var/lib/kafka # The minimum age of a log file to be eligible for deletion due to age # 日志在kafka存储的时间 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. # 日至在kafka存储的大小,默认1G #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. # 日志达到多少存储就开始分割 log.segment.bytes=1073741824 # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. # zookeeper集群 (zookeeper.connect=zk101:2181,zk102:2181,zk103:2181) zookeeper.connect=localhost:2181 ``` ## 中文注释 ``` # broker id,多个broker服务器的话,每个broker id必须不同 broker.id=1 # kafka broker所在节点的hostname hostname=10.45.10.31:9092 # 处理网络请求的线程数 num.network.threads= 8 # 执行磁盘IO的线程数 num.io.threads=8 # server使用的send buffer大小。 socket.send.buffer.bytes=1048576 # server使用的recive buffer大小。 socket.receive.buffer.bytes=1048576 # 接受的最大请求大小(防止OOM)执行关闭命令: systemctl stop firewalld.service 再次执行查看防火墙命令:systemctl status firewalld.service 执行开机禁用防火墙自启命令 : systemctl disable firewalld.service #-------------added by Kaim --------------- # 加入队列的最大请求数(超过该值,network thread阻塞) queued.max.requests=16 # purgatory(炼狱)是个容器,用来存放不能马上答复的网络请求。如果能答复请求则从炼狱删除。这个是fetch炼狱保存的最大请求数。设置的比默认值小是据说因为这里有个BUG,不知道0.10.x中解决没 # BUG说明见:http://blog.csdn.net/stark_summer/article/details/50203133 fetch.purgatory.purge.interval.requests=100 # 生产者炼狱保存的最大请求数 producer.purgatory.purge.interval.requests=100 ############################# 日志配置############################# # 可以设置多个日志存放的路径 log.dirs=~/kafka-logs # 默认每个主题的分区数,生产环境建议值:8 num.partitions= 8 # 启停时做日志恢复每个目录所需的线程数,采用RAID的时候可以增大该值 num.recovery.threads.per.data.dir=1 # 写入磁盘的消息批大小 log.flush.interval.messages=10000 # 强制刷新消息到磁盘的时间阈值 log.flush.interval.ms=10000 # 日志保留的最少时间 由于做压测,防止占用磁盘太多,保留时间为1ms # log.retention.hours=168执行关闭命令: systemctl stop firewalld.service 再次执行查看防火墙命令:systemctl status firewalld.service 执行开机禁用防火墙自启命令 : systemctl disable firewalld.service # 每个日志段大小,超过该值会生成新日志段 log.segment.bytes=1073741824 # 检查日志分段文件的间隔时间,以确定是否文件属性是否到达删除要求。 log.retention.check.interval.ms=300000 # --------------added by kami-------------- # 自动创建主题 auto.create.topics.enable=true # 当执行一次fetch后,需要一定的空间扫描最近的offset,设置的越大越好,一般使用默认值就可以 log.index.interval.bytes=4096 # 每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log segment。 log.index.size.max.bytes=10485760 # 检查是否需要fsync的时间间隔 log.flush.scheduler.interval.ms=2000 # 即使文件没有到达log.segment.bytes,只要文件创建时间到达此属性,就会创建新文件。 log.roll.hours=168 # server可以接收的消息最大尺寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大。默认值均为一百万 message.max.bytes=1000000 ############################# Zookeeper ############################# # zookeeper server地址,如果有多个则用逗号分隔 zookeeper.connect= zookeeper-uat-01.800best.com # Timeout in ms for connecting to zookeeper # zk连接的超时时间 zookeeper.connection.timeout.ms=6000 # zk follower的同步延迟时间 zookeeper.sync.time.ms = 2000 ############################ replication configuration added by KamiWan############## # 从leader备份数据的线程数 num.replica.fetchers=4 # 备份时每次fetch的最大值 replica.fetch.max.bytes=1048576 # follwer执行fetcher请求时的最大等待时间 replica.fetch.wait.max.ms=500 # 默认的replication数量,可以根据所需要的可靠性要求来配置 default.replication.factor=2 ``` ## 吞吐量计算公式 `throughput_Avg(平均吞吐量) ~= Request_Rate_Avg (平均请求速率)* Request_Size_Avg(平均请求大小) / Compression_Rate_Avg (压缩率)` 估算的实际值会比实际值大一些,因为会有一些request overhead没有考虑进去 ## 操作 查看主题 `kafka-topics --list --zookeeper 192.168.31.215:42181` 创建主题 `kafka-topics --create --zookeeper 192.168.31.215:42181 --replication-factor 2 --partitions 3 --topic kafka-go-mq` 删除主题 `kafka-topics --delete --topic test --zookeeper 192.168.31.215:42181` 查看主题详细信息 `kafka-topics --describe --zookeeper 192.168.31.215:42181 --topic kafka-go-mq` 创建生产者 `kafka-console-producer --broker-list 192.168.31.215:19092 --topic kafka-go-mq` 创建消费者 (from-beginning从第一条开始消费) `kafka-console-consumer --bootstrap-server 192.168.31.215:19092 --topic kafka-go-mq --from-beginning` 增加分区数 `kafka-topics --alter --zookeeper 192.168.31.215:42181 --topic kafka-go-mq --partitions 1` ## kafka日志 `cd /var/lib/kafka` ``` .log 是消息集文件 .timeindex 时间戳索引文件 .index 偏移量索引文件 00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex ``` ## 常用参数配置说明 版本校验 ``` version, err := sarama.ParseKafkaVersion(option.Version) if err != nil { panic(fmt.Errorf("parse kafka version error:%s\n",err)) } ``` 参考文献 https://www.cnblogs.com/hongjijun/p/13652983.html 设置消息的幂等性 ``` config.Producer.Idempotent = true // 设置消息幂等性 config.Net.MaxOpenRequests = 1 // 只能设置1 保证消息有序 ``` 消费者与消费者组 ``` 消费者组,也可以认为是一种更加合理分配资源,进行负载均衡的设计。 假设有5个消费者属于同一个消费者组,这个消费者组订阅了一个具有10个分区的主题,那么组内的每一个消费者,都会负责处理2个分区的消息 这样,能够保证当一条消息发送到主题中,只会被一个消费者所消费,不会造成重复消费的情况。 ``` 消费者参数 config.Consumer.Offsets.Initial = sarama.OffsetNewest `` 上面这个参数是控制消费组初始消费的位置,默认是OffsetNewest,即从最新的位置开始消费。如果把参数改为如下: config.Consumer.Offsets.Initial = sarama.OffsetOldest ``` 消费者分区策略 sarama.BalanceStrategyRange ``` config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange 上面这个参数是控制rebalance时的分区策略,默认是按照范围来分区,即BalanceStrategyRange config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin 执行关闭命令: systemctl stop firewalld.service 再次执行查看防火墙命令:systemctl status firewalld.service 执行开机禁用防火墙自启命令 : systemctl disable firewalld.service 注意点: `默认不设置消费者祖,创建的消费者在 默认的Consumer Groups组里` `一个消费者组中,每一个分区只能由组内的一消费者订阅` `多个消费者可以订阅同一个topic` kafka消费的本质: kafka消息可以一次生产,多次消费,这是从业务层面来说的 。每个消费场景就是一个consumer-group. kafka消息存储,分partition.每个partition消息只记录一个消费者的offset.只能有一个consumer能连接。 同一个consumer-group的消费者处理逻辑应该是一样的,只是处理的数据分区了,互不相同,并行处理,提升了整体吞吐量,仅此而已。 #### 此人博客写的比较好 `https://www.jianshu.com/p/02e260db5efa` `https://www.jianshu.com/p/e9d29ce1a463`