1 Star 0 Fork 0

peter/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
handler.go 57.45 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package chaincode
import (
"bytes"
"fmt"
"io"
"sync"
"time"
"github.com/golang/protobuf/proto"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/sysccprovider"
ccintf "github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/peer"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/looplab/fsm"
logging "github.com/op/go-logging"
"golang.org/x/net/context"
)
const (
createdstate = "created" //start state
establishedstate = "established" //in: CREATED, rcv: REGISTER, send: REGISTERED, INIT
readystate = "ready" //in:ESTABLISHED,TRANSACTION, rcv:COMPLETED
endstate = "end" //in:INIT,ESTABLISHED, rcv: error, terminate container
)
var chaincodeLogger = logging.MustGetLogger("chaincode")
// MessageHandler interface for handling chaincode messages (common between Peer chaincode support and chaincode)
type MessageHandler interface {
HandleMessage(msg *pb.ChaincodeMessage) error
SendMessage(msg *pb.ChaincodeMessage) error
}
type transactionContext struct {
chainID string
signedProp *pb.SignedProposal
proposal *pb.Proposal
responseNotifier chan *pb.ChaincodeMessage
// tracks open iterators used for range queries
queryIteratorMap map[string]commonledger.ResultsIterator
txsimulator ledger.TxSimulator
historyQueryExecutor ledger.HistoryQueryExecutor
}
type nextStateInfo struct {
msg *pb.ChaincodeMessage
sendToCC bool
//the only time we need to send synchronously is
//when launching the chaincode to take it to ready
//state (look for the panic when sending serial)
sendSync bool
}
//chaincode registered name is of the form
// <name>:<version>/<suffix>
type ccParts struct {
name string //the main name of the chaincode
version string //the version param if any (used for upgrade)
suffix string //for now just the chain name
}
// Handler responsbile for management of Peer's side of chaincode stream
type Handler struct {
sync.RWMutex
//peer to shim grpc serializer. User only in serialSend
serialLock sync.Mutex
ChatStream ccintf.ChaincodeStream
FSM *fsm.FSM
ChaincodeID *pb.ChaincodeID
ccCompParts *ccParts
chaincodeSupport *ChaincodeSupport
registered bool
readyNotify chan bool
// Map of tx txid to either invoke tx. Each tx will be
// added prior to execute and remove when done execute
txCtxs map[string]*transactionContext
txidMap map[string]bool
// used to do Send after making sure the state transition is complete
nextState chan *nextStateInfo
}
func shorttxid(txid string) string {
if len(txid) < 8 {
return txid
}
return txid[0:8]
}
//gets component parts from the canonical name of the chaincode.
//Called exactly once per chaincode when registering chaincode.
//This is needed for the "one-instance-per-chain" model when
//starting up the chaincode for each chain. It will still
//work for the "one-instance-for-all-chains" as the version
//and suffix will just be absent (also note that LCCC reserves
//"/:[]${}" as special chars mainly for such namespace uses)
func (handler *Handler) decomposeRegisteredName(cid *pb.ChaincodeID) {
handler.ccCompParts = chaincodeIDParts(cid.Name)
}
func chaincodeIDParts(ccName string) *ccParts {
b := []byte(ccName)
p := &ccParts{}
//compute suffix (ie, chain name)
i := bytes.IndexByte(b, '/')
if i >= 0 {
if i < len(b)-1 {
p.suffix = string(b[i+1:])
}
b = b[:i]
}
//compute version
i = bytes.IndexByte(b, ':')
if i >= 0 {
if i < len(b)-1 {
p.version = string(b[i+1:])
}
b = b[:i]
}
// remaining is the chaincode name
p.name = string(b)
return p
}
func (handler *Handler) getCCRootName() string {
return handler.ccCompParts.name
}
//serialSend serializes msgs so gRPC will be happy
func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error {
handler.serialLock.Lock()
defer handler.serialLock.Unlock()
var err error
if err = handler.ChatStream.Send(msg); err != nil {
err = fmt.Errorf("[%s]Error sending %s: %s", shorttxid(msg.Txid), msg.Type.String(), err)
chaincodeLogger.Errorf("%s", err)
}
return err
}
//serialSendAsync serves the same purpose as serialSend (serializ msgs so gRPC will
//be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop
//can be nonblocking. Only errors need to be handled and these are handled by
//communication on supplied error channel. A typical use will be a non-blocking or
//nil channel
func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {
go func() {
err := handler.serialSend(msg)
if errc != nil {
errc <- err
}
}()
}
func (handler *Handler) createTxContext(ctxt context.Context, chainID string, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal) (*transactionContext, error) {
if handler.txCtxs == nil {
return nil, fmt.Errorf("cannot create notifier for txid:%s", txid)
}
handler.Lock()
defer handler.Unlock()
if handler.txCtxs[txid] != nil {
return nil, fmt.Errorf("txid:%s exists", txid)
}
txctx := &transactionContext{chainID: chainID, signedProp: signedProp, proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
queryIteratorMap: make(map[string]commonledger.ResultsIterator)}
handler.txCtxs[txid] = txctx
txctx.txsimulator = getTxSimulator(ctxt)
txctx.historyQueryExecutor = getHistoryQueryExecutor(ctxt)
return txctx, nil
}
func (handler *Handler) getTxContext(txid string) *transactionContext {
handler.Lock()
defer handler.Unlock()
return handler.txCtxs[txid]
}
func (handler *Handler) deleteTxContext(txid string) {
handler.Lock()
defer handler.Unlock()
if handler.txCtxs != nil {
delete(handler.txCtxs, txid)
}
}
func (handler *Handler) putQueryIterator(txContext *transactionContext, txid string,
queryIterator commonledger.ResultsIterator) {
handler.Lock()
defer handler.Unlock()
txContext.queryIteratorMap[txid] = queryIterator
}
func (handler *Handler) getQueryIterator(txContext *transactionContext, txid string) commonledger.ResultsIterator {
handler.Lock()
defer handler.Unlock()
return txContext.queryIteratorMap[txid]
}
func (handler *Handler) deleteQueryIterator(txContext *transactionContext, txid string) {
handler.Lock()
defer handler.Unlock()
delete(txContext.queryIteratorMap, txid)
}
// Check if the transactor is allow to call this chaincode on this channel
func (handler *Handler) checkACL(signedProp *pb.SignedProposal, proposal *pb.Proposal, calledCC *ccParts) *pb.ChaincodeMessage {
// TODO: Decide what to pass in to verify that this transactor can access this
// channel (chID) and chaincode (ccID). Very likely we need the signedProposal
// which contains the sig and creator cert
// If error, return ChaincodeMessage with type ChaincodeMessage_ERROR
return nil
}
//THIS CAN BE REMOVED ONCE WE FULL SUPPORT (Invoke) CONFIDENTIALITY WITH CC-CALLING-CC
//Only invocation are allowed
func (handler *Handler) canCallChaincode(txid string, isQuery bool) *pb.ChaincodeMessage {
var errMsg string
txctx := handler.getTxContext(txid)
if txctx == nil {
errMsg = fmt.Sprintf("[%s]Error no context while checking for confidentiality. Sending %s", shorttxid(txid), pb.ChaincodeMessage_ERROR)
}
if errMsg != "" {
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: []byte(errMsg), Txid: txid}
}
//not CONFIDENTIAL transaction, OK to call CC
return nil
}
func (handler *Handler) deregister() error {
if handler.registered {
handler.chaincodeSupport.deregisterHandler(handler)
}
return nil
}
func (handler *Handler) triggerNextState(msg *pb.ChaincodeMessage, send bool) {
//this will send Async
handler.nextState <- &nextStateInfo{msg: msg, sendToCC: send, sendSync: false}
}
func (handler *Handler) triggerNextStateSync(msg *pb.ChaincodeMessage) {
//this will send sync
handler.nextState <- &nextStateInfo{msg: msg, sendToCC: true, sendSync: true}
}
func (handler *Handler) waitForKeepaliveTimer() <-chan time.Time {
if handler.chaincodeSupport.keepalive > 0 {
c := time.After(handler.chaincodeSupport.keepalive)
return c
}
//no one will signal this channel, listner blocks forever
c := make(chan time.Time, 1)
return c
}
func (handler *Handler) processStream() error {
defer handler.deregister()
msgAvail := make(chan *pb.ChaincodeMessage)
var nsInfo *nextStateInfo
var in *pb.ChaincodeMessage
var err error
//recv is used to spin Recv routine after previous received msg
//has been processed
recv := true
//catch send errors and bail now that sends aren't synchronous
errc := make(chan error, 1)
for {
in = nil
err = nil
nsInfo = nil
if recv {
recv = false
go func() {
var in2 *pb.ChaincodeMessage
in2, err = handler.ChatStream.Recv()
msgAvail <- in2
}()
}
select {
case sendErr := <-errc:
if sendErr != nil {
return sendErr
}
//send was successful, just continue
continue
case in = <-msgAvail:
// Defer the deregistering of the this handler.
if err == io.EOF {
chaincodeLogger.Debugf("Received EOF, ending chaincode support stream, %s", err)
return err
} else if err != nil {
chaincodeLogger.Errorf("Error handling chaincode support stream: %s", err)
return err
} else if in == nil {
err = fmt.Errorf("Received nil message, ending chaincode support stream")
chaincodeLogger.Debug("Received nil message, ending chaincode support stream")
return err
}
chaincodeLogger.Debugf("[%s]Received message %s from shim", shorttxid(in.Txid), in.Type.String())
if in.Type.String() == pb.ChaincodeMessage_ERROR.String() {
chaincodeLogger.Errorf("Got error: %s", string(in.Payload))
}
// we can spin off another Recv again
recv = true
if in.Type == pb.ChaincodeMessage_KEEPALIVE {
chaincodeLogger.Debug("Received KEEPALIVE Response")
// Received a keep alive message, we don't do anything with it for now
// and it does not touch the state machine
continue
}
case nsInfo = <-handler.nextState:
in = nsInfo.msg
if in == nil {
err = fmt.Errorf("Next state nil message, ending chaincode support stream")
chaincodeLogger.Debug("Next state nil message, ending chaincode support stream")
return err
}
chaincodeLogger.Debugf("[%s]Move state message %s", shorttxid(in.Txid), in.Type.String())
case <-handler.waitForKeepaliveTimer():
if handler.chaincodeSupport.keepalive <= 0 {
chaincodeLogger.Errorf("Invalid select: keepalive not on (keepalive=%d)", handler.chaincodeSupport.keepalive)
continue
}
//if no error message from serialSend, KEEPALIVE happy, and don't care about error
//(maybe it'll work later)
handler.serialSendAsync(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}, nil)
continue
}
err = handler.HandleMessage(in)
if err != nil {
chaincodeLogger.Errorf("[%s]Error handling message, ending stream: %s", shorttxid(in.Txid), err)
return fmt.Errorf("Error handling message, ending stream: %s", err)
}
if nsInfo != nil && nsInfo.sendToCC {
chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String())
//ready messages are sent sync
if nsInfo.sendSync {
if in.Type.String() != pb.ChaincodeMessage_READY.String() {
panic(fmt.Sprintf("[%s]Sync send can only be for READY state %s\n", shorttxid(in.Txid), in.Type.String()))
}
if err = handler.serialSend(in); err != nil {
return fmt.Errorf("[%s]Error sending ready message, ending stream: %s", shorttxid(in.Txid), err)
}
} else {
//if error bail in select
handler.serialSendAsync(in, errc)
}
}
}
}
// HandleChaincodeStream Main loop for handling the associated Chaincode stream
func HandleChaincodeStream(chaincodeSupport *ChaincodeSupport, ctxt context.Context, stream ccintf.ChaincodeStream) error {
deadline, ok := ctxt.Deadline()
chaincodeLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)
handler := newChaincodeSupportHandler(chaincodeSupport, stream)
return handler.processStream()
}
func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream) *Handler {
v := &Handler{
ChatStream: peerChatStream,
}
v.chaincodeSupport = chaincodeSupport
//we want this to block
v.nextState = make(chan *nextStateInfo)
v.FSM = fsm.NewFSM(
createdstate,
fsm.Events{
//Send REGISTERED, then, if deploy { trigger INIT(via INIT) } else { trigger READY(via COMPLETED) }
{Name: pb.ChaincodeMessage_REGISTER.String(), Src: []string{createdstate}, Dst: establishedstate},
{Name: pb.ChaincodeMessage_READY.String(), Src: []string{establishedstate}, Dst: readystate},
{Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_QUERY_RESULT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_GET_HISTORY_FOR_KEY.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_QUERY_STATE_NEXT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_INIT.String(), Src: []string{readystate}, Dst: readystate},
{Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: readystate},
},
fsm.Callbacks{
"before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) },
"before_" + pb.ChaincodeMessage_COMPLETED.String(): func(e *fsm.Event) { v.beforeCompletedEvent(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE.String(): func(e *fsm.Event) { v.afterGetState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_STATE_BY_RANGE.String(): func(e *fsm.Event) { v.afterGetStateByRange(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_QUERY_RESULT.String(): func(e *fsm.Event) { v.afterGetQueryResult(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_GET_HISTORY_FOR_KEY.String(): func(e *fsm.Event) { v.afterGetHistoryForKey(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterQueryStateNext(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterQueryStateClose(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
"enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) },
"enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) },
"enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) },
},
)
return v
}
func (handler *Handler) createTXIDEntry(txid string) bool {
if handler.txidMap == nil {
return false
}
handler.Lock()
defer handler.Unlock()
if handler.txidMap[txid] {
return false
}
handler.txidMap[txid] = true
return handler.txidMap[txid]
}
func (handler *Handler) deleteTXIDEntry(txid string) {
handler.Lock()
defer handler.Unlock()
if handler.txidMap != nil {
delete(handler.txidMap, txid)
} else {
chaincodeLogger.Warningf("TXID %s not found!", txid)
}
}
func (handler *Handler) notifyDuringStartup(val bool) {
//if USER_RUNS_CC readyNotify will be nil
if handler.readyNotify != nil {
chaincodeLogger.Debug("Notifying during startup")
handler.readyNotify <- val
} else {
chaincodeLogger.Debug("nothing to notify (dev mode ?)")
}
}
// beforeRegisterEvent is invoked when chaincode tries to register.
func (handler *Handler) beforeRegisterEvent(e *fsm.Event, state string) {
chaincodeLogger.Debugf("Received %s in state %s", e.Event, state)
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeID := &pb.ChaincodeID{}
err := proto.Unmarshal(msg.Payload, chaincodeID)
if err != nil {
e.Cancel(fmt.Errorf("Error in received %s, could NOT unmarshal registration info: %s", pb.ChaincodeMessage_REGISTER, err))
return
}
// Now register with the chaincodeSupport
handler.ChaincodeID = chaincodeID
err = handler.chaincodeSupport.registerHandler(handler)
if err != nil {
e.Cancel(err)
handler.notifyDuringStartup(false)
return
}
//get the component parts so we can use the root chaincode
//name in keys
handler.decomposeRegisteredName(handler.ChaincodeID)
chaincodeLogger.Debugf("Got %s for chaincodeID = %s, sending back %s", e.Event, chaincodeID, pb.ChaincodeMessage_REGISTERED)
if err := handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil {
e.Cancel(fmt.Errorf("Error sending %s: %s", pb.ChaincodeMessage_REGISTERED, err))
handler.notifyDuringStartup(false)
return
}
}
func (handler *Handler) notify(msg *pb.ChaincodeMessage) {
handler.Lock()
defer handler.Unlock()
tctx := handler.txCtxs[msg.Txid]
if tctx == nil {
chaincodeLogger.Debugf("notifier Txid:%s does not exist", msg.Txid)
} else {
chaincodeLogger.Debugf("notifying Txid:%s", msg.Txid)
tctx.responseNotifier <- msg
// clean up queryIteratorMap
for _, v := range tctx.queryIteratorMap {
v.Close()
}
}
}
// beforeCompletedEvent is invoked when chaincode has completed execution of init, invoke.
func (handler *Handler) beforeCompletedEvent(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
// Notify on channel once into READY state
chaincodeLogger.Debugf("[%s]beforeCompleted - not in ready state will notify when in readystate", shorttxid(msg.Txid))
return
}
// afterGetState handles a GET_STATE request from the chaincode.
func (handler *Handler) afterGetState(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("[%s]Received %s, invoking get state from ledger", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_STATE)
// Query ledger for state
handler.handleGetState(msg)
}
// is this a txid for which there is a valid txsim
func (handler *Handler) isValidTxSim(txid string, fmtStr string, args ...interface{}) (*transactionContext, *pb.ChaincodeMessage) {
txContext := handler.getTxContext(txid)
if txContext == nil || txContext.txsimulator == nil {
// Send error msg back to chaincode. No ledger context
errStr := fmt.Sprintf(fmtStr, args...)
chaincodeLogger.Errorf(errStr)
return nil, &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: []byte(errStr), Txid: txid}
}
return txContext, nil
}
// Handles query to ledger to get state
func (handler *Handler) handleGetState(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterGetState function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}
var serialSendMsg *pb.ChaincodeMessage
var txContext *transactionContext
txContext, serialSendMsg = handler.isValidTxSim(msg.Txid,
"[%s]No ledger context for GetState. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
defer func() {
handler.deleteTXIDEntry(msg.Txid)
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s]handleGetState serial send %s",
shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
}
handler.serialSendAsync(serialSendMsg, nil)
}()
if txContext == nil {
return
}
key := string(msg.Payload)
chaincodeID := handler.getCCRootName()
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] getting state for chaincode %s, key %s, channel %s",
shorttxid(msg.Txid), chaincodeID, key, txContext.chainID)
}
var res []byte
var err error
res, err = txContext.txsimulator.GetState(chaincodeID, key)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("[%s]Failed to get chaincode state(%s). Sending %s",
shorttxid(msg.Txid), err, pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
} else if res == nil {
//The state object being requested does not exist
chaincodeLogger.Debugf("[%s]No state associated with key: %s. Sending %s with an empty payload",
shorttxid(msg.Txid), key, pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid}
} else {
// Send response msg back to chaincode. GetState will not trigger event
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s]Got state. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RESPONSE)
}
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid}
}
}()
}
// afterGetStateByRange handles a GET_STATE_BY_RANGE request from the chaincode.
func (handler *Handler) afterGetStateByRange(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_STATE_BY_RANGE)
// Query ledger for state
handler.handleGetStateByRange(msg)
chaincodeLogger.Debug("Exiting GET_STATE")
}
// Handles query to ledger to rage query state
func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterGetStateByRange function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}
var serialSendMsg *pb.ChaincodeMessage
defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetStateByRange serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()
getStateByRange := &pb.GetStateByRange{}
unmarshalErr := proto.Unmarshal(msg.Payload, getStateByRange)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall range query request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
iterID := util.GenerateUUID()
var txContext *transactionContext
txContext, serialSendMsg = handler.isValidTxSim(msg.Txid, "[%s]No ledger context for GetStateByRange. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
if txContext == nil {
return
}
chaincodeID := handler.getCCRootName()
rangeIter, err := txContext.txsimulator.GetStateRangeScanIterator(chaincodeID, getStateByRange.StartKey, getStateByRange.EndKey)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger scan iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
handler.putQueryIterator(txContext, iterID, rangeIter)
var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
kv := qresult.(*ledger.KV)
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
if qresult != nil {
rangeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
}
//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
rangeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
}()
}
// afterQueryStateNext handles a QUERY_STATE_NEXT request from the chaincode.
func (handler *Handler) afterQueryStateNext(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_STATE_BY_RANGE)
// Query ledger for state
handler.handleQueryStateNext(msg)
chaincodeLogger.Debug("Exiting QUERY_STATE_NEXT")
}
// Handles query to ledger for query state next
func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterGetStateByRange function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Debug("Another state request pending for this Txid. Cannot process.")
return
}
var serialSendMsg *pb.ChaincodeMessage
defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetStateByRange serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()
queryStateNext := &pb.QueryStateNext{}
unmarshalErr := proto.Unmarshal(msg.Payload, queryStateNext)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall state next query request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
txContext := handler.getTxContext(msg.Txid)
queryIter := handler.getQueryIterator(txContext, queryStateNext.Id)
if queryIter == nil {
payload := []byte("query iterator not found")
chaincodeLogger.Errorf("query iterator not found. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
var err error
for ; i < queryLimit; i++ {
qresult, err = queryIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult != nil {
break
}
kv := qresult.(*ledger.KV)
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
if qresult != nil {
queryIter.Close()
handler.deleteQueryIterator(txContext, queryStateNext.Id)
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
}
//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: queryStateNext.Id}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: queryStateNext.Id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
queryIter.Close()
handler.deleteQueryIterator(txContext, queryStateNext.Id)
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
}()
}
// afterGetStateByRange handles a QUERY_STATE_CLOSE request from the chaincode.
func (handler *Handler) afterQueryStateClose(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_STATE_BY_RANGE)
// Query ledger for state
handler.handleQueryStateClose(msg)
chaincodeLogger.Debug("Exiting QUERY_STATE_CLOSE")
}
// Handles the closing of a state iterator
func (handler *Handler) handleQueryStateClose(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterGetStateByRange function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}
var serialSendMsg *pb.ChaincodeMessage
defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetStateByRange serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()
queryStateClose := &pb.QueryStateClose{}
unmarshalErr := proto.Unmarshal(msg.Payload, queryStateClose)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall state query close request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
txContext := handler.getTxContext(msg.Txid)
iter := handler.getQueryIterator(txContext, queryStateClose.Id)
if iter != nil {
iter.Close()
handler.deleteQueryIterator(txContext, queryStateClose.Id)
}
payload := &pb.QueryStateResponse{HasMore: false, Id: queryStateClose.Id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
chaincodeLogger.Debugf("Closed. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
}()
}
// afterGetQueryResult handles a GET_QUERY_RESULT request from the chaincode.
func (handler *Handler) afterGetQueryResult(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_QUERY_RESULT)
// Query ledger for state
handler.handleGetQueryResult(msg)
chaincodeLogger.Debug("Exiting GET_QUERY_RESULT")
}
// Handles query to ledger to execute query state
func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterQueryState function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}
var serialSendMsg *pb.ChaincodeMessage
defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetQueryResult serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()
getQueryResult := &pb.GetQueryResult{}
unmarshalErr := proto.Unmarshal(msg.Payload, getQueryResult)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall query request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
iterID := util.GenerateUUID()
var txContext *transactionContext
txContext, serialSendMsg = handler.isValidTxSim(msg.Txid, "[%s]No ledger context for GetQueryResult. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
if txContext == nil {
return
}
chaincodeID := handler.getCCRootName()
executeIter, err := txContext.txsimulator.ExecuteQuery(chaincodeID, getQueryResult.Query)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger query iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
handler.putQueryIterator(txContext, iterID, executeIter)
var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = executeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.QueryRecord)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.Key, Value: queryRecord.Record}
keysAndValues = append(keysAndValues, &keyAndValue)
}
if qresult != nil {
executeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
}
var payloadBytes []byte
//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
executeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
}()
}
// afterGetHistoryForKey handles a GET_HISTORY_FOR_KEY request from the chaincode.
func (handler *Handler) afterGetHistoryForKey(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_GET_HISTORY_FOR_KEY)
// Query ledger history db
handler.handleGetHistoryForKey(msg)
chaincodeLogger.Debug("Exiting GET_HISTORY_FOR_KEY")
}
// Handles query to ledger history db
func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {
// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
// is completed before the next one is triggered. The previous state transition is deemed complete only when
// the afterQueryState function is exited. Interesting bug fix!!
go func() {
// Check if this is the unique state request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
return
}
var serialSendMsg *pb.ChaincodeMessage
defer func() {
handler.deleteTXIDEntry(msg.Txid)
chaincodeLogger.Debugf("[%s]handleGetHistoryForKey serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
handler.serialSendAsync(serialSendMsg, nil)
}()
getHistoryForKey := &pb.GetHistoryForKey{}
unmarshalErr := proto.Unmarshal(msg.Payload, getHistoryForKey)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Errorf("Failed to unmarshall query request. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
iterID := util.GenerateUUID()
var txContext *transactionContext
txContext, serialSendMsg = handler.isValidTxSim(msg.Txid, "[%s]No ledger context for GetHistoryForKey. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
if txContext == nil {
return
}
chaincodeID := handler.getCCRootName()
historyIter, err := txContext.historyQueryExecutor.GetHistoryForKey(chaincodeID, getHistoryForKey.Key)
if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get ledger history iterator. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
handler.putQueryIterator(txContext, iterID, historyIter)
// TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID)
// and value (value). But we'll need to use another structure if we add other fields like timestamp.
var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = historyIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.KeyModification)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.TxID, Value: queryRecord.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
if qresult != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
}
var payloadBytes []byte
//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
}()
}
// afterPutState handles a PUT_STATE request from the chaincode.
func (handler *Handler) afterPutState(e *fsm.Event, state string) {
_, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s in state %s, invoking put state to ledger", pb.ChaincodeMessage_PUT_STATE, state)
// Put state into ledger handled within enterBusyState
}
// afterDelState handles a DEL_STATE request from the chaincode.
func (handler *Handler) afterDelState(e *fsm.Event, state string) {
_, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s, invoking delete state from ledger", pb.ChaincodeMessage_DEL_STATE)
// Delete state from ledger handled within enterBusyState
}
// afterInvokeChaincode handles an INVOKE_CHAINCODE request from the chaincode.
func (handler *Handler) afterInvokeChaincode(e *fsm.Event, state string) {
_, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("Received %s in state %s, invoking another chaincode", pb.ChaincodeMessage_INVOKE_CHAINCODE, state)
// Invoke another chaincode handled within enterBusyState
}
// Handles request to ledger to put state
func (handler *Handler) enterBusyState(e *fsm.Event, state string) {
go func() {
msg, _ := e.Args[0].(*pb.ChaincodeMessage)
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s]state is %s", shorttxid(msg.Txid), state)
}
// Check if this is the unique request from this chaincode txid
uniqueReq := handler.createTXIDEntry(msg.Txid)
if !uniqueReq {
// Drop this request
chaincodeLogger.Debug("Another request pending for this Txid. Cannot process.")
return
}
var triggerNextStateMsg *pb.ChaincodeMessage
var txContext *transactionContext
txContext, triggerNextStateMsg = handler.isValidTxSim(msg.Txid, "[%s]No ledger context for %s. Sending %s",
shorttxid(msg.Txid), msg.Type.String(), pb.ChaincodeMessage_ERROR)
defer func() {
handler.deleteTXIDEntry(msg.Txid)
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s]enterBusyState trigger event %s",
shorttxid(triggerNextStateMsg.Txid), triggerNextStateMsg.Type)
}
handler.triggerNextState(triggerNextStateMsg, true)
}()
if txContext == nil {
return
}
chaincodeID := handler.getCCRootName()
var err error
var res []byte
if msg.Type.String() == pb.ChaincodeMessage_PUT_STATE.String() {
putStateInfo := &pb.PutStateInfo{}
unmarshalErr := proto.Unmarshal(msg.Payload, putStateInfo)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Debugf("[%s]Unable to decipher payload. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
err = txContext.txsimulator.SetState(chaincodeID, putStateInfo.Key, putStateInfo.Value)
} else if msg.Type.String() == pb.ChaincodeMessage_DEL_STATE.String() {
// Invoke ledger to delete state
key := string(msg.Payload)
err = txContext.txsimulator.DeleteState(chaincodeID, key)
} else if msg.Type.String() == pb.ChaincodeMessage_INVOKE_CHAINCODE.String() {
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] C-call-C", shorttxid(msg.Txid))
}
chaincodeSpec := &pb.ChaincodeSpec{}
unmarshalErr := proto.Unmarshal(msg.Payload, chaincodeSpec)
if unmarshalErr != nil {
payload := []byte(unmarshalErr.Error())
chaincodeLogger.Debugf("[%s]Unable to decipher payload. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
// Get the chaincodeID to invoke. The chaincodeID to be called may
// contain composite info like "chaincode-name:version/channel-name"
// We are not using version now but default to the latest
calledCcParts := chaincodeIDParts(chaincodeSpec.ChaincodeId.Name)
chaincodeSpec.ChaincodeId.Name = calledCcParts.name
if calledCcParts.suffix == "" {
// use caller's channel as the called chaincode is in the same channel
calledCcParts.suffix = txContext.chainID
}
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] C-call-C %s on channel %s",
shorttxid(msg.Txid), calledCcParts.name, calledCcParts.suffix)
}
triggerNextStateMsg = handler.checkACL(txContext.signedProp, txContext.proposal, calledCcParts)
if triggerNextStateMsg != nil {
return
}
// Set up a new context for the called chaincode if on a different channel
// We grab the called channel's ledger simulator to hold the new state
ctxt := context.Background()
txsim := txContext.txsimulator
historyQueryExecutor := txContext.historyQueryExecutor
if calledCcParts.suffix != txContext.chainID {
lgr := peer.GetLedger(calledCcParts.suffix)
if lgr == nil {
payload := "Failed to find ledger for called channel " + calledCcParts.suffix
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR,
Payload: []byte(payload), Txid: msg.Txid}
return
}
txsim2, err2 := lgr.NewTxSimulator()
if err2 != nil {
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR,
Payload: []byte(err2.Error()), Txid: msg.Txid}
return
}
defer txsim2.Done()
txsim = txsim2
}
ctxt = context.WithValue(ctxt, TXSimulatorKey, txsim)
ctxt = context.WithValue(ctxt, HistoryQueryExecutorKey, historyQueryExecutor)
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] calling lccc to get chaincode data for %s on channel %s",
shorttxid(msg.Txid), calledCcParts.name, calledCcParts.suffix)
}
//Call LCCC to get the called chaincode artifacts
//is the chaincode a system chaincode ?
isscc := sysccprovider.GetSystemChaincodeProvider().IsSysCC(calledCcParts.name)
var cd *ccprovider.ChaincodeData
if !isscc {
//if its a user chaincode, get the details from LCCC
//Call LCCC to get the called chaincode artifacts
cd, err = GetChaincodeDataFromLCCC(ctxt, msg.Txid, txContext.signedProp, txContext.proposal, calledCcParts.suffix, calledCcParts.name)
if err != nil {
payload := []byte(err.Error())
chaincodeLogger.Debugf("[%s]Failed to get chaincoed data (%s) for invoked chaincode. Sending %s",
shorttxid(msg.Txid), err, pb.ChaincodeMessage_ERROR)
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
} else {
//this is a system cc, just call it directly
cd = &ccprovider.ChaincodeData{Name: calledCcParts.name, Version: util.GetSysCCVersion()}
}
cccid := ccprovider.NewCCContext(calledCcParts.suffix, calledCcParts.name, cd.Version, msg.Txid, false, txContext.signedProp, txContext.proposal)
// Launch the new chaincode if not already running
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s] launching chaincode %s on channel %s",
shorttxid(msg.Txid), calledCcParts.name, calledCcParts.suffix)
}
cciSpec := &pb.ChaincodeInvocationSpec{ChaincodeSpec: chaincodeSpec}
_, chaincodeInput, launchErr := handler.chaincodeSupport.Launch(ctxt, cccid, cciSpec)
if launchErr != nil {
payload := []byte(launchErr.Error())
chaincodeLogger.Debugf("[%s]Failed to launch invoked chaincode. Sending %s",
shorttxid(msg.Txid), pb.ChaincodeMessage_ERROR)
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
// TODO: Need to handle timeout correctly
timeout := time.Duration(30000) * time.Millisecond
ccMsg, _ := createCCMessage(pb.ChaincodeMessage_TRANSACTION, msg.Txid, chaincodeInput)
// Execute the chaincode... this CANNOT be an init at least for now
response, execErr := handler.chaincodeSupport.Execute(ctxt, cccid, ccMsg, timeout)
//payload is marshalled and send to the calling chaincode's shim which unmarshals and
//sends it to chaincode
res = nil
if execErr != nil {
err = execErr
} else {
res, err = proto.Marshal(response)
}
}
if err != nil {
// Send error msg back to chaincode and trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("[%s]Failed to handle %s. Sending %s", shorttxid(msg.Txid), msg.Type.String(), pb.ChaincodeMessage_ERROR)
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
// Send response msg back to chaincode.
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s]Completed %s. Sending %s", shorttxid(msg.Txid), msg.Type.String(), pb.ChaincodeMessage_RESPONSE)
}
triggerNextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid}
}()
}
func (handler *Handler) enterEstablishedState(e *fsm.Event, state string) {
handler.notifyDuringStartup(true)
}
func (handler *Handler) enterReadyState(e *fsm.Event, state string) {
// Now notify
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("[%s]Entered state %s", shorttxid(msg.Txid), state)
handler.notify(msg)
}
func (handler *Handler) enterEndState(e *fsm.Event, state string) {
defer handler.deregister()
// Now notify
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
chaincodeLogger.Debugf("[%s]Entered state %s", shorttxid(msg.Txid), state)
handler.notify(msg)
e.Cancel(fmt.Errorf("Entered end state"))
}
func (handler *Handler) setChaincodeProposal(signedProp *pb.SignedProposal, prop *pb.Proposal, msg *pb.ChaincodeMessage) error {
chaincodeLogger.Debug("Setting chaincode proposal context...")
if prop != nil {
chaincodeLogger.Debug("Proposal different from nil. Creating chaincode proposal context...")
// Check that also signedProp is different from nil
if signedProp == nil {
return fmt.Errorf("Failed getting proposal context. Signed proposal is nil.")
}
msg.Proposal = prop
}
return nil
}
//move to ready
func (handler *Handler) ready(ctxt context.Context, chainID string, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal) (chan *pb.ChaincodeMessage, error) {
txctx, funcErr := handler.createTxContext(ctxt, chainID, txid, signedProp, prop)
if funcErr != nil {
return nil, funcErr
}
chaincodeLogger.Debug("sending READY")
ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY, Txid: txid}
//if security is disabled the context elements will just be nil
if err := handler.setChaincodeProposal(signedProp, prop, ccMsg); err != nil {
return nil, err
}
//send the ready synchronously as the
//ready message is during launch and needs
//to happen before any init/invokes can sneak in
handler.triggerNextStateSync(ccMsg)
return txctx.responseNotifier, nil
}
// HandleMessage implementation of MessageHandler interface. Peer's handling of Chaincode messages.
func (handler *Handler) HandleMessage(msg *pb.ChaincodeMessage) error {
chaincodeLogger.Debugf("[%s]Fabric side Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, handler.FSM.Current())
if (msg.Type == pb.ChaincodeMessage_COMPLETED || msg.Type == pb.ChaincodeMessage_ERROR) && handler.FSM.Current() == "ready" {
chaincodeLogger.Debugf("[%s]HandleMessage- COMPLETED. Notify", msg.Txid)
handler.notify(msg)
return nil
}
if handler.FSM.Cannot(msg.Type.String()) {
// Other errors
return fmt.Errorf("[%s]Chaincode handler validator FSM cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type.String(), len(msg.Payload), handler.FSM.Current())
}
eventErr := handler.FSM.Event(msg.Type.String(), msg)
filteredErr := filterError(eventErr)
if filteredErr != nil {
chaincodeLogger.Errorf("[%s]Failed to trigger FSM event %s: %s", msg.Txid, msg.Type.String(), filteredErr)
}
return filteredErr
}
// Filter the Errors to allow NoTransitionError and CanceledError to not propagate for cases where embedded Err == nil
func filterError(errFromFSMEvent error) error {
if errFromFSMEvent != nil {
if noTransitionErr, ok := errFromFSMEvent.(*fsm.NoTransitionError); ok {
if noTransitionErr.Err != nil {
// Squash the NoTransitionError
return errFromFSMEvent
}
chaincodeLogger.Debugf("Ignoring NoTransitionError: %s", noTransitionErr)
}
if canceledErr, ok := errFromFSMEvent.(*fsm.CanceledError); ok {
if canceledErr.Err != nil {
// Squash the CanceledError
return canceledErr
}
chaincodeLogger.Debugf("Ignoring CanceledError: %s", canceledErr)
}
}
return nil
}
func (handler *Handler) sendExecuteMessage(ctxt context.Context, chainID string, msg *pb.ChaincodeMessage, signedProp *pb.SignedProposal, prop *pb.Proposal) (chan *pb.ChaincodeMessage, error) {
txctx, err := handler.createTxContext(ctxt, chainID, msg.Txid, signedProp, prop)
if err != nil {
return nil, err
}
if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
chaincodeLogger.Debugf("[%s]Inside sendExecuteMessage. Message %s", shorttxid(msg.Txid), msg.Type.String())
}
//if security is disabled the context elements will just be nil
if err = handler.setChaincodeProposal(signedProp, prop, msg); err != nil {
return nil, err
}
chaincodeLogger.Debugf("[%s]sendExecuteMsg trigger event %s", shorttxid(msg.Txid), msg.Type)
handler.triggerNextState(msg, true)
return txctx.responseNotifier, nil
}
func (handler *Handler) isRunning() bool {
switch handler.FSM.Current() {
case createdstate:
fallthrough
case establishedstate:
fallthrough
default:
return true
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/peter_code_git/fabric.git
git@gitee.com:peter_code_git/fabric.git
peter_code_git
fabric
fabric
v1.0.0-alpha

搜索帮助