1 Star 0 Fork 0

陈文甲/fabric

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
comm_impl.go 19.29 KB
Copy Edit Raw Blame History
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"bytes"
"crypto/tls"
"encoding/hex"
"fmt"
"net"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/pkg/errors"
"github.com/spf13/viper"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
)
const (
defDialTimeout = time.Second * time.Duration(3)
defConnTimeout = time.Second * time.Duration(2)
defRecvBuffSize = 20
defSendBuffSize = 20
)
// SetDialTimeout sets the dial timeout
func SetDialTimeout(timeout time.Duration) {
viper.Set("peer.gossip.dialTimeout", timeout)
}
func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
if len(opts) == 0 {
c.logger.Warning("Given an empty set of grpc.DialOption, aborting")
return
}
c.opts = opts
}
// NewCommInstanceWithServer creates a comm instance that creates an underlying gRPC server
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {
var ll net.Listener
var s *grpc.Server
var certHash []byte
if port > 0 {
s, ll, secureDialOpts, certHash = createGRPCLayer(port)
}
commInst := &commImpl{
pubSub: util.NewPubSub(),
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
peerIdentity: peerIdentity,
opts: dialOpts,
secureDialOpts: secureDialOpts,
port: port,
lsnr: ll,
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.Mutex{},
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
dialTimeout: util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
if port > 0 {
commInst.stopWG.Add(1)
proto.RegisterGossipServer(s, commInst)
go func() {
defer commInst.stopWG.Done()
s.Serve(ll)
}()
}
return commInst, nil
}
// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
dialOpts ...grpc.DialOption) (Comm, error) {
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)
if err != nil {
return nil, errors.WithStack(err)
}
if cert != nil {
inst := commInst.(*commImpl)
if len(cert.Certificate) == 0 {
inst.logger.Panic("Certificate supplied but certificate chain is empty")
} else {
inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])
}
}
proto.RegisterGossipServer(s, commInst.(*commImpl))
return commInst, nil
}
type commImpl struct {
pubSub *util.PubSub
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
opts []grpc.DialOption
secureDialOpts func() []grpc.DialOption
connStore *connectionStore
PKIID []byte
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.Mutex
lsnr net.Listener
gSrv *grpc.Server
exitChan chan struct{}
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
port int
stopping int32
dialTimeout time.Duration
}
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
var err error
var cc *grpc.ClientConn
var stream proto.Gossip_GossipStreamClient
var pkiID common.PKIidType
var connInfo *proto.ConnectionInfo
var dialOpts []grpc.DialOption
c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")
if c.isStopping() {
return nil, errors.New("Stopping")
}
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, c.dialTimeout)
cc, err = grpc.DialContext(ctx, endpoint, dialOpts...)
if err != nil {
return nil, errors.WithStack(err)
}
cl := proto.NewGossipClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout)
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
cc.Close()
return nil, errors.WithStack(err)
}
ctx, cf := context.WithCancel(context.Background())
if stream, err = cl.GossipStream(ctx); err == nil {
connInfo, err = c.authenticateRemotePeer(stream)
if err == nil {
pkiID = connInfo.ID
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
// PKIID is nil when we don't know the remote PKI id's
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
cc.Close()
return nil, errors.New("Authentication failure")
}
conn := newConnection(cl, cc, stream, nil)
conn.pkiID = pkiID
conn.info = connInfo
conn.logger = c.logger
conn.cancel = cf
h := func(m *proto.SignedGossipMessage) {
c.logger.Debug("Got message:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
return conn, nil
}
c.logger.Warningf("Authentication failed: %+v", err)
}
cc.Close()
return nil, errors.WithStack(err)
}
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
if c.isStopping() || len(peers) == 0 {
return
}
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
for _, peer := range peers {
go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg, nonBlockingSend)
}(peer, msg)
}
}
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage, shouldBlock blockingBehavior) {
if c.isStopping() {
return
}
c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
defer c.logger.Debug("Exiting")
var err error
conn, err := c.connStore.getConnection(peer)
if err == nil {
disConnectOnErr := func(err error) {
c.logger.Warningf("%v isn't responsive: %v", peer, err)
c.disconnect(peer.PKIID)
}
conn.send(msg, disConnectOnErr, shouldBlock)
return
}
c.logger.Warningf("Failed obtaining connection for %v reason: %v", peer, err)
c.disconnect(peer.PKIID)
}
func (c *commImpl) isStopping() bool {
return atomic.LoadInt32(&c.stopping) == int32(1)
}
func (c *commImpl) Probe(remotePeer *RemotePeer) error {
var dialOpts []grpc.DialOption
endpoint := remotePeer.Endpoint
pkiID := remotePeer.PKIID
if c.isStopping() {
return fmt.Errorf("Stopping")
}
c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID)
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, c.dialTimeout)
cc, err := grpc.DialContext(ctx, remotePeer.Endpoint, dialOpts...)
if err != nil {
c.logger.Debugf("Returning %v", err)
return err
}
defer cc.Close()
cl := proto.NewGossipClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout)
defer cancel()
_, err = cl.Ping(ctx, &proto.Empty{})
c.logger.Debugf("Returning %v", err)
return err
}
func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) {
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, c.dialTimeout)
cc, err := grpc.DialContext(ctx, remotePeer.Endpoint, dialOpts...)
if err != nil {
return nil, err
}
defer cc.Close()
cl := proto.NewGossipClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout)
defer cancel()
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {
return nil, err
}
stream, err := cl.GossipStream(context.Background())
if err != nil {
return nil, err
}
connInfo, err := c.authenticateRemotePeer(stream)
if err != nil {
c.logger.Warningf("Authentication failed: %v", err)
return nil, err
}
if len(remotePeer.PKIID) > 0 && !bytes.Equal(connInfo.ID, remotePeer.PKIID) {
return nil, fmt.Errorf("PKI-ID of remote peer doesn't match expected PKI-ID")
}
return connInfo.Identity, nil
}
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage {
genericChan := c.msgPublisher.AddChannel(acceptor)
specificChan := make(chan proto.ReceivedMessage, 10)
if c.isStopping() {
c.logger.Warning("Accept() called but comm module is stopping, returning empty channel")
return specificChan
}
c.lock.Lock()
c.subscriptions = append(c.subscriptions, specificChan)
c.lock.Unlock()
go func() {
defer c.logger.Debug("Exiting Accept() loop")
defer func() {
recover()
}()
c.stopWG.Add(1)
defer c.stopWG.Done()
for {
select {
case msg := <-genericChan:
specificChan <- msg.(*ReceivedMessageImpl)
case s := <-c.exitChan:
c.exitChan <- s
return
}
}
}()
return specificChan
}
func (c *commImpl) PresumedDead() <-chan common.PKIidType {
return c.deadEndpoints
}
func (c *commImpl) CloseConn(peer *RemotePeer) {
c.logger.Debug("Closing connection for", peer)
c.connStore.closeConn(peer)
}
func (c *commImpl) emptySubscriptions() {
c.lock.Lock()
defer c.lock.Unlock()
for _, ch := range c.subscriptions {
close(ch)
}
}
func (c *commImpl) Stop() {
if c.isStopping() {
return
}
atomic.StoreInt32(&c.stopping, int32(1))
c.logger.Info("Stopping")
defer c.logger.Info("Stopped")
if c.gSrv != nil {
c.gSrv.Stop()
}
if c.lsnr != nil {
c.lsnr.Close()
}
c.connStore.shutdown()
c.logger.Debug("Shut down connection store, connection count:", c.connStore.connNum())
c.exitChan <- struct{}{}
c.msgPublisher.Close()
c.logger.Debug("Shut down publisher")
c.emptySubscriptions()
c.logger.Debug("Closed subscriptions, waiting for goroutines to stop...")
c.stopWG.Wait()
}
func (c *commImpl) GetPKIid() common.PKIidType {
return c.PKIID
}
func extractRemoteAddress(stream stream) string {
var remoteAddress string
p, ok := peer.FromContext(stream.Context())
if ok {
if address := p.Addr; address != nil {
remoteAddress = address.String()
}
}
return remoteAddress
}
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
var err error
var cMsg *proto.SignedGossipMessage
useTLS := c.selfCertHash != nil
signer := func(msg []byte) ([]byte, error) {
return c.idMapper.Sign(msg)
}
// TLS enabled but not detected on other side
if useTLS && len(remoteCertHash) == 0 {
c.logger.Warningf("%s didn't send TLS certificate", remoteAddress)
return nil, fmt.Errorf("No TLS certificate")
}
cMsg, err = c.createConnectionMsg(c.PKIID, c.selfCertHash, c.peerIdentity, signer)
if err != nil {
return nil, err
}
c.logger.Debug("Sending", cMsg, "to", remoteAddress)
stream.Send(cMsg.Envelope)
m, err := readWithTimeout(stream, util.GetDurationOrDefault("peer.gossip.connTimeout", defConnTimeout), remoteAddress)
if err != nil {
c.logger.Warningf("Failed reading messge from %s, reason: %v", remoteAddress, err)
return nil, err
}
receivedMsg := m.GetConn()
if receivedMsg == nil {
c.logger.Warning("Expected connection message from", remoteAddress, "but got", receivedMsg)
return nil, fmt.Errorf("Wrong type")
}
if receivedMsg.PkiId == nil {
c.logger.Warning("%s didn't send a pkiID", remoteAddress)
return nil, fmt.Errorf("No PKI-ID")
}
c.logger.Debug("Received", receivedMsg, "from", remoteAddress)
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Identity)
if err != nil {
c.logger.Warningf("Identity store rejected %s : %v", remoteAddress, err)
return nil, err
}
connInfo := &proto.ConnectionInfo{
ID: receivedMsg.PkiId,
Identity: receivedMsg.Identity,
Endpoint: remoteAddress,
Auth: &proto.AuthInfo{
Signature: m.Signature,
SignedData: m.Payload,
},
}
// if TLS is enabled and detected, verify remote peer
if useTLS {
// If the remote peer sent its TLS certificate, make sure it actually matches the TLS cert
// that the peer used.
if !bytes.Equal(remoteCertHash, receivedMsg.TlsCertHash) {
return nil, errors.Errorf("Expected %v in remote hash of TLS cert, but got %v", remoteCertHash, receivedMsg.TlsCertHash)
}
}
// Final step - verify the signature on the connection message itself
verifier := func(peerIdentity []byte, signature, message []byte) error {
pkiID := c.idMapper.GetPKIidOfCert(api.PeerIdentityType(peerIdentity))
return c.idMapper.Verify(pkiID, signature, message)
}
err = m.Verify(receivedMsg.Identity, verifier)
if err != nil {
c.logger.Errorf("Failed verifying signature from %s : %v", remoteAddress, err)
return nil, err
}
c.logger.Debug("Authenticated", remoteAddress)
return connInfo, nil
}
// SendWithAck sends a message to remote peers, waiting for acknowledgement from minAck of them, or until a certain timeout expires
func (c *commImpl) SendWithAck(msg *proto.SignedGossipMessage, timeout time.Duration, minAck int, peers ...*RemotePeer) AggregatedSendResult {
if len(peers) == 0 {
return nil
}
var err error
// Roll a random NONCE to be used as a send ID to differentiate
// between different invocations
msg.Nonce = util.RandomUInt64()
// Replace the envelope in the message to update the NONCE
msg, err = msg.NoopSign()
if c.isStopping() || err != nil {
if err == nil {
err = errors.New("comm is stopping")
}
results := []SendResult{}
for _, p := range peers {
results = append(results, SendResult{
error: err,
RemotePeer: *p,
})
}
return results
}
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
sndFunc := func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg, blockingSend)
}
// Subscribe to acks
subscriptions := make(map[string]func() error)
for _, p := range peers {
topic := topicForAck(msg.Nonce, p.PKIID)
sub := c.pubSub.Subscribe(topic, timeout)
subscriptions[string(p.PKIID)] = func() error {
msg, err := sub.Listen()
if err != nil {
return err
}
if msg, isAck := msg.(*proto.Acknowledgement); !isAck {
return fmt.Errorf("Received a message of type %s, expected *proto.Acknowledgement", reflect.TypeOf(msg))
} else {
if msg.Error != "" {
return errors.New(msg.Error)
}
}
return nil
}
}
waitForAck := func(p *RemotePeer) error {
return subscriptions[string(p.PKIID)]()
}
ackOperation := newAckSendOperation(sndFunc, waitForAck)
return ackOperation.send(msg, minAck, peers...)
}
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
if c.isStopping() {
return fmt.Errorf("Shutting down")
}
connInfo, err := c.authenticateRemotePeer(stream)
if err != nil {
c.logger.Errorf("Authentication failed: %v", err)
return err
}
c.logger.Debug("Servicing", extractRemoteAddress(stream))
conn := c.connStore.onConnected(stream, connInfo)
// if connStore denied the connection, it means we already have a connection to that peer
// so close this stream
if conn == nil {
return nil
}
h := func(m *proto.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
defer func() {
c.logger.Debug("Client", extractRemoteAddress(stream), " disconnected")
c.connStore.closeByPKIid(connInfo.ID)
conn.close()
}()
return conn.serviceConnection()
}
func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
return &proto.Empty{}, nil
}
func (c *commImpl) disconnect(pkiID common.PKIidType) {
if c.isStopping() {
return
}
c.deadEndpoints <- pkiID
c.connStore.closeByPKIid(pkiID)
}
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) {
incChan := make(chan *proto.SignedGossipMessage, 1)
errChan := make(chan error, 1)
go func() {
if srvStr, isServerStr := stream.(proto.Gossip_GossipStreamServer); isServerStr {
if m, err := srvStr.Recv(); err == nil {
msg, err := m.ToGossipMessage()
if err != nil {
errChan <- err
return
}
incChan <- msg
}
} else if clStr, isClientStr := stream.(proto.Gossip_GossipStreamClient); isClientStr {
if m, err := clStr.Recv(); err == nil {
msg, err := m.ToGossipMessage()
if err != nil {
errChan <- err
return
}
incChan <- msg
}
} else {
panic(errors.Errorf("Stream isn't a GossipStreamServer or a GossipStreamClient, but %v. Aborting", reflect.TypeOf(stream)))
}
}()
select {
case <-time.NewTicker(timeout).C:
return nil, errors.Errorf("Timed out waiting for connection message from %s", address)
case m := <-incChan:
return m, nil
case err := <-errChan:
return nil, errors.WithStack(err)
}
}
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error) {
m := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Content: &proto.GossipMessage_Conn{
Conn: &proto.ConnEstablish{
TlsCertHash: certHash,
Identity: cert,
PkiId: pkiID,
},
},
}
sMsg := &proto.SignedGossipMessage{
GossipMessage: m,
}
_, err := sMsg.Sign(signer)
return sMsg, errors.WithStack(err)
}
type stream interface {
Send(envelope *proto.Envelope) error
Recv() (*proto.Envelope, error)
grpc.Stream
}
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte) {
var returnedCertHash []byte
var s *grpc.Server
var ll net.Listener
var err error
var serverOpts []grpc.ServerOption
var dialOpts []grpc.DialOption
cert := GenerateCertificatesOrPanic()
returnedCertHash = certHashFromRawCert(cert.Certificate[0])
tlsConf := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequestClientCert,
InsecureSkipVerify: true,
}
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConf)))
ta := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
})
dialOpts = append(dialOpts, grpc.WithTransportCredentials(ta))
listenAddress := fmt.Sprintf("%s:%d", "", port)
ll, err = net.Listen("tcp", listenAddress)
if err != nil {
panic(err)
}
secureDialOpts := func() []grpc.DialOption {
return dialOpts
}
s = grpc.NewServer(serverOpts...)
return s, ll, secureDialOpts, returnedCertHash
}
func topicForAck(nonce uint64, pkiID common.PKIidType) string {
return fmt.Sprintf("%d %s", nonce, hex.EncodeToString(pkiID))
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/venjia/fabric.git
git@gitee.com:venjia/fabric.git
venjia
fabric
fabric
v1.1.0-preview

Search

0d507c66 1850385 C8b1a773 1850385