1 Star 0 Fork 0

13683679291/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
blockfile_mgr.go 24.54 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"fmt"
"math"
"sync"
"sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
putil "github.com/hyperledger/fabric/protos/utils"
)
var logger = flogging.MustGetLogger("fsblkstorage")
const (
blockfilePrefix = "blockfile_"
)
var (
blkMgrInfoKey = []byte("blkMgrInfo")
)
type blockfileMgr struct {
rootDir string
conf *Conf
db *leveldbhelper.DBHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
currentFileWriter *blockfileWriter
bcInfo atomic.Value
}
/*
Creates a new manager that will manage the files used for block persistence.
This manager manages the file system FS including
-- the directory where the files are stored
-- the individual files where the blocks are stored
-- the checkpoint which tracks the latest file being persisted to
-- the index which tracks what block and transaction is in what file
When a new blockfile manager is started (i.e. only on start-up), it checks
if this start-up is the first time the system is coming up or is this a restart
of the system.
The blockfile manager stores blocks of data into a file system. That file
storage is done by creating sequentially numbered files of a configured size
i.e blockfile_000000, blockfile_000001, etc..
Each transcation in a block is stored with information about the number of
bytes in that transaction
Adding txLoc [fileSuffixNum=0, offset=3, bytesLength=104] for tx [1:0] to index
Adding txLoc [fileSuffixNum=0, offset=107, bytesLength=104] for tx [1:1] to index
Each block is stored with the total encoded length of that block as well as the
tx location offsets.
Remember that these steps are only done once at start-up of the system.
At start up a new manager:
*) Checks if the directory for storing files exists, if not creates the dir
*) Checks if the key value database exists, if not creates one
(will create a db dir)
*) Determines the checkpoint information (cpinfo) used for storage
-- Loads from db if exist, if not instantiate a new cpinfo
-- If cpinfo was loaded from db, compares to FS
-- If cpinfo and file system are not in sync, syncs cpInfo from FS
*) Starts a new file writer
-- truncates file per cpinfo to remove any excess past last block
*) Determines the index information used to find tx and blocks in
the file blkstorage
-- Instantiates a new blockIdxInfo
-- Loads the index from the db if exists
-- syncIndex comparing the last block indexed to what is in the FS
-- If index and file system are not in sync, syncs index from the FS
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
panic(fmt.Sprintf("Error: %s", err))
}
// Instantiate the manager, i.e. blockFileMgr structure
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
// It also retrieves the current size of that file and the last block number that was written to that file.
// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
cpInfo, err := mgr.loadCurrentInfo()
if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
}
if cpInfo == nil {
logger.Info(`No info about blocks file found in the db.
This could happen if this is the first time the ledger is constructed or the index is dropped.
Scanning blocks dir for the latest info`)
if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
}
logger.Infof("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
} else {
logger.Info(`Synching the info about block files`)
syncCPInfoFromFS(rootDir, cpInfo)
}
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
//Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
}
//Truncate the file to remove excess past last block
err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
if err != nil {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}
// Create a new KeyValue store database handler for the blocks index in the keyvalue database
mgr.index = newBlockIndex(indexConfig, indexStore)
// Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
// Create a checkpoint condition (event) variable, for the goroutine waiting for
// or announcing the occurrence of an event.
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
// Verify that the index stored in db is accurate with what is actually stored in block file system
// If not the same, sync the index and the file system
mgr.syncIndex()
// init BlockchainInfo for external API's
bcInfo := &common.BlockchainInfo{
Height: 0,
CurrentBlockHash: nil,
PreviousBlockHash: nil}
//If start up is a restart of an existing storage, update BlockchainInfo for external API's
if !cpInfo.isChainEmpty {
lastBlockHeader, err := mgr.retrieveBlockHeaderByNumber(cpInfo.lastBlockNumber)
if err != nil {
panic(fmt.Sprintf("Could not retrieve header of the last block form file: %s", err))
}
lastBlockHash := lastBlockHeader.Hash()
previousBlockHash := lastBlockHeader.PreviousHash
bcInfo = &common.BlockchainInfo{
Height: cpInfo.lastBlockNumber + 1,
CurrentBlockHash: lastBlockHash,
PreviousBlockHash: previousBlockHash}
}
mgr.bcInfo.Store(bcInfo)
//return the new manager (blockfileMgr)
return mgr
}
//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written. Also retrieves contains the
// last block number that was written. At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
//Checks if the file suffix of where the last block was written exists
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
}
logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
//Test is !exists because when file number is first used the file does not exist yet
//checks that the file exists and that the size of the file is what is stored in cpinfo
//status of file [/tmp/tests/ledger/blkstorage/fsblkstorage/blocks/blockfile_000000]: exists=[false], size=[0]
if !exists || int(size) == cpInfo.latestFileChunksize {
// check point info is in sync with the file on disk
return
}
//Scan the file system to verify that the checkpoint info stored in db is correct
_, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
}
cpInfo.latestFileChunksize = int(endOffsetLastBlock)
if numBlocks == 0 {
return
}
//Updates the checkpoint info for the actual last block number stored and it's end location
if cpInfo.isChainEmpty {
cpInfo.lastBlockNumber = uint64(numBlocks - 1)
} else {
cpInfo.lastBlockNumber += uint64(numBlocks)
}
cpInfo.isChainEmpty = false
logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}
func deriveBlockfilePath(rootDir string, suffixNum int) string {
return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}
func (mgr *blockfileMgr) close() {
mgr.currentFileWriter.close()
}
func (mgr *blockfileMgr) moveToNextFile() {
cpInfo := &checkpointInfo{
latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
latestFileChunksize: 0,
lastBlockNumber: mgr.cpInfo.lastBlockNumber}
nextFileWriter, err := newBlockfileWriter(
deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err))
}
mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
mgr.currentFileWriter = nextFileWriter
mgr.updateCheckpoint(cpInfo)
}
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
if block.Header.Number != mgr.getBlockchainInfo().Height {
return fmt.Errorf("Block number should have been %d but was %d", mgr.getBlockchainInfo().Height, block.Header.Number)
}
blockBytes, info, err := serializeBlock(block)
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
blockHash := block.Header.Hash()
//Get the location / offset where each transaction starts in the block and where the block ends
txOffsets := info.txOffsets
currentOffset := mgr.cpInfo.latestFileChunksize
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
//Determine if we need to start a new file since the size of this block
//exceeds the amount of space left in the current file
if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
//append blockBytesEncodedLen to the file
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err == nil {
//append the actual block bytes to the file
err = mgr.currentFileWriter.append(blockBytes, true)
}
if err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
}
//Update the checkpoint info with the results of adding the new block
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
isChainEmpty: false,
lastBlockNumber: block.Header.Number}
//save the checkpoint information in the database
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return fmt.Errorf("Error while saving current file info to db: %s", err)
}
//Index block file location pointer updated with file suffex and offset for the new block
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata})
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
func (mgr *blockfileMgr) syncIndex() error {
var lastBlockIndexed uint64
var indexEmpty bool
var err error
//from the database, get the last block that was indexed
if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
if err != errIndexEmpty {
return err
}
indexEmpty = true
}
//initialize index to file number:zero, offset:zero and blockNum:0
startFileNum := 0
startOffset := 0
skipFirstBlock := false
//get the last file that blocks were added to using the checkpoint info
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
startingBlockNum := uint64(0)
//if the index stored in the db has value, update the index information with those values
if !indexEmpty {
if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
logger.Infof("Both the block files and indices are in sync.")
return nil
}
logger.Infof("Last block indexed [%d], Last block present in block files=[%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
var flp *fileLocPointer
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
return err
}
startFileNum = flp.fileSuffixNum
startOffset = flp.locPointer.offset
skipFirstBlock = true
startingBlockNum = lastBlockIndexed + 1
} else {
logger.Infof("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
}
logger.Infof("Start building index from block [%d]", startingBlockNum)
//open a blockstream to the file location that was stored in the index
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
return err
}
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
if skipFirstBlock {
if blockBytes, _, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
}
if blockBytes == nil {
return fmt.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present",
lastBlockIndexed)
}
}
//Should be at the last block already, but go ahead and loop looking for next blockBytes.
//If there is another block, add it to the index.
//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
blockIdxInfo := &blockIdxInfo{}
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
}
if blockBytes == nil {
break
}
info, err := extractSerializedBlockInfo(blockBytes)
if err != nil {
return err
}
//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),
//therefore just shift by the difference between blockBytesOffset and blockStartOffset
numBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)
for _, offset := range info.txOffsets {
offset.loc.offset += numBytesToShift
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo.blockHash = info.blockHeader.Hash()
blockIdxInfo.blockNum = info.blockHeader.Number
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
blockIdxInfo.txOffsets = info.txOffsets
blockIdxInfo.metadata = info.metadata
logger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err
}
if blockIdxInfo.blockNum%10000 == 0 {
logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
}
}
logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
return nil
}
func (mgr *blockfileMgr) getBlockchainInfo() *common.BlockchainInfo {
return mgr.bcInfo.Load().(*common.BlockchainInfo)
}
func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
mgr.cpInfoCond.L.Lock()
defer mgr.cpInfoCond.L.Unlock()
mgr.cpInfo = cpInfo
logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
mgr.cpInfoCond.Broadcast()
}
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *common.Block) {
currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &common.BlockchainInfo{
Height: currentBCInfo.Height + 1,
CurrentBlockHash: latestBlockHash,
PreviousBlockHash: latestBlock.Header.PreviousHash}
mgr.bcInfo.Store(newBCInfo)
}
func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*common.Block, error) {
logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash)
loc, err := mgr.index.getBlockLocByHash(blockHash)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum)
// interpret math.MaxUint64 as a request for last block
if blockNum == math.MaxUint64 {
blockNum = mgr.getBlockchainInfo().Height - 1
}
loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlockByTxID(txID string) (*common.Block, error) {
logger.Debugf("retrieveBlockByTxID() - txID = [%s]", txID)
loc, err := mgr.index.getBlockLocByTxID(txID)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
logger.Debugf("retrieveTxValidationCodeByTxID() - txID = [%s]", txID)
return mgr.index.getTxValidationCodeByTxID(txID)
}
func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.BlockHeader, error) {
logger.Debugf("retrieveBlockHeaderByNumber() - blockNum = [%d]", blockNum)
loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
if err != nil {
return nil, err
}
blockBytes, err := mgr.fetchBlockBytes(loc)
if err != nil {
return nil, err
}
info, err := extractSerializedBlockInfo(blockBytes)
if err != nil {
return nil, err
}
return info.blockHeader, nil
}
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
return newBlockItr(mgr, startNum), nil
}
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*common.Envelope, error) {
logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID)
loc, err := mgr.index.getTxLoc(txID)
if err != nil {
return nil, err
}
return mgr.fetchTransactionEnvelope(loc)
}
func (mgr *blockfileMgr) retrieveTransactionByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
logger.Debugf("retrieveTransactionByBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
loc, err := mgr.index.getTXLocByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
return mgr.fetchTransactionEnvelope(loc)
}
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
blockBytes, err := mgr.fetchBlockBytes(lp)
if err != nil {
return nil, err
}
block, err := deserializeBlock(blockBytes)
if err != nil {
return nil, err
}
return block, nil
}
func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
logger.Debugf("Entering fetchTransactionEnvelope() %v\n", lp)
var err error
var txEnvelopeBytes []byte
if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
return nil, err
}
_, n := proto.DecodeVarint(txEnvelopeBytes)
return putil.GetEnvelopeFromBlock(txEnvelopeBytes[n:])
}
func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
if err != nil {
return nil, err
}
defer stream.close()
b, err := stream.nextBlockBytes()
if err != nil {
return nil, err
}
return b, nil
}
func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
reader, err := newBlockfileReader(filePath)
if err != nil {
return nil, err
}
defer reader.close()
b, err := reader.read(lp.offset, lp.bytesLength)
if err != nil {
return nil, err
}
return b, nil
}
//Get the current checkpoint information that is stored in the database
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
var b []byte
var err error
if b, err = mgr.db.Get(blkMgrInfoKey); b == nil || err != nil {
return nil, err
}
i := &checkpointInfo{}
if err = i.unmarshal(b); err != nil {
return nil, err
}
logger.Debugf("loaded checkpointInfo:%s", i)
return i, nil
}
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
b, err := i.marshal()
if err != nil {
return err
}
if err = mgr.db.Put(blkMgrInfoKey, b, sync); err != nil {
return err
}
return nil
}
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
//scan the passed file number suffix starting from the passed offset to find the last completed block
numBlocks := 0
var lastBlockBytes []byte
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
if errOpen != nil {
return nil, 0, 0, errOpen
}
defer blockStream.close()
var errRead error
var blockBytes []byte
for {
blockBytes, errRead = blockStream.nextBlockBytes()
if blockBytes == nil || errRead != nil {
break
}
lastBlockBytes = blockBytes
numBlocks++
}
if errRead == ErrUnexpectedEndOfBlockfile {
logger.Debugf(`Error:%s
The error may happen if a crash has happened during block appending.
Resetting error to nil and returning current offset as a last complete block's end offset`, errRead)
errRead = nil
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
}
// checkpointInfo
type checkpointInfo struct {
latestFileChunkSuffixNum int
latestFileChunksize int
isChainEmpty bool
lastBlockNumber uint64
}
func (i *checkpointInfo) marshal() ([]byte, error) {
buffer := proto.NewBuffer([]byte{})
var err error
if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil {
return nil, err
}
if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil {
return nil, err
}
if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil {
return nil, err
}
var chainEmptyMarker uint64
if i.isChainEmpty {
chainEmptyMarker = 1
}
if err = buffer.EncodeVarint(chainEmptyMarker); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func (i *checkpointInfo) unmarshal(b []byte) error {
buffer := proto.NewBuffer(b)
var val uint64
var chainEmptyMarker uint64
var err error
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.latestFileChunkSuffixNum = int(val)
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.latestFileChunksize = int(val)
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.lastBlockNumber = val
if chainEmptyMarker, err = buffer.DecodeVarint(); err != nil {
return err
}
i.isChainEmpty = chainEmptyMarker == 1
return nil
}
func (i *checkpointInfo) String() string {
return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], isChainEmpty=[%t], lastBlockNumber=[%d]",
i.latestFileChunkSuffixNum, i.latestFileChunksize, i.isChainEmpty, i.lastBlockNumber)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mmcro/fabric.git
git@gitee.com:mmcro/fabric.git
mmcro
fabric
fabric
v1.0.4

搜索帮助