1 Star 0 Fork 0

妥協/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
blockfile_mgr.go 24.60 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package fsblkstorage
import (
"bytes"
"fmt"
"math"
"sync"
"sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
putil "github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
var logger = flogging.MustGetLogger("fsblkstorage")
const (
blockfilePrefix = "blockfile_"
)
var (
blkMgrInfoKey = []byte("blkMgrInfo")
)
type blockfileMgr struct {
rootDir string
conf *Conf
db *leveldbhelper.DBHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
currentFileWriter *blockfileWriter
bcInfo atomic.Value
}
/*
Creates a new manager that will manage the files used for block persistence.
This manager manages the file system FS including
-- the directory where the files are stored
-- the individual files where the blocks are stored
-- the checkpoint which tracks the latest file being persisted to
-- the index which tracks what block and transaction is in what file
When a new blockfile manager is started (i.e. only on start-up), it checks
if this start-up is the first time the system is coming up or is this a restart
of the system.
The blockfile manager stores blocks of data into a file system. That file
storage is done by creating sequentially numbered files of a configured size
i.e blockfile_000000, blockfile_000001, etc..
Each transaction in a block is stored with information about the number of
bytes in that transaction
Adding txLoc [fileSuffixNum=0, offset=3, bytesLength=104] for tx [1:0] to index
Adding txLoc [fileSuffixNum=0, offset=107, bytesLength=104] for tx [1:1] to index
Each block is stored with the total encoded length of that block as well as the
tx location offsets.
Remember that these steps are only done once at start-up of the system.
At start up a new manager:
*) Checks if the directory for storing files exists, if not creates the dir
*) Checks if the key value database exists, if not creates one
(will create a db dir)
*) Determines the checkpoint information (cpinfo) used for storage
-- Loads from db if exist, if not instantiate a new cpinfo
-- If cpinfo was loaded from db, compares to FS
-- If cpinfo and file system are not in sync, syncs cpInfo from FS
*) Starts a new file writer
-- truncates file per cpinfo to remove any excess past last block
*) Determines the index information used to find tx and blocks in
the file blkstorage
-- Instantiates a new blockIdxInfo
-- Loads the index from the db if exists
-- syncIndex comparing the last block indexed to what is in the FS
-- If index and file system are not in sync, syncs index from the FS
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
panic(fmt.Sprintf("Error creating block storage root dir [%s]: %s", rootDir, err))
}
// Instantiate the manager, i.e. blockFileMgr structure
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
// It also retrieves the current size of that file and the last block number that was written to that file.
// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
cpInfo, err := mgr.loadCurrentInfo()
if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
}
if cpInfo == nil {
logger.Info(`Getting block information from block storage`)
if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
}
logger.Debugf("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
} else {
logger.Debug(`Synching block information from block storage (if needed)`)
syncCPInfoFromFS(rootDir, cpInfo)
}
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
//Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
}
//Truncate the file to remove excess past last block
err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
if err != nil {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}
// Create a new KeyValue store database handler for the blocks index in the keyvalue database
if mgr.index, err = newBlockIndex(indexConfig, indexStore); err != nil {
panic(fmt.Sprintf("error in block index: %s", err))
}
// Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
// Create a checkpoint condition (event) variable, for the goroutine waiting for
// or announcing the occurrence of an event.
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
// init BlockchainInfo for external API's
bcInfo := &common.BlockchainInfo{
Height: 0,
CurrentBlockHash: nil,
PreviousBlockHash: nil}
if !cpInfo.isChainEmpty {
//If start up is a restart of an existing storage, sync the index from block storage and update BlockchainInfo for external API's
mgr.syncIndex()
lastBlockHeader, err := mgr.retrieveBlockHeaderByNumber(cpInfo.lastBlockNumber)
if err != nil {
panic(fmt.Sprintf("Could not retrieve header of the last block form file: %s", err))
}
lastBlockHash := lastBlockHeader.Hash()
previousBlockHash := lastBlockHeader.PreviousHash
bcInfo = &common.BlockchainInfo{
Height: cpInfo.lastBlockNumber + 1,
CurrentBlockHash: lastBlockHash,
PreviousBlockHash: previousBlockHash}
}
mgr.bcInfo.Store(bcInfo)
return mgr
}
//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written. Also retrieves contains the
// last block number that was written. At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
//Checks if the file suffix of where the last block was written exists
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
}
logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
//Test is !exists because when file number is first used the file does not exist yet
//checks that the file exists and that the size of the file is what is stored in cpinfo
//status of file [/tmp/tests/ledger/blkstorage/fsblkstorage/blocks/blockfile_000000]: exists=[false], size=[0]
if !exists || int(size) == cpInfo.latestFileChunksize {
// check point info is in sync with the file on disk
return
}
//Scan the file system to verify that the checkpoint info stored in db is correct
_, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
}
cpInfo.latestFileChunksize = int(endOffsetLastBlock)
if numBlocks == 0 {
return
}
//Updates the checkpoint info for the actual last block number stored and it's end location
if cpInfo.isChainEmpty {
cpInfo.lastBlockNumber = uint64(numBlocks - 1)
} else {
cpInfo.lastBlockNumber += uint64(numBlocks)
}
cpInfo.isChainEmpty = false
logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}
func deriveBlockfilePath(rootDir string, suffixNum int) string {
return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}
func (mgr *blockfileMgr) close() {
mgr.currentFileWriter.close()
}
func (mgr *blockfileMgr) moveToNextFile() {
cpInfo := &checkpointInfo{
latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
latestFileChunksize: 0,
lastBlockNumber: mgr.cpInfo.lastBlockNumber}
nextFileWriter, err := newBlockfileWriter(
deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err))
}
mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
mgr.currentFileWriter = nextFileWriter
mgr.updateCheckpoint(cpInfo)
}
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
bcInfo := mgr.getBlockchainInfo()
if block.Header.Number != bcInfo.Height {
return errors.Errorf(
"block number should have been %d but was %d",
mgr.getBlockchainInfo().Height, block.Header.Number,
)
}
// Add the previous hash check - Though, not essential but may not be a bad idea to
// verify the field `block.Header.PreviousHash` present in the block.
// This check is a simple bytes comparison and hence does not cause any observable performance penalty
// and may help in detecting a rare scenario if there is any bug in the ordering service.
if !bytes.Equal(block.Header.PreviousHash, bcInfo.CurrentBlockHash) {
return errors.Errorf(
"unexpected Previous block hash. Expected PreviousHash = [%x], PreviousHash referred in the latest block= [%x]",
bcInfo.CurrentBlockHash, block.Header.PreviousHash,
)
}
blockBytes, info, err := serializeBlock(block)
if err != nil {
return errors.WithMessage(err, "error serializing block")
}
blockHash := block.Header.Hash()
//Get the location / offset where each transaction starts in the block and where the block ends
txOffsets := info.txOffsets
currentOffset := mgr.cpInfo.latestFileChunksize
blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
//Determine if we need to start a new file since the size of this block
//exceeds the amount of space left in the current file
if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
//append blockBytesEncodedLen to the file
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err == nil {
//append the actual block bytes to the file
err = mgr.currentFileWriter.append(blockBytes, true)
}
if err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
}
return errors.WithMessage(err, "error appending block to file")
}
//Update the checkpoint info with the results of adding the new block
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
isChainEmpty: false,
lastBlockNumber: block.Header.Number}
//save the checkpoint information in the database
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return errors.WithMessage(err, "error saving current file info to db")
}
//Index block file location pointer updated with file suffex and offset for the new block
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
if err = mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {
return err
}
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
func (mgr *blockfileMgr) syncIndex() error {
var lastBlockIndexed uint64
var indexEmpty bool
var err error
//from the database, get the last block that was indexed
if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
if err != errIndexEmpty {
return err
}
indexEmpty = true
}
//initialize index to file number:zero, offset:zero and blockNum:0
startFileNum := 0
startOffset := 0
skipFirstBlock := false
//get the last file that blocks were added to using the checkpoint info
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
startingBlockNum := uint64(0)
//if the index stored in the db has value, update the index information with those values
if !indexEmpty {
if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
logger.Debug("Both the block files and indices are in sync.")
return nil
}
logger.Debugf("Last block indexed [%d], Last block present in block files [%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
var flp *fileLocPointer
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
return err
}
startFileNum = flp.fileSuffixNum
startOffset = flp.locPointer.offset
skipFirstBlock = true
startingBlockNum = lastBlockIndexed + 1
} else {
logger.Debugf("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
}
logger.Infof("Start building index from block [%d] to last block [%d]", startingBlockNum, mgr.cpInfo.lastBlockNumber)
//open a blockstream to the file location that was stored in the index
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
return err
}
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
if skipFirstBlock {
if blockBytes, _, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
}
if blockBytes == nil {
return errors.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present",
lastBlockIndexed)
}
}
//Should be at the last block already, but go ahead and loop looking for next blockBytes.
//If there is another block, add it to the index.
//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
blockIdxInfo := &blockIdxInfo{}
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
}
if blockBytes == nil {
break
}
info, err := extractSerializedBlockInfo(blockBytes)
if err != nil {
return err
}
//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),
//therefore just shift by the difference between blockBytesOffset and blockStartOffset
numBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)
for _, offset := range info.txOffsets {
offset.loc.offset += numBytesToShift
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo.blockHash = info.blockHeader.Hash()
blockIdxInfo.blockNum = info.blockHeader.Number
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
blockIdxInfo.txOffsets = info.txOffsets
blockIdxInfo.metadata = info.metadata
logger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err
}
if blockIdxInfo.blockNum%10000 == 0 {
logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
}
}
logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
return nil
}
func (mgr *blockfileMgr) getBlockchainInfo() *common.BlockchainInfo {
return mgr.bcInfo.Load().(*common.BlockchainInfo)
}
func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
mgr.cpInfoCond.L.Lock()
defer mgr.cpInfoCond.L.Unlock()
mgr.cpInfo = cpInfo
logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
mgr.cpInfoCond.Broadcast()
}
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *common.Block) {
currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &common.BlockchainInfo{
Height: currentBCInfo.Height + 1,
CurrentBlockHash: latestBlockHash,
PreviousBlockHash: latestBlock.Header.PreviousHash}
mgr.bcInfo.Store(newBCInfo)
}
func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*common.Block, error) {
logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash)
loc, err := mgr.index.getBlockLocByHash(blockHash)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum)
// interpret math.MaxUint64 as a request for last block
if blockNum == math.MaxUint64 {
blockNum = mgr.getBlockchainInfo().Height - 1
}
loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlockByTxID(txID string) (*common.Block, error) {
logger.Debugf("retrieveBlockByTxID() - txID = [%s]", txID)
loc, err := mgr.index.getBlockLocByTxID(txID)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
logger.Debugf("retrieveTxValidationCodeByTxID() - txID = [%s]", txID)
return mgr.index.getTxValidationCodeByTxID(txID)
}
func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.BlockHeader, error) {
logger.Debugf("retrieveBlockHeaderByNumber() - blockNum = [%d]", blockNum)
loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
if err != nil {
return nil, err
}
blockBytes, err := mgr.fetchBlockBytes(loc)
if err != nil {
return nil, err
}
info, err := extractSerializedBlockInfo(blockBytes)
if err != nil {
return nil, err
}
return info.blockHeader, nil
}
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
return newBlockItr(mgr, startNum), nil
}
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*common.Envelope, error) {
logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID)
loc, err := mgr.index.getTxLoc(txID)
if err != nil {
return nil, err
}
return mgr.fetchTransactionEnvelope(loc)
}
func (mgr *blockfileMgr) retrieveTransactionByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
logger.Debugf("retrieveTransactionByBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
loc, err := mgr.index.getTXLocByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
return mgr.fetchTransactionEnvelope(loc)
}
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
blockBytes, err := mgr.fetchBlockBytes(lp)
if err != nil {
return nil, err
}
block, err := deserializeBlock(blockBytes)
if err != nil {
return nil, err
}
return block, nil
}
func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
logger.Debugf("Entering fetchTransactionEnvelope() %v\n", lp)
var err error
var txEnvelopeBytes []byte
if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
return nil, err
}
_, n := proto.DecodeVarint(txEnvelopeBytes)
return putil.GetEnvelopeFromBlock(txEnvelopeBytes[n:])
}
func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
if err != nil {
return nil, err
}
defer stream.close()
b, err := stream.nextBlockBytes()
if err != nil {
return nil, err
}
return b, nil
}
func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
reader, err := newBlockfileReader(filePath)
if err != nil {
return nil, err
}
defer reader.close()
b, err := reader.read(lp.offset, lp.bytesLength)
if err != nil {
return nil, err
}
return b, nil
}
//Get the current checkpoint information that is stored in the database
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
var b []byte
var err error
if b, err = mgr.db.Get(blkMgrInfoKey); b == nil || err != nil {
return nil, err
}
i := &checkpointInfo{}
if err = i.unmarshal(b); err != nil {
return nil, err
}
logger.Debugf("loaded checkpointInfo:%s", i)
return i, nil
}
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
b, err := i.marshal()
if err != nil {
return err
}
if err = mgr.db.Put(blkMgrInfoKey, b, sync); err != nil {
return err
}
return nil
}
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
//scan the passed file number suffix starting from the passed offset to find the last completed block
numBlocks := 0
var lastBlockBytes []byte
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
if errOpen != nil {
return nil, 0, 0, errOpen
}
defer blockStream.close()
var errRead error
var blockBytes []byte
for {
blockBytes, errRead = blockStream.nextBlockBytes()
if blockBytes == nil || errRead != nil {
break
}
lastBlockBytes = blockBytes
numBlocks++
}
if errRead == ErrUnexpectedEndOfBlockfile {
logger.Debugf(`Error:%s
The error may happen if a crash has happened during block appending.
Resetting error to nil and returning current offset as a last complete block's end offset`, errRead)
errRead = nil
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
}
// checkpointInfo
type checkpointInfo struct {
latestFileChunkSuffixNum int
latestFileChunksize int
isChainEmpty bool
lastBlockNumber uint64
}
func (i *checkpointInfo) marshal() ([]byte, error) {
buffer := proto.NewBuffer([]byte{})
var err error
if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil {
return nil, err
}
if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil {
return nil, err
}
if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil {
return nil, err
}
var chainEmptyMarker uint64
if i.isChainEmpty {
chainEmptyMarker = 1
}
if err = buffer.EncodeVarint(chainEmptyMarker); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func (i *checkpointInfo) unmarshal(b []byte) error {
buffer := proto.NewBuffer(b)
var val uint64
var chainEmptyMarker uint64
var err error
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.latestFileChunkSuffixNum = int(val)
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.latestFileChunksize = int(val)
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.lastBlockNumber = val
if chainEmptyMarker, err = buffer.DecodeVarint(); err != nil {
return err
}
i.isChainEmpty = chainEmptyMarker == 1
return nil
}
func (i *checkpointInfo) String() string {
return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], isChainEmpty=[%t], lastBlockNumber=[%d]",
i.latestFileChunkSuffixNum, i.latestFileChunksize, i.isChainEmpty, i.lastBlockNumber)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liurenhao/fabric.git
git@gitee.com:liurenhao/fabric.git
liurenhao
fabric
fabric
v1.4.0-rc1

搜索帮助

0d507c66 1850385 C8b1a773 1850385