1 Star 0 Fork 1

damon/raft-demo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.go 9.55 KB
一键复制 编辑 原始数据 按行查看 历史
feipeng 提交于 2017-02-27 06:59 . add send append
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
)
// raft协议
// http://www.thinkingyu.com/articles/Raft/
// http://www.infoq.com/cn/articles/raft-paper
var (
Name string // server对应的名称
Port int // server对应会监听的端口(内部之间的)
Servers []int // 其他服务对应的端口
ServePort int // 提供外部服务的端口
)
// 服务器角色
type Role int
const (
LeaderRole Role = iota // 领导者角色
CandidatedRole // 候选人
FollowerRole // 跟随者
)
// 日志条目信息
type LogItem struct {
Command string // 要执行的命令
Term int // 从领导人中收到的任期号
}
type Message struct {
Type int // 消息类型,0:选举消息,1:复制日志,2:回复选举,3:回复日志
Data interface{} // 消息数据
}
func logInfo(ty string, msg interface{}) {
t := time.Now()
v := t.Format("Mon Jan 2 15:04:05 -0700 MST 2006")
fmt.Printf("[%s] - %s - %+v\n", v, ty, msg)
}
type Client struct {
Conn net.Conn
}
func (c *Client) Close() {
c.Conn.Close()
}
// 发送选举消息
func (c *Client) SendVote(vote RequestVote, respChan chan RequestVoteResponse) (err error) {
data, err := json.Marshal(vote)
if err != nil {
return
}
var VoteType int = 0
err = binary.Write(c.Conn, binary.LittleEndian, &VoteType)
if err != nil {
return
}
dataLength := len(data)
err = binary.Write(c.Conn, binary.LittleEndian, &dataLength)
buffer := bytes.NewBuffer(data)
buffer.WriteTo(c.Conn)
go func(respChan chan RequestVoteResponse) {
err := binary.Read(c.Conn, binary.LittleEndian, &VoteType)
if err != nil {
logInfo("error", err.Error())
return
}
err = binary.Read(c.Conn, binary.LittleEndian, &dataLength)
if err != nil {
logInfo("error", err.Error())
return
}
respData := make([]byte, dataLength)
_, err = io.ReadFull(c.Conn, respData)
if err != nil {
logInfo("error", err.Error())
return
}
resp := RequestVoteResponse{}
err = json.Unmarshal(respData, &resp)
if err != nil {
logInfo("error", err.Error())
return
}
respChan <- resp
}(respChan)
return
}
func (c *Client) SendAppendEntry(entris AppendEntries, respChan chan AppendEntriesResponse) error {
var VoteType int = 1
err = binary.Write(c.Conn, binary.LittleEndian, &VoteType)
if err != nil {
return
}
data, err := json.Marshal(entris)
if err != nil {
return
}
dataLength := len(data)
err = binary.Write(c.Conn, binary.LittleEndian, &dataLength)
if err != nil {
return
}
buffer := bytes.NewBuffer(data)
buffer.WriteTo(c.Conn)
go func(c *Client, respChan chan AppendEntriesResponse) {
err := binary.Read(c.Conn, binary.LittleEndian, &VoteType)
if err != nil {
logInfo("error", err.Error())
return
}
err = binary.Read(c.Conn, binary.LittleEndian, &dataLength)
if err != nil {
logInfo("error", err.Error())
return
}
respData := make([]byte, dataLength)
_, err = io.ReadFull(c.Conn, respData)
if err != nil {
logInfo("error", err.Error())
return
}
resp := AppendEntriesResponse{}
err = json.Unmarshal(respData, &resp)
if err != nil {
logInfo("error", err.Error())
return
}
}(c, respChan)
return nil
}
// 服务器
type Server struct {
// 服务器基本信息
ServerId string // 服务器id
Role Role // 角色,leader, candidate, follow
OtherPorts []int // 其他端口号
Port int // 监听端口号
// 服务器上持久存在的
CurrentTerm int // 服务器最后知道的任期号(从0开始递增)
VotedFor string // 在当前任期内收到选票的候选人 id(如果没有就为 null) -> 此处设置为字符串"null",作为一个特殊值来处理
Log []LogItem // 日志条目;每个条目包含状态机的要执行命令和从领导人处收到时的任期号
// 服务器上不稳定存在的
CommitIndex int // 已知的被提交的最大日志条目的索引值(从0开始递增)
LastApplied int // 被状态机执行的最大日志条目的索引值(从0开始递增)
// 在领导人服务器上不稳定存在的
NextIndex map[string]int // 对于每一个服务器,记录需要发给它的下一个日志条目的索引(初始化为领导人上一条日志的索引值+1)
MatchIndex map[string]int // 对于每一个服务器,记录已经复制到该服务器的日志的最高索引值(从0开始递增)
ClientMutex sync.Mutex // clients链接进来时的锁操作
Clients []*Client // 其他相关连接进来的服务器
// raft的时间分为:election, normal operation, no emerging leader, 称为terms
// 选举时间
ElectionTime time.Duration // 默认为3s, 如果某server没有超时的情况下收到来自leader或者candidate的任何RPC,定时器重启,
// 如果超时,它就开始一次选举
}
func NewServer(Name string, Port int, Ports []int) *Server {
srv := &Server{
ServerId: Name,
Port: Port,
OtherPorts: Ports,
Role: FollowerRole,
CurrentTerm: 0, // 最后任期号
VotedFor: "null",
CommitIndex: 0,
LastApplied: 0,
ElectionTime: time.Duration(3),
}
return srv
}
// 开始启动服务
func (srv *Server) Start() {
fmt.Printf("start listen port %d\n", srv.Port)
ln, err := net.Listen("tcp4", fmt.Sprintf("0.0.0.0:%d", srv.Port))
if err != nil {
logInfo("error", err.Error())
return
}
defer ln.Close()
for {
c, err := ln.Accept()
if err != nil {
logInfo("error", err.Error())
}
go srv.ServeConnect(c)
}
}
// 服务链接
func (srv *Server) ServeConnect(conn net.Conn) {
client := &Client{
Conn: conn,
}
defer client.Close()
srv.ClientMutex.Lock()
srv.Clients = append(srv.Clients, client)
srv.ClientMutex.Unlock()
// 设定时间判定
}
func (srv *Server) Vote() error {
// 只有Follower需要选举
if srv.Role != FollowerRole {
return errors.New("not correct role for vote")
}
// 选举的时候,将自己设置为候选人
srv.Role = CandidatedRole
logTerm := srv.Log[srv.CommitIndex]
respChan := make(chan RequestVoteResponse)
defer close(respChan)
// 发送消息给其他机器的选举消息
vote := RequestVote{
Term: srv.CurrentTerm,
CandidateId: srv.ServerId,
LastLogIndex: srv.CommitIndex, // 需要当选为候选人,必须要保证将候选人的日志至少和服务器上的日志一样新
LastLogTerm: logTerm.Term,
}
// 给所有关联的机器发送选举消息
for _, client := range srv.Clients {
go func(client *Client, respChan chan RequestVoteResponse) {
err := client.SendVote(vote, respChan)
if err != nil {
return
}
}(client, respChan)
}
// 等待选举结果
for {
select {
case resp := <-respChan:
logInfo("info", resp)
case <-time.After(time.Second * srv.ElectionTime): // 选举超时
logInfo("info", "vote timeout")
break
}
}
return nil
}
// 领导人来调用复制日志,心跳日志也是用这个消息格式
type AppendEntries struct {
Term int // 领导人的任期号
LeaderId string // 领导人的 id,为了其他服务器能重定向到客户端
PrevLogIndex int // 最新日志之前的日志的索引值
PrevLogTerm int // 最新日志之前的日志的领导人任期号
Entries []LogItem // 将要存储的日志条目(表示 heartbeat 时为空,有时会为了效率发送超过一条)
LeaderCommit int // 领导人提交的日志条目索引值
}
type AppendEntriesResponse struct {
Term int // 当前的任期号,用于领导人更新自己的任期号
Success bool // 如果其它服务器包含能够匹配上 prevLogIndex 和 prevLogTerm 的日志时为真
ServerId string // ServerId
}
// 请求选举
type RequestVote struct {
Term int // 候选人的任期号
CandidateId string // 请求投票的候选人 id
LastLogIndex int // 候选人最新日志条目的索引值
LastLogTerm int // 候选人最新日志条目对应的任期号
}
// 选举响应
type RequestVoteResponse struct {
Term int // 目前的任期号,用于候选人更新自己
VoteGranted int // 如果候选人收到选票为 true
}
func init() {
flag.StringVar(&Name, "name", "", "-name=serverName")
flag.IntVar(&Port, "port", 1200, "-port=1200")
var servers string
flag.StringVar(&servers, "servers", "", "-servers=192,120")
flag.Parse()
for _, srv := range strings.Split(servers, ",") {
v, _ := strconv.Atoi(srv)
Servers = append(Servers, v)
}
}
func main() {
fmt.Println(Name, Port, Servers)
srv := NewServer(Name, Port, Servers)
srv.Start()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/damon/raft-demo.git
git@gitee.com:damon/raft-demo.git
damon
raft-demo
raft-demo
master

搜索帮助