63 Star 183 Fork 3

Gitee 极速下载/hyperledger-fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
克隆/下载
discovery_impl.go 27.38 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"bytes"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip/msgstore"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/spf13/viper"
)
const defaultHelloInterval = time.Duration(5) * time.Second
const msgExpirationFactor = 20
var aliveExpirationCheckInterval time.Duration
var maxConnectionAttempts = 120
// SetAliveTimeInterval sets the alive time interval
func SetAliveTimeInterval(interval time.Duration) {
viper.Set("peer.gossip.aliveTimeInterval", interval)
}
// SetAliveExpirationTimeout sets the expiration timeout
func SetAliveExpirationTimeout(timeout time.Duration) {
viper.Set("peer.gossip.aliveExpirationTimeout", timeout)
aliveExpirationCheckInterval = time.Duration(timeout / 10)
}
// SetAliveExpirationCheckInterval sets the expiration check interval
func SetAliveExpirationCheckInterval(interval time.Duration) {
aliveExpirationCheckInterval = interval
}
// SetReconnectInterval sets the reconnect interval
func SetReconnectInterval(interval time.Duration) {
viper.Set("peer.gossip.reconnectInterval", interval)
}
type timestamp struct {
incTime time.Time
seqNum uint64
lastSeen time.Time
}
func (ts *timestamp) String() string {
return fmt.Sprintf("%v, %v", ts.incTime.UnixNano(), ts.seqNum)
}
type gossipDiscoveryImpl struct {
incTime uint64
seqNum uint64
self NetworkMember
deadLastTS map[string]*timestamp // H
aliveLastTS map[string]*timestamp // V
id2Member map[string]*NetworkMember // all known members
aliveMembership *util.MembershipStore
deadMembership *util.MembershipStore
msgStore *aliveMsgStore
bootstrapPeers []string
comm CommService
crypt CryptoService
lock *sync.RWMutex
toDieChan chan struct{}
toDieFlag int32
logger *logging.Logger
disclosurePolicy DisclosurePolicy
}
// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy) Discovery {
d := &gossipDiscoveryImpl{
self: self,
incTime: uint64(time.Now().UnixNano()),
seqNum: uint64(0),
deadLastTS: make(map[string]*timestamp),
aliveLastTS: make(map[string]*timestamp),
id2Member: make(map[string]*NetworkMember),
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
crypt: crypt,
comm: comm,
lock: &sync.RWMutex{},
toDieChan: make(chan struct{}, 1),
toDieFlag: int32(0),
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint),
disclosurePolicy: disPol,
}
d.msgStore = newAliveMsgStore(d)
go d.periodicalSendAlive()
go d.periodicalCheckAlive()
go d.handleMessages()
go d.periodicalReconnectToDead()
go d.handlePresumedDeadPeers()
go d.connect2BootstrapPeers(bootstrapPeers)
d.logger.Info("Started", self, "incTime is", d.incTime)
return d
}
// Lookup returns a network member, or nil if not found
func (d *gossipDiscoveryImpl) Lookup(PKIID common.PKIidType) *NetworkMember {
d.lock.RLock()
defer d.lock.RUnlock()
nm := d.id2Member[string(PKIID)]
return nm
}
func (d *gossipDiscoveryImpl) Connect(member NetworkMember, sendInternalEndpoint func() bool) {
d.logger.Debug("Entering", member)
defer d.logger.Debug("Exiting")
go func() {
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
peer := &NetworkMember{
InternalEndpoint: member.InternalEndpoint,
Endpoint: member.Endpoint,
}
if !d.comm.Ping(peer) {
if d.toDie() {
return
}
d.logger.Warning("Could not connect to", member)
time.Sleep(getReconnectInterval())
continue
}
req := d.createMembershipRequest(sendInternalEndpoint()).NoopSign()
d.comm.SendToPeer(peer, req)
return
}
}()
}
func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) {
if len(d.self.InternalEndpoint) == 0 {
d.logger.Panic("Internal endpoint is empty:", d.self.InternalEndpoint)
}
if len(strings.Split(d.self.InternalEndpoint, ":")) != 2 {
d.logger.Panicf("Self endpoint %s isn't formatted as 'host:port'", d.self.InternalEndpoint)
}
myPort, err := strconv.ParseInt(strings.Split(d.self.InternalEndpoint, ":")[1], 10, 64)
if err != nil {
d.logger.Panicf("Self endpoint %s has not valid port'", d.self.InternalEndpoint)
}
d.logger.Info("Entering:", endpoints)
defer d.logger.Info("Exiting")
endpoints = filterOutLocalhost(endpoints, int(myPort))
if len(endpoints) == 0 {
return
}
for i := 0; i < maxConnectionAttempts && !d.somePeerIsKnown() && !d.toDie(); i++ {
var wg sync.WaitGroup
req := d.createMembershipRequest(true).NoopSign()
wg.Add(len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
peer := &NetworkMember{
Endpoint: endpoint,
InternalEndpoint: endpoint,
}
if !d.comm.Ping(peer) {
return
}
d.comm.SendToPeer(peer, req)
}(endpoint)
}
wg.Wait()
time.Sleep(getReconnectInterval())
}
}
func (d *gossipDiscoveryImpl) somePeerIsKnown() bool {
d.lock.RLock()
defer d.lock.RUnlock()
return len(d.aliveLastTS) != 0
}
func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
if d.toDie() {
return
}
var peers2SendTo []*NetworkMember
memReq := d.createMembershipRequest(true).NoopSign()
d.lock.RLock()
n := d.aliveMembership.Size()
k := peerNum
if k > n {
k = n
}
aliveMembersAsSlice := d.aliveMembership.ToSlice()
for _, i := range util.GetRandomIndices(k, n-1) {
pulledPeer := aliveMembersAsSlice[i].GetAliveMsg().Membership
var internalEndpoint string
if aliveMembersAsSlice[i].Envelope.SecretEnvelope != nil {
internalEndpoint = aliveMembersAsSlice[i].Envelope.SecretEnvelope.InternalEndpoint()
}
netMember := &NetworkMember{
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiId,
InternalEndpoint: internalEndpoint,
}
peers2SendTo = append(peers2SendTo, netMember)
}
d.lock.RUnlock()
for _, netMember := range peers2SendTo {
d.comm.SendToPeer(netMember, memReq)
}
}
func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
select {
case deadPeer := <-d.comm.PresumedDead():
if d.isAlive(deadPeer) {
d.expireDeadMembers([]common.PKIidType{deadPeer})
}
case s := <-d.toDieChan:
d.toDieChan <- s
return
}
}
}
func (d *gossipDiscoveryImpl) isAlive(pkiID common.PKIidType) bool {
d.lock.RLock()
defer d.lock.RUnlock()
_, alive := d.aliveLastTS[string(pkiID)]
return alive
}
func (d *gossipDiscoveryImpl) handleMessages() {
defer d.logger.Debug("Stopped")
in := d.comm.Accept()
for !d.toDie() {
select {
case s := <-d.toDieChan:
d.toDieChan <- s
return
case m := <-in:
d.handleMsgFromComm(m)
}
}
}
func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) {
if m == nil {
return
}
if m.GetAliveMsg() == nil && m.GetMemRes() == nil && m.GetMemReq() == nil {
d.logger.Warning("Got message with wrong type (expected Alive or MembershipResponse or MembershipRequest message):", m.GossipMessage)
return
}
d.logger.Debug("Got message:", m)
defer d.logger.Debug("Exiting")
if memReq := m.GetMemReq(); memReq != nil {
selfInfoGossipMsg, err := memReq.SelfInformation.ToGossipMessage()
if err != nil {
d.logger.Warning("Failed deserializing GossipMessage from envelope:", err)
return
}
if d.msgStore.CheckValid(selfInfoGossipMsg) {
d.handleAliveMessage(selfInfoGossipMsg)
}
var internalEndpoint string
if m.Envelope.SecretEnvelope != nil {
internalEndpoint = m.Envelope.SecretEnvelope.InternalEndpoint()
}
// Sending a membership response to a peer may block this routine
// in case the sending is deliberately slow (i.e attack).
// will keep this async until I'll write a timeout detector in the comm layer
go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint)
return
}
if m.IsAliveMsg() {
if !d.msgStore.Add(m) {
return
}
d.handleAliveMessage(m)
d.comm.Gossip(m)
return
}
if memResp := m.GetMemRes(); memResp != nil {
for _, env := range memResp.Alive {
am, err := env.ToGossipMessage()
if err != nil {
d.logger.Warning("Membership response contains an invalid message from an online peer:", err)
return
}
if !am.IsAliveMsg() {
d.logger.Warning("Expected alive message, got", am, "instead")
return
}
if d.msgStore.CheckValid(am) {
d.handleAliveMessage(am)
}
}
for _, env := range memResp.Dead {
dm, err := env.ToGossipMessage()
if err != nil {
d.logger.Warning("Membership response contains an invalid message from an offline peer", err)
return
}
if !d.crypt.ValidateAliveMsg(dm) {
d.logger.Warningf("Alive message isn't authentic, someone spoofed %s's identity", dm.GetAliveMsg().Membership)
continue
}
if !d.msgStore.CheckValid(dm) {
//Newer alive message exist
return
}
newDeadMembers := []*proto.SignedGossipMessage{}
d.lock.RLock()
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
newDeadMembers = append(newDeadMembers, dm)
}
d.lock.RUnlock()
d.learnNewMembers([]*proto.SignedGossipMessage{}, newDeadMembers)
}
}
}
func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, internalEndpoint string) {
d.logger.Debug("Entering", targetMember)
targetPeer := &NetworkMember{
Endpoint: targetMember.Endpoint,
Metadata: targetMember.Metadata,
PKIid: targetMember.PkiId,
InternalEndpoint: internalEndpoint,
}
memResp := d.createMembershipResponse(targetPeer)
if memResp == nil {
errMsg := `Got a membership request from a peer that shouldn't have sent one: %v, closing connection to the peer as a result.`
d.logger.Warningf(errMsg, targetMember)
d.comm.CloseConn(targetPeer)
return
}
defer d.logger.Debug("Exiting, replying with", memResp)
d.comm.SendToPeer(targetPeer, (&proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Content: &proto.GossipMessage_MemRes{
MemRes: memResp,
},
}).NoopSign())
}
func (d *gossipDiscoveryImpl) createMembershipResponse(targetMember *NetworkMember) *proto.MembershipResponse {
shouldBeDisclosed, omitConcealedFields := d.disclosurePolicy(targetMember)
aliveMsg := d.createAliveMessage(true)
if !shouldBeDisclosed(aliveMsg) {
return nil
}
d.lock.RLock()
defer d.lock.RUnlock()
deadPeers := []*proto.Envelope{}
for _, dm := range d.deadMembership.ToSlice() {
if !shouldBeDisclosed(dm) {
continue
}
deadPeers = append(deadPeers, omitConcealedFields(dm))
}
var aliveSnapshot []*proto.Envelope
for _, am := range d.aliveMembership.ToSlice() {
if !shouldBeDisclosed(am) {
continue
}
aliveSnapshot = append(aliveSnapshot, omitConcealedFields(am))
}
return &proto.MembershipResponse{
Alive: append(aliveSnapshot, omitConcealedFields(aliveMsg)),
Dead: deadPeers,
}
}
func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.SignedGossipMessage) {
d.logger.Debug("Entering", m)
defer d.logger.Debug("Exiting")
if !d.crypt.ValidateAliveMsg(m) {
d.logger.Warningf("Alive message isn't authentic, someone must be spoofing %s's identity", m.GetAliveMsg())
return
}
pkiID := m.GetAliveMsg().Membership.PkiId
if equalPKIid(pkiID, d.self.PKIid) {
d.logger.Debug("Got alive message about ourselves,", m)
diffExternalEndpoint := d.self.Endpoint != m.GetAliveMsg().Membership.Endpoint
var diffInternalEndpoint bool
secretEnvelope := m.GetSecretEnvelope()
if secretEnvelope != nil && secretEnvelope.InternalEndpoint() != "" {
diffInternalEndpoint = secretEnvelope.InternalEndpoint() != d.self.InternalEndpoint
}
if diffInternalEndpoint || diffExternalEndpoint {
d.logger.Error("Bad configuration detected: Received AliveMessage from a peer with the same PKI-ID as myself:", m.GossipMessage)
}
return
}
ts := m.GetAliveMsg().Timestamp
d.lock.RLock()
_, known := d.id2Member[string(pkiID)]
d.lock.RUnlock()
if !known {
d.learnNewMembers([]*proto.SignedGossipMessage{m}, []*proto.SignedGossipMessage{})
return
}
d.lock.RLock()
_, isAlive := d.aliveLastTS[string(pkiID)]
lastDeadTS, isDead := d.deadLastTS[string(pkiID)]
d.lock.RUnlock()
if !isAlive && !isDead {
d.logger.Panicf("Member %s is known but not found neither in alive nor in dead lastTS maps, isAlive=%v, isDead=%v", m.GetAliveMsg().Membership.Endpoint, isAlive, isDead)
return
}
if isAlive && isDead {
d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership)
return
}
if isDead {
if before(lastDeadTS, ts) {
// resurrect peer
d.resurrectMember(m, *ts)
} else if !same(lastDeadTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
}
return
}
d.lock.RLock()
lastAliveTS, isAlive := d.aliveLastTS[string(pkiID)]
d.lock.RUnlock()
if isAlive {
if before(lastAliveTS, ts) {
d.learnExistingMembers([]*proto.SignedGossipMessage{m})
} else if !same(lastAliveTS, ts) {
d.logger.Debug(m.GetAliveMsg().Membership, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
}
}
// else, ignore the message because it is too old
}
func (d *gossipDiscoveryImpl) resurrectMember(am *proto.SignedGossipMessage, t proto.PeerTime) {
d.logger.Info("Entering, AliveMessage:", am, "t:", t)
defer d.logger.Info("Exiting")
d.lock.Lock()
defer d.lock.Unlock()
member := am.GetAliveMsg().Membership
pkiID := member.PkiId
d.aliveLastTS[string(pkiID)] = &timestamp{
lastSeen: time.Now(),
seqNum: t.SeqNum,
incTime: tsToTime(t.IncNum),
}
var internalEndpoint string
if prevNetMem := d.id2Member[string(pkiID)]; prevNetMem != nil {
internalEndpoint = prevNetMem.InternalEndpoint
}
if am.Envelope.SecretEnvelope != nil {
internalEndpoint = am.Envelope.SecretEnvelope.InternalEndpoint()
}
d.id2Member[string(pkiID)] = &NetworkMember{
Endpoint: member.Endpoint,
Metadata: member.Metadata,
PKIid: member.PkiId,
InternalEndpoint: internalEndpoint,
}
delete(d.deadLastTS, string(pkiID))
d.deadMembership.Remove(common.PKIidType(pkiID))
d.aliveMembership.Put(common.PKIidType(pkiID), &proto.SignedGossipMessage{GossipMessage: am.GossipMessage, Envelope: am.Envelope})
}
func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
wg := &sync.WaitGroup{}
for _, member := range d.copyLastSeen(d.deadLastTS) {
wg.Add(1)
go func(member NetworkMember) {
defer wg.Done()
if d.comm.Ping(&member) {
d.logger.Debug(member, "is responding, sending membership request")
d.sendMembershipRequest(&member, true)
} else {
d.logger.Debug(member, "is still dead")
}
}(member)
}
wg.Wait()
d.logger.Debug("Sleeping", getReconnectInterval())
time.Sleep(getReconnectInterval())
}
}
func (d *gossipDiscoveryImpl) sendMembershipRequest(member *NetworkMember, includeInternalEndpoint bool) {
d.comm.SendToPeer(member, d.createMembershipRequest(includeInternalEndpoint))
}
func (d *gossipDiscoveryImpl) createMembershipRequest(includeInternalEndpoint bool) *proto.SignedGossipMessage {
req := &proto.MembershipRequest{
SelfInformation: d.createAliveMessage(includeInternalEndpoint).Envelope,
// TODO: sending the known peers is not secure because the remote peer might shouldn't know
// TODO: about the known peers. I'm deprecating this until a secure mechanism will be implemented.
// TODO: See FAB-2570 for tracking this issue.
Known: [][]byte{},
}
return (&proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(0),
Content: &proto.GossipMessage_MemReq{
MemReq: req,
},
}).NoopSign()
}
func (d *gossipDiscoveryImpl) copyLastSeen(lastSeenMap map[string]*timestamp) []NetworkMember {
d.lock.RLock()
defer d.lock.RUnlock()
res := []NetworkMember{}
for pkiIDStr := range lastSeenMap {
res = append(res, *(d.id2Member[pkiIDStr]))
}
return res
}
func (d *gossipDiscoveryImpl) periodicalCheckAlive() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
time.Sleep(getAliveExpirationCheckInterval())
dead := d.getDeadMembers()
if len(dead) > 0 {
d.logger.Debugf("Got %v dead members: %v", len(dead), dead)
d.expireDeadMembers(dead)
}
}
}
func (d *gossipDiscoveryImpl) expireDeadMembers(dead []common.PKIidType) {
d.logger.Warning("Entering", dead)
defer d.logger.Warning("Exiting")
var deadMembers2Expire []*NetworkMember
d.lock.Lock()
for _, pkiID := range dead {
if _, isAlive := d.aliveLastTS[string(pkiID)]; !isAlive {
continue
}
deadMembers2Expire = append(deadMembers2Expire, d.id2Member[string(pkiID)])
// move lastTS from alive to dead
lastTS, hasLastTS := d.aliveLastTS[string(pkiID)]
if hasLastTS {
d.deadLastTS[string(pkiID)] = lastTS
delete(d.aliveLastTS, string(pkiID))
}
if am := d.aliveMembership.MsgByID(pkiID); am != nil {
d.deadMembership.Put(pkiID, am)
d.aliveMembership.Remove(pkiID)
}
}
d.lock.Unlock()
for _, member2Expire := range deadMembers2Expire {
d.logger.Warning("Closing connection to", member2Expire)
d.comm.CloseConn(member2Expire)
}
}
func (d *gossipDiscoveryImpl) getDeadMembers() []common.PKIidType {
d.lock.RLock()
defer d.lock.RUnlock()
dead := []common.PKIidType{}
for id, last := range d.aliveLastTS {
elapsedNonAliveTime := time.Since(last.lastSeen)
if elapsedNonAliveTime.Nanoseconds() > getAliveExpirationTimeout().Nanoseconds() {
d.logger.Warning("Haven't heard from", id, "for", elapsedNonAliveTime)
dead = append(dead, common.PKIidType(id))
}
}
return dead
}
func (d *gossipDiscoveryImpl) periodicalSendAlive() {
defer d.logger.Debug("Stopped")
for !d.toDie() {
d.logger.Debug("Sleeping", getAliveTimeInterval())
time.Sleep(getAliveTimeInterval())
d.comm.Gossip(d.createAliveMessage(true))
}
}
func (d *gossipDiscoveryImpl) createAliveMessage(includeInternalEndpoint bool) *proto.SignedGossipMessage {
d.lock.Lock()
d.seqNum++
seqNum := d.seqNum
endpoint := d.self.Endpoint
meta := d.self.Metadata
pkiID := d.self.PKIid
internalEndpoint := d.self.InternalEndpoint
d.lock.Unlock()
msg2Gossip := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_AliveMsg{
AliveMsg: &proto.AliveMessage{
Membership: &proto.Member{
Endpoint: endpoint,
Metadata: meta,
PkiId: pkiID,
},
Timestamp: &proto.PeerTime{
IncNum: uint64(d.incTime),
SeqNum: seqNum,
},
},
},
}
signedMsg := &proto.SignedGossipMessage{
GossipMessage: msg2Gossip,
Envelope: d.crypt.SignMessage(msg2Gossip, internalEndpoint),
}
if !includeInternalEndpoint {
signedMsg.Envelope.SecretEnvelope = nil
}
return signedMsg
}
func (d *gossipDiscoveryImpl) learnExistingMembers(aliveArr []*proto.SignedGossipMessage) {
d.logger.Debugf("Entering: learnedMembers={%v}", aliveArr)
defer d.logger.Debug("Exiting")
d.lock.Lock()
defer d.lock.Unlock()
for _, m := range aliveArr {
am := m.GetAliveMsg()
if m == nil {
d.logger.Warning("Expected alive message, got instead:", m)
return
}
d.logger.Debug("updating", am)
var internalEndpoint string
if prevNetMem := d.id2Member[string(am.Membership.PkiId)]; prevNetMem != nil {
internalEndpoint = prevNetMem.InternalEndpoint
}
if m.Envelope.SecretEnvelope != nil {
internalEndpoint = m.Envelope.SecretEnvelope.InternalEndpoint()
}
// update member's data
member := d.id2Member[string(am.Membership.PkiId)]
member.Endpoint = am.Membership.Endpoint
member.Metadata = am.Membership.Metadata
member.InternalEndpoint = internalEndpoint
if _, isKnownAsDead := d.deadLastTS[string(am.Membership.PkiId)]; isKnownAsDead {
d.logger.Warning(am.Membership, "has already expired")
continue
}
if _, isKnownAsAlive := d.aliveLastTS[string(am.Membership.PkiId)]; !isKnownAsAlive {
d.logger.Warning(am.Membership, "has already expired")
continue
} else {
d.logger.Debug("Updating aliveness data:", am)
// update existing aliveness data
alive := d.aliveLastTS[string(am.Membership.PkiId)]
alive.incTime = tsToTime(am.Timestamp.IncNum)
alive.lastSeen = time.Now()
alive.seqNum = am.Timestamp.SeqNum
if am := d.aliveMembership.MsgByID(m.GetAliveMsg().Membership.PkiId); am == nil {
d.logger.Debug("Adding", am, "to aliveMembership")
msg := &proto.SignedGossipMessage{GossipMessage: m.GossipMessage, Envelope: am.Envelope}
d.aliveMembership.Put(m.GetAliveMsg().Membership.PkiId, msg)
} else {
d.logger.Debug("Replacing", am, "in aliveMembership")
am.GossipMessage = m.GossipMessage
am.Envelope = m.Envelope
}
}
}
}
func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*proto.SignedGossipMessage, deadMembers []*proto.SignedGossipMessage) {
d.logger.Debugf("Entering: learnedMembers={%v}, deadMembers={%v}", aliveMembers, deadMembers)
defer d.logger.Debugf("Exiting")
d.lock.Lock()
defer d.lock.Unlock()
for _, am := range aliveMembers {
if equalPKIid(am.GetAliveMsg().Membership.PkiId, d.self.PKIid) {
continue
}
d.aliveLastTS[string(am.GetAliveMsg().Membership.PkiId)] = &timestamp{
incTime: tsToTime(am.GetAliveMsg().Timestamp.IncNum),
lastSeen: time.Now(),
seqNum: am.GetAliveMsg().Timestamp.SeqNum,
}
d.aliveMembership.Put(am.GetAliveMsg().Membership.PkiId, &proto.SignedGossipMessage{GossipMessage: am.GossipMessage, Envelope: am.Envelope})
d.logger.Debugf("Learned about a new alive member: %v", am)
}
for _, dm := range deadMembers {
if equalPKIid(dm.GetAliveMsg().Membership.PkiId, d.self.PKIid) {
continue
}
d.deadLastTS[string(dm.GetAliveMsg().Membership.PkiId)] = &timestamp{
incTime: tsToTime(dm.GetAliveMsg().Timestamp.IncNum),
lastSeen: time.Now(),
seqNum: dm.GetAliveMsg().Timestamp.SeqNum,
}
d.deadMembership.Put(dm.GetAliveMsg().Membership.PkiId, &proto.SignedGossipMessage{GossipMessage: dm.GossipMessage, Envelope: dm.Envelope})
d.logger.Debugf("Learned about a new dead member: %v", dm)
}
// update the member in any case
for _, a := range [][]*proto.SignedGossipMessage{aliveMembers, deadMembers} {
for _, m := range a {
member := m.GetAliveMsg()
if member == nil {
d.logger.Warning("Expected alive message, got instead:", m)
return
}
var internalEndpoint string
if m.Envelope.SecretEnvelope != nil {
internalEndpoint = m.Envelope.SecretEnvelope.InternalEndpoint()
}
if prevNetMem := d.id2Member[string(member.Membership.PkiId)]; prevNetMem != nil {
internalEndpoint = prevNetMem.InternalEndpoint
}
d.id2Member[string(member.Membership.PkiId)] = &NetworkMember{
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
PKIid: member.Membership.PkiId,
InternalEndpoint: internalEndpoint,
}
}
}
}
func (d *gossipDiscoveryImpl) GetMembership() []NetworkMember {
if d.toDie() {
return []NetworkMember{}
}
d.lock.RLock()
defer d.lock.RUnlock()
response := []NetworkMember{}
for _, m := range d.aliveMembership.ToSlice() {
member := m.GetAliveMsg()
response = append(response, NetworkMember{
PKIid: member.Membership.PkiId,
Endpoint: member.Membership.Endpoint,
Metadata: member.Membership.Metadata,
InternalEndpoint: d.id2Member[string(m.GetAliveMsg().Membership.PkiId)].InternalEndpoint,
})
}
return response
}
func tsToTime(ts uint64) time.Time {
return time.Unix(int64(0), int64(ts))
}
func (d *gossipDiscoveryImpl) UpdateMetadata(md []byte) {
d.lock.Lock()
defer d.lock.Unlock()
d.self.Metadata = md
}
func (d *gossipDiscoveryImpl) UpdateEndpoint(endpoint string) {
d.lock.Lock()
defer d.lock.Unlock()
d.self.Endpoint = endpoint
}
func (d *gossipDiscoveryImpl) Self() NetworkMember {
return NetworkMember{
Endpoint: d.self.Endpoint,
Metadata: d.self.Metadata,
PKIid: d.self.PKIid,
InternalEndpoint: d.self.InternalEndpoint,
}
}
func (d *gossipDiscoveryImpl) toDie() bool {
toDie := atomic.LoadInt32(&d.toDieFlag) == int32(1)
return toDie
}
func (d *gossipDiscoveryImpl) Stop() {
defer d.logger.Info("Stopped")
d.logger.Info("Stopping")
atomic.StoreInt32(&d.toDieFlag, int32(1))
d.msgStore.Stop()
d.toDieChan <- struct{}{}
}
func equalPKIid(a, b common.PKIidType) bool {
return bytes.Equal(a, b)
}
func same(a *timestamp, b *proto.PeerTime) bool {
return uint64(a.incTime.UnixNano()) == b.IncNum && a.seqNum == b.SeqNum
}
func before(a *timestamp, b *proto.PeerTime) bool {
return (uint64(a.incTime.UnixNano()) == b.IncNum && a.seqNum < b.SeqNum) ||
uint64(a.incTime.UnixNano()) < b.IncNum
}
func getAliveTimeInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.aliveTimeInterval", defaultHelloInterval)
}
func getAliveExpirationTimeout() time.Duration {
return util.GetDurationOrDefault("peer.gossip.aliveExpirationTimeout", 5*getAliveTimeInterval())
}
func getAliveExpirationCheckInterval() time.Duration {
if aliveExpirationCheckInterval != 0 {
return aliveExpirationCheckInterval
}
return time.Duration(getAliveExpirationTimeout() / 10)
}
func getReconnectInterval() time.Duration {
return util.GetDurationOrDefault("peer.gossip.reconnectInterval", getAliveExpirationTimeout())
}
func filterOutLocalhost(endpoints []string, port int) []string {
var returnedEndpoints []string
for _, endpoint := range endpoints {
if endpoint == fmt.Sprintf("127.0.0.1:%d", port) || endpoint == fmt.Sprintf("localhost:%d", port) {
continue
}
returnedEndpoints = append(returnedEndpoints, endpoint)
}
return returnedEndpoints
}
type aliveMsgStore struct {
msgstore.MessageStore
}
func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
msg := m.(*proto.SignedGossipMessage)
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}
s := &aliveMsgStore{
MessageStore: msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback),
}
return s
}
func (s *aliveMsgStore) Add(msg interface{}) bool {
if !msg.(*proto.SignedGossipMessage).IsAliveMsg() {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.Add(msg)
}
func (s *aliveMsgStore) CheckValid(msg interface{}) bool {
if !msg.(*proto.SignedGossipMessage).IsAliveMsg() {
panic(fmt.Sprint("Msg ", msg, " is not AliveMsg"))
}
return s.MessageStore.CheckValid(msg)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/hyperledger-fabric.git
git@gitee.com:mirrors/hyperledger-fabric.git
mirrors
hyperledger-fabric
hyperledger-fabric
v1.0.0-alpha2

搜索帮助