代码拉取完成,页面将自动刷新
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"github.com/golang/protobuf/proto"
)
// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appening a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
fileNum int
file *os.File
reader *bufio.Reader
currentOffset int64
}
// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
rootDir string
currentFileNum int
endFileNum int
currentFileStream *blockfileStream
}
// blockPlacementInfo captures the information related
// to block's placement in the file.
type blockPlacementInfo struct {
fileNum int
blockStartOffset int64
blockBytesOffset int64
}
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
filePath := deriveBlockfilePath(rootDir, fileNum)
logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
var file *os.File
var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, err
}
var newPosition int64
if newPosition, err = file.Seek(startOffset, 0); err != nil {
return nil, err
}
if newPosition != startOffset {
panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition))
}
s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
return s, nil
}
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
// nextBlockBytesAndPlacementInfo returns bytes for the next block
// along with the offset information in the block file.
// An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
// which is possible towards the tail of the file if a crash had taken place during appending of a block
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var lenBytes []byte
var err error
var fileInfo os.FileInfo
moreContentAvailable := true
if fileInfo, err = s.file.Stat(); err != nil {
return nil, nil, err
}
if s.currentOffset == fileInfo.Size() {
logger.Debugf("Finished reading file number [%d]", s.fileNum)
return nil, nil, nil
}
remainingBytes := fileInfo.Size() - s.currentOffset
// Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
// Assumption is that a block size would be small enough to be represented in 8 bytes varint
peekBytes := 8
if remainingBytes < int64(peekBytes) {
peekBytes = int(remainingBytes)
moreContentAvailable = false
}
logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
return nil, nil, err
}
length, n := proto.DecodeVarint(lenBytes)
if n == 0 {
// proto.DecodeVarint did not consume any byte at all which means that the bytes
// representing the size of the block are partial bytes
if !moreContentAvailable {
return nil, nil, ErrUnexpectedEndOfBlockfile
}
panic(fmt.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
}
bytesExpected := int64(n) + int64(length)
if bytesExpected > remainingBytes {
logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]",
bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile)
return nil, nil, ErrUnexpectedEndOfBlockfile
}
// skip the bytes representing the block size
if _, err = s.reader.Discard(n); err != nil {
return nil, nil, err
}
blockBytes := make([]byte, length)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
logger.Debugf("Error while trying to read [%d] bytes from fileNum [%d]: %s", length, s.fileNum, err)
return nil, nil, err
}
blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum,
blockStartOffset: s.currentOffset,
blockBytesOffset: s.currentOffset + int64(n)}
s.currentOffset += int64(n) + int64(length)
logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo)
return blockBytes, blockPlacementInfo, nil
}
func (s *blockfileStream) close() error {
return s.file.Close()
}
///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset)
if err != nil {
return nil, err
}
return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}
func (s *blockStream) moveToNextBlockfileStream() error {
var err error
if err = s.currentFileStream.close(); err != nil {
return err
}
s.currentFileNum++
if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil {
return err
}
return nil
}
func (s *blockStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
var err error
if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
logger.Debugf("current file [%d] length of blockbytes [%d]. Err:%s", s.currentFileNum, len(blockBytes), err)
return nil, nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, nil, err
}
return s.nextBlockBytesAndPlacementInfo()
}
return blockBytes, blockPlacementInfo, nil
}
func (s *blockStream) close() error {
return s.currentFileStream.close()
}
func (i *blockPlacementInfo) String() string {
return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]",
i.fileNum, i.blockStartOffset, i.blockBytesOffset)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。