1 Star 0 Fork 0

陈文甲/fabric

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
comm_impl.go 18.93 KB
Copy Edit Raw Blame History
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/metrics"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
const (
handshakeTimeout = time.Second * 10
DefDialTimeout = time.Second * 3
DefConnTimeout = time.Second * 2
DefRecvBuffSize = 20
DefSendBuffSize = 20
)
// SecurityAdvisor defines an external auxiliary object
// that provides security and identity related capabilities
type SecurityAdvisor interface {
// OrgByPeerIdentity returns the organization identity of the given PeerIdentityType
OrgByPeerIdentity(api.PeerIdentityType) api.OrgIdentityType
}
func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
if len(opts) == 0 {
c.logger.Warning("Given an empty set of grpc.DialOption, aborting")
return
}
c.opts = opts
}
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper,
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts, sa api.SecurityAdvisor,
commMetrics *metrics.CommMetrics, config CommConfig, dialOpts ...grpc.DialOption) (Comm, error) {
commInst := &commImpl{
sa: sa,
pubSub: util.NewPubSub(),
PKIID: idStore.GetPKIidOfCert(peerIdentity),
idMapper: idStore,
logger: util.GetLogger(util.CommLogger, ""),
peerIdentity: peerIdentity,
opts: dialOpts,
secureDialOpts: secureDialOpts,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.Mutex{},
deadEndpoints: make(chan common.PKIidType, 100),
identityChanges: make(chan common.PKIidType, 1),
stopping: int32(0),
exitChan: make(chan struct{}),
subscriptions: make([]chan proto.ReceivedMessage, 0),
tlsCerts: certs,
metrics: commMetrics,
dialTimeout: config.DialTimeout,
connTimeout: config.ConnTimeout,
recvBuffSize: config.RecvBuffSize,
sendBuffSize: config.SendBuffSize,
}
connConfig := ConnConfig{
RecvBuffSize: config.RecvBuffSize,
SendBuffSize: config.SendBuffSize,
}
commInst.connStore = newConnStore(commInst, commInst.logger, connConfig)
proto.RegisterGossipServer(s, commInst)
return commInst, nil
}
// CommConfig is the configuration required to initialize a new comm
type CommConfig struct {
DialTimeout time.Duration // Dial timeout
ConnTimeout time.Duration // Connection timeout
RecvBuffSize int // Buffer size of received messages
SendBuffSize int // Buffer size of sending messages
}
type commImpl struct {
sa api.SecurityAdvisor
tlsCerts *common.TLSCertificates
pubSub *util.PubSub
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger util.Logger
opts []grpc.DialOption
secureDialOpts func() []grpc.DialOption
connStore *connectionStore
PKIID []byte
deadEndpoints chan common.PKIidType
identityChanges chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.Mutex
exitChan chan struct{}
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
stopping int32
metrics *metrics.CommMetrics
dialTimeout time.Duration
connTimeout time.Duration
recvBuffSize int
sendBuffSize int
}
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
var err error
var cc *grpc.ClientConn
var stream proto.Gossip_GossipStreamClient
var pkiID common.PKIidType
var connInfo *proto.ConnectionInfo
var dialOpts []grpc.DialOption
c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")
if c.isStopping() {
return nil, errors.New("Stopping")
}
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
cc, err = grpc.DialContext(ctx, endpoint, dialOpts...)
if err != nil {
return nil, errors.WithStack(err)
}
cl := proto.NewGossipClient(cc)
ctx, cancel = context.WithTimeout(context.Background(), DefConnTimeout)
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
cc.Close()
return nil, errors.WithStack(err)
}
ctx, cancel = context.WithCancel(context.Background())
if stream, err = cl.GossipStream(ctx); err == nil {
connInfo, err = c.authenticateRemotePeer(stream, true)
if err == nil {
pkiID = connInfo.ID
// PKIID is nil when we don't know the remote PKI id's
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
actualOrg := c.sa.OrgByPeerIdentity(connInfo.Identity)
// If the identity isn't present, it's nil - therefore OrgByPeerIdentity would
// return nil too and thus would be different than the actual organization
identity, _ := c.idMapper.Get(expectedPKIID)
oldOrg := c.sa.OrgByPeerIdentity(identity)
if !bytes.Equal(actualOrg, oldOrg) {
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
cc.Close()
cancel()
return nil, errors.New("authentication failure")
} else {
c.logger.Infof("Peer %s changed its PKI-ID from %s to %s", endpoint, expectedPKIID, pkiID)
c.identityChanges <- expectedPKIID
}
}
connConfig := ConnConfig{
RecvBuffSize: c.recvBuffSize,
SendBuffSize: c.sendBuffSize,
}
conn := newConnection(cl, cc, stream, nil, c.metrics, connConfig)
conn.pkiID = pkiID
conn.info = connInfo
conn.logger = c.logger
conn.cancel = cancel
h := func(m *proto.SignedGossipMessage) {
c.logger.Debug("Got message:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
return conn, nil
}
c.logger.Warningf("Authentication failed: %+v", err)
}
cc.Close()
cancel()
return nil, errors.WithStack(err)
}
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
if c.isStopping() || len(peers) == 0 {
return
}
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
for _, peer := range peers {
go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg, nonBlockingSend)
}(peer, msg)
}
}
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage, shouldBlock blockingBehavior) {
if c.isStopping() {
return
}
c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
defer c.logger.Debug("Exiting")
var err error
conn, err := c.connStore.getConnection(peer)
if err == nil {
disConnectOnErr := func(err error) {
c.logger.Warningf("%v isn't responsive: %v", peer, err)
c.disconnect(peer.PKIID)
conn.close()
}
conn.send(msg, disConnectOnErr, shouldBlock)
return
}
c.logger.Warningf("Failed obtaining connection for %v reason: %v", peer, err)
c.disconnect(peer.PKIID)
}
func (c *commImpl) isStopping() bool {
return atomic.LoadInt32(&c.stopping) == int32(1)
}
func (c *commImpl) Probe(remotePeer *RemotePeer) error {
var dialOpts []grpc.DialOption
endpoint := remotePeer.Endpoint
pkiID := remotePeer.PKIID
if c.isStopping() {
return fmt.Errorf("Stopping")
}
c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID)
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, remotePeer.Endpoint, dialOpts...)
if err != nil {
c.logger.Debugf("Returning %v", err)
return err
}
defer cc.Close()
cl := proto.NewGossipClient(cc)
ctx, cancel = context.WithTimeout(context.Background(), DefConnTimeout)
defer cancel()
_, err = cl.Ping(ctx, &proto.Empty{})
c.logger.Debugf("Returning %v", err)
return err
}
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) {
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, c.dialTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, remotePeer.Endpoint, dialOpts...)
if err != nil {
return nil, err
}
defer cc.Close()
cl := proto.NewGossipClient(cc)
ctx, cancel = context.WithTimeout(context.Background(), DefConnTimeout)
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
return nil, err
}
ctx, cancel = context.WithTimeout(context.Background(), handshakeTimeout)
defer cancel()
stream, err := cl.GossipStream(ctx)
if err != nil {
return nil, err
}
connInfo, err := c.authenticateRemotePeer(stream, true)
if err != nil {
c.logger.Warningf("Authentication failed: %v", err)
return nil, err
}
if len(remotePeer.PKIID) > 0 && !bytes.Equal(connInfo.ID, remotePeer.PKIID) {
return nil, fmt.Errorf("PKI-ID of remote peer doesn't match expected PKI-ID")
}
return connInfo.Identity, nil
}
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage {
genericChan := c.msgPublisher.AddChannel(acceptor)
specificChan := make(chan proto.ReceivedMessage, 10)
if c.isStopping() {
c.logger.Warning("Accept() called but comm module is stopping, returning empty channel")
return specificChan
}
c.lock.Lock()
c.subscriptions = append(c.subscriptions, specificChan)
c.lock.Unlock()
c.stopWG.Add(1)
go func() {
defer c.logger.Debug("Exiting Accept() loop")
defer c.stopWG.Done()
for {
select {
case msg := <-genericChan:
if msg == nil {
return
}
select {
case specificChan <- msg.(*ReceivedMessageImpl):
case <-c.exitChan:
return
}
case <-c.exitChan:
return
}
}
}()
return specificChan
}
func (c *commImpl) PresumedDead() <-chan common.PKIidType {
return c.deadEndpoints
}
func (c *commImpl) IdentitySwitch() <-chan common.PKIidType {
return c.identityChanges
}
func (c *commImpl) CloseConn(peer *RemotePeer) {
c.logger.Debug("Closing connection for", peer)
c.connStore.closeConn(peer)
}
func (c *commImpl) closeSubscriptions() {
c.lock.Lock()
defer c.lock.Unlock()
for _, ch := range c.subscriptions {
close(ch)
}
}
func (c *commImpl) Stop() {
if !atomic.CompareAndSwapInt32(&c.stopping, 0, int32(1)) {
return
}
c.logger.Info("Stopping")
defer c.logger.Info("Stopped")
c.connStore.shutdown()
c.logger.Debug("Shut down connection store, connection count:", c.connStore.connNum())
c.msgPublisher.Close()
close(c.exitChan)
c.stopWG.Wait()
c.closeSubscriptions()
}
func (c *commImpl) GetPKIid() common.PKIidType {
return c.PKIID
}
func extractRemoteAddress(stream stream) string {
var remoteAddress string
p, ok := peer.FromContext(stream.Context())
if ok {
if address := p.Addr; address != nil {
remoteAddress = address.String()
}
}
return remoteAddress
}
func (c *commImpl) authenticateRemotePeer(stream stream, initiator bool) (*proto.ConnectionInfo, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
var err error
var cMsg *proto.SignedGossipMessage
useTLS := c.tlsCerts != nil
var selfCertHash []byte
if useTLS {
certReference := c.tlsCerts.TLSServerCert
if initiator {
certReference = c.tlsCerts.TLSClientCert
}
selfCertHash = certHashFromRawCert(certReference.Load().(*tls.Certificate).Certificate[0])
}
signer := func(msg []byte) ([]byte, error) {
return c.idMapper.Sign(msg)
}
// TLS enabled but not detected on other side
if useTLS && len(remoteCertHash) == 0 {
c.logger.Warningf("%s didn't send TLS certificate", remoteAddress)
return nil, fmt.Errorf("No TLS certificate")
}
cMsg, err = c.createConnectionMsg(c.PKIID, selfCertHash, c.peerIdentity, signer)
if err != nil {
return nil, err
}
c.logger.Debug("Sending", cMsg, "to", remoteAddress)
stream.Send(cMsg.Envelope)
m, err := readWithTimeout(stream, c.connTimeout, remoteAddress)
if err != nil {
c.logger.Warningf("Failed reading messge from %s, reason: %v", remoteAddress, err)
return nil, err
}
receivedMsg := m.GetConn()
if receivedMsg == nil {
c.logger.Warning("Expected connection message from", remoteAddress, "but got", receivedMsg)
return nil, fmt.Errorf("Wrong type")
}
if receivedMsg.PkiId == nil {
c.logger.Warningf("%s didn't send a pkiID", remoteAddress)
return nil, fmt.Errorf("No PKI-ID")
}
c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Identity)
if err != nil {
c.logger.Warningf("Identity store rejected %s : %v", remoteAddress, err)
return nil, err
}
connInfo := &proto.ConnectionInfo{
ID: receivedMsg.PkiId,
Identity: receivedMsg.Identity,
Endpoint: remoteAddress,
Auth: &proto.AuthInfo{
Signature: m.Signature,
SignedData: m.Payload,
},
}
// if TLS is enabled and detected, verify remote peer
if useTLS {
// If the remote peer sent its TLS certificate, make sure it actually matches the TLS cert
// that the peer used.
if !bytes.Equal(remoteCertHash, receivedMsg.TlsCertHash) {
return nil, errors.Errorf("Expected %v in remote hash of TLS cert, but got %v", remoteCertHash, receivedMsg.TlsCertHash)
}
}
// Final step - verify the signature on the connection message itself
verifier := func(peerIdentity []byte, signature, message []byte) error {
pkiID := c.idMapper.GetPKIidOfCert(api.PeerIdentityType(peerIdentity))
return c.idMapper.Verify(pkiID, signature, message)
}
err = m.Verify(receivedMsg.Identity, verifier)
if err != nil {
c.logger.Errorf("Failed verifying signature from %s : %v", remoteAddress, err)
return nil, err
}
c.logger.Debug("Authenticated", remoteAddress)
return connInfo, nil
}
// SendWithAck sends a message to remote peers, waiting for acknowledgement from minAck of them, or until a certain timeout expires
func (c *commImpl) SendWithAck(msg *proto.SignedGossipMessage, timeout time.Duration, minAck int, peers ...*RemotePeer) AggregatedSendResult {
if len(peers) == 0 {
return nil
}
var err error
// Roll a random NONCE to be used as a send ID to differentiate
// between different invocations
msg.Nonce = util.RandomUInt64()
// Replace the envelope in the message to update the NONCE
msg, err = msg.NoopSign()
if c.isStopping() || err != nil {
if err == nil {
err = errors.New("comm is stopping")
}
results := []SendResult{}
for _, p := range peers {
results = append(results, SendResult{
error: err,
RemotePeer: *p,
})
}
return results
}
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
sndFunc := func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg, blockingSend)
}
// Subscribe to acks
subscriptions := make(map[string]func() error)
for _, p := range peers {
topic := topicForAck(msg.Nonce, p.PKIID)
sub := c.pubSub.Subscribe(topic, timeout)
subscriptions[string(p.PKIID)] = func() error {
msg, err := sub.Listen()
if err != nil {
return err
}
if msg, isAck := msg.(*proto.Acknowledgement); !isAck {
return fmt.Errorf("Received a message of type %s, expected *proto.Acknowledgement", reflect.TypeOf(msg))
} else {
if msg.Error != "" {
return errors.New(msg.Error)
}
}
return nil
}
}
waitForAck := func(p *RemotePeer) error {
return subscriptions[string(p.PKIID)]()
}
ackOperation := newAckSendOperation(sndFunc, waitForAck)
return ackOperation.send(msg, minAck, peers...)
}
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
if c.isStopping() {
return fmt.Errorf("Shutting down")
}
connInfo, err := c.authenticateRemotePeer(stream, false)
if err != nil {
c.logger.Errorf("Authentication failed: %v", err)
return err
}
c.logger.Debug("Servicing", extractRemoteAddress(stream))
conn := c.connStore.onConnected(stream, connInfo, c.metrics)
h := func(m *proto.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
defer func() {
c.logger.Debug("Client", extractRemoteAddress(stream), " disconnected")
c.connStore.closeByPKIid(connInfo.ID)
conn.close()
}()
return conn.serviceConnection()
}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
return &proto.Empty{}, nil
}
func (c *commImpl) disconnect(pkiID common.PKIidType) {
if c.isStopping() {
return
}
c.deadEndpoints <- pkiID
c.connStore.closeByPKIid(pkiID)
}
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) {
incChan := make(chan *proto.SignedGossipMessage, 1)
errChan := make(chan error, 1)
go func() {
if srvStr, isServerStr := stream.(proto.Gossip_GossipStreamServer); isServerStr {
if m, err := srvStr.Recv(); err == nil {
msg, err := m.ToGossipMessage()
if err != nil {
errChan <- err
return
}
incChan <- msg
}
} else if clStr, isClientStr := stream.(proto.Gossip_GossipStreamClient); isClientStr {
if m, err := clStr.Recv(); err == nil {
msg, err := m.ToGossipMessage()
if err != nil {
errChan <- err
return
}
incChan <- msg
}
} else {
panic(errors.Errorf("Stream isn't a GossipStreamServer or a GossipStreamClient, but %v. Aborting", reflect.TypeOf(stream)))
}
}()
select {
case <-time.After(timeout):
return nil, errors.Errorf("timed out waiting for connection message from %s", address)
case m := <-incChan:
return m, nil
case err := <-errChan:
return nil, errors.WithStack(err)
}
}
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error) {
m := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_Conn{
Conn: &proto.ConnEstablish{
TlsCertHash: certHash,
Identity: cert,
PkiId: pkiID,
},
},
}
sMsg := &proto.SignedGossipMessage{
GossipMessage: m,
}
_, err := sMsg.Sign(signer)
return sMsg, errors.WithStack(err)
}
type stream interface {
Send(envelope *proto.Envelope) error
Recv() (*proto.Envelope, error)
grpc.Stream
}
func topicForAck(nonce uint64, pkiID common.PKIidType) string {
return fmt.Sprintf("%d %s", nonce, hex.EncodeToString(pkiID))
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/venjia/fabric.git
git@gitee.com:venjia/fabric.git
venjia
fabric
fabric
v1.4.4

Search