代码拉取完成,页面将自动刷新
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/protoext"
"github.com/hyperledger/fabric/gossip/util"
)
type sendFunc func(peer *RemotePeer, msg *protoext.SignedGossipMessage)
type waitFunc func(*RemotePeer) error
type ackSendOperation struct {
snd sendFunc
waitForAck waitFunc
}
func newAckSendOperation(snd sendFunc, waitForAck waitFunc) *ackSendOperation {
return &ackSendOperation{
snd: snd,
waitForAck: waitForAck,
}
}
func (aso *ackSendOperation) send(msg *protoext.SignedGossipMessage, minAckNum int, peers ...*RemotePeer) []SendResult {
successAcks := 0
results := []SendResult{}
acks := make(chan SendResult, len(peers))
// Send to all peers the message
for _, p := range peers {
go func(p *RemotePeer) {
// Send the message to 'p'
aso.snd(p, msg)
// Wait for an ack from 'p', or get an error if timed out
err := aso.waitForAck(p)
acks <- SendResult{
RemotePeer: *p,
error: err,
}
}(p)
}
for {
ack := <-acks
results = append(results, SendResult{
error: ack.error,
RemotePeer: ack.RemotePeer,
})
if ack.error == nil {
successAcks++
}
if successAcks == minAckNum || len(results) == len(peers) {
break
}
}
return results
}
func interceptAcks(nextHandler handler, remotePeerID common.PKIidType, pubSub *util.PubSub) func(*protoext.SignedGossipMessage) {
return func(m *protoext.SignedGossipMessage) {
if protoext.IsAck(m.GossipMessage) {
topic := topicForAck(m.Nonce, remotePeerID)
pubSub.Publish(topic, m.GetAck())
return
}
nextHandler(m)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。