Fetch the repository succeeded.
kafka日志保留策略:
时间和大小
kafka消息发送可靠性:
acks=0:
producer不会等待broker发动ack,消息可能丢失也可能重发
acks=1:
当leader接收到消息之后发送ack,丢失会重发,丢的概率小
acks=2:
当所有的follower都同步消息成功后发送ack,丢失消息可能性比较低
消息可靠性:
发送可靠性
存储可靠性
Partition的规则:
副本机制
ISR副本同步队列:
维护的是有资格的follower节点:
1.副本的所有节点都必须要和zookeeper保持连接状态
2.副本的最后一条消息的offset和leader副本的最后一条消息的offset之间的差值不能超过指定的阈值,这个阈值是可以设置的
HW&LEO
hw:highwatermark
ieo offset
副本数过多时一定会影响到性能
提交:
自动提交
手动提交
手动同步
手动异步
消息的消费原理:
老版本的kafka的offset进度维护在zk上,新版本kafka把consumer的offset维护保存在kafka内部的topic
kafka的分区分配策略:
同一个分区只能由同一个消费组内的consumer来消费
同一个consumer group内的consumer应该怎么去分配消费哪个分区的消息
两种:
通过partition.assignment.stategy指定
consumer的rebalance
当一个consumer group新增了消费者
当订阅的主题新增了分区(分区数量发生了变化)
消费者主动取消了某个订阅
某个分区或者某个broker carsh
Range策略(默认)
0,1,2,3,4,5,6,7,8,9
c0,c1,c2
partition num/consumer num
roundrobin策略
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic replica_demo
redis:
集合
set跟list不一样的点,集合类型不能存在重复的数据,而且是无序的
sdiff 对多个集合执行差集运算
sunion 对多个集合执行并集操作
有序集合
zadd key score member
zrange key start stop [withscores]去获得元素,withscores是可以获取元素的分数
如果两个元素的score一样的话会根据0-9,a-z来排序
网站访问前十名
多进程架构:
1.资源共享竞争问题
2.数据的安全性
分布式锁的解决方法:
lua:
lua动态类型的语言
a = 1;
local a = 11;
逻辑表达式
+ - * / % -
关系运算符
a==b是否相等
~= 不等于
不会自动转换
print(1==1) true
print(1=='1') false
逻辑运算符
and / or
local a = "hello";
local b = "world";
print(a..b);//连接a和b
redis.call()
EVAL script numkeys key [key ...] arg [arg ...]
eval "return redis.call('set','name','mic')" 0
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 hello world
KEYS和AVGV是全局变量
分库分表:
垂直切分,水平切分
垂直拆分:解决的是表过多的问题
水平切分:解决的是单表列过多的问题
大数据拆成小表
常见的拆分策略
垂直拆分(er分片)
水平拆分
一致性hash
范围切分
拆分之后带来的问题:
1.跨库join的问题
设计时要考虑到
在服务层去调用
2.全局表
Mycat的核心概念:
1.逻辑数据库
Balance:
0:读操作都发到writeHost节点上
1:所有的读操作都随机发送到readHost节点上
2:所有读操作往writeHost和readHost上发送
writeType:
0:所有写操作都发送到writeHost节点上
1:所有写操作都随机发送到readHost节点上
2:所有写操作都随机往writeHost和readHost上发
readHost是从属于writeHost的,即意味着它从那个writeHost获取同步数据,因此,当它所属的writeHost宕机了
则它也不会再参与到读写分离中来,即“不工作了”,这是因为此时,它的数据已经“不可靠”了.基于这个考虑,目前mycat
1.3和1.4版本中,若想支持MySQL一主一从的标准配置,并且在主节点宕机的情况下,从节点还能读取数据,则需要在Mycat里
配置为两个writeHost并设置banlance=1
swithType:
-1:不自动切换
1:默认值,自动切换
2:基于mysql主从状况来决定是否切换
单库大表拆分:
<!--逻辑数据库-->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
<table name="company" subTables="company$1-3" dataNode="dn1" rule="mod-long" />
</schema>
<dataNode name="dn1" dataHost="localhost1" database="DB" />
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="hostS1" url="localhost:3306" user="root"
password="root" />
</dataHost>
跨库:
<!--逻辑数据库-->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
<table name="company" subTables="company$1-3" dataNode="dn1" rule="mod-long" />
<table name="record" dataNode="dn1,dn2,dn3" rule="mod-long" />
</schema>
<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />
<dataNode name="dn3" dataHost="localhost1" database="db3" />
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="hostS1" url="localhost:3306" user="root"
password="root" />
</dataHost>
读写分离:
<!--逻辑数据库-->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100"/>
<dataNode name="dn1" dataHost="localhost1" database="DB" />
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
<heartbeat>show slave master</heartbeat>
<writeHost host="hostM1" url="localhost:3306" user="root" password="root" >
<readHost host="hostS1" url="localhost:3306" password="root" user="root"/>
</writeHost>
</dataHost>
Kafka
分布式消息和订阅系统,高性能,高吞吐量,scala语言
内置分区,实现集群
行为跟踪,日志收集
集群环境:
ProducerConfig.ACKS_CONFIG(acks),"-1"
0:消息发送给broker以后,不需要确认
1:只需要kafka集群中leader节点确认即可返回
all(-1):需要ISR中所有的Replica进行确认
batch.size(16kb):producer对于同一个分区来说,会按照batch.size的大小进行统一收集进行批量发送
linger.ms(1000):5条消息/s,delay以后再次批量发送到broker
max.request.size(1M):发送请求的大小
消息的同步发送和异步发送
kafka1.0之后默认的client都是用异步发送
GROUP_ID_CONFIG,"KafkaConsumerDemo"
组内竞争,不同组不会进行竞争
AUTO_OFFSET_RESET_CONFIG,"earleast"
对于新的groupid来说,如果设置为earleast,那么他会从最早的消息开始消费
earleast和none
latest最新
增减consumer.broker.partition会导致Rebalance
分区分配策略:
1.Range,默认
假设十个分区对应三个消费者
0,1,2,3,4,...9
C1/C2/C3
10/3
C1:0,1,2,3
C2:4,5,6
C3:7,8,9
2.roundBobin,轮询
3.partition.assignment.strategy
offset在哪里维护?
__consumer_offset的topic,有50个
用来保存消费者消费的offset的一个位置
定位当前consumer_group存储在哪个分区上
groupid.hashCode%groupMetadataTopicPartitionCount(50)
消息的写入性能
顺序写入:
零拷贝
消息的存储机制:
LogSegment分段保存
index->log
kafka的日志是分段的
000000.indx
000000.log
000000.timeindex
日志的清理和日志的压缩
根据时间来保存
根据大小
压缩:根据key的最新值
Partition的副本的概念:
./kafka-topics.sh --create --zookeeper 192.168.1.1:2181 --replication-factor 3 --partition 3 --topic secondTopic
replication-factor 副本
分为leader副本和follower副本
第i个分区的第j个副本(i+j) mod n
{"controller_epoch":20,"leader":1,"version":1,"leader_epoch":14,"isr":[1,0,2]}
leader等于1代表isr中broker为1的为leader副本
如果有一个broker挂掉了其他的broker都有副本可用
ISR维护当前分区所有的副本集 in sync replicas,前提是follower副本集必须和leader副本的数据在阈值范围内保持一致
如果follower的其中一个副本跟leader副本的内容有延迟会踢出有延迟的follower副本
leader副本负责接收客户端的消息写入和消息读取请求
follower负责从leader副本去读取数据,不接受任何客户端的请求
LEO:log end offset
HW:高水位的值
一.初始状态:
leader:
LEO:0
HW:0
REMOTE LEO:0
follower:
LEO:0
HW:0
1.把消息写入到对应分区的log文件中,同时更新leader副本的leo
2.尝试去更新leader的hw的值,leader比较自己本身的LEO和remote LEO的值,取最小的值作为HW
HW只有follower去fetchleader的值之后才会发生变化
二.第二个阶段:
leader:
LEO:1
HW:0
REMOTE LEO:0
follower:
LEO:1
HW:0
follower会fetchleader的值,此时会发送一个请求(fetch:offset->0)
1.leader副本读取log消息,并且更新remote LEO(根据follower的fetch传递过来的offset)
2.更新HW,HW还是0
3.把读取到值发送给follower副本
但是HW现在还是0,只有HW标记的值才可以消费,现在0这个数据无法消费
三.第三个阶段
leader:
LEO:1
HW:1
REMOTE LEO:1
follower:
HW:1
LEO:1
读取1这个数据,follower的LEO=1,此时leader的remote LEO值为1了,leader比较自己本身的LEO和remote LEO的值,取最小的值作为HW
HW=1,这是leader的0和1都可以读取了,到这一个阶段LEO和HW都进入了一个一致的阶段
在这个前提下,min.insync.replicas=1的时候,并且acks=-1的情况下
0.11版本之后做了修复,leader epoch,每一轮选举epoch加1,从0开始
在follower重启后会发起一个请求:offsetForLeaderEpochRequest获得LEO的值,不做日志截断
如果isr中follower都被踢出去怎么办?leader也挂了
1.等待ISR中的任意一个replica活过来
2.选择一个活过来的replica作为leader
设置50000个分区,批量发送batch.size,linger.ms这两个参数,针对同一个partition而言
50000个分区,多线程
Kafka客户端API类型
AdminClient API:允许管理和检测Topic,broker以及其他Kafka对象
Product API:发布消息到1个或多个topic
Consumer API:订阅一个或多个topic,并处理产生的消息
Streams API:高效地将输入流转换到输出流
Connector API:从一些源系统或应用程序中拉取数据到kafka
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。