1 Star 0 Fork 2

who7708/etcd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
network.go 3.05 KB
一键复制 编辑 原始数据 按行查看 历史
Xiang Li 提交于 2015-02-06 10:03 . raftest: wait for network sending
package rafttest
import (
"math/rand"
"sync"
"time"
"github.com/coreos/etcd/raft/raftpb"
)
// a network interface
type iface interface {
send(m raftpb.Message)
recv() chan raftpb.Message
disconnect()
connect()
}
// a network
type network interface {
// drop message at given rate (1.0 drops all messages)
drop(from, to uint64, rate float64)
// delay message for (0, d] randomly at given rate (1.0 delay all messages)
// do we need rate here?
delay(from, to uint64, d time.Duration, rate float64)
disconnect(id uint64)
connect(id uint64)
// heal heals the network
heal()
}
type raftNetwork struct {
mu sync.Mutex
disconnected map[uint64]bool
dropmap map[conn]float64
delaymap map[conn]delay
recvQueues map[uint64]chan raftpb.Message
}
type conn struct {
from, to uint64
}
type delay struct {
d time.Duration
rate float64
}
func newRaftNetwork(nodes ...uint64) *raftNetwork {
pn := &raftNetwork{
recvQueues: make(map[uint64]chan raftpb.Message),
dropmap: make(map[conn]float64),
delaymap: make(map[conn]delay),
disconnected: make(map[uint64]bool),
}
for _, n := range nodes {
pn.recvQueues[n] = make(chan raftpb.Message, 1024)
}
return pn
}
func (rn *raftNetwork) nodeNetwork(id uint64) iface {
return &nodeNetwork{id: id, raftNetwork: rn}
}
func (rn *raftNetwork) send(m raftpb.Message) {
rn.mu.Lock()
to := rn.recvQueues[m.To]
if rn.disconnected[m.To] {
to = nil
}
drop := rn.dropmap[conn{m.From, m.To}]
delay := rn.delaymap[conn{m.From, m.To}]
rn.mu.Unlock()
if to == nil {
return
}
if drop != 0 && rand.Float64() < drop {
return
}
// TODO: shall we delay without blocking the send call?
if delay.d != 0 && rand.Float64() < delay.rate {
rd := rand.Int63n(int64(delay.d))
time.Sleep(time.Duration(rd))
}
select {
case to <- m:
default:
// drop messages when the receiver queue is full.
}
}
func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
rn.mu.Lock()
fromc := rn.recvQueues[from]
if rn.disconnected[from] {
fromc = nil
}
rn.mu.Unlock()
return fromc
}
func (rn *raftNetwork) drop(from, to uint64, rate float64) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.dropmap[conn{from, to}] = rate
}
func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.delaymap[conn{from, to}] = delay{d, rate}
}
func (rn *raftNetwork) heal() {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.dropmap = make(map[conn]float64)
rn.delaymap = make(map[conn]delay)
}
func (rn *raftNetwork) disconnect(id uint64) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.disconnected[id] = true
}
func (rn *raftNetwork) connect(id uint64) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.disconnected[id] = false
}
type nodeNetwork struct {
id uint64
*raftNetwork
}
func (nt *nodeNetwork) connect() {
nt.raftNetwork.connect(nt.id)
}
func (nt *nodeNetwork) disconnect() {
nt.raftNetwork.disconnect(nt.id)
}
func (nt *nodeNetwork) send(m raftpb.Message) {
nt.raftNetwork.send(m)
}
func (nt *nodeNetwork) recv() chan raftpb.Message {
return nt.recvFrom(nt.id)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/who7708/etcd.git
git@gitee.com:who7708/etcd.git
who7708
etcd
etcd
v2.0.6

搜索帮助