代码拉取完成,页面将自动刷新
package shs
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"sort"
"sync"
"github.com/valyala/bytebufferpool"
"github.com/rosedblabs/wal"
)
const (
PageSize = 131072
PageHeaderSize = 16
FileTrailerSize = 8
)
type WalRecordType uint8
const (
WalOpInsert WalRecordType = iota
WalOpDelete
)
type WalRecord struct {
Type WalRecordType
Key []byte
PageID uint64
}
// Record 表示一个记录
type Record struct {
Key []byte
Value []byte
}
// DataPage 表示一个数据页
type DataPage struct {
PageID uint64 // 页号,页号是PageHeader的一部分
UserRecordsSize uint64 // 用户记录总大小,是PageHeader的一部分
UserRecords []Record // 用户记录
FileTrailer [8]byte // crc
}
type StorageEngine struct {
mu sync.RWMutex // 读写锁用于并发控制
wal *wal.WAL
indexes Index // 索引:键 -> 页号
file *os.File
}
type Options struct {
DirPath string
DataIndex Index
}
var (
ErrKeyNotFound = fmt.Errorf("key not found")
ErrKeyEmpty = fmt.Errorf("key cannot be empty")
ErrRecordTooLarge = fmt.Errorf("key or value is too large, the size cannot exceed %d", PageSize-40)
)
// NewStorageEngine 创建一个新的存储引擎实例,并从磁盘加载数据
func NewStorageEngine(opt Options) (*StorageEngine, error) {
dir := opt.DirPath
// wal用于记录对indexes的操作,重启后可以从wal恢复indexes
walOptions := wal.DefaultOptions
walOptions.DirPath = dir
w, err := wal.Open(walOptions)
if err != nil {
return nil, err
}
filePath := filepath.Join(dir, "data.db")
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666)
if err != nil {
return nil, err
}
if opt.DataIndex == nil {
opt.DataIndex = NewDefaultIndex()
}
engine := &StorageEngine{
indexes: opt.DataIndex,
wal: w,
file: file,
}
// 从wal恢复indexes
err = engine.loadFromDisk()
if err != nil {
return nil, err
}
return engine, nil
}
func (se *StorageEngine) Close() error {
se.mu.Lock()
defer se.mu.Unlock()
return se.file.Close()
}
// loadFromDisk 从磁盘加载wal恢复indexes
func (se *StorageEngine) loadFromDisk() error {
se.mu.RLock()
defer se.mu.RUnlock()
reader := se.wal.NewReader()
for {
b, _, err := reader.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
rec := WalRecord{}
decoder := gob.NewDecoder(bytes.NewReader(b))
err = decoder.Decode(&rec)
if err != nil {
return fmt.Errorf("failed to decode wal record: %w", err)
}
switch rec.Type {
case WalOpInsert:
err = se.indexes.Set(rec.Key, rec.PageID)
if err != nil {
return err
}
case WalOpDelete:
err = se.indexes.Delete(rec.Key)
if err != nil {
return err
}
}
}
return nil
}
// readPageFromDisk 直接从磁盘读取指定位置的数据页
func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPage, error) {
// 从池中获取一个缓冲区
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
// 确保缓冲区容量足够
if uint64(buf.Len()) != PageSize {
buf.B = make([]byte, PageSize)
}
// 每次读取一个完整的页
err := loadFile(file, int64(offset), func(data []byte) error {
copy(buf.B, data)
return nil
})
if err != nil {
return DataPage{}, fmt.Errorf("failed to read page: %w", err)
}
page := DataPage{}
// 解析PageHeader
page.PageID = binary.BigEndian.Uint64(buf.B[0:8])
page.UserRecordsSize = binary.BigEndian.Uint64(buf.B[8:16])
// 解析UserRecords
userRecordsData := buf.B[PageHeaderSize : PageHeaderSize+page.UserRecordsSize]
offset = uint64(0)
for offset < page.UserRecordsSize {
keySize := binary.BigEndian.Uint64(userRecordsData[offset : offset+8])
offset += 8
valueSize := binary.BigEndian.Uint64(userRecordsData[offset : offset+8])
offset += 8
key := make([]byte, keySize)
copy(key, userRecordsData[offset:offset+keySize])
offset += keySize
value := make([]byte, valueSize)
copy(value, userRecordsData[offset:offset+valueSize])
offset += valueSize
page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value})
}
// 读取File Trailer (CRC)
copy(page.FileTrailer[:], buf.B[PageSize-FileTrailerSize:])
// 验证CRC
storedCRC := binary.BigEndian.Uint32(page.FileTrailer[:])
calculatedCRC := crc32.ChecksumIEEE(buf.B[:PageSize-FileTrailerSize])
if storedCRC != calculatedCRC {
return DataPage{}, fmt.Errorf("CRC check failed: stored %d, calculated %d", storedCRC, calculatedCRC)
}
return page, nil
}
// savePage 将单个数据页保存到磁盘
func (se *StorageEngine) savePage(pageID uint64, page DataPage) error {
offset := int64(pageID * PageSize)
// 从池中获取一个缓冲区
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
buf.Reset()
// UserRecord 排序
sort.Sort(SortUserRecordByKey(page.UserRecords))
// 写入PageHeader
binary.Write(buf, binary.BigEndian, page.PageID)
// 计算并写入UserRecordsSize
userRecordsSize := uint64(0)
for _, record := range page.UserRecords {
userRecordsSize += 16 + uint64(len(record.Key)) + uint64(len(record.Value))
}
binary.Write(buf, binary.BigEndian, userRecordsSize)
// 写入UserRecords
for _, record := range page.UserRecords {
binary.Write(buf, binary.BigEndian, uint64(len(record.Key)))
binary.Write(buf, binary.BigEndian, uint64(len(record.Value)))
buf.Write(record.Key)
buf.Write(record.Value)
}
// 填充Free Space
freeSpaceSize := PageSize - uint64(buf.Len()) - FileTrailerSize
buf.Write(make([]byte, freeSpaceSize))
// 计算并写入CRC
crc := crc32.ChecksumIEEE(buf.B)
binary.Write(buf, binary.BigEndian, crc)
err := writeFile(se.file, offset, buf.Bytes())
if err != nil {
return err
}
return se.file.Sync()
}
// findOrAllocatePage 查找或分配一个数据页
func (se *StorageEngine) findOrAllocatePage(record Record) (uint64, error) {
recordSize := uint64(len(record.Key) + len(record.Value) + 16) // 16 bytes for key and value sizes
// 查找可用的页
for _, pageID := range se.indexes.Iterator() {
page, exists := se.getPage(pageID)
if !exists {
continue
}
freeSpace := PageSize - PageHeaderSize - page.UserRecordsSize - FileTrailerSize
if freeSpace >= recordSize {
return pageID, nil
}
}
// 如果没有找到可用的页,分配新页
fileInfo, err := se.file.Stat()
if err != nil {
return 0, err
}
newPageID := uint64(fileInfo.Size()) / PageSize
return newPageID, nil
}
// getPage 从磁盘读取数据页
func (se *StorageEngine) getPage(pageID uint64) (*DataPage, bool) {
page, err := se.readPageFromDisk(se.file, pageID*PageSize)
if err != nil {
return nil, false
}
return &page, true
}
func (se *StorageEngine) walOp(key []byte, pageID uint64, opType WalRecordType) error {
rec := WalRecord{
Type: opType,
Key: key,
PageID: pageID,
}
buf := bytes.Buffer{}
encoder := gob.NewEncoder(&buf)
err := encoder.Encode(rec)
if err != nil {
return err
}
_, err = se.wal.Write(buf.Bytes())
if err != nil {
return err
}
return nil
}
// Insert 插入一个键值对,并保存到磁盘
func (se *StorageEngine) Insert(key, value []byte) error {
if len(key) == 0 {
return ErrKeyEmpty
}
if uint64(len(key))+uint64(len(value)) > PageSize-40 {
return ErrRecordTooLarge
}
se.mu.Lock()
defer se.mu.Unlock()
pageID, err := se.findOrAllocatePage(Record{key, value})
if err != nil {
return err
}
page, exists := se.getPage(pageID)
if !exists {
page = &DataPage{PageID: pageID}
}
idxByKey := binarySearchUserRecordsByKey(page.UserRecords, key)
if idxByKey != -1 {
return fmt.Errorf("key already exists")
}
err = se.walOp(key, pageID, WalOpInsert)
if err != nil {
return fmt.Errorf("failed to write wal: %w", err)
}
page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value})
err = se.indexes.Set(key, pageID)
if err != nil {
return err
}
err = se.savePage(pageID, *page)
if err != nil {
return fmt.Errorf("failed to save to disk: %w", err)
}
return nil
}
// Modify 修改一个键值对
func (se *StorageEngine) Modify(key, value []byte) error {
if len(key) == 0 {
return ErrKeyEmpty
}
if uint64(len(key))+uint64(len(value)) > PageSize-40 {
return ErrRecordTooLarge
}
se.mu.Lock()
pageID, err := se.indexes.Get(key)
if err != nil {
se.mu.Unlock()
return err
}
page, exists := se.getPage(pageID)
if !exists {
page = &DataPage{PageID: pageID}
}
idxByKey := binarySearchUserRecordsByKey(page.UserRecords, key)
if idxByKey != -1 {
page.UserRecords[idxByKey].Value = value
} else {
se.mu.Unlock()
return ErrKeyNotFound
}
curSize := uint64(0)
for _, it := range page.UserRecords {
curSize += 16 + uint64(len(it.Key)) + uint64(len(it.Value))
}
if curSize > PageSize-24 {
se.mu.Unlock()
err := se.Delete(key)
if err != nil {
return err
}
return se.Insert(key, value)
}
err = se.savePage(pageID, *page)
if err != nil {
se.mu.Unlock()
return fmt.Errorf("failed to save to disk: %w", err)
}
se.mu.Unlock()
return nil
}
// Get 获取一个键对应的值
func (se *StorageEngine) Get(key []byte) ([]byte, error) {
if len(key) == 0 {
return nil, ErrKeyEmpty
}
if uint64(len(key)) > PageSize-40 {
return nil, ErrRecordTooLarge
}
se.mu.RLock()
defer se.mu.RUnlock()
index, err := se.indexes.Get(key)
if err != nil {
return nil, err
}
page, exists := se.getPage(index)
if !exists {
return nil, fmt.Errorf("page not found, pageID: %d", page.PageID)
}
idxByKey := binarySearchUserRecordsByKey(page.UserRecords, key)
if idxByKey != -1 {
return page.UserRecords[idxByKey].Value, nil
} else {
return nil, ErrKeyNotFound
}
}
// Delete 删除一个键值对,并保存到磁盘
func (se *StorageEngine) Delete(key []byte) error {
if len(key) == 0 {
return ErrKeyEmpty
}
if uint64(len(key)) > PageSize-40 {
return ErrRecordTooLarge
}
se.mu.Lock()
defer se.mu.Unlock()
index, err := se.indexes.Get(key)
if err != nil {
return err
}
page, exists := se.getPage(index)
if !exists {
return fmt.Errorf("page not found")
}
err = se.walOp(key, page.PageID, WalOpDelete)
if err != nil {
return fmt.Errorf("failed to write wal: %w", err)
}
found := false
idxByKey := binarySearchUserRecordsByKey(page.UserRecords, key)
if idxByKey != -1 {
found = true
} else {
return ErrKeyNotFound
}
if found {
newRecords := make([]Record, len(page.UserRecords)-1)
copy(newRecords[:idxByKey], page.UserRecords[:idxByKey])
copy(newRecords[idxByKey:], page.UserRecords[idxByKey+1:])
page.UserRecords = newRecords
err := se.indexes.Delete(key)
if err != nil {
return err
}
err = se.savePage(index, *page)
if err != nil {
return fmt.Errorf("failed to save to disk: %w", err)
}
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。