# raft-java-demo **Repository Path**: summerXiaXiao/raft-java-demo ## Basic Information - **Project Name**: raft-java-demo - **Description**: 用java实现的一个简易版Raft共识算法,实现了一个简易版KV存储,实现了选举、心跳、日志拉取、日志同步等,结构分明,注释完善,提供测试类启动,不完全和Raft论文一致,自己做了很多修改 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 8 - **Created**: 2024-09-04 - **Last Updated**: 2024-09-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # raft-java-demo ## **一、Raft前置简介** Raft目前是最著名的分布式共识性算法,被广泛的应用在各种分布式框架、组件中,如Redis、RocketMq、Kafka、Nacos(CP)等 根据Raft论文,可将Raft拆分为如下**4个功能模块**: - 领导者选举 - 日志同步、心跳 - 持久化 - 日志压缩,快照(本文未实现) 这4个模块彼此并不完全独立,如日志的同步情况左右着领导者选举,快照也影响着日志同步等等;为了前后的递进性,对于一些功能的实现,可能会出现改动和优化,比如日志同步实现后,在数据持久化部分又会对同步做一些优化,提高主、从节点日志冲突解决的性能。 这里就不再过多的介绍了,看本文之前请先简单了解一下Raft算法,提供如下资料: - [图解分布式共识算法Paxos协议](https://returnac.cn/pages/algorithm/raft/%E5%9B%BE%E8%A7%A3%E5%88%86%E5%B8%83%E5%BC%8F%E5%85%B1%E8%AF%86%E7%AE%97%E6%B3%95Paxos%E5%8D%8F%E8%AE%AE.html "图解分布式共识算法Paxos协议") - [浅谈分布式一致性:Raft 与 SOFAJRaft](https://returnac.cn/pages/algorithm/raft/%E6%B5%85%E8%B0%88%E5%88%86%E5%B8%83%E5%BC%8F%E4%B8%80%E8%87%B4%E6%80%A7%EF%BC%9ARaft%E4%B8%8ESOFAJRaft.html "浅谈分布式一致性:Raft 与 SOFAJRaft") - [深入剖析共识性算法 Raft](https://returnac.cn/pages/algorithm/raft/%E6%B7%B1%E5%85%A5%E5%89%96%E6%9E%90%E5%85%B1%E8%AF%86%E6%80%A7%E7%AE%97%E6%B3%95Raft.html "深入剖析共识性算法 Raft") - [深入解读Raft算法与etcd工程实现](https://returnac.cn/pages/algorithm/raft/%E6%B7%B1%E5%85%A5%E8%A7%A3%E8%AF%BBRaft%E7%AE%97%E6%B3%95%E4%B8%8Eetcd%E5%B7%A5%E7%A8%8B%E5%AE%9E%E7%8E%B0.html "深入解读Raft算法与etcd工程实现") - [Raft一致性算法论文-译文](https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md "Raft一致性算法论文-译文") - [SOFA-JRaft:蚂蚁金服的Raft算法实现库(JAVA版)](https://github.com/sofastack/sofa-jraft "SOFA-JRaft:蚂蚁金服的Raft算法实现库(JAVA版)") **本文实现不完全和Raft论文一致,做了不少改动,核心思想不变,请悉知!!** ## 二、快速开始 提供了测试类,供用户快速体验和测试 ### 集群启动 每一个测试方法代表了一个节点,每启动一个则会生成一个日志文件,因为并没有接注册中心所以不支持动态节点接入,只能在启动时设置好集群节点,目前提供了5集群节点 ![](./images/image_G3RuKmnSG3.png) 设置了大量DEBUG日志,以便观察状态,如果发生如下异常,是正常的(主要是心跳检测任务被中断导致的): ![](./images/image_4cqFZUA1DU.png) ### 客户端测试 同时提供了向集群发送命令请求的测试方法,注意目前只支持SET,GET命令,自带重定向处理,会自动请求到leader节点 ![](./images/image_HmNapfisHK.png) 会得到像这样的结果: ![](./images/image_rxMaq_HQZ1.png) ## 三、功能流程简介 你看完上述资料,应该对Raft有一个基本了解了,本文我们实现了一个Raft算法下的简易版的KV存储,我将它拆分成一下几个角色: **RPC模块**:复制各节点间的信息传递,如心跳、日志、选举等等 **节点模块**:节点有三种状态leader、follow、candidate,每种状态下所要做的事是不一样的 **状态机**:负责节点状态的变更,日志持久化一致性处理,投票一致性处理 **定时任务**:leader需要定时发送心跳,follow需要定时检测leader是否存活等 **日志模块**:日志需要持久化在本地文件,还需要给其他节点同步 以上几个角色相互配合,实现以下几个主要功能流程: ### 1.选举流程 实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是: 1. Follow节点发现Leader节点挂了,则升级为Candidate节点发起投票 2. 其他Follow节点收到投票请求后,根据条件判断是否投票给它,True或者False 3. Candidate一旦收到的投票通过请求过半,则升级为Leader 4. 升级Leader后发送心跳,阻止其他Follow变成Candidate ![](./images/image_LrgRK2aI8j.png) ### 2.心跳流程 > **注意**:这里和原文有区别,我将心跳和日志做了拆分,不再耦合了,因为我觉得在没有客户端请求的情况下,记录这些心跳日志没有意义,在没有数据日志或者说数据日志水平都是一样的情况下,谁做Leader我觉得都OK 实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是: 1. Leader会定时发送心跳请求给Follow,告诉它我还活着,防止它篡位 2. Follow收到心跳后返回一个心跳响应 3. Leader收到的心跳响应没有过半则自动降级成为Follow停止对外服务 (为什么要心跳响应,还要自动降级?后面咱们细说) ![](./images/image_85yUz_2oOA.png) ### 3.KV客户端请求流程 因为我们要做的是一个简易版KV嘛,那肯定有客户端发送命令嘛对不对: 1. 客户端发送SET或者GET命令,集群返回成功或者数据 2. 发送SET命令,只有Leader会处理,同步给其他Follow,然后根据结果返回成功还是失败 3. 发送GET命令,目前也只有Leader会处理,返回对应数据,没有就null(GET没有日志产生) > 节点间日志的同步持久化后面细说,这里也看的出来为什么分布式体系下CAP不能共存了,你想要高可用,性能好,就必须在请求leader刷盘成功后返回甚至异步刷盘,这就必然导致可能存在数据丢失或者主从数据不一致的情况,如果你想要一致性,就必须在节点日志都同步完成后才返回(下面我们将日志同步流程) ![](./images/image_erHc5iRNHs.png) ### 4.日志同步流程 > 上面说过了,我们将心跳和日志做了拆分,只有客户端请求SET命令才会产生日志 1. Leader收到客户端请求后,先预提交到内存中,后发送预提交命令给所有Follow 2. Follow收到Leader的预提交命令同样先提交到内存,然后响应Leader 3. Leader一旦收到超过半数的Follow响应则执行刷盘持久化,否则给客户端响应失败 4. Leader刷盘成功后,给所有Follow发送刷盘请求,然后给客户端响应成功(无需关心Follow刷盘结果) > 这就是很典型的CP流程,保持了一致性和数据不丢失,但大大降低了性能(发现没有尽管这样做,依旧可能存在Follow数据丢失的情况,比如:我是新加入的Follow节点、Follow节点刷盘失败等等情况,那该怎么办呢,我们下面接着来补充) ![](./images/image_u-4gTXWa1A.png) ### 5.日志校验流程 正如上所说,日志依旧存在丢失的风险,我们需要做一个日志校验定时任务,定时校验日志是否丢失,由于这个和日志的设计息息相关,所以我们后面在细说,这里简单过一遍流程 1. follow会有一个定时任务,定时Check日志文件,寻找缺失的日志 2. 如果有则拿到缺失的日志发送拉取请求到Leader,获取对应的日志 3. 然后填充进日志文件,这样就一定保持了和Leader日志数据对齐了 > 难道每次都要从头到尾扫描一次文件吗?当然不是,扫描过的不需要扫描,有checkPoint,每次只是从checkPoint扫描到lastLogIndex ![](./images/image_-Rp0JDVzhL.png) ## 四、模块简介 ### 1.RPC模块 这里我们采用Netty框架来做,每个节点即是Client又是Server 按原Raft算法来说,一共有以下几种RPC类型的通信: > RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。 > AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。 但是我将它进行了一个拆分,拆分的更细了: - **RequestVoteRPC**-请求投票 RPC,由 Candidate 在选举期间发起。 - **RequestVoteResult**-投票响应RPC,由follow投票 - **HeartBeatRequest**-心跳RPC,由leader定时不间断发起 - **HeartBeatResult**-心跳响应RPC,由follow响应 - **AppendEntriesPreCommit**-日志预提交RPC,由leader发起预提交 - **AppendEntriesPreCommitResult**-日志预提交响应RPC,由follow响应 - **AppendEntriesCommit**-日志提交RPC,预提交成功后,leader会发起真正提交的命令 - **LogIndexPull**-日志拉取RPC,follow定时检测发现自身存在日志丢失,向leader主动拉取日志 - **LogIndexPullResult**-日志拉取响应RPC,leader发现follow存在日志缺失,把日志发给follow - **ClientRequest**-客户端请求RPC,KV存储的客户端,向集群发出的命令 - **ClientResponse**-客户端请求响应RPC,对客户端的响应 分别对应着一个实体类: ![](./images/image_FD_caLDv8k.png) > RPC整体的编解码设计,序列化等等,都和我之前写的RPC框架差不多,这里就不在过多的介绍了,有兴趣可以看看我的: [如何从0-1手写一个RPC框架](https://returnac.cn/pages/wheel/Easy-RPC.html) 这里只介绍一下相比原来做出的调整,原来RPC框架传输的实体是固定的,而现在多了很多,而且大量涉及到同步请求返回,所以相比原来新增了泛型的处理,如下示例,两行代码就搞定了一次请求: ```java RpcSession rpcSession = RpcSessionFactory.openSession(serverConfig, clientRequest); ClientResponse clientResponse = rpcSession.syncSend(4000L); ``` 同时支持:同步等待、超时等待、异步三种请求方式: ```java public interface RpcSession{ R syncSend(); R syncSend(long timeout); void asyncSend(); } ``` 感兴趣的建议自己看看,RPC所在目录和Netty所有Handler如下: ![](./images/image_LCw7Owsir7.png) ### 2.节点模块 节点有三种类型,leader、candidate、follow,所以我这抽象出一个节点接口,三种实现,一个统一对外服务,一个全局节点信息类 #### 一个节点接口 ```java public interface RaftNode { /** 客户端的请求,会产生日志 : 只有leader才会处理,follow返回leader地址,candidate拒绝 */ ClientResponse clientRequestHandler(ClientRequest command,List serverConfigs) throws ExecutionException, InterruptedException; /** leader发来的log预处理:会先缓存 */ AppendEntriesPreCommitResult logPreCommitHandler(AppendEntriesPreCommit appendEntriesPreCommit); /** leader发来的log提交请求 */ void logCommitHandler(AppendEntriesCommit appendEntriesCommit); /** follow发来的log拉取请求 */ LogIndexPullResult sendLogPullRequest(List pullLogIndex); /** leader要处理follow的拉取请求 */ LogIndexPullResult logPullRequestHandler(LogIndexPull logIndexPull); /** 发起投票 : 只有候选者 才会发起 */ boolean callVoteRequest(List serverConfigs) throws ExecutionException, InterruptedException; /** 投票请求处理 */ RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC); /** 发起心跳 : 只有领导才会发起心跳 阻止其他节点成为候选人*/ boolean callHeartBeatRequest(ListserverConfigs) throws ExecutionException, InterruptedException; /** 心跳请求处理 : 只有追随者/候选人才会处理*/ HeartBeatResult heartBeatHandler(HeartBeatRequest heartBeatRequest); } ``` #### 三种实现 ![](./images/image_zX9dA2rAK0.png) #### 一个对外服务 ```java public class RaftNodeService { private static final Logger log = LoggerFactory.getLogger(RaftNodeService.class); // 心跳间隔时间 private final static long INTERVAL_TIME = 1500L; private static Map raftNodeMap = new ConcurrentHashMap<>(8); static { raftNodeMap.put(NodeStatusEnums.LEADER, new LeaderRaftNode()); raftNodeMap.put(NodeStatusEnums.CANDIDATE, new CandidateRaftNode()); raftNodeMap.put(NodeStatusEnums.FOLLOW, new FollowRaftNode()); } /** * 节点信息初始化 */ public static void raftNodeInit(ServerConfig self, List clusterConfig) { RaftNodeInfo.getInstance().setSelf(self); RaftNodeInfo.getInstance().setClusterConfig(clusterConfig); RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW); createElectionTask(); } /** * 发送心跳 */ public synchronized static void sendHeartBeat() { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); boolean result = false; try { result = raftNode.callHeartBeatRequest(RaftNodeInfo.getInstance().getClusterConfig()); } catch (ExecutionException e) { log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e); } catch (InterruptedException e) { log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e); } if (!result) { // 代表心跳失败了,状态已经变更了 // 需要停止心跳,开启心跳检测 heartBeatTestDestroy(); createElectionTask(); } } /** * 心跳处理 */ public static void heartBeatHandler(HeartBeatRequest request, Channel channel) { ThreadPoolUtils.nettyServerAsyncPool.execute(() -> { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); channel.writeAndFlush(new RpcRemoteMsg(raftNode.heartBeatHandler(request))); // 收到了心跳,所以就要停止当前心跳的检测,然后重新开启一个检测任务 createElectionTask(); }); } // 命令合规性校验 目前就get set 随便校验一下 public static boolean commandCheck(String command) { if (command == null || !"SET_GET".contains(command.split(" ")[0]) || command.split(" ").length < 2) { return false; } return true; } /** * 客户端的请求,以KV为例 就是set命令 , 这里请求返回就简陋一点 */ public static void clientRequestHandler(ClientRequest request, Channel channel) { ThreadPoolUtils.nettyServerAsyncPool.execute(() -> { ClientResponse clientResponse = ClientResponse.builder().build(); clientResponse.setRequestId(request.getRequestId()); if (!commandCheck(request.getCommand())) { clientResponse.setCode(401); clientResponse.setMsg("命令格式不正确"); channel.writeAndFlush(new RpcRemoteMsg(clientResponse)); return; } // 只有set命令才需要发送日志,get命令直接取数据就行了 String[] command = request.getCommand().split(" "); if (command[0].equals("SET")) { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); try { channel.writeAndFlush(new RpcRemoteMsg(raftNode.clientRequestHandler(request, RaftNodeInfo.getInstance().getClusterConfig()))); return; } catch (ExecutionException e) { log.debug(" {}: 日志提交失败了:{}", request.getCommand(), e.getMessage(), e); clientResponse.setCode(500); clientResponse.setMsg(e.getMessage()); channel.writeAndFlush(new RpcRemoteMsg(clientResponse)); } catch (InterruptedException e) { log.debug(" {}: 日志提交失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e); clientResponse.setCode(500); clientResponse.setMsg(e.getMessage()); channel.writeAndFlush(new RpcRemoteMsg(clientResponse)); } } else { // get命令直接取值 clientResponse.setCode(200); clientResponse.setData(RaftNodeInfo.getInstance().getLogManage().getDataByKey(command[1])); channel.writeAndFlush(new RpcRemoteMsg(clientResponse)); } }); } /** * Log预提交请求 */ public static void logPreCommitHandler(AppendEntriesPreCommit request, Channel channel) { ThreadPoolUtils.nettyServerAsyncPool.execute(() -> { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); channel.writeAndFlush(new RpcRemoteMsg(raftNode.logPreCommitHandler(request))); }); } /** * Log提交请求 */ public static void logCommitHandler(AppendEntriesCommit request) { ThreadPoolUtils.nettyServerAsyncPool.execute(() -> { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); raftNode.logCommitHandler(request); // 收到了日志,所以就要停止当前心跳的检测,然后重新开启一个检测任务 createElectionTask(); }); } /** * 发起投票 */ public synchronized static void sendCallVote() { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); boolean result = false; try { result = raftNode.callVoteRequest(RaftNodeInfo.getInstance().getClusterConfig()); } catch (ExecutionException e) { StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null); log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e); } catch (InterruptedException e) { StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null); log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e); } if (!result) { // 代表发起投票失败了,状态已经变更了 // 需要重新开启一个检测任务 createElectionTask(); return; } // 投票成功了 需要开启心跳任务 createHearBeatTask(); } /** * 发起投票请求处理 */ public synchronized static void callVoteHandler(RequestVoteRPC requestVoteRPC, Channel channel) { ThreadPoolUtils.nettyServerAsyncPool.execute(() -> { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); channel.writeAndFlush(new RpcRemoteMsg(raftNode.voteRequestHandler(requestVoteRPC))); }); } /** * 发起Log拉取请求 */ public synchronized static LogIndexPullResult sendLogPullRequest(List pullLogIndex) { if (CollectionUtil.isNotEmpty(pullLogIndex)) { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); return raftNode.sendLogPullRequest(pullLogIndex); } return null; } /** * 发起Log拉取请求处理 */ public synchronized static void logPullRequestHandler(LogIndexPull logIndexPull, Channel channel) { ThreadPoolUtils.nettyServerAsyncPool.execute(() -> { RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus()); log.debug("{}:拉取日志:{}",channel.remoteAddress(),JSONObject.toJSON(logIndexPull)); channel.writeAndFlush(new RpcRemoteMsg(raftNode.logPullRequestHandler(logIndexPull))); }); } /** * 销毁并创建心跳检测任务 */ public static void createElectionTask() { long randomTime = getRandomTime(); final long intervalTime = INTERVAL_TIME + randomTime; // 先销毁之前的 electionTaskDestroy(); //开启新的 ScheduledFuture schedule = ThreadPoolUtils.hearBeatAsyncPool.schedule(new ElectionTask(intervalTime), intervalTime, TimeUnit.MILLISECONDS); RaftNodeInfo.getInstance().setElectionTask(schedule); } /** * 销毁并创建心跳任务 */ public static void createHearBeatTask() { // 先销毁之前的 heartBeatTestDestroy(); //开启新的 ScheduledFuture schedule = ThreadPoolUtils.hearBeatAsyncPool.scheduleAtFixedRate(new HeartBeatTask(), 0L, INTERVAL_TIME, TimeUnit.MILLISECONDS); RaftNodeInfo.getInstance().setElectionTask(schedule); } public static long getRandomTime() { // 要比心跳慢一点 return RandomUtil.randomLong(250L, 1000L); } /** * 销毁心跳检测任务 */ public static void electionTaskDestroy() { if (null != RaftNodeInfo.getInstance().getElectionTask()) { RaftNodeInfo.getInstance().getElectionTask().cancel(true); RaftNodeInfo.getInstance().setElectionTask(null); } } /** * 销毁心跳任务 */ public static void heartBeatTestDestroy() { if (null != RaftNodeInfo.getInstance().getHeartBeatTask()) { RaftNodeInfo.getInstance().getHeartBeatTask().cancel(true); RaftNodeInfo.getInstance().setHeartBeatTask(null); } } ``` #### 一个全局节点信息类 ```java public class RaftNodeInfo { /** * 自己 */ private ServerConfig self; /** * 集群其他节点信息 */ private List clusterConfig; /** * 当前节点状态 默认FOLLOW */ private volatile NodeStatusEnums currentNodeStatus = NodeStatusEnums.FOLLOW; /** * 当前节点任期 */ private volatile long currentTerm = 0L; /** * 当前leader */ private volatile String currentLeaderId; /** * 最后日志索引 已提交的 */ private volatile long lastLogIndex = 0L; /** * 最后的日志任期 这我这没用到 */ private volatile long lastLogTerm = 0L; /** * 当前任期给谁投过票 */ private volatile String voteFor; /** * 最近更新时间 心跳或者日志更新 **/ private volatile long lastUpdateTime = 0L; /** * 心跳任务 **/ private ScheduledFuture heartBeatTask; /** * 心跳检测任务 **/ private ScheduledFuture electionTask; /** * 日志管理 **/ private LogManage logManage; /** * 日志文件 **/ private String logPath; } ``` ### 3.状态机 提供节点状态变更、心跳结果处理、投票结果处理、日志一致性处理 ```java public class StateMachines { private static final Logger log = LoggerFactory.getLogger(StateMachines.class); /** 候选人-》leader */ public static void becomeLeader(){ // 变为leader RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.LEADER); // leader设置为自己 RaftNodeInfo.getInstance().setCurrentLeader(RaftNodeInfo.getInstance().getSelf().toString()); // 票清了 RaftNodeInfo.getInstance().setVoteFor(null); } /** follow-》候选人 */ public static void becomeCandidate(){ // 变为候选人 RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.CANDIDATE); // 任期+1 RaftNodeInfo.getInstance().setCallVoteTerm(); // 给自己投一票 RaftNodeInfo.getInstance().setVoteFor(RaftNodeInfo.getInstance().getSelf().toString()); } /** 候选人、leader->follow */ public static void becomeFollow(long term,String leaderId,String voteFor){ RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW); RaftNodeInfo.getInstance().setCurrentLeader(leaderId); RaftNodeInfo.getInstance().setCurrentTerm(term); RaftNodeInfo.getInstance().setVoteFor(voteFor); RaftNodeInfo.getInstance().setLastUpdateTime(System.currentTimeMillis()); } /** 投票结果一致性处理 */ public static boolean voteResultHandler(List> taskList,Integer nodeNum) throws ExecutionException, InterruptedException { int voteNum = 0; for (Future future : taskList) { RequestVoteResult voteResult = future.get(); // 判断leader是否还存活 存活的话肯定要把我给否了呀 if (leaderIsLive(voteResult)) { return false; } if(voteResult!=null){ log.debug("投票结果,我的term:{} ,结果:{}",RaftNodeInfo.getInstance().getCurrentTerm(), JSONObject.toJSON(voteResult)); } if (null != voteResult && voteResult.isVoteGranted()) { voteNum++; } } if (voteNum != 0 && voteNum >= (nodeNum / 2)) { // 投票通过 升级为leader StateMachines.becomeLeader(); log.debug(" {}: 哈哈哈,我升级为leader啦", RaftNodeInfo.getInstance().getSelf().toString()); return true; } else { // 投票不通过,退成follow 继续苟着 StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null); log.debug(" {}: 完了,这帮人不支持我,等待机会再试", RaftNodeInfo.getInstance().getSelf().toString()); return false; } } // 判断leader是否还存活 存活的话肯定要把我给否了呀 private static boolean leaderIsLive(RequestVoteResult voteResult) { if (null != voteResult && StrUtil.isNotEmpty(voteResult.getLeaderId())) { // 被leader一票否决,退成follow 继续苟着 StateMachines.becomeFollow(voteResult.getTerm(), voteResult.getLeaderId(), null); return true; } return false; } /** 心跳结果一致性处理 */ public static boolean heartBeatResultHandler(List> taskList,Integer nodeNum) throws ExecutionException, InterruptedException { int responseNum = 0; for (Future future : taskList) { HeartBeatResult heartBeatResult = future.get(); if (null != heartBeatResult) { responseNum++; } } if (responseNum != 0 && responseNum >= (nodeNum / 2)) { log.debug("{}: 万众一心,我再接再厉", RaftNodeInfo.getInstance().getSelf().toString()); return true; } else { // 没有应答或者应答数量小于一半 就退化为候选者,并停止对外提供服务 // 状态变更 StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null); log.debug("{}: 我找不到追随者了,我暂时停止对外服务", RaftNodeInfo.getInstance().getSelf().toString()); return false; } } /** 日志预提交结果 */ public static boolean logPreCommitHandler(List> taskList, Integer nodeNum) throws ExecutionException, InterruptedException { int responseNum = 0; for (Future future : taskList) { AppendEntriesPreCommitResult preCommitResult = future.get(); if (null != preCommitResult && preCommitResult.isSuccess()) { responseNum++; } } return responseNum != 0 && responseNum >= (nodeNum / 2); } } ``` ### 4.日志模块 ```java public interface LogManage extends ResourceLifeCycle{ /** leader预提交 */ long preCommitLog(LogEntity logEntity); /** follow预提交 */ void preCommitLog(long preCommitLogId,LogEntity logEntity); /** 缓存移除 */ void cacheLogRemove(long cacheLogId); /** leader日志提交 */ long commitLog(long cacheLogId); /** follow日志提交 */ void commitLog(long cacheLogId,long logIndex); /** follow日志Check */ void logIndexCheck(); /** 根据日志索引获取日志内容 */ LogEntity getLogEntityByIndex(long logIndex, RandomAccessFile file); /** 命令数据处理 */ void dataHandler(String command); /** 根据Key获取数据 */ String getDataByKey(String key); } ``` ### 5.定时任务 ![](./images/image_DWkZrLe3c3.png) - **ElectionTask**:心跳检测任务,不通过则升级为Candidate - **HeartBeatTask**:心跳任务,不断给Follow发送心跳,阻止其成为Candidate - **LogIndexCheckTask**:Follow日志Check定时任务 ## 五、核心流程介绍 其实流程图已经很清楚了,这里挑部分来聊聊 ### 1.选举 目前心跳设置的时间为1500ms,心跳检测的时间为1750ms+0-750ms随机数(**之前随机数设置的很短,算上网络延迟等因素,导致两个Candidate同任期的几率非常之高**),follow收到心跳会更新lastUpdateTIme,而心跳检测则会检测这个时间到当前时间是否超过检测时间间隔,超过了则会变成candidate发起选举 ![](./images/image_LrgRK2aI8j.png) **CandidateRaftNode**:发起选举RPC **选举RPC实体类**: ```java public class RequestVoteRPC extends RpcMsgId implements Serializable { /** 候选人的任期号 */ private long term; /** 请求选票的候选人的 Id(ip:selfPort) */ private String candidateId; /** 候选人的最后日志条目的索引值 */ private long lastLogIndex; /** 候选人最后日志条目的任期号 */ private long lastLogTerm; } ``` **选举方法**: ```java public boolean callVoteRequest(List serverConfigs) throws ExecutionException, InterruptedException { if (CollectionUtil.isEmpty(serverConfigs)) { StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null); log.error("只有一个节点,还发起什么投票?"); return false; } // candidate 会发起投票请求 RaftNodeInfo instance = RaftNodeInfo.getInstance(); // 投票过程中 可能又收到了心跳或者日志,状态已经变为follow if (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) { return false; } log.debug(" {}: 哈哈哈,我发起了投票", RaftNodeInfo.getInstance().getSelf().toString()); List> taskList = new ArrayList<>(serverConfigs.size()); // 加上自己的一票 需要 大于= n/2+1 // 所以直接 >= n/2 就算通过了 // 但是注意此时如果已经存在leader,日志数又不比当前leader大,所以leader还是leader 具有一票否决权 for (ServerConfig serverConfig : serverConfigs) { Future voteResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> { // 构建投票 RequestVoteRPC voteRPC = RequestVoteRPC.builder().candidateId(instance.getSelf().toString()) .term(instance.getCurrentTerm()) // 成为候选 的时候任期就+1了 .lastLogIndex(instance.getLastLogIndex()).build(); RpcSession voteRPCRpcSession = RpcSessionFactory.openSession(serverConfig, voteRPC); return voteRPCRpcSession == null ? null : voteRPCRpcSession.syncSend(1000L); }); taskList.add(voteResultFuture); } // 投票过程中 可能状态又已经变为follow if (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) { return false; } return StateMachines.voteResultHandler(taskList, serverConfigs.size()); } ``` **Follow选举响应**: 1. 任期比我大我就同意 2. 任期跟我一样,记录的日志比我多而且我没有投过票我也同意 (Follow同一个任期内只能投一票) ```java public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) { // follow 需要处理投票请求 RaftNodeInfo instance = RaftNodeInfo.getInstance(); RequestVoteResult voteResult = RequestVoteResult.builder().term(instance.getCurrentTerm()).build(); voteResult.setRequestId(voteRPC.getRequestId()); // 1.任期比我大,我直接就同意 if (voteRPC.getTerm() > instance.getCurrentTerm()) { return agreeVote(voteResult, voteRPC); } // 2.任期跟我一样,记录的日志比我多 而且 我没有投过票 // 我只能投一票 if ((voteRPC.getTerm() == instance.getCurrentTerm() && voteRPC.getLastLogIndex() >= instance.getLastLogIndex()) && (instance.getVoteFor() == null || instance.getVoteFor().equals(voteRPC.getCandidateId()))) { return agreeVote(voteResult, voteRPC); } voteResult.setTerm(instance.getCurrentTerm()); voteResult.setVoteGranted(false); log.info(" {}: 我身为现任Follow,我不认可你的实力,我不能给你投票:{}", instance.getSelf().toString(), voteRPC.getCandidateId()); return voteResult; } private RequestVoteResult agreeVote(RequestVoteResult voteResult, RequestVoteRPC voteRPC) { voteResult.setTerm(RaftNodeInfo.getInstance().getCurrentTerm()); voteResult.setVoteGranted(true); RaftNodeInfo.getInstance().setCurrentTerm(voteRPC.getTerm()); RaftNodeInfo.getInstance().setVoteFor(voteRPC.getCandidateId()); log.info(" {}: 我身为现任Follow,我认可你的实力,我给你投票:{}", RaftNodeInfo.getInstance().getSelf().toString(), voteRPC.getCandidateId()); return voteResult; } ``` **Leader响应**: leader有没有可能收到投票?有可能!假设某一个Follow延迟收到心跳或者没有收到心跳就会发起,那leader就会收到它发起的投票,那怎么办?**判断任期和日志,任期和日志都比Leader大则Leader需要退位,否则Leader应该具有一票否决权**(这样就防止了某个follow无限发起投票,任期无限+1这种情况) > 一个candidate任期非常大的时候,其他follow必然会给他投票,那这样就升为leader就导致了同时存在两个leader的情况,所以这时候的当期leader应该具有一票否决权 ```java public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) { // leader 有可能收到 候选者的投票申请 RaftNodeInfo instance = RaftNodeInfo.getInstance(); RequestVoteResult requestVoteResult = RequestVoteResult.builder().build(); requestVoteResult.setRequestId(voteRPC.getRequestId()); // 候选人的任期比我大 而且日志还比我大 说明我已经out了,我需要退位 if (voteRPC.getTerm() >= instance.getCurrentTerm() && voteRPC.getLastLogIndex() > instance.getLastLogIndex()) { // 状态变更 StateMachines.becomeFollow(voteRPC.getTerm(), voteRPC.getCandidateId(), null); requestVoteResult.setTerm(voteRPC.getTerm()); requestVoteResult.setVoteGranted(true); log.info(" {}: 我身为现任leader,我认可你的实力,我下位让贤:{}", instance.getSelf().toString(), voteRPC.getCandidateId()); return requestVoteResult; } log.info(" {}: 我身为现任leader,不同你的上任请求:{}", instance.getSelf().toString(), voteRPC.getCandidateId()); // 否则就不同意,而且你还得给我老实点 requestVoteResult.setTerm(instance.getCurrentTerm()); requestVoteResult.setVoteGranted(false); requestVoteResult.setLeaderId(instance.getSelf().toString()); return requestVoteResult; } ``` ### 2.心跳 ![](./images/image__t3rroPm6H.png) 心跳这里我做了一个响应降级的操作,其实正常是不需要的,我这的目的是防止**网络分区!** 假设原本是这样: ![](./images/image_TGbhe5jfNL.png) 一旦网络分区则会变成这样,导致两个leader的出现,所以这时候心跳的响应就至关重要,一旦响应少于半数,则leader应该自动降级 ![](./images/image_wehoUGWuM_.png) **LeaderRaftNode**:发起心跳 ```java public boolean callHeartBeatRequest(List serverConfigs) throws ExecutionException, InterruptedException { if (CollectionUtil.isEmpty(serverConfigs)) { StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null); log.debug(" {}: 只有一个leader,还发什么心跳?", RaftNodeInfo.getInstance().getSelf().toString()); return false; } List> taskList = new ArrayList<>(serverConfigs.size()); // leader 需要发送心跳 防止网络分区,一旦心跳返回不足 n/2 则自动降级 for (ServerConfig serverConfig : serverConfigs) { Future heartBeatResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> { HeartBeatRequest build = HeartBeatRequest.builder() .leaderId(RaftNodeInfo.getInstance().getSelf().toString()) .leaderLastCommitIndex(RaftNodeInfo.getInstance().getLastLogIndex()) .term(RaftNodeInfo.getInstance().getCurrentTerm()).build(); RpcSession heartBeatRequestRpcSession = RpcSessionFactory.openSession(serverConfig, build); return heartBeatRequestRpcSession == null ? null : heartBeatRequestRpcSession.syncSend(200L); }); taskList.add(heartBeatResultFuture); } // 响应结果处理 return StateMachines.heartBeatResultHandler(taskList, serverConfigs.size()); } ``` ### 3.日志 日志设计的非常之简陋,就不做过多的介绍了,本文目的还是以实现Raft为主,性能问题暂不考虑,不过还是说一下测试结果,因为KV存储,项目启动需要读取数据放入内存,目前读取50m左右文件10w条日志需要8s左右,肯定是不合理的,**目前并没有做日志压缩和快照,也没有用零拷贝技术**,因为不想搞的太过复杂 ![](./images/image_96m5l3bxPO.png) **关于日志check,这里放上两种测试常见的结果** ![](./images/image_-Rp0JDVzhL.png) 1.新的节点加入,需要拉取一次所有数据 ![](./images/image_QjTkwz-TbM.png) 2.日志中间缺失 ![](./images/image_z7R1sbHD7I.png) 两种情况都是没问题的! ## 六、遗留的问题 **注意**:尽管这样还是有几率导致数据丢失的!!!! **再次强调**:本文不完全和Raft论文对标,加了不少个人的想法进去,所以在这个过程中都是**遇到问题、思考问题、解决问题**,这本就是一个学习的过程,目前最大的一个问题就是: > **新加入的节点已经收到了Leader的数据,更新的lastCommitIndex,但是还没来得及向Leader同步以前的数据,而这时Leader挂了,所以这时候这个节点就有几率通过投票成为Leader,这时候数据就有几率丢失**,**文章中可能看不太出来,具体得看看代码,这算是一个很严重的BUG,各位想想可以怎么解决,而Raft又是怎么解决的?** 当然可能还有其他问题,各位大佬如果知道的也可以提出来 ## 七、总结 只有深入本质才能顺应发展,在分布式体系下,共识算法是必不可少的,光看不实践就容易眼高手低,当初我看Raft的时候也感觉挺简单的,不就是三种状态做不同的事,然后状态变更嘛,真正一做起来就发现好多细节都需要考虑,这还只是个demo,回头想想RocketMq和kafka的存储设计是真的厉害,做完这个又收获不少