6 Star 46 Fork 28

Hyperledger/fabric

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
conn.go 10.71 KB
Copy Edit Raw Blame History
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type handler func(message *proto.SignedGossipMessage)
type blockingBehavior bool
const (
blockingSend = blockingBehavior(true)
nonBlockingSend = blockingBehavior(false)
)
type connFactory interface {
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
}
type connectionStore struct {
config ConnConfig
logger util.Logger // logger
isClosing bool // whether this connection store is shutting down
connFactory connFactory // creates a connection to remote peer
sync.RWMutex // synchronize access to shared variables
pki2Conn map[string]*connection // mapping between pkiID to connections
destinationLocks map[string]*sync.Mutex //mapping between pkiIDs and locks,
// used to prevent concurrent connection establishment to the same remote endpoint
}
func newConnStore(connFactory connFactory, logger util.Logger, config ConnConfig) *connectionStore {
return &connectionStore{
connFactory: connFactory,
isClosing: false,
pki2Conn: make(map[string]*connection),
destinationLocks: make(map[string]*sync.Mutex),
logger: logger,
config: config,
}
}
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
cs.RLock()
isClosing := cs.isClosing
cs.RUnlock()
if isClosing {
return nil, fmt.Errorf("Shutting down")
}
pkiID := peer.PKIID
endpoint := peer.Endpoint
cs.Lock()
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
if !hasConnected {
destinationLock = &sync.Mutex{}
cs.destinationLocks[string(pkiID)] = destinationLock
}
cs.Unlock()
destinationLock.Lock()
cs.RLock()
conn, exists := cs.pki2Conn[string(pkiID)]
if exists {
cs.RUnlock()
destinationLock.Unlock()
return conn, nil
}
cs.RUnlock()
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
if err == nil {
cs.logger.Debugf("Created new connection to %s, %s", endpoint, pkiID)
}
destinationLock.Unlock()
cs.RLock()
isClosing = cs.isClosing
cs.RUnlock()
if isClosing {
return nil, fmt.Errorf("ConnStore is closing")
}
cs.Lock()
delete(cs.destinationLocks, string(pkiID))
defer cs.Unlock()
// check again, maybe someone connected to us during the connection creation?
conn, exists = cs.pki2Conn[string(pkiID)]
if exists {
if createdConnection != nil {
createdConnection.close()
}
return conn, nil
}
// no one connected to us AND we failed connecting!
if err != nil {
return nil, err
}
// At this point we know the latest PKI-ID of the remote peer.
// Perhaps we are trying to connect to a peer that changed its PKI-ID.
// Ensure that we are not already connected to that PKI-ID,
// and if so - close the old connection in order to keep the latest one.
// We keep the latest one because the remote peer is going to close
// our old connection anyway once it sensed this connection handshake.
if conn, exists := cs.pki2Conn[string(createdConnection.pkiID)]; exists {
conn.close()
}
// at this point in the code, we created a connection to a remote peer
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn
go conn.serviceConnection()
return conn, nil
}
func (cs *connectionStore) connNum() int {
cs.RLock()
defer cs.RUnlock()
return len(cs.pki2Conn)
}
func (cs *connectionStore) closeConn(peer *RemotePeer) {
cs.Lock()
defer cs.Unlock()
if conn, exists := cs.pki2Conn[string(peer.PKIID)]; exists {
conn.close()
delete(cs.pki2Conn, string(conn.pkiID))
}
}
func (cs *connectionStore) shutdown() {
cs.Lock()
cs.isClosing = true
pkiIds2conn := cs.pki2Conn
var connections2Close []*connection
for _, conn := range pkiIds2conn {
connections2Close = append(connections2Close, conn)
}
cs.Unlock()
wg := sync.WaitGroup{}
for _, conn := range connections2Close {
wg.Add(1)
go func(conn *connection) {
cs.closeByPKIid(conn.pkiID)
wg.Done()
}(conn)
}
wg.Wait()
}
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer,
connInfo *proto.ConnectionInfo, metrics *metrics.CommMetrics) *connection {
cs.Lock()
defer cs.Unlock()
if c, exists := cs.pki2Conn[string(connInfo.ID)]; exists {
c.close()
}
return cs.registerConn(connInfo, serverStream, metrics)
}
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo,
serverStream proto.Gossip_GossipStreamServer, metrics *metrics.CommMetrics) *connection {
conn := newConnection(nil, nil, nil, serverStream, metrics, cs.config)
conn.pkiID = connInfo.ID
conn.info = connInfo
conn.logger = cs.logger
cs.pki2Conn[string(connInfo.ID)] = conn
return conn
}
func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) {
cs.Lock()
defer cs.Unlock()
if conn, exists := cs.pki2Conn[string(pkiID)]; exists {
conn.close()
delete(cs.pki2Conn, string(pkiID))
}
}
func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient,
ss proto.Gossip_GossipStreamServer, metrics *metrics.CommMetrics, config ConnConfig) *connection {
connection := &connection{
metrics: metrics,
outBuff: make(chan *msgSending, config.SendBuffSize),
cl: cl,
conn: c,
clientStream: cs,
serverStream: ss,
stopFlag: int32(0),
stopChan: make(chan struct{}),
recvBuffSize: config.RecvBuffSize,
}
return connection
}
// ConnConfig is the configuration required to initialize a new conn
type ConnConfig struct {
RecvBuffSize int
SendBuffSize int
}
type connection struct {
recvBuffSize int
metrics *metrics.CommMetrics
cancel context.CancelFunc
info *proto.ConnectionInfo
outBuff chan *msgSending
logger util.Logger // logger
pkiID common.PKIidType // pkiID of the remote endpoint
handler handler // function to invoke upon a message reception
conn *grpc.ClientConn // gRPC connection to remote endpoint
cl proto.GossipClient // gRPC stub of remote endpoint
clientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpoint
serverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpoint
stopFlag int32 // indicates whether this connection is in process of stopping
stopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routine
sync.RWMutex // synchronizes access to shared variables
}
func (conn *connection) close() {
if conn.toDie() {
return
}
amIFirst := atomic.CompareAndSwapInt32(&conn.stopFlag, int32(0), int32(1))
if !amIFirst {
return
}
close(conn.stopChan)
conn.drainOutputBuffer()
conn.Lock()
defer conn.Unlock()
if conn.clientStream != nil {
conn.clientStream.CloseSend()
}
if conn.conn != nil {
conn.conn.Close()
}
if conn.cancel != nil {
conn.cancel()
}
}
func (conn *connection) toDie() bool {
return atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
}
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error), shouldBlock blockingBehavior) {
if conn.toDie() {
conn.logger.Debugf("Aborting send() to %s because connection is closing", conn.info.Endpoint)
return
}
m := &msgSending{
envelope: msg.Envelope,
onErr: onErr,
}
select {
case conn.outBuff <- m:
// room in channel, successfully sent message, nothing to do
default: // did not send
if shouldBlock {
conn.outBuff <- m // try again, and wait to send
} else {
conn.metrics.BufferOverflow.Add(1)
conn.logger.Debugf("Buffer to %s overflowed, dropping message %s", conn.info.Endpoint, msg)
}
}
}
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, conn.recvBuffSize)
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, msgChan)
go conn.writeToStream()
for !conn.toDie() {
select {
case <-conn.stopChan:
return nil
case err := <-errChan:
return err
case msg := <-msgChan:
conn.handler(msg)
}
}
return nil
}
func (conn *connection) writeToStream() {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
return
}
select {
case m := <-conn.outBuff:
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
return
}
conn.metrics.SentMessages.Add(1)
case <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
return
}
}
}
func (conn *connection) drainOutputBuffer() {
// Read from the buffer until it is empty.
// There may be multiple concurrent readers.
for {
select {
case <-conn.outBuff:
default:
return
}
}
}
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
errChan <- fmt.Errorf("Stream is nil")
return
}
envelope, err := stream.Recv()
if conn.toDie() {
conn.logger.Debug(conn.pkiID, "canceling read because closing")
return
}
if err != nil {
errChan <- err
conn.logger.Debugf("Got error, aborting: %v", err)
return
}
conn.metrics.ReceivedMessages.Add(1)
msg, err := envelope.ToGossipMessage()
if err != nil {
errChan <- err
conn.logger.Warningf("Got error, aborting: %v", err)
return
}
select {
case msgChan <- msg:
case <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
return
}
}
}
func (conn *connection) getStream() stream {
conn.Lock()
defer conn.Unlock()
if conn.clientStream != nil && conn.serverStream != nil {
e := errors.New("Both client and server stream are not nil, something went wrong")
conn.logger.Errorf("%+v", e)
}
if conn.clientStream != nil {
return conn.clientStream
}
if conn.serverStream != nil {
return conn.serverStream
}
return nil
}
type msgSending struct {
envelope *proto.Envelope
onErr func(error)
}
//go:generate mockery -dir . -name MockStream -case underscore -output mocks/
type MockStream interface {
proto.Gossip_GossipStreamClient
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hyperledger/fabric.git
git@gitee.com:hyperledger/fabric.git
hyperledger
fabric
fabric
v1.4.9

Search