1 Star 0 Fork 0

妥協 / fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
block_stream.go 7.22 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"github.com/golang/protobuf/proto"
)
// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appening a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
fileNum int
file *os.File
reader *bufio.Reader
currentOffset int64
}
// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
rootDir string
currentFileNum int
endFileNum int
currentFileStream *blockfileStream
}
// blockPlacementInfo captures the information related
// to block's placement in the file.
type blockPlacementInfo struct {
fileNum int
blockStartOffset int64
blockBytesOffset int64
}
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
filePath := deriveBlockfilePath(rootDir, fileNum)
logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
var file *os.File
var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, err
}
var newPosition int64
if newPosition, err = file.Seek(startOffset, 0); err != nil {
return nil, err
}
if newPosition != startOffset {
panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition))
}
s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
return s, nil
}
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
// nextBlockBytesAndPlacementInfo returns bytes for the next block
// along with the offset information in the block file.
// An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
// which is possible towards the tail of the file if a crash had taken place during appending of a block
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var lenBytes []byte
var err error
var fileInfo os.FileInfo
moreContentAvailable := true
if fileInfo, err = s.file.Stat(); err != nil {
return nil, nil, err
}
if s.currentOffset == fileInfo.Size() {
logger.Debugf("Finished reading file number [%d]", s.fileNum)
return nil, nil, nil
}
remainingBytes := fileInfo.Size() - s.currentOffset
// Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
// Assumption is that a block size would be small enough to be represented in 8 bytes varint
peekBytes := 8
if remainingBytes < int64(peekBytes) {
peekBytes = int(remainingBytes)
moreContentAvailable = false
}
logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
return nil, nil, err
}
length, n := proto.DecodeVarint(lenBytes)
if n == 0 {
// proto.DecodeVarint did not consume any byte at all which means that the bytes
// representing the size of the block are partial bytes
if !moreContentAvailable {
return nil, nil, ErrUnexpectedEndOfBlockfile
}
panic(fmt.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
}
bytesExpected := int64(n) + int64(length)
if bytesExpected > remainingBytes {
logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]",
bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile)
return nil, nil, ErrUnexpectedEndOfBlockfile
}
// skip the bytes representing the block size
if _, err = s.reader.Discard(n); err != nil {
return nil, nil, err
}
blockBytes := make([]byte, length)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
logger.Debugf("Error while trying to read [%d] bytes from fileNum [%d]: %s", length, s.fileNum, err)
return nil, nil, err
}
blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum,
blockStartOffset: s.currentOffset,
blockBytesOffset: s.currentOffset + int64(n)}
s.currentOffset += int64(n) + int64(length)
logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo)
return blockBytes, blockPlacementInfo, nil
}
func (s *blockfileStream) close() error {
return s.file.Close()
}
///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset)
if err != nil {
return nil, err
}
return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}
func (s *blockStream) moveToNextBlockfileStream() error {
var err error
if err = s.currentFileStream.close(); err != nil {
return err
}
s.currentFileNum++
if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil {
return err
}
return nil
}
func (s *blockStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
var err error
if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
logger.Debugf("current file [%d] length of blockbytes [%d]. Err:%s", s.currentFileNum, len(blockBytes), err)
return nil, nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, nil, err
}
return s.nextBlockBytesAndPlacementInfo()
}
return blockBytes, blockPlacementInfo, nil
}
func (s *blockStream) close() error {
return s.currentFileStream.close()
}
func (i *blockPlacementInfo) String() string {
return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]",
i.fileNum, i.blockStartOffset, i.blockBytesOffset)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liurenhao/fabric.git
git@gitee.com:liurenhao/fabric.git
liurenhao
fabric
fabric
v1.1.0-rc1

搜索帮助

344bd9b3 5694891 D2dac590 5694891