代码拉取完成,页面将自动刷新
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"fmt"
"sync"
"sync/atomic"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type handler func(message *proto.SignedGossipMessage)
type blockingBehavior bool
const (
blockingSend = blockingBehavior(true)
nonBlockingSend = blockingBehavior(false)
)
type connFactory interface {
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
}
type connectionStore struct {
logger *logging.Logger // logger
isClosing bool // whether this connection store is shutting down
connFactory connFactory // creates a connection to remote peer
sync.RWMutex // synchronize access to shared variables
pki2Conn map[string]*connection // mapping between pkiID to connections
destinationLocks map[string]*sync.Mutex //mapping between pkiIDs and locks,
// used to prevent concurrent connection establishment to the same remote endpoint
}
func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore {
return &connectionStore{
connFactory: connFactory,
isClosing: false,
pki2Conn: make(map[string]*connection),
destinationLocks: make(map[string]*sync.Mutex),
logger: logger,
}
}
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
cs.RLock()
isClosing := cs.isClosing
cs.RUnlock()
if isClosing {
return nil, fmt.Errorf("Shutting down")
}
pkiID := peer.PKIID
endpoint := peer.Endpoint
cs.Lock()
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
if !hasConnected {
destinationLock = &sync.Mutex{}
cs.destinationLocks[string(pkiID)] = destinationLock
}
cs.Unlock()
destinationLock.Lock()
cs.RLock()
conn, exists := cs.pki2Conn[string(pkiID)]
if exists {
cs.RUnlock()
destinationLock.Unlock()
return conn, nil
}
cs.RUnlock()
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
destinationLock.Unlock()
cs.RLock()
isClosing = cs.isClosing
cs.RUnlock()
if isClosing {
return nil, fmt.Errorf("ConnStore is closing")
}
cs.Lock()
delete(cs.destinationLocks, string(pkiID))
defer cs.Unlock()
// check again, maybe someone connected to us during the connection creation?
conn, exists = cs.pki2Conn[string(pkiID)]
if exists {
if createdConnection != nil {
createdConnection.close()
}
return conn, nil
}
// no one connected to us AND we failed connecting!
if err != nil {
return nil, err
}
// at this point in the code, we created a connection to a remote peer
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn
go conn.serviceConnection()
return conn, nil
}
func (cs *connectionStore) connNum() int {
cs.RLock()
defer cs.RUnlock()
return len(cs.pki2Conn)
}
func (cs *connectionStore) closeConn(peer *RemotePeer) {
cs.Lock()
defer cs.Unlock()
if conn, exists := cs.pki2Conn[string(peer.PKIID)]; exists {
conn.close()
delete(cs.pki2Conn, string(conn.pkiID))
}
}
func (cs *connectionStore) shutdown() {
cs.Lock()
cs.isClosing = true
pkiIds2conn := cs.pki2Conn
var connections2Close []*connection
for _, conn := range pkiIds2conn {
connections2Close = append(connections2Close, conn)
}
cs.Unlock()
wg := sync.WaitGroup{}
for _, conn := range connections2Close {
wg.Add(1)
go func(conn *connection) {
cs.closeByPKIid(conn.pkiID)
wg.Done()
}(conn)
}
wg.Wait()
}
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection {
cs.Lock()
defer cs.Unlock()
if c, exists := cs.pki2Conn[string(connInfo.Identity)]; exists {
c.close()
}
return cs.registerConn(connInfo, serverStream)
}
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection {
conn := newConnection(nil, nil, nil, serverStream)
conn.pkiID = connInfo.ID
conn.info = connInfo
conn.logger = cs.logger
cs.pki2Conn[string(connInfo.ID)] = conn
return conn
}
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) {
cs.Lock()
defer cs.Unlock()
if conn, exists := cs.pki2Conn[string(pkiID)]; exists {
conn.close()
delete(cs.pki2Conn, string(pkiID))
}
}
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection {
connection := &connection{
outBuff: make(chan *msgSending, util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize)),
cl: cl,
conn: c,
clientStream: cs,
serverStream: ss,
stopFlag: int32(0),
stopChan: make(chan struct{}, 1),
}
return connection
}
type connection struct {
cancel context.CancelFunc
info *proto.ConnectionInfo
outBuff chan *msgSending
logger *logging.Logger // logger
pkiID common.PKIidType // pkiID of the remote endpoint
handler handler // function to invoke upon a message reception
conn *grpc.ClientConn // gRPC connection to remote endpoint
cl proto.GossipClient // gRPC stub of remote endpoint
clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint
serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint
stopFlag int32 // indicates whether this connection is in process of stopping
stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
sync.RWMutex // synchronizes access to shared variables
}
func (conn *connection) close() {
if conn.toDie() {
return
}
amIFirst := atomic.CompareAndSwapInt32(&conn.stopFlag, int32(0), int32(1))
if !amIFirst {
return
}
conn.stopChan <- struct{}{}
conn.drainOutputBuffer()
conn.Lock()
defer conn.Unlock()
if conn.clientStream != nil {
conn.clientStream.CloseSend()
}
if conn.conn != nil {
conn.conn.Close()
}
if conn.cancel != nil {
conn.cancel()
}
}
func (conn *connection) toDie() bool {
return atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error), shouldBlock blockingBehavior) {
if conn.toDie() {
conn.logger.Debug("Aborting send() to ", conn.info.Endpoint, "because connection is closing")
return
}
conn.Lock()
defer conn.Unlock()
m := &msgSending{
envelope: msg.Envelope,
onErr: onErr,
}
if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
if conn.logger.IsEnabledFor(logging.DEBUG) {
conn.logger.Debug("Buffer to", conn.info.Endpoint, "overflowed, dropping message", msg.String())
}
if !shouldBlock {
return
}
}
conn.outBuff <- m
}
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
defer close(msgChan)
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, msgChan)
go conn.writeToStream()
for !conn.toDie() {
select {
case stop := <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
conn.stopChan <- stop
return nil
case err := <-errChan:
return err
case msg := <-msgChan:
conn.handler(msg)
}
}
return nil
}
func (conn *connection) writeToStream() {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
return
}
select {
case m := <-conn.outBuff:
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
return
}
case stop := <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
conn.stopChan <- stop
return
}
}
}
func (conn *connection) drainOutputBuffer() {
// Drain the output buffer
for len(conn.outBuff) > 0 {
<-conn.outBuff
}
}
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) {
defer func() {
recover()
}() // msgChan might be closed
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
errChan <- fmt.Errorf("Stream is nil")
return
}
envelope, err := stream.Recv()
if conn.toDie() {
conn.logger.Debug(conn.pkiID, "canceling read because closing")
return
}
if err != nil {
errChan <- err
conn.logger.Debugf("%v Got error, aborting: %v", err)
return
}
msg, err := envelope.ToGossipMessage()
if err != nil {
errChan <- err
conn.logger.Warning("%v Got error, aborting: %v", err)
}
msgChan <- msg
}
}
func (conn *connection) getStream() stream {
conn.Lock()
defer conn.Unlock()
if conn.clientStream != nil && conn.serverStream != nil {
e := errors.New("Both client and server stream are not nil, something went wrong")
conn.logger.Errorf("%+v", e)
}
if conn.clientStream != nil {
return conn.clientStream
}
if conn.serverStream != nil {
return conn.serverStream
}
return nil
}
type msgSending struct {
envelope *proto.Envelope
onErr func(error)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。