# jraft **Repository Path**: 399601829/jraft ## Basic Information - **Project Name**: jraft - **Description**: java实现raft协议 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 3 - **Created**: 2023-06-08 - **Last Updated**: 2023-06-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # jraft ## 项目地址 https://gitee.com/XDsun-yu/jraft ## 语雀文档 https://www.yuque.com/yuqueyonghuofhbn2/okk7bs ## 具体功能 - Leader选举 - 日志复制 - 成员变更:已实现单节点变更,正在完善ing,等待同步到Git - ~~日志压缩:未实现~~ - 分布式KV存储:采用RocksDB作为状态机,正在完善ing,等待同步到Git 日志压缩更加依赖于具体状态机的实现,可能会在以后的版本中完善 ## 运行方式 运行jraft-stack-example的ClientTest ## 技术选型 ### 日志存储 由于需要将日志进行持久化,如果我们直接对磁盘IO,效率会十分低下,因此选用RocksDB来实现日志存储的功能。RocksDB是基于LSM的存储引擎,LSM是一种数据结构,能够将写入操作顺序化,大大提高了写数据的性能。 ### RPC 由于节点之间的通信较为频繁,因此我们选用长连接。对于长连接,一般都使用netty或者其他基于netty的框架。在此我选用SOFAbolt,蚂蚁金融开源的一套基于netty的网络通信框架。 ## 模块划分 - LogModule:负责日志条目的读写 - ConsensusModule:处理来自其他节点的追加日志以及投票请求 - StateMachine:状态机的接口,不同状态机可以有不同的实现 - RPC:实现不同节点之间的通信 - Node:上述功能的聚合体,作为系统中的单个节点 - LifeCycle:管理各个模块的生命周期 ## Model ![img](https://git-1307839558.cos.ap-beijing.myqcloud.com/Raft/imgs/model.png) - **DTO** - Request - AppendEntriesRequest: 追加日志请求的参数的封装 - VoteRequest: 投票请求的参数的封装 - ClientRequest: 客户端请求的参数的封装 - Response - AppendEntriesResponse: 追加日志响应的结果的封装 - VoteReponse: 请求投票响应的结果的封装 - ClientReponse: 对客户端响应的结果的封装 - **State:** 对应原论文中的State - PersistentState - VolatileState - LeaderState - **LogEntry:** 日志条目的封装 - **Peer:** 单个节点的配置 - **PeerGroup:** 节点的集群配置 ## 基本逻辑 ### 处理client请求 ![img](https://git-1307839558.cos.ap-beijing.myqcloud.com/Raft/imgs/%E5%A4%84%E7%90%86client%E8%AF%B7%E6%B1%82.png) 1. 判断Node状态,如果是Leader,继续进行处理,否则直接将请求转发给Leader(这里与论文中稍有区别,论文中应是直接返回false以及leader的地址,由client再次请求leader节点,这里为了减少一轮RPC,直接在节点出重定向) 2. 根据请求生成日志条目并写入到日志模块 3. 向其他节点发送追加日志请求,这里需要利用线程池并发调用RPC,同时,还需要给线程池分配获取RPC结果的任务,除此之外,main线程中还需要设置一定的等待时间来等待线程池中的任务执行完成 4. 为了保证整个系统重启之后能够恢复原来的状态,因此在获取到结果之后,先对commitIndex进行一次更新,对应论文中的5.3和5.4,规则为:假设存在N满足N>commitIndex,使得大多数的matchIndex[i]≥N以及log[N].term == currentTerm成立,则令commitIndex == N。更新完毕之后,将commitIndex提交到commitChannel中,应用到状态机。 5. 判断是否满足了超过半数的节点确认,如果是,再次更新commitIndex为当前logEntry的index,并进行提交,否则直接返回false 6. 等待状态机应用后返回结果 ### Leader的追加日志实现 ![img](https://git-1307839558.cos.ap-beijing.myqcloud.com/Raft/imgs/Leader%E7%9A%84%E8%BF%BD%E5%8A%A0%E6%97%A5%E5%BF%97%E5%AE%9E%E7%8E%B0.png) 1. 判断运行时间是否超出0.5s,未超时则继续 2. 初始化任期,leaderId,leaderCommit 3. 从日志模块读取从nextIndex到curLogEntry' s Index的日志 4. 初始化prevLogIndex和prevLogTerm 5. 调用RPC模块的send,向目标节点发送请求 6. 同步等待RPC结果,如果成功直接返回true,如果失败还需判断目标节点的任期是否高于当前节点的任期,如果比当前节点高,则当前节点转变为follower,否则递减nextIndex回到步骤1继续重试 ### 接收者追加日志的实现 ![img](https://git-1307839558.cos.ap-beijing.myqcloud.com/Raft/imgs/%E6%8E%A5%E6%94%B6%E8%80%85%E8%BF%BD%E5%8A%A0%E6%97%A5%E5%BF%97%E7%9A%84%E5%AE%9E%E7%8E%B0.png) 1. 尝试获取锁 2. 比较leader的任期和当前节点的任期,如果当前节点的任期较高,则直接返回false 3. 更新收到心跳的时间,当前的任期,节点的状态以及leaderId 4. 如果日志为空,则为心跳包,直接返回即可 5. 判断即将写入的位置是否有日志,并且是否和需要写入的日志发生冲突,如果发生冲突,则删除冲突位置及其之后的所有日志 6. 追加日志 7. 根据请求参数中的leaderCommit重设接收者的commitIndex 8. 根据commitIndex进行提交 9. 最终返回true并解锁 ### Follower超时选举的实现 ![img](https://git-1307839558.cos.ap-beijing.myqcloud.com/Raft/imgs/Follower%E8%B6%85%E6%97%B6%E9%80%89%E4%B8%BE%E7%9A%84%E5%AE%9E%E7%8E%B0.png) 1. 线程池定时执行该任务 2. 当前节点是否为leader,如果是,直接返回 3. 当前时间-上一次收到心跳的时间是否超时(超时时间随机生成),如果不超时,直接返回 4. 任期号自增,转变为candidate,投票给自己 5. 调用RPC,向其他节点请求投票 6. 收到结果后,再次判断当前节点是否处于candidate,如果不是,直接返回 7. 如果大多数节点确认了投票请求,当前节点转变为leader,清除自己的投票状态,更新matchIndex和nextIndex,否则,清除自己的投票状态并返回,等待下一次选举 ### 接收者请求投票的实现 ![img](https://git-1307839558.cos.ap-beijing.myqcloud.com/Raft/imgs/%E6%8E%A5%E6%94%B6%E8%80%85%E8%AF%B7%E6%B1%82%E6%8A%95%E7%A5%A8%E7%9A%84%E5%AE%9E%E7%8E%B0.png) 1. 尝试获取锁,这里是为了保证先来先服务,如果当前节点的锁已经被获取,则证明当前节点已经为别人投过票了 2. 比较candidate的任期和当前节点的任期,如果当前节点的任期较高,则直接返回false 3. 如果votedFor为空或者为candidateId,并且candidate的日志比当前节点的日志新,那么当前节点就把票投给该candidate 4. 最终释放锁 ## 多线程 ### 为什么程序中很少使用锁,而基本都使用volatile? Raft具有很强的容错机制,由于Quorum和term的设定,就算出现了leader在追加日志时变为了follower的情况,接收者也能通过比较任期来拒绝追加,所以Raft对线程安全的要求性不高,只需要保证共享资源在多个线程中的可见性即可。 ### 异常处理 在多线程环境下,无法对单个线程进行try catch处理,正确的做法是定义异常处理器,即实现UncaughtExceptionHandler接口,重写其中的uncaughtException方法 ## 栈计算器 栈计算器在原框架的基础上实现了状态机,在状态机内部维护一个Stack,通过来自client的请求进行字符串解析,从而实现不同的操作。 **支持的指令:** - create:创建一个栈计算器实例(即分布式系统中的一个节点) - delete:删除对应id的节点 - push:向对应节点的栈状态机中压入值 - pop:对应节点的栈状态机弹栈,并返回该值 - get:返回栈顶元素 - add:将对应节点的栈状态机的两个栈顶元素出栈并作为操作数相加,结果再次入栈 - sub:将对应节点的栈状态机的两个栈顶元素出栈并作为操作数相减,结果再次入栈 - mul:将对应节点的栈状态机的两个栈顶元素出栈并作为操作数相乘,结果再次入栈 - div:将对应节点的栈状态机的两个栈顶元素出栈并作为操作数相除,结果再次入栈 - inc:将对应节点的栈状态机的栈顶整数弹栈,自增后再次入栈 - dec:将对应节点的栈状态机的栈顶整数弹栈,自减后再次入栈 ## 需要改进的地方 - RPC的等待时间,超时时间都是硬编码,用户无法自定义 - 心跳包无法更新follower的commitIndex,只能等待下一次追加日志时才能把上一次的追加的日志进行提交 - 当candidate竞选成功时,无法保证立马发送心跳包,只能等待下一次定时心跳任务的触发 - 每次只能处理单个用户的一条请求,当QPS到达一定数量级时,可以先将用户请求放置于队列中,每隔一定时间再取出,进行批量追加 ## 引用 在刚开始实现Raft协议时,查找了许多开源Raft框架的底层实现,本框架借鉴了其中的一些内容,推荐几个比较完整的框架: **SOFAJRaft:** 蚂蚁金融开源的一套Raft框架,其中还实现了一套分布式存储框架 官网:https://www.sofastack.tech/projects/sofa-jraft/overview/ github地址:https://github.com/sofastack/sofa-jraft **TiDB:** PingCAP的一款分布式关系型数据库,底层同样采用Raft协议 官网:https://pingcap.com/zh/product/ github地址:https://github.com/pingcap/tidb **lu-raft-kv:** GitHub上开源的一套基于Raft的分布式存储实现,比较贴合论文 github地址:https://github.com/stateIs0/lu-raft-kv