1 Star 0 Fork 0

peter / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
common.go 23.84 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package chaincode
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"strings"
"sync"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/cauthdsl"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/chaincode/shim"
"github.com/hyperledger/fabric/core/container"
"github.com/hyperledger/fabric/internal/peer/common"
"github.com/hyperledger/fabric/internal/pkg/identity"
pcommon "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// checkSpec to see if chaincode resides within current package capture for language.
func checkSpec(spec *pb.ChaincodeSpec) error {
// Don't allow nil value
if spec == nil {
return errors.New("expected chaincode specification, nil received")
}
if spec.ChaincodeId == nil {
return errors.New("expected chaincode ID, nil received")
}
return platformRegistry.ValidateSpec(spec.Type.String(), spec.ChaincodeId.Path)
}
// getChaincodeDeploymentSpec get chaincode deployment spec given the chaincode spec
func getChaincodeDeploymentSpec(spec *pb.ChaincodeSpec, crtPkg bool) (*pb.ChaincodeDeploymentSpec, error) {
var codePackageBytes []byte
if crtPkg {
var err error
if err = checkSpec(spec); err != nil {
return nil, err
}
codePackageBytes, err = container.GetChaincodePackageBytes(platformRegistry, spec)
if err != nil {
err = errors.WithMessage(err, "error getting chaincode package bytes")
return nil, err
}
}
chaincodeDeploymentSpec := &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec, CodePackage: codePackageBytes}
return chaincodeDeploymentSpec, nil
}
// getChaincodeSpec get chaincode spec from the cli cmd pramameters
func getChaincodeSpec(cmd *cobra.Command) (*pb.ChaincodeSpec, error) {
spec := &pb.ChaincodeSpec{}
if err := checkChaincodeCmdParams(cmd); err != nil {
// unset usage silence because it's a command line usage error
cmd.SilenceUsage = false
return spec, err
}
// Build the spec
input := chaincodeInput{}
if err := json.Unmarshal([]byte(chaincodeCtorJSON), &input); err != nil {
return spec, errors.Wrap(err, "chaincode argument error")
}
input.IsInit = isInit
chaincodeLang = strings.ToUpper(chaincodeLang)
spec = &pb.ChaincodeSpec{
Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value[chaincodeLang]),
ChaincodeId: &pb.ChaincodeID{Path: chaincodePath, Name: chaincodeName, Version: chaincodeVersion},
Input: &input.ChaincodeInput,
}
return spec, nil
}
// chaincodeInput is wrapper around the proto defined ChaincodeInput message that
// is decorated with a custom JSON unmarshaller.
type chaincodeInput struct {
pb.ChaincodeInput
}
// UnmarshalJSON converts the string-based REST/JSON input to
// the []byte-based current ChaincodeInput structure.
func (c *chaincodeInput) UnmarshalJSON(b []byte) error {
sa := struct {
Function string
Args []string
}{}
err := json.Unmarshal(b, &sa)
if err != nil {
return err
}
allArgs := sa.Args
if sa.Function != "" {
allArgs = append([]string{sa.Function}, sa.Args...)
}
c.Args = util.ToChaincodeArgs(allArgs...)
return nil
}
func chaincodeInvokeOrQuery(cmd *cobra.Command, invoke bool, cf *ChaincodeCmdFactory) (err error) {
spec, err := getChaincodeSpec(cmd)
if err != nil {
return err
}
// call with empty txid to ensure production code generates a txid.
// otherwise, tests can explicitly set their own txid
txID := ""
proposalResp, err := ChaincodeInvokeOrQuery(
spec,
channelID,
txID,
invoke,
cf.Signer,
cf.Certificate,
cf.EndorserClients,
cf.DeliverClients,
cf.BroadcastClient,
)
if err != nil {
return errors.Errorf("%s - proposal response: %v", err, proposalResp)
}
if invoke {
logger.Debugf("ESCC invoke result: %v", proposalResp)
pRespPayload, err := protoutil.GetProposalResponsePayload(proposalResp.Payload)
if err != nil {
return errors.WithMessage(err, "error while unmarshaling proposal response payload")
}
ca, err := protoutil.GetChaincodeAction(pRespPayload.Extension)
if err != nil {
return errors.WithMessage(err, "error while unmarshaling chaincode action")
}
if proposalResp.Endorsement == nil {
return errors.Errorf("endorsement failure during invoke. response: %v", proposalResp.Response)
}
logger.Infof("Chaincode invoke successful. result: %v", ca.Response)
} else {
if proposalResp == nil {
return errors.New("error during query: received nil proposal response")
}
if proposalResp.Endorsement == nil {
return errors.Errorf("endorsement failure during query. response: %v", proposalResp.Response)
}
if chaincodeQueryRaw && chaincodeQueryHex {
return fmt.Errorf("options --raw (-r) and --hex (-x) are not compatible")
}
if chaincodeQueryRaw {
fmt.Println(proposalResp.Response.Payload)
return nil
}
if chaincodeQueryHex {
fmt.Printf("%x\n", proposalResp.Response.Payload)
return nil
}
fmt.Println(string(proposalResp.Response.Payload))
}
return nil
}
type collectionConfigJson struct {
Name string `json:"name"`
Policy string `json:"policy"`
RequiredCount int32 `json:"requiredPeerCount"`
MaxPeerCount int32 `json:"maxPeerCount"`
BlockToLive uint64 `json:"blockToLive"`
MemberOnlyRead bool `json:"memberOnlyRead"`
MemberOnlyWrite bool `json:"memberOnlyWrite"`
}
// GetCollectionConfigFromFile retrieves the collection configuration
// from the supplied file; the supplied file must contain a
// json-formatted array of collectionConfigJson elements
func GetCollectionConfigFromFile(ccFile string) (*pcommon.CollectionConfigPackage, []byte, error) {
fileBytes, err := ioutil.ReadFile(ccFile)
if err != nil {
return nil, nil, errors.Wrapf(err, "could not read file '%s'", ccFile)
}
return getCollectionConfigFromBytes(fileBytes)
}
// getCollectionConfig retrieves the collection configuration
// from the supplied byte array; the byte array must contain a
// json-formatted array of collectionConfigJson elements
func getCollectionConfigFromBytes(cconfBytes []byte) (*pcommon.CollectionConfigPackage, []byte, error) {
cconf := &[]collectionConfigJson{}
err := json.Unmarshal(cconfBytes, cconf)
if err != nil {
return nil, nil, errors.Wrap(err, "could not parse the collection configuration")
}
ccarray := make([]*pcommon.CollectionConfig, 0, len(*cconf))
for _, cconfitem := range *cconf {
p, err := cauthdsl.FromString(cconfitem.Policy)
if err != nil {
return nil, nil, errors.WithMessagef(err, "invalid policy %s", cconfitem.Policy)
}
cpc := &pcommon.CollectionPolicyConfig{
Payload: &pcommon.CollectionPolicyConfig_SignaturePolicy{
SignaturePolicy: p,
},
}
cc := &pcommon.CollectionConfig{
Payload: &pcommon.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &pcommon.StaticCollectionConfig{
Name: cconfitem.Name,
MemberOrgsPolicy: cpc,
RequiredPeerCount: cconfitem.RequiredCount,
MaximumPeerCount: cconfitem.MaxPeerCount,
BlockToLive: cconfitem.BlockToLive,
MemberOnlyRead: cconfitem.MemberOnlyRead,
MemberOnlyWrite: cconfitem.MemberOnlyWrite,
},
},
}
ccarray = append(ccarray, cc)
}
ccp := &pcommon.CollectionConfigPackage{Config: ccarray}
ccpBytes, err := proto.Marshal(ccp)
return ccp, ccpBytes, err
}
func checkChaincodeCmdParams(cmd *cobra.Command) error {
// we need chaincode name for everything, including deploy
if chaincodeName == common.UndefinedParamValue {
return errors.Errorf("must supply value for %s name parameter", chainFuncName)
}
if cmd.Name() == instantiateCmdName || cmd.Name() == installCmdName ||
cmd.Name() == upgradeCmdName || cmd.Name() == packageCmdName {
if chaincodeVersion == common.UndefinedParamValue {
return errors.Errorf("chaincode version is not provided for %s", cmd.Name())
}
if escc != common.UndefinedParamValue {
logger.Infof("Using escc %s", escc)
} else {
logger.Info("Using default escc")
escc = "escc"
}
if vscc != common.UndefinedParamValue {
logger.Infof("Using vscc %s", vscc)
} else {
logger.Info("Using default vscc")
vscc = "vscc"
}
if policy != common.UndefinedParamValue {
p, err := cauthdsl.FromString(policy)
if err != nil {
return errors.Errorf("invalid policy %s", policy)
}
policyMarshalled = protoutil.MarshalOrPanic(p)
}
if collectionsConfigFile != common.UndefinedParamValue {
var err error
_, collectionConfigBytes, err = GetCollectionConfigFromFile(collectionsConfigFile)
if err != nil {
return errors.WithMessagef(err, "invalid collection configuration in file %s", collectionsConfigFile)
}
}
}
// Check that non-empty chaincode parameters contain only Args as a key.
// Type checking is done later when the JSON is actually unmarshaled
// into a pb.ChaincodeInput. To better understand what's going
// on here with JSON parsing see http://blog.golang.org/json-and-go -
// Generic JSON with interface{}
if chaincodeCtorJSON != "{}" {
var f interface{}
err := json.Unmarshal([]byte(chaincodeCtorJSON), &f)
if err != nil {
return errors.Wrap(err, "chaincode argument error")
}
m := f.(map[string]interface{})
sm := make(map[string]interface{})
for k := range m {
sm[strings.ToLower(k)] = m[k]
}
_, argsPresent := sm["args"]
_, funcPresent := sm["function"]
if !argsPresent || (len(m) == 2 && !funcPresent) || len(m) > 2 {
return errors.New("non-empty JSON chaincode parameters must contain the following keys: 'Args' or 'Function' and 'Args'")
}
} else {
if cmd == nil || (cmd != chaincodeInstallCmd && cmd != chaincodePackageCmd) {
return errors.New("empty JSON chaincode parameters must contain the following keys: 'Args' or 'Function' and 'Args'")
}
}
return nil
}
func validatePeerConnectionParameters(cmdName string) error {
if connectionProfile != common.UndefinedParamValue {
networkConfig, err := common.GetConfig(connectionProfile)
if err != nil {
return err
}
if len(networkConfig.Channels[channelID].Peers) != 0 {
peerAddresses = []string{}
tlsRootCertFiles = []string{}
for peer, peerChannelConfig := range networkConfig.Channels[channelID].Peers {
if peerChannelConfig.EndorsingPeer {
peerConfig, ok := networkConfig.Peers[peer]
if !ok {
return errors.Errorf("peer '%s' is defined in the channel config but doesn't have associated peer config", peer)
}
peerAddresses = append(peerAddresses, peerConfig.URL)
tlsRootCertFiles = append(tlsRootCertFiles, peerConfig.TLSCACerts.Path)
}
}
}
}
// currently only support multiple peer addresses for invoke
multiplePeersAllowed := map[string]bool{
"invoke": true,
}
_, ok := multiplePeersAllowed[cmdName]
if !ok && len(peerAddresses) > 1 {
return errors.Errorf("'%s' command can only be executed against one peer. received %d", cmdName, len(peerAddresses))
}
if len(tlsRootCertFiles) > len(peerAddresses) {
logger.Warningf("received more TLS root cert files (%d) than peer addresses (%d)", len(tlsRootCertFiles), len(peerAddresses))
}
if viper.GetBool("peer.tls.enabled") {
if len(tlsRootCertFiles) != len(peerAddresses) {
return errors.Errorf("number of peer addresses (%d) does not match the number of TLS root cert files (%d)", len(peerAddresses), len(tlsRootCertFiles))
}
} else {
tlsRootCertFiles = nil
}
return nil
}
// ChaincodeCmdFactory holds the clients used by ChaincodeCmd
type ChaincodeCmdFactory struct {
EndorserClients []pb.EndorserClient
DeliverClients []pb.DeliverClient
Certificate tls.Certificate
Signer identity.SignerSerializer
BroadcastClient common.BroadcastClient
}
// InitCmdFactory init the ChaincodeCmdFactory with default clients
func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool) (*ChaincodeCmdFactory, error) {
var err error
var endorserClients []pb.EndorserClient
var deliverClients []pb.DeliverClient
if isEndorserRequired {
if err = validatePeerConnectionParameters(cmdName); err != nil {
return nil, errors.WithMessage(err, "error validating peer connection parameters")
}
for i, address := range peerAddresses {
var tlsRootCertFile string
if tlsRootCertFiles != nil {
tlsRootCertFile = tlsRootCertFiles[i]
}
endorserClient, err := common.GetEndorserClientFnc(address, tlsRootCertFile)
if err != nil {
return nil, errors.WithMessagef(err, "error getting endorser client for %s", cmdName)
}
endorserClients = append(endorserClients, endorserClient)
deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile)
if err != nil {
return nil, errors.WithMessagef(err, "error getting deliver client for %s", cmdName)
}
deliverClients = append(deliverClients, deliverClient)
}
if len(endorserClients) == 0 {
return nil, errors.New("no endorser clients retrieved - this might indicate a bug")
}
}
certificate, err := common.GetCertificateFnc()
if err != nil {
return nil, errors.WithMessage(err, "error getting client cerificate")
}
signer, err := common.GetDefaultSignerFnc()
if err != nil {
return nil, errors.WithMessage(err, "error getting default signer")
}
var broadcastClient common.BroadcastClient
if isOrdererRequired {
if len(common.OrderingEndpoint) == 0 {
if len(endorserClients) == 0 {
return nil, errors.New("orderer is required, but no ordering endpoint or endorser client supplied")
}
endorserClient := endorserClients[0]
orderingEndpoints, err := common.GetOrdererEndpointOfChainFnc(channelID, signer, endorserClient)
if err != nil {
return nil, errors.WithMessagef(err, "error getting channel (%s) orderer endpoint", channelID)
}
if len(orderingEndpoints) == 0 {
return nil, errors.Errorf("no orderer endpoints retrieved for channel %s", channelID)
}
logger.Infof("Retrieved channel (%s) orderer endpoint: %s", channelID, orderingEndpoints[0])
// override viper env
viper.Set("orderer.address", orderingEndpoints[0])
}
broadcastClient, err = common.GetBroadcastClientFnc()
if err != nil {
return nil, errors.WithMessage(err, "error getting broadcast client")
}
}
return &ChaincodeCmdFactory{
EndorserClients: endorserClients,
DeliverClients: deliverClients,
Signer: signer,
BroadcastClient: broadcastClient,
Certificate: certificate,
}, nil
}
// ChaincodeInvokeOrQuery invokes or queries the chaincode. If successful, the
// INVOKE form prints the ProposalResponse to STDOUT, and the QUERY form prints
// the query result on STDOUT. A command-line flag (-r, --raw) determines
// whether the query result is output as raw bytes, or as a printable string.
// The printable form is optionally (-x, --hex) a hexadecimal representation
// of the query response. If the query response is NIL, nothing is output.
//
// NOTE - Query will likely go away as all interactions with the endorser are
// Proposal and ProposalResponses
func ChaincodeInvokeOrQuery(
spec *pb.ChaincodeSpec,
cID string,
txID string,
invoke bool,
signer identity.SignerSerializer,
certificate tls.Certificate,
endorserClients []pb.EndorserClient,
deliverClients []pb.DeliverClient,
bc common.BroadcastClient,
) (*pb.ProposalResponse, error) {
// Build the ChaincodeInvocationSpec message
invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
creator, err := signer.Serialize()
if err != nil {
return nil, errors.WithMessage(err, "error serializing identity")
}
funcName := "invoke"
if !invoke {
funcName = "query"
}
// extract the transient field if it exists
var tMap map[string][]byte
if transient != "" {
if err := json.Unmarshal([]byte(transient), &tMap); err != nil {
return nil, errors.Wrap(err, "error parsing transient string")
}
}
prop, txid, err := protoutil.CreateChaincodeProposalWithTxIDAndTransient(pcommon.HeaderType_ENDORSER_TRANSACTION, cID, invocation, creator, txID, tMap)
if err != nil {
return nil, errors.WithMessagef(err, "error creating proposal for %s", funcName)
}
signedProp, err := protoutil.GetSignedProposal(prop, signer)
if err != nil {
return nil, errors.WithMessagef(err, "error creating signed proposal for %s", funcName)
}
var responses []*pb.ProposalResponse
for _, endorser := range endorserClients {
proposalResp, err := endorser.ProcessProposal(context.Background(), signedProp)
if err != nil {
return nil, errors.WithMessagef(err, "error endorsing %s", funcName)
}
responses = append(responses, proposalResp)
}
if len(responses) == 0 {
// this should only happen if some new code has introduced a bug
return nil, errors.New("no proposal responses received - this might indicate a bug")
}
// all responses will be checked when the signed transaction is created.
// for now, just set this so we check the first response's status
proposalResp := responses[0]
if invoke {
if proposalResp != nil {
if proposalResp.Response.Status >= shim.ERRORTHRESHOLD {
return proposalResp, nil
}
// assemble a signed transaction (it's an Envelope message)
env, err := protoutil.CreateSignedTx(prop, signer, responses...)
if err != nil {
return proposalResp, errors.WithMessage(err, "could not assemble transaction")
}
var dg *DeliverGroup
var ctx context.Context
if waitForEvent {
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithTimeout(context.Background(), waitForEventTimeout)
defer cancelFunc()
dg = NewDeliverGroup(
deliverClients,
peerAddresses,
signer,
certificate,
channelID,
txid,
)
// connect to deliver service on all peers
err := dg.Connect(ctx)
if err != nil {
return nil, err
}
}
// send the envelope for ordering
if err = bc.Send(env); err != nil {
return proposalResp, errors.WithMessagef(err, "error sending transaction for %s", funcName)
}
if dg != nil && ctx != nil {
// wait for event that contains the txid from all peers
err = dg.Wait(ctx)
if err != nil {
return nil, err
}
}
}
}
return proposalResp, nil
}
// DeliverGroup holds all of the information needed to connect
// to a set of peers to wait for the interested txid to be
// committed to the ledgers of all peers. This functionality
// is currently implemented via the peer's DeliverFiltered service.
// An error from any of the peers/deliver clients will result in
// the invoke command returning an error. Only the first error that
// occurs will be set
type DeliverGroup struct {
Clients []*DeliverClient
Certificate tls.Certificate
ChannelID string
TxID string
Signer identity.SignerSerializer
mutex sync.Mutex
Error error
wg sync.WaitGroup
}
// DeliverClient holds the client/connection related to a specific
// peer. The address is included for logging purposes
type DeliverClient struct {
Client pb.DeliverClient
Connection pb.Deliver_DeliverClient
Address string
}
func NewDeliverGroup(
deliverClients []pb.DeliverClient,
peerAddresses []string,
signer identity.SignerSerializer,
certificate tls.Certificate,
channelID string,
txid string,
) *DeliverGroup {
clients := make([]*DeliverClient, len(deliverClients))
for i, client := range deliverClients {
dc := &DeliverClient{
Client: client,
Address: peerAddresses[i],
}
clients[i] = dc
}
dg := &DeliverGroup{
Clients: clients,
Certificate: certificate,
ChannelID: channelID,
TxID: txid,
Signer: signer,
}
return dg
}
// Connect waits for all deliver clients in the group to connect to
// the peer's deliver service, receive an error, or for the context
// to timeout. An error will be returned whenever even a single
// deliver client fails to connect to its peer
func (dg *DeliverGroup) Connect(ctx context.Context) error {
dg.wg.Add(len(dg.Clients))
for _, client := range dg.Clients {
go dg.ClientConnect(ctx, client)
}
readyCh := make(chan struct{})
go dg.WaitForWG(readyCh)
select {
case <-readyCh:
if dg.Error != nil {
err := errors.WithMessage(dg.Error, "failed to connect to deliver on all peers")
return err
}
case <-ctx.Done():
err := errors.New("timed out waiting for connection to deliver on all peers")
return err
}
return nil
}
// ClientConnect sends a deliver seek info envelope using the
// provided deliver client, setting the deliverGroup's Error
// field upon any error
func (dg *DeliverGroup) ClientConnect(ctx context.Context, dc *DeliverClient) {
defer dg.wg.Done()
df, err := dc.Client.DeliverFiltered(ctx)
if err != nil {
err = errors.WithMessagef(err, "error connecting to deliver filtered at %s", dc.Address)
dg.setError(err)
return
}
defer df.CloseSend()
dc.Connection = df
envelope := createDeliverEnvelope(dg.ChannelID, dg.Certificate, dg.Signer)
err = df.Send(envelope)
if err != nil {
err = errors.WithMessagef(err, "error sending deliver seek info envelope to %s", dc.Address)
dg.setError(err)
return
}
}
// Wait waits for all deliver client connections in the group to
// either receive a block with the txid, an error, or for the
// context to timeout
func (dg *DeliverGroup) Wait(ctx context.Context) error {
if len(dg.Clients) == 0 {
return nil
}
dg.wg.Add(len(dg.Clients))
for _, client := range dg.Clients {
go dg.ClientWait(client)
}
readyCh := make(chan struct{})
go dg.WaitForWG(readyCh)
select {
case <-readyCh:
if dg.Error != nil {
return dg.Error
}
case <-ctx.Done():
err := errors.New("timed out waiting for txid on all peers")
return err
}
return nil
}
// ClientWait waits for the specified deliver client to receive
// a block event with the requested txid
func (dg *DeliverGroup) ClientWait(dc *DeliverClient) {
defer dg.wg.Done()
for {
resp, err := dc.Connection.Recv()
if err != nil {
err = errors.WithMessagef(err, "error receiving from deliver filtered at %s", dc.Address)
dg.setError(err)
return
}
switch r := resp.Type.(type) {
case *pb.DeliverResponse_FilteredBlock:
filteredTransactions := r.FilteredBlock.FilteredTransactions
for _, tx := range filteredTransactions {
if tx.Txid == dg.TxID {
logger.Infof("txid [%s] committed with status (%s) at %s", dg.TxID, tx.TxValidationCode, dc.Address)
if tx.TxValidationCode != pb.TxValidationCode_VALID {
err = errors.Errorf("transaction invalidated with status (%s)", tx.TxValidationCode)
dg.setError(err)
}
return
}
}
case *pb.DeliverResponse_Status:
err = errors.Errorf("deliver completed with status (%s) before txid received", r.Status)
dg.setError(err)
return
default:
err = errors.Errorf("received unexpected response type (%T) from %s", r, dc.Address)
dg.setError(err)
return
}
}
}
// WaitForWG waits for the deliverGroup's wait group and closes
// the channel when ready
func (dg *DeliverGroup) WaitForWG(readyCh chan struct{}) {
dg.wg.Wait()
close(readyCh)
}
// setError serializes an error for the deliverGroup
func (dg *DeliverGroup) setError(err error) {
dg.mutex.Lock()
dg.Error = err
dg.mutex.Unlock()
}
func createDeliverEnvelope(
channelID string,
certificate tls.Certificate,
signer identity.SignerSerializer,
) *pcommon.Envelope {
var tlsCertHash []byte
// check for client certificate and create hash if present
if len(certificate.Certificate) > 0 {
tlsCertHash = util.ComputeSHA256(certificate.Certificate[0])
}
start := &ab.SeekPosition{
Type: &ab.SeekPosition_Newest{
Newest: &ab.SeekNewest{},
},
}
stop := &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: math.MaxUint64,
},
},
}
seekInfo := &ab.SeekInfo{
Start: start,
Stop: stop,
Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
}
env, err := protoutil.CreateSignedEnvelopeWithTLSBinding(
pcommon.HeaderType_DELIVER_SEEK_INFO,
channelID,
signer,
seekInfo,
int32(0),
uint64(0),
tlsCertHash,
)
if err != nil {
logger.Errorf("Error signing envelope: %s", err)
return nil
}
return env
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/peter_code_git/fabric.git
git@gitee.com:peter_code_git/fabric.git
peter_code_git
fabric
fabric
v2.0.0-alpha

搜索帮助

344bd9b3 5694891 D2dac590 5694891