1 Star 0 Fork 0

13683679291/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
block_stream.go 7.15 KB
一键复制 编辑 原始数据 按行查看 历史
gzuhlwang 提交于 2020-03-17 05:47 . fix typo in comment (#860)
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package fsblkstorage
import (
"bufio"
"fmt"
"io"
"os"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appending a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
fileNum int
file *os.File
reader *bufio.Reader
currentOffset int64
}
// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
rootDir string
currentFileNum int
endFileNum int
currentFileStream *blockfileStream
}
// blockPlacementInfo captures the information related
// to block's placement in the file.
type blockPlacementInfo struct {
fileNum int
blockStartOffset int64
blockBytesOffset int64
}
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
filePath := deriveBlockfilePath(rootDir, fileNum)
logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
var file *os.File
var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, errors.Wrapf(err, "error opening block file %s", filePath)
}
var newPosition int64
if newPosition, err = file.Seek(startOffset, 0); err != nil {
return nil, errors.Wrapf(err, "error seeking block file [%s] to startOffset [%d]", filePath, startOffset)
}
if newPosition != startOffset {
panic(fmt.Sprintf("Could not seek block file [%s] to startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition))
}
s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
return s, nil
}
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
// nextBlockBytesAndPlacementInfo returns bytes for the next block
// along with the offset information in the block file.
// An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
// which is possible towards the tail of the file if a crash had taken place during appending of a block
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var lenBytes []byte
var err error
var fileInfo os.FileInfo
moreContentAvailable := true
if fileInfo, err = s.file.Stat(); err != nil {
return nil, nil, errors.Wrapf(err, "error getting block file stat")
}
if s.currentOffset == fileInfo.Size() {
logger.Debugf("Finished reading file number [%d]", s.fileNum)
return nil, nil, nil
}
remainingBytes := fileInfo.Size() - s.currentOffset
// Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
// Assumption is that a block size would be small enough to be represented in 8 bytes varint
peekBytes := 8
if remainingBytes < int64(peekBytes) {
peekBytes = int(remainingBytes)
moreContentAvailable = false
}
logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
return nil, nil, errors.Wrapf(err, "error peeking [%d] bytes from block file", peekBytes)
}
length, n := proto.DecodeVarint(lenBytes)
if n == 0 {
// proto.DecodeVarint did not consume any byte at all which means that the bytes
// representing the size of the block are partial bytes
if !moreContentAvailable {
return nil, nil, ErrUnexpectedEndOfBlockfile
}
panic(errors.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
}
bytesExpected := int64(n) + int64(length)
if bytesExpected > remainingBytes {
logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]",
bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile)
return nil, nil, ErrUnexpectedEndOfBlockfile
}
// skip the bytes representing the block size
if _, err = s.reader.Discard(n); err != nil {
return nil, nil, errors.Wrapf(err, "error discarding [%d] bytes", n)
}
blockBytes := make([]byte, length)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
logger.Errorf("Error reading [%d] bytes from file number [%d], error: %s", length, s.fileNum, err)
return nil, nil, errors.Wrapf(err, "error reading [%d] bytes from file number [%d]", length, s.fileNum)
}
blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum,
blockStartOffset: s.currentOffset,
blockBytesOffset: s.currentOffset + int64(n)}
s.currentOffset += int64(n) + int64(length)
logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo)
return blockBytes, blockPlacementInfo, nil
}
func (s *blockfileStream) close() error {
return errors.WithStack(s.file.Close())
}
///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset)
if err != nil {
return nil, err
}
return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}
func (s *blockStream) moveToNextBlockfileStream() error {
var err error
if err = s.currentFileStream.close(); err != nil {
return err
}
s.currentFileNum++
if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil {
return err
}
return nil
}
func (s *blockStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
var err error
if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
logger.Errorf("Error reading next block bytes from file number [%d]: %s", s.currentFileNum, err)
return nil, nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, nil, err
}
return s.nextBlockBytesAndPlacementInfo()
}
return blockBytes, blockPlacementInfo, nil
}
func (s *blockStream) close() error {
return s.currentFileStream.close()
}
func (i *blockPlacementInfo) String() string {
return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]",
i.fileNum, i.blockStartOffset, i.blockBytesOffset)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mmcro/fabric.git
git@gitee.com:mmcro/fabric.git
mmcro
fabric
fabric
v2.1.1

搜索帮助