From d37847c382a1be6d813934f71202a4ea67964b38 Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 11:22:03 +0800 Subject: [PATCH 1/7] add wal --- go.mod | 4 +++ go.sum | 12 ++++++++ main.go | 91 ++++++++++++++++++++++++++++++++------------------------- 3 files changed, 68 insertions(+), 39 deletions(-) create mode 100644 go.sum diff --git a/go.mod b/go.mod index 01426de..916c96b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,7 @@ module simple-hash-kv-storage go 1.23.0 + +require github.com/rosedblabs/wal v1.3.8 + +require github.com/valyala/bytebufferpool v1.0.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5271581 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rosedblabs/wal v1.3.8 h1:tErpD9JT/ICiyV3mv5l7qUH6lybn5XF1TbI0e8kvH8M= +github.com/rosedblabs/wal v1.3.8/go.mod h1:DFvhrmTTeiXvn2btXXT2MW9Nvu99PU0g/pKGgh0+T+o= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index a9cb5ad..7280df0 100644 --- a/main.go +++ b/main.go @@ -4,53 +4,57 @@ import ( "bytes" "encoding/gob" "fmt" + "github.com/rosedblabs/wal" "io" "os" "sync" ) +const ( + PageSize = 4096 +) + // Record 表示一个记录 type Record struct { - Key string - Value string + Key []byte + Value []byte } // DataPage 表示一个数据页 type DataPage struct { - PageID int // 页号 - FileHeader [38]byte // 文件头部 - PageHeader [56]byte // 页面头部 - Infimum Record // 页面中的最小记录 - Supremum Record // 页面中的最大记录 - UserRecords []Record // 用户记录 - FreeSpace []byte // 空闲空间 - PageDirectory []int // 页目录 - FileTrailer [8]byte // 文件尾部 + PageID uint64 // 页号 + PageHeader [56]byte // 页面头部 + UserRecords []Record // 用户记录 + FileTrailer [8]byte // 文件尾部 } // Index 表示索引结构 type Index struct { - Key string - PageID int + Key []byte + PageID uint64 } // StorageEngine 是一个简单的存储引擎 type StorageEngine struct { - sync.RWMutex // 读写锁用于并发控制 - pages map[int]bool // 索引:页号 -> 是否存在 + sync.RWMutex // 读写锁用于并发控制 + wal *wal.WAL indexes map[string]Index // 索引:键 -> 索引 filePath string // 数据文件路径 } // NewStorageEngine 创建一个新的存储引擎实例,并从磁盘加载数据 func NewStorageEngine(filePath string) (*StorageEngine, error) { + w, err := wal.Open(wal.Options{DirPath: "data"}) + if err != nil { + return nil, err + } engine := &StorageEngine{ - pages: make(map[int]bool), indexes: make(map[string]Index), + wal: w, filePath: filePath, } - err := engine.loadFromDisk() + err = engine.loadFromDisk() if err != nil { return nil, err } @@ -66,29 +70,28 @@ func (se *StorageEngine) loadFromDisk() error { } defer file.Close() - pageSize := 4096 fileInfo, err := file.Stat() if err != nil { return fmt.Errorf("failed to get file info: %w", err) } fileSize := fileInfo.Size() - for offset := int64(0); offset < fileSize; offset += int64(pageSize) { - page, err := se.readPageFromDisk(file, offset) + for offset := int64(0); offset < fileSize; offset += int64(PageSize) { + // TODO recover indexes from disk + _, err := se.readPageFromDisk(file, uint64(offset)) if err != nil { return fmt.Errorf("failed to read page from disk: %w", err) } - se.pages[page.PageID] = true } return nil } // readPageFromDisk 直接从磁盘读取指定位置的数据页 -func (se *StorageEngine) readPageFromDisk(file *os.File, offset int64) (DataPage, error) { +func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPage, error) { var page DataPage pageSize := 4096 buffer := make([]byte, pageSize) - n, err := file.ReadAt(buffer, offset) + n, err := file.ReadAt(buffer, int64(offset)) if err != nil && err != io.EOF { return page, err } @@ -106,7 +109,7 @@ func (se *StorageEngine) readPageFromDisk(file *os.File, offset int64) (DataPage } // savePage 将单个数据页保存到磁盘 -func (se *StorageEngine) savePage(pageID int, page DataPage) error { +func (se *StorageEngine) savePage(pageID uint64, page DataPage) error { file, err := os.OpenFile(se.filePath, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return fmt.Errorf("failed to open file: %w", err) @@ -136,7 +139,7 @@ func (se *StorageEngine) Insert(key, value string) error { se.Lock() defer se.Unlock() - pageID, err := se.findOrAllocatePage(key) + pageID, err := se.findOrAllocatePage(Record{[]byte(key), []byte(value)}) if err != nil { return err } @@ -146,8 +149,8 @@ func (se *StorageEngine) Insert(key, value string) error { page = &DataPage{PageID: pageID} } - page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value}) - se.indexes[key] = Index{Key: key, PageID: pageID} + page.UserRecords = append(page.UserRecords, Record{Key: []byte(key), Value: []byte(value)}) + se.indexes[key] = Index{Key: []byte(key), PageID: pageID} err = se.savePage(pageID, *page) if err != nil { @@ -158,24 +161,24 @@ func (se *StorageEngine) Insert(key, value string) error { } // Get 获取一个键对应的值 -func (se *StorageEngine) Get(key string) (string, bool) { +func (se *StorageEngine) Get(key string) ([]byte, bool) { index, ok := se.indexes[key] if !ok { - return "", false + return nil, false } page, exists := se.getPage(index.PageID) if !exists { - return "", false + return nil, false } for _, record := range page.UserRecords { - if record.Key == key { + if string(record.Key) == key { return record.Value, true } } - return "", false + return nil, false } // Delete 删除一个键值对,并保存到磁盘 @@ -214,29 +217,39 @@ func (se *StorageEngine) Delete(key string) error { } if len(page.UserRecords) == 0 { - delete(se.pages, index.PageID) } return nil } // findOrAllocatePage 查找或分配一个数据页 -func (se *StorageEngine) findOrAllocatePage(key string) (int, error) { - for pageID, _ := range se.pages { +func (se *StorageEngine) findOrAllocatePage(record Record) (uint64, error) { + pageSize := 4096 + recordSize := len(record.Key) + len(record.Value) // 简化计算,假设每个记录占用固定字节数 + + for _, index := range se.indexes { + pageID := index.PageID page, exists := se.getPage(pageID) - if exists && len(page.UserRecords) < 10 { // 假设每页最多存储10个记录 + if !exists { + continue + } + + // 计算当前页面的已用空间 + usedSpace := 38 + 56 + 8 + len(page.FreeSpace) + len(page.PageDirectory)*4 + len(page.UserRecords)*(recordSize+8) + + // 如果当前页面还有足够的空间,则返回该页面的 ID + if usedSpace+recordSize <= pageSize { return pageID, nil } } // 如果所有页都满了,分配一个新的页 - newPageID := len(se.pages) - se.pages[newPageID] = true + newPageID := len(se.indexes) return newPageID, nil } // getPage 从磁盘读取数据页 -func (se *StorageEngine) getPage(pageID int) (*DataPage, bool) { +func (se *StorageEngine) getPage(pageID uint64) (*DataPage, bool) { page, err := se.readPageFromDisk(se.openFile(), int64(pageID)*4096) if err != nil { return nil, false -- Gitee From d2bb087147d852b639ab12198c9fc6e1db792476 Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 12:08:29 +0800 Subject: [PATCH 2/7] update --- main.go | 77 ++++++++++++++++++--------------------------------------- 1 file changed, 24 insertions(+), 53 deletions(-) diff --git a/main.go b/main.go index 7280df0..c578eed 100644 --- a/main.go +++ b/main.go @@ -39,7 +39,7 @@ type StorageEngine struct { sync.RWMutex // 读写锁用于并发控制 wal *wal.WAL indexes map[string]Index // 索引:键 -> 索引 - filePath string // 数据文件路径 + file *os.File } // NewStorageEngine 创建一个新的存储引擎实例,并从磁盘加载数据 @@ -48,10 +48,15 @@ func NewStorageEngine(filePath string) (*StorageEngine, error) { if err != nil { return nil, err } + + file, err := os.OpenFile(filePath, os.O_RDONLY|os.O_CREATE, 0666) + if err != nil { + return nil, err + } engine := &StorageEngine{ - indexes: make(map[string]Index), - wal: w, - filePath: filePath, + indexes: make(map[string]Index), + wal: w, + file: file, } err = engine.loadFromDisk() @@ -62,15 +67,15 @@ func NewStorageEngine(filePath string) (*StorageEngine, error) { return engine, nil } +func (se *StorageEngine) Close() error { + se.Lock() + defer se.Unlock() + return se.file.Close() +} + // loadFromDisk 从磁盘加载数据 func (se *StorageEngine) loadFromDisk() error { - file, err := os.OpenFile(se.filePath, os.O_RDONLY|os.O_CREATE, 0666) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - fileInfo, err := file.Stat() + fileInfo, err := se.file.Stat() if err != nil { return fmt.Errorf("failed to get file info: %w", err) } @@ -78,7 +83,7 @@ func (se *StorageEngine) loadFromDisk() error { for offset := int64(0); offset < fileSize; offset += int64(PageSize) { // TODO recover indexes from disk - _, err := se.readPageFromDisk(file, uint64(offset)) + _, err := se.readPageFromDisk(se.file, uint64(offset)) if err != nil { return fmt.Errorf("failed to read page from disk: %w", err) } @@ -110,23 +115,17 @@ func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPag // savePage 将单个数据页保存到磁盘 func (se *StorageEngine) savePage(pageID uint64, page DataPage) error { - file, err := os.OpenFile(se.filePath, os.O_RDWR|os.O_CREATE, 0666) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - pageSize := 4096 offset := int64(pageID) * int64(pageSize) buffer := bytes.Buffer{} enc := gob.NewEncoder(&buffer) - err = enc.Encode(page) + err := enc.Encode(page) if err != nil { return fmt.Errorf("failed to encode page: %w", err) } - _, err = file.WriteAt(buffer.Bytes(), offset) + _, err = se.file.WriteAt(buffer.Bytes(), offset) if err != nil { return fmt.Errorf("failed to write page at offset %d: %w", offset, err) } @@ -199,7 +198,7 @@ func (se *StorageEngine) Delete(key string) error { newRecords := make([]Record, 0) found := false for _, record := range page.UserRecords { - if record.Key == key { + if string(record.Key) == key { found = true continue } @@ -224,48 +223,19 @@ func (se *StorageEngine) Delete(key string) error { // findOrAllocatePage 查找或分配一个数据页 func (se *StorageEngine) findOrAllocatePage(record Record) (uint64, error) { - pageSize := 4096 - recordSize := len(record.Key) + len(record.Value) // 简化计算,假设每个记录占用固定字节数 - - for _, index := range se.indexes { - pageID := index.PageID - page, exists := se.getPage(pageID) - if !exists { - continue - } - - // 计算当前页面的已用空间 - usedSpace := 38 + 56 + 8 + len(page.FreeSpace) + len(page.PageDirectory)*4 + len(page.UserRecords)*(recordSize+8) - - // 如果当前页面还有足够的空间,则返回该页面的 ID - if usedSpace+recordSize <= pageSize { - return pageID, nil - } - } - - // 如果所有页都满了,分配一个新的页 - newPageID := len(se.indexes) - return newPageID, nil + // TODO + return 0, nil } // getPage 从磁盘读取数据页 func (se *StorageEngine) getPage(pageID uint64) (*DataPage, bool) { - page, err := se.readPageFromDisk(se.openFile(), int64(pageID)*4096) + page, err := se.readPageFromDisk(se.file, uint64(pageID)*4096) if err != nil { return nil, false } return &page, true } -// openFile 打开文件 -func (se *StorageEngine) openFile() *os.File { - file, err := os.OpenFile(se.filePath, os.O_RDONLY, 0666) - if err != nil { - panic(fmt.Sprintf("Failed to open file: %v", err)) - } - return file -} - func main() { filePath := "data/data.db" @@ -274,6 +244,7 @@ func main() { fmt.Println("Error initializing storage engine:", err) return } + defer engine.Close() err = engine.Insert("name", "张三") if err != nil { -- Gitee From f8e8387f4c68d6a488854754c615c088252a16d8 Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 14:18:46 +0800 Subject: [PATCH 3/7] update --- README.md | 6 ++ main.go | 178 +++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 134 insertions(+), 50 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..222edd8 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +https://github.com/zcbenz/BPlusTree +https://github.com/wspeirs/btree +https://blog.csdn.net/weixin_46117680/article/details/140841684 +https://www.cnblogs.com/wuzhenzhao/p/10341114.html +https://blog.csdn.net/weixin_37870870/article/details/124285834 +https://blog.csdn.net/qq_45473439/article/details/123445631 \ No newline at end of file diff --git a/main.go b/main.go index c578eed..23ba48d 100644 --- a/main.go +++ b/main.go @@ -2,16 +2,19 @@ package main import ( "bytes" - "encoding/gob" + "encoding/binary" "fmt" "github.com/rosedblabs/wal" + "hash/crc32" "io" "os" "sync" ) const ( - PageSize = 4096 + PageSize = 4096 + PageHeaderSize = 24 + FileTrailerSize = 8 ) // Record 表示一个记录 @@ -22,28 +25,24 @@ type Record struct { // DataPage 表示一个数据页 type DataPage struct { - PageID uint64 // 页号 - PageHeader [56]byte // 页面头部 - UserRecords []Record // 用户记录 - FileTrailer [8]byte // 文件尾部 + PageID uint64 // 页号,页号是PageHeader的一部分 + UserRecordsSize uint64 // 用户记录总大小,是PageHeader的一部分 + PageDirectorySize uint64 // 页目录大小 + PageDirectory []byte // 页目录, 记录各个记录的偏移量 + UserRecords []Record // 用户记录 + FileTrailer [8]byte // crc } -// Index 表示索引结构 -type Index struct { - Key []byte - PageID uint64 -} - -// StorageEngine 是一个简单的存储引擎 type StorageEngine struct { sync.RWMutex // 读写锁用于并发控制 wal *wal.WAL - indexes map[string]Index // 索引:键 -> 索引 + indexes map[string]uint64 // 索引:键 -> 页号 file *os.File } // NewStorageEngine 创建一个新的存储引擎实例,并从磁盘加载数据 func NewStorageEngine(filePath string) (*StorageEngine, error) { + // wal用于记录对indexes的操作,重启后可以从wal恢复indexes w, err := wal.Open(wal.Options{DirPath: "data"}) if err != nil { return nil, err @@ -54,11 +53,12 @@ func NewStorageEngine(filePath string) (*StorageEngine, error) { return nil, err } engine := &StorageEngine{ - indexes: make(map[string]Index), + indexes: make(map[string]uint64), wal: w, file: file, } + // 从wal恢复indexes err = engine.loadFromDisk() if err != nil { return nil, err @@ -73,19 +73,22 @@ func (se *StorageEngine) Close() error { return se.file.Close() } -// loadFromDisk 从磁盘加载数据 +// loadFromDisk 从磁盘加载数据恢复indexes func (se *StorageEngine) loadFromDisk() error { fileInfo, err := se.file.Stat() if err != nil { - return fmt.Errorf("failed to get file info: %w", err) + return err } - fileSize := fileInfo.Size() - for offset := int64(0); offset < fileSize; offset += int64(PageSize) { - // TODO recover indexes from disk - _, err := se.readPageFromDisk(se.file, uint64(offset)) + pageCount := fileInfo.Size() / PageSize + for i := uint64(0); i < uint64(pageCount); i++ { + page, err := se.readPageFromDisk(se.file, i*PageSize) if err != nil { - return fmt.Errorf("failed to read page from disk: %w", err) + return err + } + + for _, record := range page.UserRecords { + se.indexes[string(record.Key)] = page.PageID } } return nil @@ -93,21 +96,58 @@ func (se *StorageEngine) loadFromDisk() error { // readPageFromDisk 直接从磁盘读取指定位置的数据页 func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPage, error) { - var page DataPage - pageSize := 4096 - buffer := make([]byte, pageSize) - n, err := file.ReadAt(buffer, int64(offset)) - if err != nil && err != io.EOF { - return page, err + _, err := file.Seek(int64(offset), io.SeekStart) + if err != nil { + return DataPage{}, err + } + + page := DataPage{} + + // 读取PageHeader + header := make([]byte, PageHeaderSize) + _, err = file.Read(header) + if err != nil { + return DataPage{}, err } - if n < pageSize { - return page, fmt.Errorf("read less than expected size at offset %d", offset) + + page.PageID = binary.BigEndian.Uint64(header[:8]) + page.UserRecordsSize = binary.BigEndian.Uint64(header[8:16]) + page.PageDirectorySize = binary.BigEndian.Uint64(header[16:24]) + + // 读取PageDirectory + page.PageDirectory = make([]byte, page.PageDirectorySize) + _, err = file.Read(page.PageDirectory) + if err != nil { + return DataPage{}, err + } + + // 读取UserRecords + userRecordsData := make([]byte, page.UserRecordsSize) + _, err = file.Read(userRecordsData) + if err != nil { + return DataPage{}, err + } + + // 解析UserRecords + offset = 0 + for offset < page.UserRecordsSize { + keySize := binary.BigEndian.Uint64(userRecordsData[offset : offset+8]) + offset += 8 + valueSize := binary.BigEndian.Uint64(userRecordsData[offset : offset+8]) + offset += 8 + key := userRecordsData[offset : offset+keySize] + offset += keySize + value := userRecordsData[offset : offset+valueSize] + offset += valueSize + page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value}) } - dec := gob.NewDecoder(bytes.NewReader(buffer)) - err = dec.Decode(&page) + // 跳过Free Space + + // 读取File Trailer + _, err = file.Read(page.FileTrailer[:]) if err != nil { - return page, fmt.Errorf("failed to decode page: %w", err) + return DataPage{}, err } return page, nil @@ -115,22 +155,40 @@ func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPag // savePage 将单个数据页保存到磁盘 func (se *StorageEngine) savePage(pageID uint64, page DataPage) error { - pageSize := 4096 - offset := int64(pageID) * int64(pageSize) - - buffer := bytes.Buffer{} - enc := gob.NewEncoder(&buffer) - err := enc.Encode(page) + offset := int64(pageID * PageSize) + _, err := se.file.Seek(offset, io.SeekStart) if err != nil { - return fmt.Errorf("failed to encode page: %w", err) + return err } - _, err = se.file.WriteAt(buffer.Bytes(), offset) - if err != nil { - return fmt.Errorf("failed to write page at offset %d: %w", offset, err) + buf := new(bytes.Buffer) + + // 写入PageHeader + binary.Write(buf, binary.BigEndian, page.PageID) + binary.Write(buf, binary.BigEndian, page.UserRecordsSize) + binary.Write(buf, binary.BigEndian, page.PageDirectorySize) + + // 写入PageDirectory + buf.Write(page.PageDirectory) + + // 写入UserRecords + for _, record := range page.UserRecords { + binary.Write(buf, binary.BigEndian, uint64(len(record.Key))) + binary.Write(buf, binary.BigEndian, uint64(len(record.Value))) + buf.Write(record.Key) + buf.Write(record.Value) } - return nil + // 填充Free Space + freeSpaceSize := PageSize - buf.Len() - FileTrailerSize + buf.Write(make([]byte, freeSpaceSize)) + + // 计算并写入CRC + crc := crc32.ChecksumIEEE(buf.Bytes()) + binary.Write(buf, binary.BigEndian, crc) + + _, err = se.file.Write(buf.Bytes()) + return err } // Insert 插入一个键值对,并保存到磁盘 @@ -149,7 +207,7 @@ func (se *StorageEngine) Insert(key, value string) error { } page.UserRecords = append(page.UserRecords, Record{Key: []byte(key), Value: []byte(value)}) - se.indexes[key] = Index{Key: []byte(key), PageID: pageID} + se.indexes[key] = pageID err = se.savePage(pageID, *page) if err != nil { @@ -166,7 +224,7 @@ func (se *StorageEngine) Get(key string) ([]byte, bool) { return nil, false } - page, exists := se.getPage(index.PageID) + page, exists := se.getPage(index) if !exists { return nil, false } @@ -190,7 +248,7 @@ func (se *StorageEngine) Delete(key string) error { return fmt.Errorf("key not found") } - page, exists := se.getPage(index.PageID) + page, exists := se.getPage(index) if !exists { return fmt.Errorf("page not found") } @@ -209,7 +267,7 @@ func (se *StorageEngine) Delete(key string) error { page.UserRecords = newRecords delete(se.indexes, key) - err := se.savePage(index.PageID, *page) + err := se.savePage(index, *page) if err != nil { return fmt.Errorf("failed to save to disk: %w", err) } @@ -223,8 +281,28 @@ func (se *StorageEngine) Delete(key string) error { // findOrAllocatePage 查找或分配一个数据页 func (se *StorageEngine) findOrAllocatePage(record Record) (uint64, error) { - // TODO - return 0, nil + recordSize := uint64(len(record.Key) + len(record.Value) + 16) // 16 bytes for key and value sizes + + // 查找可用的页 + for _, pageID := range se.indexes { + page, exists := se.getPage(pageID) + if !exists { + continue + } + + freeSpace := PageSize - PageHeaderSize - page.UserRecordsSize - page.PageDirectorySize - FileTrailerSize + if freeSpace >= recordSize { + return pageID, nil + } + } + + // 如果没有找到可用的页,分配新页 + fileInfo, err := se.file.Stat() + if err != nil { + return 0, err + } + newPageID := uint64(fileInfo.Size() / PageSize) + return newPageID, nil } // getPage 从磁盘读取数据页 -- Gitee From 02b54dc3e81b1f03ff610a87dbed5d780ac366ea Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 14:32:07 +0800 Subject: [PATCH 4/7] fix: init wal --- main.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 23ba48d..1723e48 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "hash/crc32" "io" "os" + "path/filepath" "sync" ) @@ -41,13 +42,16 @@ type StorageEngine struct { } // NewStorageEngine 创建一个新的存储引擎实例,并从磁盘加载数据 -func NewStorageEngine(filePath string) (*StorageEngine, error) { +func NewStorageEngine(dir string) (*StorageEngine, error) { // wal用于记录对indexes的操作,重启后可以从wal恢复indexes - w, err := wal.Open(wal.Options{DirPath: "data"}) + walOptions := wal.DefaultOptions + walOptions.DirPath = dir + w, err := wal.Open(walOptions) if err != nil { return nil, err } + filePath := filepath.Join(dir, "data.db") file, err := os.OpenFile(filePath, os.O_RDONLY|os.O_CREATE, 0666) if err != nil { return nil, err @@ -315,7 +319,7 @@ func (se *StorageEngine) getPage(pageID uint64) (*DataPage, bool) { } func main() { - filePath := "data/data.db" + filePath := "data" engine, err := NewStorageEngine(filePath) if err != nil { -- Gitee From 1747aca43215255dd6fb92ec5057a50cdfea3348 Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 15:06:53 +0800 Subject: [PATCH 5/7] update --- main.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index 1723e48..a927dff 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/binary" + "encoding/gob" "fmt" "github.com/rosedblabs/wal" "hash/crc32" @@ -18,6 +19,19 @@ const ( FileTrailerSize = 8 ) +type WalRecordType uint8 + +const ( + WalOpInsert WalRecordType = iota + WalOpDelete +) + +type WalRecord struct { + Type WalRecordType + Key []byte + PageID uint64 +} + // Record 表示一个记录 type Record struct { Key []byte @@ -77,22 +91,29 @@ func (se *StorageEngine) Close() error { return se.file.Close() } -// loadFromDisk 从磁盘加载数据恢复indexes +// loadFromDisk 从磁盘加载wal恢复indexes func (se *StorageEngine) loadFromDisk() error { - fileInfo, err := se.file.Stat() - if err != nil { - return err - } - - pageCount := fileInfo.Size() / PageSize - for i := uint64(0); i < uint64(pageCount); i++ { - page, err := se.readPageFromDisk(se.file, i*PageSize) + reader := se.wal.NewReader() + for { + b, _, err := reader.Next() if err != nil { + if err == io.EOF { + break + } return err } + rec := WalRecord{} + decoder := gob.NewDecoder(bytes.NewReader(b)) + err = decoder.Decode(&rec) + if err != nil { + return fmt.Errorf("failed to decode wal record: %w", err) + } - for _, record := range page.UserRecords { - se.indexes[string(record.Key)] = page.PageID + switch rec.Type { + case WalOpInsert: + se.indexes[string(rec.Key)] = rec.PageID + case WalOpDelete: + delete(se.indexes, string(rec.Key)) } } return nil @@ -195,6 +216,26 @@ func (se *StorageEngine) savePage(pageID uint64, page DataPage) error { return err } +func (se *StorageEngine) walOp(key []byte, pageID uint64, opType WalRecordType) error { + rec := WalRecord{ + Type: opType, + Key: key, + PageID: pageID, + } + buf := bytes.Buffer{} + encoder := gob.NewEncoder(&buf) + err := encoder.Encode(rec) + if err != nil { + return err + } + _, err = se.wal.Write(buf.Bytes()) + if err != nil { + return err + } + + return nil +} + // Insert 插入一个键值对,并保存到磁盘 func (se *StorageEngine) Insert(key, value string) error { se.Lock() @@ -210,6 +251,10 @@ func (se *StorageEngine) Insert(key, value string) error { page = &DataPage{PageID: pageID} } + err = se.walOp([]byte(key), pageID, WalOpInsert) + if err != nil { + return fmt.Errorf("failed to write wal: %w", err) + } page.UserRecords = append(page.UserRecords, Record{Key: []byte(key), Value: []byte(value)}) se.indexes[key] = pageID @@ -257,6 +302,11 @@ func (se *StorageEngine) Delete(key string) error { return fmt.Errorf("page not found") } + err := se.walOp([]byte(key), page.PageID, WalOpDelete) + if err != nil { + return fmt.Errorf("failed to write wal: %w", err) + } + newRecords := make([]Record, 0) found := false for _, record := range page.UserRecords { -- Gitee From afdd3e2a64871699608e0aa21a926f8acac9708b Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 15:30:43 +0800 Subject: [PATCH 6/7] update --- main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/main.go b/main.go index a927dff..302cf87 100644 --- a/main.go +++ b/main.go @@ -165,9 +165,14 @@ func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPag value := userRecordsData[offset : offset+valueSize] offset += valueSize page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value}) + fmt.Printf("%s %s", string(key), string(value)) } // 跳过Free Space + freeSpaceSize := PageSize - FileTrailerSize - PageHeaderSize - page.PageDirectorySize - page.UserRecordsSize + if _, err := io.CopyN(io.Discard, file, int64(freeSpaceSize)); err != nil { + return DataPage{}, err + } // 读取File Trailer _, err = file.Read(page.FileTrailer[:]) -- Gitee From 21f08353d3afecd64dbecb55b08606a4e76605c1 Mon Sep 17 00:00:00 2001 From: WSRer <1749094641@qq.com> Date: Tue, 15 Oct 2024 16:39:25 +0800 Subject: [PATCH 7/7] fix: save data --- main.go | 270 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 152 insertions(+), 118 deletions(-) diff --git a/main.go b/main.go index 302cf87..4793549 100644 --- a/main.go +++ b/main.go @@ -5,17 +5,18 @@ import ( "encoding/binary" "encoding/gob" "fmt" - "github.com/rosedblabs/wal" "hash/crc32" "io" "os" "path/filepath" "sync" + + "github.com/rosedblabs/wal" ) const ( PageSize = 4096 - PageHeaderSize = 24 + PageHeaderSize = 16 FileTrailerSize = 8 ) @@ -40,12 +41,10 @@ type Record struct { // DataPage 表示一个数据页 type DataPage struct { - PageID uint64 // 页号,页号是PageHeader的一部分 - UserRecordsSize uint64 // 用户记录总大小,是PageHeader的一部分 - PageDirectorySize uint64 // 页目录大小 - PageDirectory []byte // 页目录, 记录各个记录的偏移量 - UserRecords []Record // 用户记录 - FileTrailer [8]byte // crc + PageID uint64 // 页号,页号是PageHeader的一部分 + UserRecordsSize uint64 // 用户记录总大小,是PageHeader的一部分 + UserRecords []Record // 用户记录 + FileTrailer [8]byte // crc } type StorageEngine struct { @@ -66,7 +65,7 @@ func NewStorageEngine(dir string) (*StorageEngine, error) { } filePath := filepath.Join(dir, "data.db") - file, err := os.OpenFile(filePath, os.O_RDONLY|os.O_CREATE, 0666) + file, err := os.OpenFile(filePath, os.O_RDONLY|os.O_CREATE, 0o666) if err != nil { return nil, err } @@ -121,40 +120,26 @@ func (se *StorageEngine) loadFromDisk() error { // readPageFromDisk 直接从磁盘读取指定位置的数据页 func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPage, error) { + // 一次性读取整个页面 _, err := file.Seek(int64(offset), io.SeekStart) if err != nil { return DataPage{}, err } - page := DataPage{} - - // 读取PageHeader - header := make([]byte, PageHeaderSize) - _, err = file.Read(header) - if err != nil { - return DataPage{}, err - } - - page.PageID = binary.BigEndian.Uint64(header[:8]) - page.UserRecordsSize = binary.BigEndian.Uint64(header[8:16]) - page.PageDirectorySize = binary.BigEndian.Uint64(header[16:24]) - - // 读取PageDirectory - page.PageDirectory = make([]byte, page.PageDirectorySize) - _, err = file.Read(page.PageDirectory) + buffer := make([]byte, PageSize) + _, err = file.Read(buffer) if err != nil { - return DataPage{}, err + return DataPage{}, fmt.Errorf("failed to read page: %w", err) } + page := DataPage{} - // 读取UserRecords - userRecordsData := make([]byte, page.UserRecordsSize) - _, err = file.Read(userRecordsData) - if err != nil { - return DataPage{}, err - } + // 解析PageHeader + page.PageID = binary.BigEndian.Uint64(buffer[0:8]) + page.UserRecordsSize = binary.BigEndian.Uint64(buffer[8:16]) // 解析UserRecords - offset = 0 + userRecordsData := buffer[PageHeaderSize : PageHeaderSize+page.UserRecordsSize] + offset = uint64(0) for offset < page.UserRecordsSize { keySize := binary.BigEndian.Uint64(userRecordsData[offset : offset+8]) offset += 8 @@ -165,19 +150,16 @@ func (se *StorageEngine) readPageFromDisk(file *os.File, offset uint64) (DataPag value := userRecordsData[offset : offset+valueSize] offset += valueSize page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value}) - fmt.Printf("%s %s", string(key), string(value)) } - // 跳过Free Space - freeSpaceSize := PageSize - FileTrailerSize - PageHeaderSize - page.PageDirectorySize - page.UserRecordsSize - if _, err := io.CopyN(io.Discard, file, int64(freeSpaceSize)); err != nil { - return DataPage{}, err - } + // 读取File Trailer (CRC) + copy(page.FileTrailer[:], buffer[PageSize-FileTrailerSize:]) - // 读取File Trailer - _, err = file.Read(page.FileTrailer[:]) - if err != nil { - return DataPage{}, err + // 验证CRC + storedCRC := binary.BigEndian.Uint32(page.FileTrailer[:]) + calculatedCRC := crc32.ChecksumIEEE(buffer[:PageSize-FileTrailerSize]) + if storedCRC != calculatedCRC { + return DataPage{}, fmt.Errorf("CRC check failed: stored %d, calculated %d", storedCRC, calculatedCRC) } return page, nil @@ -191,23 +173,21 @@ func (se *StorageEngine) savePage(pageID uint64, page DataPage) error { return err } - buf := new(bytes.Buffer) + userRecordsBuf := new(bytes.Buffer) + for _, record := range page.UserRecords { + binary.Write(userRecordsBuf, binary.BigEndian, uint64(len(record.Key))) + binary.Write(userRecordsBuf, binary.BigEndian, uint64(len(record.Value))) + userRecordsBuf.Write(record.Key) + userRecordsBuf.Write(record.Value) + } + page.UserRecordsSize = uint64(userRecordsBuf.Len()) + buf := new(bytes.Buffer) // 写入PageHeader binary.Write(buf, binary.BigEndian, page.PageID) binary.Write(buf, binary.BigEndian, page.UserRecordsSize) - binary.Write(buf, binary.BigEndian, page.PageDirectorySize) - - // 写入PageDirectory - buf.Write(page.PageDirectory) - // 写入UserRecords - for _, record := range page.UserRecords { - binary.Write(buf, binary.BigEndian, uint64(len(record.Key))) - binary.Write(buf, binary.BigEndian, uint64(len(record.Value))) - buf.Write(record.Key) - buf.Write(record.Value) - } + buf.Write(userRecordsBuf.Bytes()) // 填充Free Space freeSpaceSize := PageSize - buf.Len() - FileTrailerSize @@ -217,8 +197,47 @@ func (se *StorageEngine) savePage(pageID uint64, page DataPage) error { crc := crc32.ChecksumIEEE(buf.Bytes()) binary.Write(buf, binary.BigEndian, crc) + // fmt.Printf("save pageid: %d,\n buf:\n %v", pageID, buf.Bytes()) _, err = se.file.Write(buf.Bytes()) - return err + if err != nil { + return err + } + return se.file.Sync() +} + +// findOrAllocatePage 查找或分配一个数据页 +func (se *StorageEngine) findOrAllocatePage(record Record) (uint64, error) { + recordSize := uint64(len(record.Key) + len(record.Value) + 16) // 16 bytes for key and value sizes + + // 查找可用的页 + for _, pageID := range se.indexes { + page, exists := se.getPage(pageID) + if !exists { + continue + } + + freeSpace := PageSize - PageHeaderSize - page.UserRecordsSize - FileTrailerSize + if freeSpace >= recordSize { + return pageID, nil + } + } + + // 如果没有找到可用的页,分配新页 + fileInfo, err := se.file.Stat() + if err != nil { + return 0, err + } + newPageID := uint64(fileInfo.Size() / PageSize) + return newPageID, nil +} + +// getPage 从磁盘读取数据页 +func (se *StorageEngine) getPage(pageID uint64) (*DataPage, bool) { + page, err := se.readPageFromDisk(se.file, uint64(pageID)*4096) + if err != nil { + return nil, false + } + return &page, true } func (se *StorageEngine) walOp(key []byte, pageID uint64, opType WalRecordType) error { @@ -242,11 +261,11 @@ func (se *StorageEngine) walOp(key []byte, pageID uint64, opType WalRecordType) } // Insert 插入一个键值对,并保存到磁盘 -func (se *StorageEngine) Insert(key, value string) error { +func (se *StorageEngine) Insert(key, value []byte) error { se.Lock() defer se.Unlock() - pageID, err := se.findOrAllocatePage(Record{[]byte(key), []byte(value)}) + pageID, err := se.findOrAllocatePage(Record{key, value}) if err != nil { return err } @@ -256,12 +275,50 @@ func (se *StorageEngine) Insert(key, value string) error { page = &DataPage{PageID: pageID} } - err = se.walOp([]byte(key), pageID, WalOpInsert) + for _, record := range page.UserRecords { + if string(record.Key) == string(key) { + return fmt.Errorf("key already exists") + } + } + + err = se.walOp(key, pageID, WalOpInsert) if err != nil { return fmt.Errorf("failed to write wal: %w", err) } - page.UserRecords = append(page.UserRecords, Record{Key: []byte(key), Value: []byte(value)}) - se.indexes[key] = pageID + page.UserRecords = append(page.UserRecords, Record{Key: key, Value: value}) + + se.indexes[string(key)] = pageID + + err = se.savePage(pageID, *page) + if err != nil { + return fmt.Errorf("failed to save to disk: %w", err) + } + + return nil +} + +// Modify 修改一个键值对 +func (se *StorageEngine) Modify(key, value []byte) error { + se.Lock() + defer se.Unlock() + + pageID, err := se.findOrAllocatePage(Record{key, value}) + if err != nil { + return err + } + + page, exists := se.getPage(pageID) + if !exists { + page = &DataPage{PageID: pageID} + } + + for idx, record := range page.UserRecords { + if string(record.Key) == string(key) { + page.UserRecords[idx].Value = value + } + } + + se.indexes[string(key)] = pageID err = se.savePage(pageID, *page) if err != nil { @@ -272,32 +329,31 @@ func (se *StorageEngine) Insert(key, value string) error { } // Get 获取一个键对应的值 -func (se *StorageEngine) Get(key string) ([]byte, bool) { - index, ok := se.indexes[key] +func (se *StorageEngine) Get(key []byte) ([]byte, error) { + index, ok := se.indexes[string(key)] if !ok { - return nil, false + return nil, fmt.Errorf("key not found") } - page, exists := se.getPage(index) if !exists { - return nil, false + return nil, fmt.Errorf("page not found, pageID: %d", page.PageID) } for _, record := range page.UserRecords { - if string(record.Key) == key { - return record.Value, true + if string(record.Key) == string(key) { + return record.Value, nil } } - return nil, false + return nil, fmt.Errorf("record not found, key: %s", key) } // Delete 删除一个键值对,并保存到磁盘 -func (se *StorageEngine) Delete(key string) error { +func (se *StorageEngine) Delete(key []byte) error { se.Lock() defer se.Unlock() - index, ok := se.indexes[key] + index, ok := se.indexes[string(key)] if !ok { return fmt.Errorf("key not found") } @@ -315,7 +371,7 @@ func (se *StorageEngine) Delete(key string) error { newRecords := make([]Record, 0) found := false for _, record := range page.UserRecords { - if string(record.Key) == key { + if string(record.Key) == string(key) { found = true continue } @@ -324,7 +380,7 @@ func (se *StorageEngine) Delete(key string) error { if found { page.UserRecords = newRecords - delete(se.indexes, key) + delete(se.indexes, string(key)) err := se.savePage(index, *page) if err != nil { @@ -332,47 +388,9 @@ func (se *StorageEngine) Delete(key string) error { } } - if len(page.UserRecords) == 0 { - } - return nil } -// findOrAllocatePage 查找或分配一个数据页 -func (se *StorageEngine) findOrAllocatePage(record Record) (uint64, error) { - recordSize := uint64(len(record.Key) + len(record.Value) + 16) // 16 bytes for key and value sizes - - // 查找可用的页 - for _, pageID := range se.indexes { - page, exists := se.getPage(pageID) - if !exists { - continue - } - - freeSpace := PageSize - PageHeaderSize - page.UserRecordsSize - page.PageDirectorySize - FileTrailerSize - if freeSpace >= recordSize { - return pageID, nil - } - } - - // 如果没有找到可用的页,分配新页 - fileInfo, err := se.file.Stat() - if err != nil { - return 0, err - } - newPageID := uint64(fileInfo.Size() / PageSize) - return newPageID, nil -} - -// getPage 从磁盘读取数据页 -func (se *StorageEngine) getPage(pageID uint64) (*DataPage, bool) { - page, err := se.readPageFromDisk(se.file, uint64(pageID)*4096) - if err != nil { - return nil, false - } - return &page, true -} - func main() { filePath := "data" @@ -383,31 +401,47 @@ func main() { } defer engine.Close() - err = engine.Insert("name", "张三") + err = engine.Insert([]byte("name"), []byte("张三")) if err != nil { fmt.Println("Error inserting data:", err) return } - err = engine.Insert("age", "25") + err = engine.Insert([]byte("age"), []byte("25")) if err != nil { fmt.Println("Error inserting data:", err) return } - if val, ok := engine.Get("name"); ok { - fmt.Println("Name:", val) + if val, err := engine.Get([]byte("name")); err == nil { + fmt.Println("Name:", string(val)) + } else { + fmt.Println(err.Error()) + } + if val, err := engine.Get([]byte("age")); err == nil { + fmt.Println("Age:", string(val)) + } else { + fmt.Println(err.Error()) + } + err = engine.Modify([]byte("age"), []byte("35")) + if err != nil { + fmt.Println("Error Modify data:", err) + return + } + if val, err := engine.Get([]byte("age")); err == nil { + fmt.Println("Age:", string(val)) } else { - fmt.Println("Name not found") + fmt.Println(err.Error()) } - err = engine.Delete("age") + err = engine.Delete([]byte("age")) if err != nil { fmt.Println("Error deleting data:", err) return } - if _, ok := engine.Get("age"); !ok { + if _, err := engine.Get([]byte("age")); err != nil { + fmt.Println("del age", err) fmt.Println("Age deleted successfully") } } -- Gitee