1 Star 0 Fork 0

peter/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ack.go 1.65 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
)
type sendFunc func(peer *RemotePeer, msg *proto.SignedGossipMessage)
type waitFunc func(*RemotePeer) error
type ackSendOperation struct {
snd sendFunc
waitForAck waitFunc
}
func newAckSendOperation(snd sendFunc, waitForAck waitFunc) *ackSendOperation {
return &ackSendOperation{
snd: snd,
waitForAck: waitForAck,
}
}
func (aso *ackSendOperation) send(msg *proto.SignedGossipMessage, minAckNum int, peers ...*RemotePeer) []SendResult {
successAcks := 0
results := []SendResult{}
acks := make(chan SendResult, len(peers))
// Send to all peers the message
for _, p := range peers {
go func(p *RemotePeer) {
// Send the message to 'p'
aso.snd(p, msg)
// Wait for an ack from 'p', or get an error if timed out
err := aso.waitForAck(p)
acks <- SendResult{
RemotePeer: *p,
error: err,
}
}(p)
}
for {
ack := <-acks
results = append(results, SendResult{
error: ack.error,
RemotePeer: ack.RemotePeer,
})
if ack.error == nil {
successAcks++
}
if successAcks == minAckNum || len(results) == len(peers) {
break
}
}
return results
}
func interceptAcks(nextHandler handler, remotePeerID common.PKIidType, pubSub *util.PubSub) func(*proto.SignedGossipMessage) {
return func(m *proto.SignedGossipMessage) {
if m.IsAck() {
topic := topicForAck(m.Nonce, remotePeerID)
pubSub.Publish(topic, m.GetAck())
return
}
nextHandler(m)
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/peter_code_git/fabric.git
git@gitee.com:peter_code_git/fabric.git
peter_code_git
fabric
fabric
v1.4.5

搜索帮助