From 005e6ac7d8a91e81e0cde7ad86a00f074146decf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=BD=B3=E4=BC=9F?= <12445374+jiamu-yue@user.noreply.gitee.com> Date: Sun, 13 Apr 2025 18:22:57 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=8E=8B=E7=BC=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Conflicts: # README.md # main.go # tuplestore/benchmarks/bench_test.go # tuplestore/benchmarks/compression_bench_test.go # tuplestore/compression.go # tuplestore/main_test.go # tuplestore/metrics.go # tuplestore/operations.go # tuplestore/store.go # tuplestore/types.go --- .DS_Store | Bin 0 -> 6148 bytes README.md | 1800 ++++++++--------- main.go | 198 +- store.go | 108 + tuplestore/.DS_Store | Bin 0 -> 6148 bytes tuplestore/benchmarks/bench_test.go | 42 +- .../benchmarks/compression_bench_test.go | 119 +- tuplestore/compression.go | 68 +- tuplestore/main_test.go | 410 ++-- tuplestore/metrics.go | 78 +- tuplestore/operations.go | 413 ++-- tuplestore/store.go | 70 +- tuplestore/types.go | 1 + ...27\346\263\225\344\273\213\347\273\215,md" | 89 + ...27\346\263\225\346\265\213\350\257\225.md" | 127 ++ 15 files changed, 1994 insertions(+), 1529 deletions(-) create mode 100644 .DS_Store create mode 100644 store.go create mode 100644 tuplestore/.DS_Store create mode 100644 "\345\216\213\347\274\251\347\256\227\346\263\225\344\273\213\347\273\215,md" create mode 100644 "\345\216\213\347\274\251\347\256\227\346\263\225\346\265\213\350\257\225.md" diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..d1cd56eb9e39301ee5aabe8ca5b9502ea8e1c566 GIT binary patch literal 6148 zcmeHK&u`N(6n^f?nlK^t0Mag#B5|!oS1JVBC6w;Kfy;v60H`D+8WCw+H7Pw*m2!qZ zW>@|a{ufT$_iP8P=_YYPi1|s+pJTtzKRzdROhjU|h>wW6MC8C3TP~7ojO*FAtYu0% zK%v@@6u->mrzn%rRO+ody-bVyUASJ#hUis$gd3uS8fb!M>NaQqU`s7iq=m1&Thx)ID5{A;Imu?WmqnXQ8<6cPp_rSqBE3@K z^D<2HWUdHloFL@=+cb~mawHddTqtg!D;(ExC*A$kYB(J9yrZXsb`c34CYD?is9swYgT0ufaUeWPXPFnp<-$S_P~Ew^sqPA2@rr zR{}e8tAJJD1_gM3@ZgNT!AhgrI*_O<0I-c>WytfdAagu}zQIZ(YG6WBfto7J6+>t` z$~_b38>}>HItg?65T<8gZYV%bgU{iq|{n+L6|K#TT|0c;kSp}>D z|CIuwJr2erOv#+BTa)9n)`Ne9vvFLd@izq-brd6(kK%o}GL(Cq0s00jjVOWH9|0wU KO{@ZcRe?({*pGq$ literal 0 HcmV?d00001 diff --git a/README.md b/README.md index 89b60e3..f45784b 100644 --- a/README.md +++ b/README.md @@ -1,900 +1,900 @@ -# TupleStore - -TupleStore 是一个高性能的元组存储系统,支持以下特性: - -- 高效的元组存储和检索 -- 支持多种数据类型(int32, int64, float32, float64, string, bool, time) -- 支持条件查询和复合查询 -- 支持并发操作 -- 支持数据压缩 -- 提供完整的指标监控 - -## 项目结构 - -``` -. -├── main.go # 主程序入口,包含示例代码 -├── tuplestore/ # 核心实现包 -│ ├── store.go # 存储核心实现,包含 TupleStore 和 Segment 结构 -│ ├── types.go # 数据类型定义和元组结构 -│ ├── compression.go # 数据压缩实现 -│ ├── query.go # 查询操作实现 -│ ├── operations.go # 基本操作实现(插入、更新、删除) -│ ├── index.go # 索引实现 -│ ├── metrics.go # 性能指标监控 -│ ├── main_test.go # 主测试文件 -│ └── tuplestore_test.go # 存储测试文件 -└── benchmarks/ # 性能测试目录 -``` - -## 核心组件说明 - -### 1. 存储结构 -- `TupleStore`: 顶层存储容器,管理多个 Segment -- `Segment`: 存储段,包含多个 Block -- `Block`: 数据块,包含主存储和增量存储 -- `DeltaStore`: 增量存储,处理最近的操作 -- `MainStore`: 主存储,存储压缩后的数据 - -### 2. 数据类型 -支持多种数据类型: -- 整数:int32, int64 -- 浮点数:float32, float64 -- 字符串:string -- 布尔值:bool -- 时间:time.Time - -### 3. 压缩机制 -- 字典编码:适用于低基数列 -- 增量编码:适用于有序数值列 -- 游程编码:适用于重复值多的列 -- 位打包:适用于小整数 -- 前缀压缩:适用于字符串 - -### 4. 查询功能 -- ID 查询:通过唯一标识符快速查找 -- 条件查询:支持多条件组合查询 -- 范围查询:支持数值和时间的范围查询 - -## 安装 - -```bash -# 克隆项目 -git clone https://github.com/yourusername/go-tuplestore.git -cd go-tuplestore - -# 安装依赖 -go mod download -``` - -## 运行 - -```bash -# 运行主程序 -go run . - -# 运行测试 -go test -v ./tuplestore - -# 运行性能测试 -go test -bench=. ./tuplestore/benchmarks -``` - -## 使用示例 - -```go -package main - -import ( - "fmt" - "time" - "go-tuplestore/tuplestore" -) - -func main() { - // 创建配置 - config := tuplestore.Config{ - MergeThreshold: 100, - MaxDeltaSize: 1024 * 1024, // 1MB - MaxMergeInterval: time.Hour, - InitialSegments: 4, - BlocksPerSegment: 10, - } - - // 初始化 TupleStore - store := tuplestore.NewTupleStore(config) - - // 插入元组 - tuple := &tuplestore.Tuple{ - ID: "user1", - Data: []byte("example data"), - } - if err := store.Insert(tuple); err != nil { - fmt.Printf("插入失败: %v\n", err) - return - } - - // 查询元组 - result, err := store.GetByID("user1") - if err != nil { - fmt.Printf("查询失败: %v\n", err) - return - } - fmt.Printf("查询结果: %+v\n", result) -} -``` - -## 性能指标 - -- 插入性能:10万 QPS -- 查询性能:20万 QPS -- 压缩比率:平均 50% - -## 注意事项 - -1. 系统使用内存存储,请确保有足够的内存空间 -2. 建议根据实际数据特征调整配置参数 -3. 定期监控系统指标,及时调整存储策略 -4. 对于大量数据,建议使用分布式部署 - -# 语雀文档 - -有序多元组的高效数据结构与存储详解 -1. 数据结构核心设计 -1.1 元组基础结构 -// Tuple 表示单个多元组,保持元素有序 -type Tuple struct { - ID string // 唯一标识符 - Elements []Element // 有序元素列表 - Created int64 // 创建时间戳 - Updated int64 // 最后更新时间戳 -} - -// Element 表示元组中的单个元素 -type Element struct { - Index int // 元素在元组中的位置索引 - Type ElementType // 元素类型(整数、浮点数、字符串等) - Value interface{} // 元素值 -} - -// ElementType 定义元素类型枚举 -type ElementType byte - -const ( - TypeNull ElementType = iota - TypeInt - TypeFloat - TypeString - TypeBool - TypeBytes - TypeArray -) -1.2 分层存储架构 -// TupleStore 主存储容器 -type TupleStore struct { - // 分段存储 - 按时间或 ID 范围分段 - segments []*Segment - // 索引层 - 提供快速查找能力 - indices map[string]Index - // 元数据字典 - 存储全局信息 - metaDict *MetaDictionary -} - -// Segment 表示一个存储段 -type Segment struct { - ID string - MinID string // 段内最小 ID - MaxID string // 段内最大 ID - MinTime int64 // 段内最早时间戳 - MaxTime int64 // 段内最晚时间戳 - Blocks []*Block // 段内所有数据块 - BlockMap map[string]int // ID 到块索引的映射 - Stats *SegmentStatistics // 段统计信息 -} - -// Block 数据块 - 实际存储压缩数据的单元 -type Block struct { - ID string - TupleCount int // 块内元组数量 - ElementMap []int // 元素索引映射 - // 列式压缩存储 - 每列使用不同压缩方法 - Columns []*CompressedColumn - // 增量存储 - 未压缩的最近操作 - DeltaStore *DeltaStore - // 元数据 - MinValues []interface{} // 每列的最小值 - MaxValues []interface{} // 每列的最大值 -} -2. 详细的压缩存储实现 -2.1 列存储压缩方案 -// CompressedColumn 表示单个压缩列 -type CompressedColumn struct { - ElementIndex int // 对应元素索引 - DataType ElementType // 数据类型 - // 压缩元数据 - ValueCount int // 值的数量 - NullCount int // 空值数量 - NullBitmap []byte // 空值位图 - // 压缩方法与数据 - EncodingType EncodingType // 使用的编码类型 - EncodedData []byte // 压缩后的字节数据 - // 用于字典编码的字典 - Dictionary []interface{} // 值字典(仅用于字典编码) - DictMap map[interface{}]uint32 // 值到索引的映射 -} - -// EncodingType 编码类型枚举 -type EncodingType byte - -const ( - EncodingRaw EncodingType = iota - EncodingDictionary // 字典编码:将值替换为字典索引 - EncodingRLE // 游程编码:适用于重复值 - EncodingDelta // 增量编码:存储与前值的差异 - EncodingBitPacking // 位打包:对小整数使用较少位 - EncodingByteAligned // 字节对齐:为不同值使用最少字节 - EncodingPrefixCompression // 前缀压缩:用于字符串 - EncodingFrequencyEncoding // 频率编码:不同长度编码不同值 -) -2.2 具体压缩算法实现 -字典编码 (适用于基数较低的列) -func encodeDictionary(values []interface{}) (*CompressedColumn, error) { - col := &CompressedColumn{ - EncodingType: EncodingDictionary, - ValueCount: len(values), - } - - // 1. 构建唯一值字典 - valueset := make(map[interface{}]struct{}) - for _, v := range values { - valueset[v] = struct{}{} - } - - // 2. 为每个唯一值分配索引 - col.Dictionary = make([]interface{}, 0, len(valueset)) - col.DictMap = make(map[interface{}]uint32) - for v := range valueset { - idx := uint32(len(col.Dictionary)) - col.Dictionary = append(col.Dictionary, v) - col.DictMap[v] = idx - } - - // 3. 确定索引编码需要的位数 - dictSize := len(col.Dictionary) - bitsPerIndex := 0 - if dictSize <= 1 { - bitsPerIndex = 1 - } else { - bitsPerIndex = int(math.Ceil(math.Log2(float64(dictSize)))) - } - - // 4. 将值转换为索引并位打包 - indexBuffer := make([]uint32, len(values)) - for i, v := range values { - indexBuffer[i] = col.DictMap[v] - } - - writer := NewBitWriter(uint(bitsPerIndex)) - for _, idx := range indexBuffer { - writer.Write(idx) - } - col.EncodedData = writer.Bytes() - return col, nil -} -增量编码 (适用于有序数值列) -func encodeDelta(values []int64) (*CompressedColumn, error) { - col := &CompressedColumn{ - EncodingType: EncodingDelta, - ValueCount: len(values), - } - - if len(values) == 0 { - return col, nil - } - - // 1. 计算基值和增量值 - baseValue := values[0] - deltas := make([]int64, len(values)) - deltas[0] = 0 // 第一个值的增量为 0 - for i := 1; i < len(values); i++ { - deltas[i] = values[i] - values[i-1] - } - - // 2. 将基值存储为元数据 - baseValueBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(baseValueBytes, uint64(baseValue)) - - // 3. 为增量值使用可变长度编码 - varintBuffer := make([]byte, 0, len(values)*5) // 估计每个增量最多需要 5 字节 - for _, delta := range deltas { - // 使用 zigzag 编码处理负数 - zigzagDelta := (delta << 1) ^ (delta >> 63) - // 写入可变长度整数 - for zigzagDelta >= 128 { - varintBuffer = append(varintBuffer, byte(zigzagDelta)|128) - zigzagDelta >>= 7 - } - varintBuffer = append(varintBuffer, byte(zigzagDelta)) - } - - // 4. 组合基值和增量 - col.EncodedData = append(baseValueBytes, varintBuffer...) - return col, nil -} -游程编码 (适用于重复值多的列) -func encodeRLE(values []interface{}) (*CompressedColumn, error) { - col := &CompressedColumn{ - EncodingType: EncodingRLE, - ValueCount: len(values), - } - - if len(values) == 0 { - return col, nil - } - - // 压缩策略:(重复次数, 值) 对 - var buffer bytes.Buffer - currentValue := values[0] - runLength := 1 - - for i := 1; i < len(values); i++ { - if values[i] == currentValue { - runLength++ - } else { - // 写入 (runLength, value) 对 - writeRunLength(&buffer, runLength) - writeValue(&buffer, currentValue) - // 重置当前值和计数 - currentValue = values[i] - runLength = 1 - } - } - // 处理最后一组 - writeRunLength(&buffer, runLength) - writeValue(&buffer, currentValue) - - col.EncodedData = buffer.Bytes() - return col, nil -} - -// 使用可变长度编码写入重复计数 -func writeRunLength(buffer *bytes.Buffer, length int) { - for length >= 128 { - buffer.WriteByte(byte(length&127) | 128) - length >>= 7 - } - buffer.WriteByte(byte(length)) -} - -// 写入值 (根据类型使用不同编码) -func writeValue(buffer *bytes.Buffer, value interface{}) { - switch v := value.(type) { - case int64: - binary.Write(buffer, binary.LittleEndian, v) - case float64: - binary.Write(buffer, binary.LittleEndian, v) - case string: - writeRunLength(buffer, len(v)) - buffer.WriteString(v) - case bool: - if v { - buffer.WriteByte(1) - } else { - buffer.WriteByte(0) - } - } -} -2.3 增量存储设计 -// DeltaStore 存储未压缩的增量变更 -type DeltaStore struct { - // 新增的元组 - AddedTuples []*Tuple - // 更新的元组 (ID -> 新元组) - UpdatedTuples map[string]*Tuple - // 已删除的元组 ID 集合 - DeletedIDs map[string]struct{} - // 变更计数,用于决定何时触发合并 - ChangeCount int -} - -// 添加新元组到增量存储 -func (ds *DeltaStore) AddTuple(tuple *Tuple) { - ds.AddedTuples = append(ds.AddedTuples, tuple) - ds.ChangeCount++ -} - -// 标记元组删除 -func (ds *DeltaStore) DeleteTuple(id string) { - if ds.DeletedIDs == nil { - ds.DeletedIDs = make(map[string]struct{}) - } - ds.DeletedIDs[id] = struct{}{} - ds.ChangeCount++ -} - -// 更新元组 -func (ds *DeltaStore) UpdateTuple(tuple *Tuple) { - if ds.UpdatedTuples == nil { - ds.UpdatedTuples = make(map[string]*Tuple) - } - ds.UpdatedTuples[tuple.ID] = tuple - ds.ChangeCount++ -} -3. 物理存储布局 -3.1 内存布局 -TupleStore -├── 元数据区 (MetaDictionary) -│ ├── 字段信息表 -│ ├── 类型词典 -│ └── 统计信息 -├── 段管理区 (Segments) -│ ├── Segment 1 -│ │ ├── 段元信息 -│ │ └── 块索引表 -│ ├── Segment 2 -│ │ ... -│ ├── 数据块区 (Blocks) -│ ├── Block 1 -│ │ ├── 块元信息 -│ │ ├── 列1 (压缩) -│ │ │ ├── 压缩元数据 -│ │ │ ├── 空值位图 -│ │ │ ├── 字典 (如适用) -│ │ │ └── 压缩数据 -│ │ ├── 列2 (压缩) -│ │ │ ... -│ │ └── 增量存储 -│ │ ├── 新增元组 -│ │ ├── 更新元组 -│ │ └── 删除 ID 集合 -│ ├── Block 2 -│ │ ... -└── 索引区 (Indices) - ├── 主键索引 - │ └── B+树/哈希表结构 - └── 二级索引 - └── 多级决策树结构 -3.2 详细内存布局示例 -以一个包含 100 万条 5 元素元组的存储示例: -● 元数据区 (~1KB) - ○ 5 个元素定义: {索引: 0-4, 类型: int, float, string, bool, int} - ○ 总元组数: 1,000,000 - ○ 段数: 10 - ○ 块数: 100 (每段 10 块) -● 段 0 (~100KB) - ○ ID 范围: "0000" - "0999" - ○ 时间范围: 1609459200 - 1609545599 - ○ 块索引: {块 ID -> 内存位置} 映射 (10 条) - ○ 统计摘要: {每列最小/最大值,不同值数量} -● 块 0 (~1MB) - ○ 元组数: 10,000 - ○ 元素 0 (int 列): - ■ 编码: 位打包 (11 位/值) - ■ 最小值: 1000 - ■ 最大值: 3050 - ■ 压缩数据: [13.75KB] (vs 原始 40KB, 压缩率 66%) - ○ 元素 1 (float 列): - ■ 编码: 字典编码 (只有 500 个不同值) - ■ 字典: [4KB] (500 个浮点数) - ■ 压缩数据: [12.5KB] (vs 原始 80KB, 压缩率 80%) - ○ 元素 2 (string 列): - ■ 编码: 前缀压缩 + 字典编码 - ■ 字典: [20KB] (2000 个字符串前缀和后缀) - ■ 压缩数据: [35KB] (vs 原始 250KB, 压缩率 86%) - ○ 元素 3 (bool 列): - ■ 编码: 位图编码 - ■ 压缩数据: [1.25KB] (vs 原始 10KB, 压缩率 87.5%) - ○ 元素 4 (int 列): - ■ 编码: 增量编码 - ■ 基准值: 100 - ■ 压缩数据: [15KB] (vs 原始 40KB, 压缩率 62.5%) - ○ 增量存储: - ■ 新增元组: 150 条 [~15KB] - ■ 更新元组: 75 条 [~7.5KB] - ■ 删除 ID 集合: 25 个 ID [~0.5KB] -4. 索引结构详解 -4.1 主键索引 (ID 查询优化) -// IDIndex 主键索引 -type IDIndex struct { - // 使用 B+ 树存储 ID 到位置的映射 - tree *btree.BTree -} - -// IDIndexEntry 表示索引中的一个条目 -type IDIndexEntry struct { - ID string // 元组 ID - SegmentID string // 段 ID - BlockID string // 块 ID - Deleted bool // 删除标记 -} - -// 查找元组位置 -func (idx *IDIndex) LookupID(id string) (*IDIndexEntry, bool) { - key := &IDIndexEntry{ID: id} - item := idx.tree.Get(key) - if item == nil { - return nil, false - } - - entry := item.(*IDIndexEntry) - if entry.Deleted { - return nil, false - } - return entry, true -} -4.2 多级决策树索引 (多条件查询优化) -// DecisionTreeIndex 多级决策树索引 -type DecisionTreeIndex struct { - Root *DTNode // 根节点 - FieldOrder []int // 索引的元素顺序 (按选择性排序) - Depth int // 树最大深度 - UpdateCount int64 // 更新计数,用于触发重平衡 -} - -// DTNode 决策树节点 -type DTNode struct { - ElementIndex int // 当前节点的元素索引 - SplitType SplitType // 节点分裂类型 - // 等值分支 (精确匹配) - ValueBranches map[interface{}]*DTNode - // 范围分支 (范围匹配) - RangeBranches []*DTRangeBranch - // 对应的块引用 (叶子节点) - BlockRefs []BlockRef - // 统计信息,用于自适应优化 - QueryCount int64 - HitCount int64 -} - -// 范围分支 -type DTRangeBranch struct { - Min interface{} - Max interface{} - NextNode *DTNode -} - -// 块引用 -type BlockRef struct { - SegmentID string - BlockID string - // 块内元组计数 (用于查询规划) - TupleCount int -} -5. 详细操作实现 -5.1 插入操作 -func (ts *TupleStore) Insert(tuple *Tuple) error { - // 1. 检查 ID 唯一性 - if _, exists := ts.idIndex.LookupID(tuple.ID); exists { - return ErrDuplicateID - } - - // 2. 选择最佳段和块 - segment, block := ts.selectOptimalLocation(tuple) - - // 3. 添加到块的增量存储 - block.DeltaStore.AddTuple(tuple) - - // 4. 更新索引 - ts.updateIndices(tuple, segment.ID, block.ID) - - // 5. 检查是否需要合并增量存储 - if block.DeltaStore.ChangeCount >= ts.mergeThreshold { - // 异步触发合并操作 - ts.scheduleBlockMerge(segment.ID, block.ID) - } - - return nil -} - -// 选择最佳位置 -func (ts *TupleStore) selectOptimalLocation(tuple *Tuple) (*Segment, *Block) { - // 基于元组的特征选择最佳放置位置 - segmentID := ts.selectSegment(tuple) - segment := ts.segments[segmentID] - - blockID := segment.selectBlock(tuple) - block := segment.Blocks[blockID] - - return segment, block -} - -// 选择段的策略 -func (ts *TupleStore) selectSegment(tuple *Tuple) string { - // 可以基于多种策略: - // 1. ID 范围分段 - // 2. 时间范围分段 - // 3. 元素值分段 - // 这里使用简单的 ID 哈希策略 - hash := xxhash.Sum64String(tuple.ID) - segmentIndex := hash % uint64(len(ts.segments)) - return ts.segments[segmentIndex].ID -} -5.2 查询操作 -执行单 ID 查询 -func (ts *TupleStore) GetByID(id string) (*Tuple, error) { - // 1. 通过 ID 索引查找位置 - entry, exists := ts.idIndex.LookupID(id) - if !exists { - return nil, ErrNotFound - } - - // 2. 获取对应段和块 - segment := ts.getSegment(entry.SegmentID) - block := segment.getBlock(entry.BlockID) - - // 3. 先查找增量存储 - if tuple := block.DeltaStore.findTuple(id); tuple != nil { - return tuple, nil - } - - // 4. 查找压缩数据 - tuple, err := block.decompressTuple(id) - if err != nil { - return nil, err - } - - return tuple, nil -} -条件查询 -func (ts *TupleStore) Query(conditions []Condition) ([]*Tuple, error) { - // 1. 分析查询条件,确定最佳索引 - index, plan := ts.selectBestIndex(conditions) - - // 2. 使用索引获取候选块 - candidateBlocks := index.findCandidateBlocks(conditions, plan) - - // 3. 并行处理候选块 - resultChan := make(chan []*Tuple, len(candidateBlocks)) - var wg sync.WaitGroup - - for _, blockRef := range candidateBlocks { - wg.Add(1) - go func(ref BlockRef) { - defer wg.Done() - - segment := ts.getSegment(ref.SegmentID) - block := segment.getBlock(ref.BlockID) - - // 过滤块内元组 - matches := block.queryTuples(conditions) - resultChan <- matches - }(blockRef) - } - - go func() { - wg.Wait() - close(resultChan) - }() - - // 4. 收集并合并结果 - allMatches := make([]*Tuple, 0) - for matches := range resultChan { - allMatches = append(allMatches, matches...) - } - - return allMatches, nil -} -5.3 块内查询和解压缩 -块内查询 -func (block *Block) queryTuples(conditions []Condition) []*Tuple { - // 1. 检查列范围能否快速排除整个块 - for _, cond := range conditions { - elemIndex := cond.ElementIndex - if !cond.matchesRange(block.MinValues[elemIndex], block.MaxValues[elemIndex]) { - return nil // 整个块都不匹配 - } - } - - // 2. 从增量存储中查找匹配元组 - deltaMatches := block.DeltaStore.queryTuples(conditions) - - // 3. 选择性解压缩 - // 确定需要解压的列 - requiredColumns := make(map[int]bool) - for _, cond := range conditions { - requiredColumns[cond.ElementIndex] = true - } - - // 先解压条件涉及的列 - columnValues := make(map[int][]interface{}) - for elemIndex := range requiredColumns { - values, err := block.decompressColumn(elemIndex) - if err != nil { - // 处理错误 - continue - } - columnValues[elemIndex] = values - } - - // 4. 过滤压缩数据 - compressedMatches := make([]*Tuple, 0) - for i := 0; i < block.TupleCount; i++ { - // 检查增量存储中是否已有此 ID 的元组 (更新或删除) - tupleID := block.getTupleID(i) - if block.DeltaStore.isModified(tupleID) { - continue - } - - // 检查是否满足所有条件 - match := true - for _, cond := range conditions { - elemValue := columnValues[cond.ElementIndex][i] - if !cond.matches(elemValue) { - match = false - break - } - } - - if match { - // 只解压需要的元组 - tuple, err := block.decompressTupleAt(i) - if err == nil { - compressedMatches = append(compressedMatches, tuple) - } - } - } - - // 5. 合并结果 - return append(deltaMatches, compressedMatches...) -} -解压单个列 -func (block *Block) decompressColumn(elemIndex int) ([]interface{}, error) { - col := block.Columns[elemIndex] - - switch col.EncodingType { - case EncodingDictionary: - return decodeDictionary(col) - case EncodingDelta: - return decodeDelta(col) - case EncodingRLE: - return decodeRLE(col) - default: - return nil, fmt.Errorf("unsupported encoding: %v", col.EncodingType) - } -} -6. 高级优化机制 -6.1 自适应索引重平衡 -func (idx *DecisionTreeIndex) rebalance(tupleStore *TupleStore) { - // 1. 分析节点访问模式 - nodeStats := idx.collectNodeStatistics() - - // 2. 识别热点和冷点路径 - hotPaths, coldPaths := idx.identifyHotAndColdPaths(nodeStats) - - // 3. 重新评估元素顺序 - newFieldOrder := idx.optimizeFieldOrder(tupleStore, hotPaths) - - // 4. 如果元素顺序有较大变化,重建索引 - if idx.shouldRebuild(newFieldOrder) { - idx.rebuild(tupleStore, newFieldOrder) - } else { - // 5. 否则只调整有问题的分支 - idx.adjustBranches(hotPaths, coldPaths) - } -} -6.2 块合并与分裂策略 -合并增量存储到压缩列 -func (block *Block) mergeDeltas() error { - // 跳过,如果没有足够变更 - if block.DeltaStore.ChangeCount < minChangeThreshold { - return nil - } - - // 1. 应用删除操作 - for id := range block.DeltaStore.DeletedIDs { - block.markDeleted(id) - } - - // 2. 提取所有未删除的当前元组 - tuples := block.getAllActiveTuples() - - // 3. 应用更新操作 - for id, updatedTuple := range block.DeltaStore.UpdatedTuples { - // 替换或添加 - tuples[id] = updatedTuple - } - - // 4. 添加新增元组 - for _, newTuple := range block.DeltaStore.AddedTuples { - tuples[newTuple.ID] = newTuple - } - - // 5. 将所有元组转换为列式格式 - columns := convertToColumns(tuples) - - // 6. 压缩每一列 - newCompressedColumns := make([]*CompressedColumn, len(columns)) - for i, column := range columns { - compressed, err := compressColumn(column, block.selectEncodingType(i, column)) - if err != nil { - return err - } - newCompressedColumns[i] = compressed - } - - // 7. 替换旧压缩列并重置增量存储 - block.Columns = newCompressedColumns - block.DeltaStore = newDeltaStore() - block.updateMetadata() - - return nil -} -检查是否需要分裂块 -func (block *Block) checkSplitRequired() bool { - // 基于以下因素决定是否分裂: - // 1. 块大小 (字节) - if block.sizeInBytes() > maxBlockSize { - return true - } - - // 2. 元组数量 - if block.TupleCount > maxTuplesPerBlock { - return true - } - - // 3. 压缩率降低 - if block.compressionRatio() < minCompressionRatio { - return true - } - - return false -} -分裂块策略 -func (block *Block) split() []*Block { - // 1. 选择分裂维度 - splitElementIndex := block.selectSplitDimension() - - // 2. 确定分裂点 - splitValue := block.determineSplitPoint(splitElementIndex) - - // 3. 创建两个新块 - leftBlock := newBlock() - rightBlock := newBlock() - - // 4. 将元组分配到新块 - tuples := block.getAllTuples() - for _, tuple := range tuples { - elemValue := tuple.Elements[splitElementIndex].Value - - if compareValues(elemValue, splitValue) <= 0 { - leftBlock.DeltaStore.AddTuple(tuple) - } else { - rightBlock.DeltaStore.AddTuple(tuple) - } - } - - // 5. 压缩新块 - leftBlock.mergeDeltas() - rightBlock.mergeDeltas() - - return []*Block{leftBlock, rightBlock} -} -7. 性能与内存使用分析 -7.1 压缩率分析 -典型数据集的预期压缩率: -● 整数列: 60-85% 压缩率 (取决于值域和分布) -● 浮点列: 40-75% 压缩率 (取决于精度需求) -● 字符串列: 70-95% 压缩率 (取决于重复模式和长度) -● 布尔列: 85-98% 压缩率 (使用位压缩) -7.2 时间复杂度 -主要操作的计算复杂度: -● 插入: O(log N) - 索引更新的复杂度 -● ID 查询: O(1) 到 O(log N) - 取决于索引实现 -● 条件查询: O(log N + kc) - k 为匹配元组数,c 为条件数 -● 范围查询: O(log N + r) - r 为范围内元组数 -● 更新: O(log N) - 索引查找 + O(1) 增量更新 -● 删除: O(log N) - 仅标记删除 -7.3 内存用量示例 -对于 100 万个 5 元素元组的数据集: -● 原始大小: ~100MB - ○ (假设每个元素平均大小为 20 字节,每个元组 100 字节) -● 压缩后总大小: ~20-30MB - ○ - ■ 压缩数据: ~15-25MB - ○ - ■ 索引开销: ~3-5MB - ○ - ■ 元数据与字典: ~1-2MB - ○ - ■ 增量缓冲区: ~1-3MB -通过这种设计,我们实现了高压缩率的数据存储,同时保持了快速的增删改查性能。系统能够根据数据特性和访问模式自动调整存储策略,在空间效率和访问性能之间取得平衡。 +# TupleStore + +TupleStore 是一个高性能的元组存储系统,支持以下特性: + +- 高效的元组存储和检索 +- 支持多种数据类型(int32, int64, float32, float64, string, bool, time) +- 支持条件查询和复合查询 +- 支持并发操作 +- 支持数据压缩 +- 提供完整的指标监控 + +## 项目结构 + +``` +. +├── main.go # 主程序入口,包含示例代码 +├── tuplestore/ # 核心实现包 +│ ├── store.go # 存储核心实现,包含 TupleStore 和 Segment 结构 +│ ├── types.go # 数据类型定义和元组结构 +│ ├── compression.go # 数据压缩实现 +│ ├── query.go # 查询操作实现 +│ ├── operations.go # 基本操作实现(插入、更新、删除) +│ ├── index.go # 索引实现 +│ ├── metrics.go # 性能指标监控 +│ ├── main_test.go # 主测试文件 +│ └── tuplestore_test.go # 存储测试文件 +└── benchmarks/ # 性能测试目录 +``` + +## 核心组件说明 + +### 1. 存储结构 +- `TupleStore`: 顶层存储容器,管理多个 Segment +- `Segment`: 存储段,包含多个 Block +- `Block`: 数据块,包含主存储和增量存储 +- `DeltaStore`: 增量存储,处理最近的操作 +- `MainStore`: 主存储,存储压缩后的数据 + +### 2. 数据类型 +支持多种数据类型: +- 整数:int32, int64 +- 浮点数:float32, float64 +- 字符串:string +- 布尔值:bool +- 时间:time.Time + +### 3. 压缩机制 +- 字典编码:适用于低基数列 +- 增量编码:适用于有序数值列 +- 游程编码:适用于重复值多的列 +- 位打包:适用于小整数 +- 前缀压缩:适用于字符串 + +### 4. 查询功能 +- ID 查询:通过唯一标识符快速查找 +- 条件查询:支持多条件组合查询 +- 范围查询:支持数值和时间的范围查询 + +## 安装 + +```bash +# 克隆项目 +git clone https://github.com/yourusername/go-tuplestore.git +cd go-tuplestore + +# 安装依赖 +go mod download +``` + +## 运行 + +```bash +# 运行主程序 +go run . + +# 运行测试 +go test -v ./tuplestore + +# 运行性能测试 +go test -bench=. ./tuplestore/benchmarks +``` + +## 使用示例 + +```go +package main + +import ( + "fmt" + "time" + "go-tuplestore/tuplestore" +) + +func main() { + // 创建配置 + config := tuplestore.Config{ + MergeThreshold: 100, + MaxDeltaSize: 1024 * 1024, // 1MB + MaxMergeInterval: time.Hour, + InitialSegments: 4, + BlocksPerSegment: 10, + } + + // 初始化 TupleStore + store := tuplestore.NewTupleStore(config) + + // 插入元组 + tuple := &tuplestore.Tuple{ + ID: "user1", + Data: []byte("example data"), + } + if err := store.Insert(tuple); err != nil { + fmt.Printf("插入失败: %v\n", err) + return + } + + // 查询元组 + result, err := store.GetByID("user1") + if err != nil { + fmt.Printf("查询失败: %v\n", err) + return + } + fmt.Printf("查询结果: %+v\n", result) +} +``` + +## 性能指标 + +- 插入性能:10万 QPS +- 查询性能:20万 QPS +- 压缩比率:平均 50% + +## 注意事项 + +1. 系统使用内存存储,请确保有足够的内存空间 +2. 建议根据实际数据特征调整配置参数 +3. 定期监控系统指标,及时调整存储策略 +4. 对于大量数据,建议使用分布式部署 + +# 语雀文档 + +有序多元组的高效数据结构与存储详解 +1. 数据结构核心设计 +1.1 元组基础结构 +// Tuple 表示单个多元组,保持元素有序 +type Tuple struct { + ID string // 唯一标识符 + Elements []Element // 有序元素列表 + Created int64 // 创建时间戳 + Updated int64 // 最后更新时间戳 +} + +// Element 表示元组中的单个元素 +type Element struct { + Index int // 元素在元组中的位置索引 + Type ElementType // 元素类型(整数、浮点数、字符串等) + Value interface{} // 元素值 +} + +// ElementType 定义元素类型枚举 +type ElementType byte + +const ( + TypeNull ElementType = iota + TypeInt + TypeFloat + TypeString + TypeBool + TypeBytes + TypeArray +) +1.2 分层存储架构 +// TupleStore 主存储容器 +type TupleStore struct { + // 分段存储 - 按时间或 ID 范围分段 + segments []*Segment + // 索引层 - 提供快速查找能力 + indices map[string]Index + // 元数据字典 - 存储全局信息 + metaDict *MetaDictionary +} + +// Segment 表示一个存储段 +type Segment struct { + ID string + MinID string // 段内最小 ID + MaxID string // 段内最大 ID + MinTime int64 // 段内最早时间戳 + MaxTime int64 // 段内最晚时间戳 + Blocks []*Block // 段内所有数据块 + BlockMap map[string]int // ID 到块索引的映射 + Stats *SegmentStatistics // 段统计信息 +} + +// Block 数据块 - 实际存储压缩数据的单元 +type Block struct { + ID string + TupleCount int // 块内元组数量 + ElementMap []int // 元素索引映射 + // 列式压缩存储 - 每列使用不同压缩方法 + Columns []*CompressedColumn + // 增量存储 - 未压缩的最近操作 + DeltaStore *DeltaStore + // 元数据 + MinValues []interface{} // 每列的最小值 + MaxValues []interface{} // 每列的最大值 +} +2. 详细的压缩存储实现 +2.1 列存储压缩方案 +// CompressedColumn 表示单个压缩列 +type CompressedColumn struct { + ElementIndex int // 对应元素索引 + DataType ElementType // 数据类型 + // 压缩元数据 + ValueCount int // 值的数量 + NullCount int // 空值数量 + NullBitmap []byte // 空值位图 + // 压缩方法与数据 + EncodingType EncodingType // 使用的编码类型 + EncodedData []byte // 压缩后的字节数据 + // 用于字典编码的字典 + Dictionary []interface{} // 值字典(仅用于字典编码) + DictMap map[interface{}]uint32 // 值到索引的映射 +} + +// EncodingType 编码类型枚举 +type EncodingType byte + +const ( + EncodingRaw EncodingType = iota + EncodingDictionary // 字典编码:将值替换为字典索引 + EncodingRLE // 游程编码:适用于重复值 + EncodingDelta // 增量编码:存储与前值的差异 + EncodingBitPacking // 位打包:对小整数使用较少位 + EncodingByteAligned // 字节对齐:为不同值使用最少字节 + EncodingPrefixCompression // 前缀压缩:用于字符串 + EncodingFrequencyEncoding // 频率编码:不同长度编码不同值 +) +2.2 具体压缩算法实现 +字典编码 (适用于基数较低的列) +func encodeDictionary(values []interface{}) (*CompressedColumn, error) { + col := &CompressedColumn{ + EncodingType: EncodingDictionary, + ValueCount: len(values), + } + + // 1. 构建唯一值字典 + valueset := make(map[interface{}]struct{}) + for _, v := range values { + valueset[v] = struct{}{} + } + + // 2. 为每个唯一值分配索引 + col.Dictionary = make([]interface{}, 0, len(valueset)) + col.DictMap = make(map[interface{}]uint32) + for v := range valueset { + idx := uint32(len(col.Dictionary)) + col.Dictionary = append(col.Dictionary, v) + col.DictMap[v] = idx + } + + // 3. 确定索引编码需要的位数 + dictSize := len(col.Dictionary) + bitsPerIndex := 0 + if dictSize <= 1 { + bitsPerIndex = 1 + } else { + bitsPerIndex = int(math.Ceil(math.Log2(float64(dictSize)))) + } + + // 4. 将值转换为索引并位打包 + indexBuffer := make([]uint32, len(values)) + for i, v := range values { + indexBuffer[i] = col.DictMap[v] + } + + writer := NewBitWriter(uint(bitsPerIndex)) + for _, idx := range indexBuffer { + writer.Write(idx) + } + col.EncodedData = writer.Bytes() + return col, nil +} +增量编码 (适用于有序数值列) +func encodeDelta(values []int64) (*CompressedColumn, error) { + col := &CompressedColumn{ + EncodingType: EncodingDelta, + ValueCount: len(values), + } + + if len(values) == 0 { + return col, nil + } + + // 1. 计算基值和增量值 + baseValue := values[0] + deltas := make([]int64, len(values)) + deltas[0] = 0 // 第一个值的增量为 0 + for i := 1; i < len(values); i++ { + deltas[i] = values[i] - values[i-1] + } + + // 2. 将基值存储为元数据 + baseValueBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(baseValueBytes, uint64(baseValue)) + + // 3. 为增量值使用可变长度编码 + varintBuffer := make([]byte, 0, len(values)*5) // 估计每个增量最多需要 5 字节 + for _, delta := range deltas { + // 使用 zigzag 编码处理负数 + zigzagDelta := (delta << 1) ^ (delta >> 63) + // 写入可变长度整数 + for zigzagDelta >= 128 { + varintBuffer = append(varintBuffer, byte(zigzagDelta)|128) + zigzagDelta >>= 7 + } + varintBuffer = append(varintBuffer, byte(zigzagDelta)) + } + + // 4. 组合基值和增量 + col.EncodedData = append(baseValueBytes, varintBuffer...) + return col, nil +} +游程编码 (适用于重复值多的列) +func encodeRLE(values []interface{}) (*CompressedColumn, error) { + col := &CompressedColumn{ + EncodingType: EncodingRLE, + ValueCount: len(values), + } + + if len(values) == 0 { + return col, nil + } + + // 压缩策略:(重复次数, 值) 对 + var buffer bytes.Buffer + currentValue := values[0] + runLength := 1 + + for i := 1; i < len(values); i++ { + if values[i] == currentValue { + runLength++ + } else { + // 写入 (runLength, value) 对 + writeRunLength(&buffer, runLength) + writeValue(&buffer, currentValue) + // 重置当前值和计数 + currentValue = values[i] + runLength = 1 + } + } + // 处理最后一组 + writeRunLength(&buffer, runLength) + writeValue(&buffer, currentValue) + + col.EncodedData = buffer.Bytes() + return col, nil +} + +// 使用可变长度编码写入重复计数 +func writeRunLength(buffer *bytes.Buffer, length int) { + for length >= 128 { + buffer.WriteByte(byte(length&127) | 128) + length >>= 7 + } + buffer.WriteByte(byte(length)) +} + +// 写入值 (根据类型使用不同编码) +func writeValue(buffer *bytes.Buffer, value interface{}) { + switch v := value.(type) { + case int64: + binary.Write(buffer, binary.LittleEndian, v) + case float64: + binary.Write(buffer, binary.LittleEndian, v) + case string: + writeRunLength(buffer, len(v)) + buffer.WriteString(v) + case bool: + if v { + buffer.WriteByte(1) + } else { + buffer.WriteByte(0) + } + } +} +2.3 增量存储设计 +// DeltaStore 存储未压缩的增量变更 +type DeltaStore struct { + // 新增的元组 + AddedTuples []*Tuple + // 更新的元组 (ID -> 新元组) + UpdatedTuples map[string]*Tuple + // 已删除的元组 ID 集合 + DeletedIDs map[string]struct{} + // 变更计数,用于决定何时触发合并 + ChangeCount int +} + +// 添加新元组到增量存储 +func (ds *DeltaStore) AddTuple(tuple *Tuple) { + ds.AddedTuples = append(ds.AddedTuples, tuple) + ds.ChangeCount++ +} + +// 标记元组删除 +func (ds *DeltaStore) DeleteTuple(id string) { + if ds.DeletedIDs == nil { + ds.DeletedIDs = make(map[string]struct{}) + } + ds.DeletedIDs[id] = struct{}{} + ds.ChangeCount++ +} + +// 更新元组 +func (ds *DeltaStore) UpdateTuple(tuple *Tuple) { + if ds.UpdatedTuples == nil { + ds.UpdatedTuples = make(map[string]*Tuple) + } + ds.UpdatedTuples[tuple.ID] = tuple + ds.ChangeCount++ +} +3. 物理存储布局 +3.1 内存布局 +TupleStore +├── 元数据区 (MetaDictionary) +│ ├── 字段信息表 +│ ├── 类型词典 +│ └── 统计信息 +├── 段管理区 (Segments) +│ ├── Segment 1 +│ │ ├── 段元信息 +│ │ └── 块索引表 +│ ├── Segment 2 +│ │ ... +│ ├── 数据块区 (Blocks) +│ ├── Block 1 +│ │ ├── 块元信息 +│ │ ├── 列1 (压缩) +│ │ │ ├── 压缩元数据 +│ │ │ ├── 空值位图 +│ │ │ ├── 字典 (如适用) +│ │ │ └── 压缩数据 +│ │ ├── 列2 (压缩) +│ │ │ ... +│ │ └── 增量存储 +│ │ ├── 新增元组 +│ │ ├── 更新元组 +│ │ └── 删除 ID 集合 +│ ├── Block 2 +│ │ ... +└── 索引区 (Indices) + ├── 主键索引 + │ └── B+树/哈希表结构 + └── 二级索引 + └── 多级决策树结构 +3.2 详细内存布局示例 +以一个包含 100 万条 5 元素元组的存储示例: +● 元数据区 (~1KB) + ○ 5 个元素定义: {索引: 0-4, 类型: int, float, string, bool, int} + ○ 总元组数: 1,000,000 + ○ 段数: 10 + ○ 块数: 100 (每段 10 块) +● 段 0 (~100KB) + ○ ID 范围: "0000" - "0999" + ○ 时间范围: 1609459200 - 1609545599 + ○ 块索引: {块 ID -> 内存位置} 映射 (10 条) + ○ 统计摘要: {每列最小/最大值,不同值数量} +● 块 0 (~1MB) + ○ 元组数: 10,000 + ○ 元素 0 (int 列): + ■ 编码: 位打包 (11 位/值) + ■ 最小值: 1000 + ■ 最大值: 3050 + ■ 压缩数据: [13.75KB] (vs 原始 40KB, 压缩率 66%) + ○ 元素 1 (float 列): + ■ 编码: 字典编码 (只有 500 个不同值) + ■ 字典: [4KB] (500 个浮点数) + ■ 压缩数据: [12.5KB] (vs 原始 80KB, 压缩率 80%) + ○ 元素 2 (string 列): + ■ 编码: 前缀压缩 + 字典编码 + ■ 字典: [20KB] (2000 个字符串前缀和后缀) + ■ 压缩数据: [35KB] (vs 原始 250KB, 压缩率 86%) + ○ 元素 3 (bool 列): + ■ 编码: 位图编码 + ■ 压缩数据: [1.25KB] (vs 原始 10KB, 压缩率 87.5%) + ○ 元素 4 (int 列): + ■ 编码: 增量编码 + ■ 基准值: 100 + ■ 压缩数据: [15KB] (vs 原始 40KB, 压缩率 62.5%) + ○ 增量存储: + ■ 新增元组: 150 条 [~15KB] + ■ 更新元组: 75 条 [~7.5KB] + ■ 删除 ID 集合: 25 个 ID [~0.5KB] +4. 索引结构详解 +4.1 主键索引 (ID 查询优化) +// IDIndex 主键索引 +type IDIndex struct { + // 使用 B+ 树存储 ID 到位置的映射 + tree *btree.BTree +} + +// IDIndexEntry 表示索引中的一个条目 +type IDIndexEntry struct { + ID string // 元组 ID + SegmentID string // 段 ID + BlockID string // 块 ID + Deleted bool // 删除标记 +} + +// 查找元组位置 +func (idx *IDIndex) LookupID(id string) (*IDIndexEntry, bool) { + key := &IDIndexEntry{ID: id} + item := idx.tree.Get(key) + if item == nil { + return nil, false + } + + entry := item.(*IDIndexEntry) + if entry.Deleted { + return nil, false + } + return entry, true +} +4.2 多级决策树索引 (多条件查询优化) +// DecisionTreeIndex 多级决策树索引 +type DecisionTreeIndex struct { + Root *DTNode // 根节点 + FieldOrder []int // 索引的元素顺序 (按选择性排序) + Depth int // 树最大深度 + UpdateCount int64 // 更新计数,用于触发重平衡 +} + +// DTNode 决策树节点 +type DTNode struct { + ElementIndex int // 当前节点的元素索引 + SplitType SplitType // 节点分裂类型 + // 等值分支 (精确匹配) + ValueBranches map[interface{}]*DTNode + // 范围分支 (范围匹配) + RangeBranches []*DTRangeBranch + // 对应的块引用 (叶子节点) + BlockRefs []BlockRef + // 统计信息,用于自适应优化 + QueryCount int64 + HitCount int64 +} + +// 范围分支 +type DTRangeBranch struct { + Min interface{} + Max interface{} + NextNode *DTNode +} + +// 块引用 +type BlockRef struct { + SegmentID string + BlockID string + // 块内元组计数 (用于查询规划) + TupleCount int +} +5. 详细操作实现 +5.1 插入操作 +func (ts *TupleStore) Insert(tuple *Tuple) error { + // 1. 检查 ID 唯一性 + if _, exists := ts.idIndex.LookupID(tuple.ID); exists { + return ErrDuplicateID + } + + // 2. 选择最佳段和块 + segment, block := ts.selectOptimalLocation(tuple) + + // 3. 添加到块的增量存储 + block.DeltaStore.AddTuple(tuple) + + // 4. 更新索引 + ts.updateIndices(tuple, segment.ID, block.ID) + + // 5. 检查是否需要合并增量存储 + if block.DeltaStore.ChangeCount >= ts.mergeThreshold { + // 异步触发合并操作 + ts.scheduleBlockMerge(segment.ID, block.ID) + } + + return nil +} + +// 选择最佳位置 +func (ts *TupleStore) selectOptimalLocation(tuple *Tuple) (*Segment, *Block) { + // 基于元组的特征选择最佳放置位置 + segmentID := ts.selectSegment(tuple) + segment := ts.segments[segmentID] + + blockID := segment.selectBlock(tuple) + block := segment.Blocks[blockID] + + return segment, block +} + +// 选择段的策略 +func (ts *TupleStore) selectSegment(tuple *Tuple) string { + // 可以基于多种策略: + // 1. ID 范围分段 + // 2. 时间范围分段 + // 3. 元素值分段 + // 这里使用简单的 ID 哈希策略 + hash := xxhash.Sum64String(tuple.ID) + segmentIndex := hash % uint64(len(ts.segments)) + return ts.segments[segmentIndex].ID +} +5.2 查询操作 +执行单 ID 查询 +func (ts *TupleStore) GetByID(id string) (*Tuple, error) { + // 1. 通过 ID 索引查找位置 + entry, exists := ts.idIndex.LookupID(id) + if !exists { + return nil, ErrNotFound + } + + // 2. 获取对应段和块 + segment := ts.getSegment(entry.SegmentID) + block := segment.getBlock(entry.BlockID) + + // 3. 先查找增量存储 + if tuple := block.DeltaStore.findTuple(id); tuple != nil { + return tuple, nil + } + + // 4. 查找压缩数据 + tuple, err := block.decompressTuple(id) + if err != nil { + return nil, err + } + + return tuple, nil +} +条件查询 +func (ts *TupleStore) Query(conditions []Condition) ([]*Tuple, error) { + // 1. 分析查询条件,确定最佳索引 + index, plan := ts.selectBestIndex(conditions) + + // 2. 使用索引获取候选块 + candidateBlocks := index.findCandidateBlocks(conditions, plan) + + // 3. 并行处理候选块 + resultChan := make(chan []*Tuple, len(candidateBlocks)) + var wg sync.WaitGroup + + for _, blockRef := range candidateBlocks { + wg.Add(1) + go func(ref BlockRef) { + defer wg.Done() + + segment := ts.getSegment(ref.SegmentID) + block := segment.getBlock(ref.BlockID) + + // 过滤块内元组 + matches := block.queryTuples(conditions) + resultChan <- matches + }(blockRef) + } + + go func() { + wg.Wait() + close(resultChan) + }() + + // 4. 收集并合并结果 + allMatches := make([]*Tuple, 0) + for matches := range resultChan { + allMatches = append(allMatches, matches...) + } + + return allMatches, nil +} +5.3 块内查询和解压缩 +块内查询 +func (block *Block) queryTuples(conditions []Condition) []*Tuple { + // 1. 检查列范围能否快速排除整个块 + for _, cond := range conditions { + elemIndex := cond.ElementIndex + if !cond.matchesRange(block.MinValues[elemIndex], block.MaxValues[elemIndex]) { + return nil // 整个块都不匹配 + } + } + + // 2. 从增量存储中查找匹配元组 + deltaMatches := block.DeltaStore.queryTuples(conditions) + + // 3. 选择性解压缩 + // 确定需要解压的列 + requiredColumns := make(map[int]bool) + for _, cond := range conditions { + requiredColumns[cond.ElementIndex] = true + } + + // 先解压条件涉及的列 + columnValues := make(map[int][]interface{}) + for elemIndex := range requiredColumns { + values, err := block.decompressColumn(elemIndex) + if err != nil { + // 处理错误 + continue + } + columnValues[elemIndex] = values + } + + // 4. 过滤压缩数据 + compressedMatches := make([]*Tuple, 0) + for i := 0; i < block.TupleCount; i++ { + // 检查增量存储中是否已有此 ID 的元组 (更新或删除) + tupleID := block.getTupleID(i) + if block.DeltaStore.isModified(tupleID) { + continue + } + + // 检查是否满足所有条件 + match := true + for _, cond := range conditions { + elemValue := columnValues[cond.ElementIndex][i] + if !cond.matches(elemValue) { + match = false + break + } + } + + if match { + // 只解压需要的元组 + tuple, err := block.decompressTupleAt(i) + if err == nil { + compressedMatches = append(compressedMatches, tuple) + } + } + } + + // 5. 合并结果 + return append(deltaMatches, compressedMatches...) +} +解压单个列 +func (block *Block) decompressColumn(elemIndex int) ([]interface{}, error) { + col := block.Columns[elemIndex] + + switch col.EncodingType { + case EncodingDictionary: + return decodeDictionary(col) + case EncodingDelta: + return decodeDelta(col) + case EncodingRLE: + return decodeRLE(col) + default: + return nil, fmt.Errorf("unsupported encoding: %v", col.EncodingType) + } +} +6. 高级优化机制 +6.1 自适应索引重平衡 +func (idx *DecisionTreeIndex) rebalance(tupleStore *TupleStore) { + // 1. 分析节点访问模式 + nodeStats := idx.collectNodeStatistics() + + // 2. 识别热点和冷点路径 + hotPaths, coldPaths := idx.identifyHotAndColdPaths(nodeStats) + + // 3. 重新评估元素顺序 + newFieldOrder := idx.optimizeFieldOrder(tupleStore, hotPaths) + + // 4. 如果元素顺序有较大变化,重建索引 + if idx.shouldRebuild(newFieldOrder) { + idx.rebuild(tupleStore, newFieldOrder) + } else { + // 5. 否则只调整有问题的分支 + idx.adjustBranches(hotPaths, coldPaths) + } +} +6.2 块合并与分裂策略 +合并增量存储到压缩列 +func (block *Block) mergeDeltas() error { + // 跳过,如果没有足够变更 + if block.DeltaStore.ChangeCount < minChangeThreshold { + return nil + } + + // 1. 应用删除操作 + for id := range block.DeltaStore.DeletedIDs { + block.markDeleted(id) + } + + // 2. 提取所有未删除的当前元组 + tuples := block.getAllActiveTuples() + + // 3. 应用更新操作 + for id, updatedTuple := range block.DeltaStore.UpdatedTuples { + // 替换或添加 + tuples[id] = updatedTuple + } + + // 4. 添加新增元组 + for _, newTuple := range block.DeltaStore.AddedTuples { + tuples[newTuple.ID] = newTuple + } + + // 5. 将所有元组转换为列式格式 + columns := convertToColumns(tuples) + + // 6. 压缩每一列 + newCompressedColumns := make([]*CompressedColumn, len(columns)) + for i, column := range columns { + compressed, err := compressColumn(column, block.selectEncodingType(i, column)) + if err != nil { + return err + } + newCompressedColumns[i] = compressed + } + + // 7. 替换旧压缩列并重置增量存储 + block.Columns = newCompressedColumns + block.DeltaStore = newDeltaStore() + block.updateMetadata() + + return nil +} +检查是否需要分裂块 +func (block *Block) checkSplitRequired() bool { + // 基于以下因素决定是否分裂: + // 1. 块大小 (字节) + if block.sizeInBytes() > maxBlockSize { + return true + } + + // 2. 元组数量 + if block.TupleCount > maxTuplesPerBlock { + return true + } + + // 3. 压缩率降低 + if block.compressionRatio() < minCompressionRatio { + return true + } + + return false +} +分裂块策略 +func (block *Block) split() []*Block { + // 1. 选择分裂维度 + splitElementIndex := block.selectSplitDimension() + + // 2. 确定分裂点 + splitValue := block.determineSplitPoint(splitElementIndex) + + // 3. 创建两个新块 + leftBlock := newBlock() + rightBlock := newBlock() + + // 4. 将元组分配到新块 + tuples := block.getAllTuples() + for _, tuple := range tuples { + elemValue := tuple.Elements[splitElementIndex].Value + + if compareValues(elemValue, splitValue) <= 0 { + leftBlock.DeltaStore.AddTuple(tuple) + } else { + rightBlock.DeltaStore.AddTuple(tuple) + } + } + + // 5. 压缩新块 + leftBlock.mergeDeltas() + rightBlock.mergeDeltas() + + return []*Block{leftBlock, rightBlock} +} +7. 性能与内存使用分析 +7.1 压缩率分析 +典型数据集的预期压缩率: +● 整数列: 60-85% 压缩率 (取决于值域和分布) +● 浮点列: 40-75% 压缩率 (取决于精度需求) +● 字符串列: 70-95% 压缩率 (取决于重复模式和长度) +● 布尔列: 85-98% 压缩率 (使用位压缩) +7.2 时间复杂度 +主要操作的计算复杂度: +● 插入: O(log N) - 索引更新的复杂度 +● ID 查询: O(1) 到 O(log N) - 取决于索引实现 +● 条件查询: O(log N + kc) - k 为匹配元组数,c 为条件数 +● 范围查询: O(log N + r) - r 为范围内元组数 +● 更新: O(log N) - 索引查找 + O(1) 增量更新 +● 删除: O(log N) - 仅标记删除 +7.3 内存用量示例 +对于 100 万个 5 元素元组的数据集: +● 原始大小: ~100MB + ○ (假设每个元素平均大小为 20 字节,每个元组 100 字节) +● 压缩后总大小: ~20-30MB + ○ + ■ 压缩数据: ~15-25MB + ○ + ■ 索引开销: ~3-5MB + ○ + ■ 元数据与字典: ~1-2MB + ○ + ■ 增量缓冲区: ~1-3MB +通过这种设计,我们实现了高压缩率的数据存储,同时保持了快速的增删改查性能。系统能够根据数据特性和访问模式自动调整存储策略,在空间效率和访问性能之间取得平衡。 diff --git a/main.go b/main.go index f8f47fe..192d1b4 100644 --- a/main.go +++ b/main.go @@ -1,99 +1,99 @@ -package main - -import ( - "encoding/binary" - "fmt" - "time" - - "go-tuplestore/tuplestore" -) - -func main() { - // 创建配置 - config := tuplestore.Config{ - MergeThreshold: 100, - MaxDeltaSize: 1024 * 1024, // 1MB - MaxMergeInterval: time.Hour, - InitialSegments: 4, - BlocksPerSegment: 10, - } - - // 定义元组数据格式 - schema := []tuplestore.ElementInfo{ - {Type: tuplestore.TypeString, Offset: 0, Size: 50, Nullable: false}, // ID - {Type: tuplestore.TypeInt32, Offset: 50, Size: 4, Nullable: false}, // Age - {Type: tuplestore.TypeString, Offset: 54, Size: 100, Nullable: true}, // Name - {Type: tuplestore.TypeTime, Offset: 154, Size: 8, Nullable: false}, // CreatedAt - } - - // 初始化 TupleStore - store := tuplestore.NewTupleStore(config) - - // 创建测试元组 - createTuple := func(id string, age int32, name string) *tuplestore.Tuple { - data := make([]byte, 162) // 总大小:50 + 4 + 100 + 8 - copy(data[0:50], []byte(id)) - binary.LittleEndian.PutUint32(data[50:54], uint32(age)) - copy(data[54:154], []byte(name)) - binary.LittleEndian.PutUint64(data[154:162], uint64(time.Now().UnixNano())) - - return &tuplestore.Tuple{ - ID: id, - Data: data, - Timestamp: time.Now(), - Schema: schema, - } - } - - // 准备测试数据 - tuples := []*tuplestore.Tuple{ - createTuple("user1", 25, "Alice"), - createTuple("user2", 30, "Bob"), - createTuple("user3", 35, "Charlie"), - createTuple("user4", 25, "David"), - } - - // 批量插入 - if err := store.BatchInsert(tuples); err != nil { - fmt.Printf("批量插入失败: %v\n", err) - return - } - fmt.Println("批量插入成功") - - // 测试ID查询 - tuple, err := store.GetByID("user1") - if err != nil { - fmt.Printf("ID查询失败: %v\n", err) - } else { - fmt.Printf("ID查询成功: ID=%s\n", tuple.ID) - } - - // 测试条件查询 - conditions := []tuplestore.Condition{ - {ElementIndex: 1, Operator: "=", Value: int32(25)}, // Age = 25 - } - results, err := store.Query(conditions) - if err != nil { - fmt.Printf("条件查询失败: %v\n", err) - } else { - fmt.Printf("条件查询成功,找到%d条记录\n", len(results)) - for _, r := range results { - fmt.Printf(" - ID: %s\n", r.ID) - } - } - - // 测试复合条件查询 - conditions = []tuplestore.Condition{ - {ElementIndex: 1, Operator: ">", Value: int32(30)}, // Age > 30 - {ElementIndex: 2, Operator: "=", Value: "Charlie"}, // Name = "Charlie" - } - results, err = store.Query(conditions) - if err != nil { - fmt.Printf("复合条件查询失败: %v\n", err) - } else { - fmt.Printf("复合条件查询成功,找到%d条记录\n", len(results)) - for _, r := range results { - fmt.Printf(" - ID: %s\n", r.ID) - } - } -} +package main + +import ( + "encoding/binary" + "fmt" + "time" + + "go-tuplestore/tuplestore" +) + +func main() { + // 创建配置 + config := tuplestore.Config{ + MergeThreshold: 100, + MaxDeltaSize: 1024 * 1024, // 1MB + MaxMergeInterval: time.Hour, + InitialSegments: 4, + BlocksPerSegment: 10, + } + + // 定义元组数据格式 + schema := []tuplestore.ElementInfo{ + {Type: tuplestore.TypeString, Offset: 0, Size: 50, Nullable: false}, // ID + {Type: tuplestore.TypeInt32, Offset: 50, Size: 4, Nullable: false}, // Age + {Type: tuplestore.TypeString, Offset: 54, Size: 100, Nullable: true}, // Name + {Type: tuplestore.TypeTime, Offset: 154, Size: 8, Nullable: false}, // CreatedAt + } + + // 初始化 TupleStore + store := tuplestore.NewTupleStore(config) + + // 创建测试元组 + createTuple := func(id string, age int32, name string) *tuplestore.Tuple { + data := make([]byte, 162) // 总大小:50 + 4 + 100 + 8 + copy(data[0:50], []byte(id)) + binary.LittleEndian.PutUint32(data[50:54], uint32(age)) + copy(data[54:154], []byte(name)) + binary.LittleEndian.PutUint64(data[154:162], uint64(time.Now().UnixNano())) + + return &tuplestore.Tuple{ + ID: id, + Data: data, + Timestamp: time.Now(), + Schema: schema, + } + } + + // 准备测试数据 + tuples := []*tuplestore.Tuple{ + createTuple("user1", 25, "Alice"), + createTuple("user2", 30, "Bob"), + createTuple("user3", 35, "Charlie"), + createTuple("user4", 25, "David"), + } + + // 批量插入 + if err := store.BatchInsert(tuples); err != nil { + fmt.Printf("批量插入失败: %v\n", err) + return + } + fmt.Println("批量插入成功") + + // 测试ID查询 + tuple, err := store.GetByID("user1") + if err != nil { + fmt.Printf("ID查询失败: %v\n", err) + } else { + fmt.Printf("ID查询成功: ID=%s\n", tuple.ID) + } + + // 测试条件查询 + conditions := []tuplestore.Condition{ + {ElementIndex: 1, Operator: "=", Value: int32(25)}, // Age = 25 + } + results, err := store.Query(conditions) + if err != nil { + fmt.Printf("条件查询失败: %v\n", err) + } else { + fmt.Printf("条件查询成功,找到%d条记录\n", len(results)) + for _, r := range results { + fmt.Printf(" - ID: %s\n", r.ID) + } + } + + // 测试复合条件查询 + conditions = []tuplestore.Condition{ + {ElementIndex: 1, Operator: ">", Value: int32(30)}, // Age > 30 + {ElementIndex: 2, Operator: "=", Value: "Charlie"}, // Name = "Charlie" + } + results, err = store.Query(conditions) + if err != nil { + fmt.Printf("复合条件查询失败: %v\n", err) + } else { + fmt.Printf("复合条件查询成功,找到%d条记录\n", len(results)) + for _, r := range results { + fmt.Printf(" - ID: %s\n", r.ID) + } + } +} diff --git a/store.go b/store.go new file mode 100644 index 0000000..f8e9b0f --- /dev/null +++ b/store.go @@ -0,0 +1,108 @@ +package tuplestore + +import ( + "sync" +) + +type TupleStore struct { + mu sync.RWMutex + segments []*Segment + useCompression bool +} + +type Segment struct { + mu sync.RWMutex + Blocks []*Block +} + +type Block struct { + mu sync.RWMutex + MainStore *MainStore + DeltaStore *DeltaStore + CompressedColumns []*CompressedColumn +} + +type MainStore struct { + mu sync.RWMutex + tuples []*Tuple +} + +type DeltaStore struct { + mu sync.RWMutex + size int64 +} + +type Tuple struct { + Data []byte +} + +type CompressedColumn struct { + EncodedData []byte + NullBitmap []byte + EncodingType EncodingType + Dictionary []interface{} +} + +type EncodingType int + +const ( + EncodingDictionary EncodingType = iota +) + +// GetTotalSize 返回存储的总大小 +func (ts *TupleStore) GetTotalSize() int64 { + ts.mu.RLock() + defer ts.mu.RUnlock() + + var totalSize int64 + for _, segment := range ts.segments { + segment.mu.RLock() + for _, block := range segment.Blocks { + block.mu.RLock() + + if ts.useCompression && block.CompressedColumns != nil { + // 计算压缩列的大小 + for _, col := range block.CompressedColumns { + if col != nil { + // 计算压缩列的总大小: + // 1. 编码数据大小 + totalSize += int64(len(col.EncodedData)) + // 2. 空值位图大小 + totalSize += int64(len(col.NullBitmap)) + // 3. 字典大小(如果使用字典编码) + if col.EncodingType == EncodingDictionary && col.Dictionary != nil { + // 估算字典大小 + for _, v := range col.Dictionary { + switch val := v.(type) { + case string: + totalSize += int64(len(val)) + case []byte: + totalSize += int64(len(val)) + default: + totalSize += 8 // 假设其他类型平均8字节 + } + } + } + } + } + } else { + // 如果没有压缩或压缩数据不可用,计算原始数据大小 + // 计算主存储大小 + block.MainStore.mu.RLock() + for _, tuple := range block.MainStore.tuples { + totalSize += int64(len(tuple.Data)) + } + block.MainStore.mu.RUnlock() + + // 计算增量存储大小 + block.DeltaStore.mu.RLock() + totalSize += block.DeltaStore.size + block.DeltaStore.mu.RUnlock() + } + + block.mu.RUnlock() + } + segment.mu.RUnlock() + } + return totalSize +} diff --git a/tuplestore/.DS_Store b/tuplestore/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..d57e94c8bb2922ea612b0b9fe33ee8cf553789bf GIT binary patch literal 6148 zcmeHK%We}f6unLZboF0DUOqk`Y7mh`VXW3sl^D-+S+R;8 z*#-*zj1xMeAtiK5>58`-a0)mD{x=2q+pUvN0mU?;!uR_;QX@Z7KIS<&JVDw2sST}b zIQ}%9Sk59m{P!Nc~v>CGOsTTQR? z;BY>#$vgM%KYlSdjm~2ANw1y+@|Ct7i7D>NBUkY!K#ELRRWBp22Myy)Ve;3|4 z))vA4H}lVmu#&}RNIc_0@2_-Ef)yG{!RhT4;~6**W$vU zP93Q96#!Vpur{>$cY!&c#jeGLLCnB}p#lw6*eixGbc}n}uWNB((9lWP%ZIR$g}tE& z9Uag2v^$BeL03BkoB~%B*f8A&pa0LUzyDt)xt>$NDX>-wi0V=QsD~}tvvp;2eAap> s?@`#;Z(&eVP}$>HH~1*tL(zsgj~l?Q#f3qPz}%03mcdm{fq$yNFaPG=S^xk5 literal 0 HcmV?d00001 diff --git a/tuplestore/benchmarks/bench_test.go b/tuplestore/benchmarks/bench_test.go index 9504145..cd38e73 100644 --- a/tuplestore/benchmarks/bench_test.go +++ b/tuplestore/benchmarks/bench_test.go @@ -1,21 +1,21 @@ -package benchmarks - -import ( - "fmt" - "go-tuplestore/tuplestore" - "testing" -) - -func BenchmarkInsert(b *testing.B) { - store := tuplestore.NewTupleStore(tuplestore.Config{ - MergeThreshold: 1000, - }) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - _ = store.Insert(&tuplestore.Tuple{ - ID: fmt.Sprintf("id_%d", i), - Data: []byte("test data"), - }) - } -} +package benchmarks + +import ( + "fmt" + "go-tuplestore/tuplestore" + "testing" +) + +func BenchmarkInsert(b *testing.B) { + store := tuplestore.NewTupleStore(tuplestore.Config{ + MergeThreshold: 1000, + }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = store.Insert(&tuplestore.Tuple{ + ID: fmt.Sprintf("id_%d", i), + Data: []byte("test data"), + }) + } +} diff --git a/tuplestore/benchmarks/compression_bench_test.go b/tuplestore/benchmarks/compression_bench_test.go index cca55a8..0f67100 100644 --- a/tuplestore/benchmarks/compression_bench_test.go +++ b/tuplestore/benchmarks/compression_bench_test.go @@ -11,14 +11,14 @@ import ( // 创建不同数据特征的测试元组 func createTestData(size int, pattern string) []*tuplestore.Tuple { tuples := make([]*tuplestore.Tuple, size) - + // 创建示例schema schema := []tuplestore.ElementInfo{ {Type: tuplestore.TypeInt32, Offset: 0, Size: 4, Nullable: false}, {Type: tuplestore.TypeString, Offset: 4, Size: 100, Nullable: true}, {Type: tuplestore.TypeFloat64, Offset: 104, Size: 8, Nullable: false}, } - + switch pattern { case "random": // 随机数据,低压缩率 for i := 0; i < size; i++ { @@ -31,7 +31,7 @@ func createTestData(size int, pattern string) []*tuplestore.Tuple { Schema: schema, } } - + case "repetitive": // 重复数据,适合RLE baseData := make([]byte, 112) rand.Read(baseData) @@ -50,7 +50,7 @@ func createTestData(size int, pattern string) []*tuplestore.Tuple { Schema: schema, } } - + case "sequential": // 序列化数据,适合增量编码 baseData := make([]byte, 112) for i := 0; i < size; i++ { @@ -68,7 +68,7 @@ func createTestData(size int, pattern string) []*tuplestore.Tuple { Schema: schema, } } - + case "categorical": // 分类数据,适合字典编码 categories := [][]byte{ []byte("category_A"), @@ -76,7 +76,7 @@ func createTestData(size int, pattern string) []*tuplestore.Tuple { []byte("category_C"), []byte("category_D"), } - + for i := 0; i < size; i++ { data := make([]byte, 112) // 使用有限类别 @@ -90,7 +90,7 @@ func createTestData(size int, pattern string) []*tuplestore.Tuple { } } } - + return tuples } @@ -98,13 +98,13 @@ func createTestData(size int, pattern string) []*tuplestore.Tuple { func BenchmarkStandardVsCompressed(b *testing.B) { dataPatterns := []string{"random", "repetitive", "sequential", "categorical"} dataSizes := []int{1000, 10000} - + for _, pattern := range dataPatterns { for _, size := range dataSizes { b.Run(fmt.Sprintf("%s_data_%d_tuples", pattern, size), func(b *testing.B) { // 准备测试数据 tuples := createTestData(size, pattern) - + // 1. 标准存储性能 b.Run("standard_store", func(b *testing.B) { b.ResetTimer() @@ -116,7 +116,7 @@ func BenchmarkStandardVsCompressed(b *testing.B) { _ = store.BatchInsert(tuples) } }) - + // 2. 带压缩的存储性能 b.Run("compressed_store", func(b *testing.B) { b.ResetTimer() @@ -127,7 +127,7 @@ func BenchmarkStandardVsCompressed(b *testing.B) { UseCompression: true, // 启用压缩 }) _ = store.BatchInsert(tuples) - + // 获取所有段和块,执行压缩 for _, segment := range store.GetSegments() { for _, block := range segment.GetBlocks() { @@ -136,7 +136,7 @@ func BenchmarkStandardVsCompressed(b *testing.B) { } } }) - + // 3. 测量内存使用 b.Run("memory_usage_comparison", func(b *testing.B) { // 不使用压缩的存储 @@ -146,7 +146,7 @@ func BenchmarkStandardVsCompressed(b *testing.B) { }) _ = standardStore.BatchInsert(tuples) standardSize := standardStore.GetTotalSize() - + // 使用压缩的存储 compressedStore := tuplestore.NewTupleStore(tuplestore.Config{ MergeThreshold: size * 2, @@ -154,21 +154,21 @@ func BenchmarkStandardVsCompressed(b *testing.B) { UseCompression: true, }) _ = compressedStore.BatchInsert(tuples) - + // 压缩所有块 for _, segment := range compressedStore.GetSegments() { for _, block := range segment.GetBlocks() { _ = block.CompressBlock() } } - + compressedSize := compressedStore.GetTotalSize() - + // 记录压缩比率 compressionRatio := float64(standardSize) / float64(compressedSize) b.ReportMetric(compressionRatio, "compression_ratio") }) - + // 4. 测试查询性能 b.Run("query_performance", func(b *testing.B) { // 标准查询 @@ -177,7 +177,7 @@ func BenchmarkStandardVsCompressed(b *testing.B) { InitialSegments: 2, }) _ = standardStore.BatchInsert(tuples) - + // 压缩查询 compressedStore := tuplestore.NewTupleStore(tuplestore.Config{ MergeThreshold: size * 2, @@ -185,26 +185,26 @@ func BenchmarkStandardVsCompressed(b *testing.B) { UseCompression: true, }) _ = compressedStore.BatchInsert(tuples) - + // 压缩所有块 for _, segment := range compressedStore.GetSegments() { for _, block := range segment.GetBlocks() { _ = block.CompressBlock() } } - + // 查询条件 conditions := []tuplestore.Condition{ {ElementIndex: 0, Operator: ">", Value: int32(50)}, // 第一个元素大于50 } - + b.Run("standard_query", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { _, _ = standardStore.Query(conditions) } }) - + b.Run("compressed_query", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { @@ -221,11 +221,11 @@ func BenchmarkStandardVsCompressed(b *testing.B) { func BenchmarkCompressionAlgorithms(b *testing.B) { // 创建10000个元组的测试数据 tuples := createTestData(10000, "categorical") - + // 假设我们可以提取出第一列(int32)和第二列(string)的数据 col1Data := make([]interface{}, len(tuples)) col2Data := make([]interface{}, len(tuples)) - + for i, tuple := range tuples { // 从元组数据中提取列值(这里简化处理) val1, _ := tuplestore.ExtractElementFromTest(tuple.Data, 0, tuple.Schema) @@ -233,7 +233,7 @@ func BenchmarkCompressionAlgorithms(b *testing.B) { col1Data[i] = val1 col2Data[i] = val2 } - + // 测试字典编码 b.Run("dictionary_encoding", func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -241,7 +241,7 @@ func BenchmarkCompressionAlgorithms(b *testing.B) { _, _ = tuplestore.DecodeDictionaryTest(col) } }) - + // 测试RLE编码 b.Run("rle_encoding", func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -249,7 +249,7 @@ func BenchmarkCompressionAlgorithms(b *testing.B) { _, _ = tuplestore.DecodeRLETest(col) } }) - + // 测试增量编码(仅适用于数值类型的列) b.Run("delta_encoding", func(b *testing.B) { // 将col1Data转换为int64切片 @@ -259,7 +259,7 @@ func BenchmarkCompressionAlgorithms(b *testing.B) { int64Data[i] = int64(intVal) } } - + for i := 0; i < b.N; i++ { col, _ := tuplestore.EncodeDeltaTest(int64Data) _, _ = tuplestore.DecodeDeltaTest(col) @@ -272,7 +272,7 @@ func BenchmarkCompressedRead(b *testing.B) { // 创建有代表性的数据集 size := 10000 tuples := createTestData(size, "categorical") - + // 创建并填充store store := tuplestore.NewTupleStore(tuplestore.Config{ MergeThreshold: size * 2, @@ -280,14 +280,14 @@ func BenchmarkCompressedRead(b *testing.B) { UseCompression: true, }) _ = store.BatchInsert(tuples) - + // 压缩所有块 for _, segment := range store.GetSegments() { for _, block := range segment.GetBlocks() { _ = block.CompressBlock() } } - + // 随机选择几个ID进行读取测试 sampleSize := 100 sampleIDs := make([]string, sampleSize) @@ -295,7 +295,7 @@ func BenchmarkCompressedRead(b *testing.B) { idx := rand.Intn(size) sampleIDs[i] = tuples[idx].ID } - + b.ResetTimer() b.Run("compressed_read_by_id", func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -309,10 +309,11 @@ func BenchmarkCompressedRead(b *testing.B) { func TestCompressionRatioComparison(t *testing.T) { dataPatterns := []string{"random", "repetitive", "sequential", "categorical"} tupleCount := 10000 - + for _, pattern := range dataPatterns { tuples := createTestData(tupleCount, pattern) - + t.Logf("创建了 %d 个测试元组,模式: %s", len(tuples), pattern) + // 标准存储 standardStore := tuplestore.NewTupleStore(tuplestore.Config{ MergeThreshold: tupleCount * 2, @@ -320,7 +321,8 @@ func TestCompressionRatioComparison(t *testing.T) { }) _ = standardStore.BatchInsert(tuples) standardSize := standardStore.GetTotalSize() - + t.Logf("标准存储大小: %d 字节", standardSize) + // 压缩存储 compressedStore := tuplestore.NewTupleStore(tuplestore.Config{ MergeThreshold: tupleCount * 2, @@ -328,17 +330,52 @@ func TestCompressionRatioComparison(t *testing.T) { UseCompression: true, }) _ = compressedStore.BatchInsert(tuples) - - // 压缩所有块 + + // 确保所有块都有Schema for _, segment := range compressedStore.GetSegments() { for _, block := range segment.GetBlocks() { - _ = block.CompressBlock() + if len(block.Schema) == 0 && len(tuples) > 0 { + block.Schema = tuples[0].Schema + t.Logf("为块 %s 设置Schema", block.ID) + } + + err := block.CompressBlock() + if err != nil { + t.Logf("压缩块 %s 出错: %v", block.ID, err) + } else { + if block.CompressedColumns == nil || len(block.CompressedColumns) == 0 { + t.Logf("块 %s 未生成压缩列", block.ID) + } else { + t.Logf("块 %s 生成了 %d 个压缩列", block.ID, len(block.CompressedColumns)) + + // 打印每个压缩列的大小 + for idx, col := range block.CompressedColumns { + encodedSize := len(col.EncodedData) + nullBitmapSize := len(col.NullBitmap) + dictionarySize := 0 + if col.EncodingType == tuplestore.EncodingDictionary && col.Dictionary != nil { + dictionarySize = len(col.Dictionary) * 8 // 估计 + } + totalColSize := encodedSize + nullBitmapSize + dictionarySize + t.Logf(" 列 %d: 编码大小=%d, 位图大小=%d, 字典大小≈%d, 总大小≈%d", + idx, encodedSize, nullBitmapSize, dictionarySize, totalColSize) + } + } + } } } - + compressedSize := compressedStore.GetTotalSize() - - compressionRatio := float64(standardSize) / float64(compressedSize) + t.Logf("压缩存储大小: %d 字节", compressedSize) + + // 避免除零错误 + var compressionRatio float64 + if compressedSize > 0 { + compressionRatio = float64(standardSize) / float64(compressedSize) + } else { + compressionRatio = 0 + } + t.Logf("Data pattern: %s - Standard size: %d bytes, Compressed size: %d bytes, Ratio: %.2f", pattern, standardSize, compressedSize, compressionRatio) } diff --git a/tuplestore/compression.go b/tuplestore/compression.go index 42d8346..2e147b1 100644 --- a/tuplestore/compression.go +++ b/tuplestore/compression.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/binary" "errors" + "fmt" "io" "math" "time" @@ -704,18 +705,43 @@ func chooseCompression(values []interface{}, dataType ElementType) (*CompressedC // CompressBlock 压缩整个块的数据 func (b *Block) CompressBlock() error { + b.mu.Lock() + defer b.mu.Unlock() + + // 如果Schema为空,无法压缩 + if len(b.Schema) == 0 { + return errors.New("无法压缩:Schema未设置") + } + + // 检查块中是否有数据需要压缩 + b.MainStore.mu.RLock() + mainStoreEmpty := len(b.MainStore.tuples) == 0 + b.MainStore.mu.RUnlock() + + b.DeltaStore.mu.RLock() + deltaStoreEmpty := len(b.DeltaStore.tuples) == 0 + b.DeltaStore.mu.RUnlock() + + if mainStoreEmpty && deltaStoreEmpty { + return errors.New("无数据可压缩") + } + // 遍历每一列,进行压缩 for elemIndex, elemInfo := range b.Schema { // 提取列数据 values, err := b.decompressColumn(elemIndex, b.Schema) if err != nil { - return err + return fmt.Errorf("提取列数据错误: %w", err) + } + + if len(values) == 0 { + continue // 跳过空列 } // 压缩列数据 compressedCol, err := chooseCompression(values, elemInfo.Type) if err != nil { - return err + return fmt.Errorf("压缩列数据错误: %w", err) } // 存储压缩后的列 @@ -728,6 +754,20 @@ func (b *Block) CompressBlock() error { b.CompressedColumns[elemIndex] = compressedCol } + // 压缩完成后,如果启用了压缩模式,清理原始数据以释放内存 + if b.useCompression { + // 主存储中保存列式压缩数据后,清除行式原始数据 + b.MainStore.mu.Lock() + b.MainStore.tuples = make(map[string]*Tuple) // 清除原始数据 + b.MainStore.mu.Unlock() + + // 也清除增量存储中的数据 + b.DeltaStore.mu.Lock() + b.DeltaStore.tuples = make(map[string]*Tuple) + b.DeltaStore.size = 0 + b.DeltaStore.mu.Unlock() + } + return nil } @@ -843,23 +883,39 @@ func (b *Block) decompressColumn(elemIndex int, schema []ElementInfo) ([]interfa results := make([]interface{}, 0) // 从增量存储中提取 - b.DeltaStore.mu.Lock() + b.DeltaStore.mu.RLock() + deltaValues := make([]interface{}, 0, len(b.DeltaStore.tuples)) for _, tuple := range b.DeltaStore.tuples { + if tuple.Schema == nil { + tuple.Schema = schema // 确保元组有Schema + } if val, err := extractElement(tuple.Data, elemIndex, tuple.Schema); err == nil { - results = append(results, val) + deltaValues = append(deltaValues, val) } } - b.DeltaStore.mu.Unlock() + b.DeltaStore.mu.RUnlock() // 从主存储中提取 b.MainStore.mu.RLock() + mainValues := make([]interface{}, 0, len(b.MainStore.tuples)) for _, tuple := range b.MainStore.tuples { + if tuple.Schema == nil { + tuple.Schema = schema // 确保元组有Schema + } if val, err := extractElement(tuple.Data, elemIndex, tuple.Schema); err == nil { - results = append(results, val) + mainValues = append(mainValues, val) } } b.MainStore.mu.RUnlock() + // 合并结果 + results = append(results, deltaValues...) + results = append(results, mainValues...) + + if len(results) == 0 { + return nil, fmt.Errorf("列 %d 未找到数据", elemIndex) + } + return results, nil } diff --git a/tuplestore/main_test.go b/tuplestore/main_test.go index 5f26af1..d11ba79 100644 --- a/tuplestore/main_test.go +++ b/tuplestore/main_test.go @@ -1,205 +1,205 @@ -package tuplestore_test - -import ( - "encoding/json" - "fmt" - "os" - "sync" - "testing" - "time" - - "go-tuplestore/tuplestore" -) - -// 测试数据集结构 -type testTuple struct { - ID string `json:"id"` - Data string `json:"data"` - Timestamp time.Time `json:"timestamp"` -} - -// 加载测试数据 -func loadTestData(filename string) []*tuplestore.Tuple { - data, err := os.ReadFile(filename) - if err != nil { - panic(fmt.Sprintf("Failed to load test data: %v", err)) - } - - var tuples []testTuple - if err := json.Unmarshal(data, &tuples); err != nil { - panic(fmt.Sprintf("Failed to parse test data: %v", err)) - } - - var result []*tuplestore.Tuple - for _, t := range tuples { - result = append(result, &tuplestore.Tuple{ - ID: t.ID, - Data: []byte(t.Data), - Timestamp: t.Timestamp, - }) - } - return result -} - -// 创建默认配置 -func defaultConfig() tuplestore.Config { - return tuplestore.Config{ - MergeThreshold: 3, - MaxDeltaSize: 1024, - MaxMergeInterval: time.Minute, - InitialSegments: 2, - BlocksPerSegment: 2, - } -} - -func TestBasicOperations(t *testing.T) { - store := tuplestore.NewTupleStore(defaultConfig()) - - // 测试数据 - testTuples := []*tuplestore.Tuple{ - {ID: "id1", Data: []byte("data1"), Timestamp: time.Now()}, - {ID: "id2", Data: []byte("data2"), Timestamp: time.Now()}, - } - - t.Run("SingleInsert", func(t *testing.T) { - for _, tuple := range testTuples { - if err := store.Insert(tuple); err != nil { - t.Errorf("Insert failed: %v", err) - } - } - }) - - t.Run("DuplicateInsert", func(t *testing.T) { - err := store.Insert(testTuples[0]) - if err == nil || err.Error() != `duplicate tuple ID: "id1"` { - t.Errorf("Expected duplicate error, got: %v", err) - } - }) - - t.Run("BatchInsert", func(t *testing.T) { - newTuples := []*tuplestore.Tuple{ - {ID: "id3", Data: []byte("data3")}, - {ID: "id4", Data: []byte("data4")}, - } - if err := store.BatchInsert(newTuples); err != nil { - t.Errorf("BatchInsert failed: %v", err) - } - }) -} -func TestWithDataset(t *testing.T) { - // 硬编码绝对路径 - testDataPath := `D:\desktop\useful\算法\go-tuplestore\testdata\test_set1.json` - - if _, err := os.Stat(testDataPath); os.IsNotExist(err) { - t.Skipf("Skipping dataset test: %s not found", testDataPath) - } - - data := loadTestData(testDataPath) - store := tuplestore.NewTupleStore(tuplestore.Config{ - MergeThreshold: len(data) + 1, - InitialSegments: 4, - BlocksPerSegment: 2, - }) - - t.Run("DatasetInsert", func(t *testing.T) { - if err := store.BatchInsert(data); err != nil { - t.Errorf("Dataset insert failed: %v", err) - } - }) -} -func TestConcurrentOperations(t *testing.T) { - store := tuplestore.NewTupleStore(tuplestore.Config{ - MergeThreshold: 1000, - InitialSegments: 4, - BlocksPerSegment: 2, // 确保每个段有足够的块 - }) - - t.Run("ConcurrentInsert", func(t *testing.T) { - var wg sync.WaitGroup - workers := 10 - itemsPerWorker := 100 - errCh := make(chan error, workers*itemsPerWorker) - - for i := 0; i < workers; i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - for j := 0; j < itemsPerWorker; j++ { - tuple := &tuplestore.Tuple{ - ID: fmt.Sprintf("worker_%d_item_%d", workerID, j), - Data: []byte(fmt.Sprintf("data_%d_%d", workerID, j)), - } - if err := store.Insert(tuple); err != nil { - errCh <- fmt.Errorf("Worker %d: Insert failed: %v", workerID, err) - return - - } - } - }(i) - } - // 等待所有goroutine完成 - go func() { - wg.Wait() - close(errCh) - }() - - // 收集错误 - for err := range errCh { - t.Error(err) - } - }) - - t.Run("ConcurrentBatchInsert", func(t *testing.T) { - var wg sync.WaitGroup - batches := 5 - batchSize := 20 - - for i := 0; i < batches; i++ { - wg.Add(1) - go func(batchID int) { - defer wg.Done() - var tuples []*tuplestore.Tuple - for j := 0; j < batchSize; j++ { - tuples = append(tuples, &tuplestore.Tuple{ - ID: fmt.Sprintf("batch_%d_item_%d", batchID, j), - Data: []byte(fmt.Sprintf("batch_data_%d_%d", batchID, j)), - }) - } - if err := store.BatchInsert(tuples); err != nil { - t.Errorf("Batch %d failed: %v", batchID, err) - } - }(i) - } - wg.Wait() - }) -} - -func TestEdgeCases(t *testing.T) { - store := tuplestore.NewTupleStore(defaultConfig()) - - t.Run("EmptyTuple", func(t *testing.T) { - if err := store.Insert(nil); err == nil { - t.Error("Expected error for nil tuple") - } else if err.Error() != "nil tuple provided" { - t.Errorf("Expected nil tuple error, got: %v", err) - } - }) - - t.Run("EmptyID", func(t *testing.T) { - if err := store.Insert(&tuplestore.Tuple{ID: ""}); err == nil { - t.Error("Expected error for empty ID") - } else if err.Error() != "empty tuple ID" { - t.Errorf("Expected empty ID error, got: %v", err) - } - }) - - t.Run("LargeData", func(t *testing.T) { - largeData := make([]byte, 2*1024*1024) // 2MB - if err := store.Insert(&tuplestore.Tuple{ - ID: "large_data", - Data: largeData, - }); err != nil { - t.Errorf("Large data insert failed: %v", err) - } - }) -} +package tuplestore_test + +import ( + "encoding/json" + "fmt" + "os" + "sync" + "testing" + "time" + + "go-tuplestore/tuplestore" +) + +// 测试数据集结构 +type testTuple struct { + ID string `json:"id"` + Data string `json:"data"` + Timestamp time.Time `json:"timestamp"` +} + +// 加载测试数据 +func loadTestData(filename string) []*tuplestore.Tuple { + data, err := os.ReadFile(filename) + if err != nil { + panic(fmt.Sprintf("Failed to load test data: %v", err)) + } + + var tuples []testTuple + if err := json.Unmarshal(data, &tuples); err != nil { + panic(fmt.Sprintf("Failed to parse test data: %v", err)) + } + + var result []*tuplestore.Tuple + for _, t := range tuples { + result = append(result, &tuplestore.Tuple{ + ID: t.ID, + Data: []byte(t.Data), + Timestamp: t.Timestamp, + }) + } + return result +} + +// 创建默认配置 +func defaultConfig() tuplestore.Config { + return tuplestore.Config{ + MergeThreshold: 3, + MaxDeltaSize: 1024, + MaxMergeInterval: time.Minute, + InitialSegments: 2, + BlocksPerSegment: 2, + } +} + +func TestBasicOperations(t *testing.T) { + store := tuplestore.NewTupleStore(defaultConfig()) + + // 测试数据 + testTuples := []*tuplestore.Tuple{ + {ID: "id1", Data: []byte("data1"), Timestamp: time.Now()}, + {ID: "id2", Data: []byte("data2"), Timestamp: time.Now()}, + } + + t.Run("SingleInsert", func(t *testing.T) { + for _, tuple := range testTuples { + if err := store.Insert(tuple); err != nil { + t.Errorf("Insert failed: %v", err) + } + } + }) + + t.Run("DuplicateInsert", func(t *testing.T) { + err := store.Insert(testTuples[0]) + if err == nil || err.Error() != `duplicate tuple ID: "id1"` { + t.Errorf("Expected duplicate error, got: %v", err) + } + }) + + t.Run("BatchInsert", func(t *testing.T) { + newTuples := []*tuplestore.Tuple{ + {ID: "id3", Data: []byte("data3")}, + {ID: "id4", Data: []byte("data4")}, + } + if err := store.BatchInsert(newTuples); err != nil { + t.Errorf("BatchInsert failed: %v", err) + } + }) +} +func TestWithDataset(t *testing.T) { + // 硬编码绝对路径 + testDataPath := `D:\desktop\useful\算法\go-tuplestore\testdata\test_set1.json` + + if _, err := os.Stat(testDataPath); os.IsNotExist(err) { + t.Skipf("Skipping dataset test: %s not found", testDataPath) + } + + data := loadTestData(testDataPath) + store := tuplestore.NewTupleStore(tuplestore.Config{ + MergeThreshold: len(data) + 1, + InitialSegments: 4, + BlocksPerSegment: 2, + }) + + t.Run("DatasetInsert", func(t *testing.T) { + if err := store.BatchInsert(data); err != nil { + t.Errorf("Dataset insert failed: %v", err) + } + }) +} +func TestConcurrentOperations(t *testing.T) { + store := tuplestore.NewTupleStore(tuplestore.Config{ + MergeThreshold: 1000, + InitialSegments: 4, + BlocksPerSegment: 2, // 确保每个段有足够的块 + }) + + t.Run("ConcurrentInsert", func(t *testing.T) { + var wg sync.WaitGroup + workers := 10 + itemsPerWorker := 100 + errCh := make(chan error, workers*itemsPerWorker) + + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < itemsPerWorker; j++ { + tuple := &tuplestore.Tuple{ + ID: fmt.Sprintf("worker_%d_item_%d", workerID, j), + Data: []byte(fmt.Sprintf("data_%d_%d", workerID, j)), + } + if err := store.Insert(tuple); err != nil { + errCh <- fmt.Errorf("Worker %d: Insert failed: %v", workerID, err) + return + + } + } + }(i) + } + // 等待所有goroutine完成 + go func() { + wg.Wait() + close(errCh) + }() + + // 收集错误 + for err := range errCh { + t.Error(err) + } + }) + + t.Run("ConcurrentBatchInsert", func(t *testing.T) { + var wg sync.WaitGroup + batches := 5 + batchSize := 20 + + for i := 0; i < batches; i++ { + wg.Add(1) + go func(batchID int) { + defer wg.Done() + var tuples []*tuplestore.Tuple + for j := 0; j < batchSize; j++ { + tuples = append(tuples, &tuplestore.Tuple{ + ID: fmt.Sprintf("batch_%d_item_%d", batchID, j), + Data: []byte(fmt.Sprintf("batch_data_%d_%d", batchID, j)), + }) + } + if err := store.BatchInsert(tuples); err != nil { + t.Errorf("Batch %d failed: %v", batchID, err) + } + }(i) + } + wg.Wait() + }) +} + +func TestEdgeCases(t *testing.T) { + store := tuplestore.NewTupleStore(defaultConfig()) + + t.Run("EmptyTuple", func(t *testing.T) { + if err := store.Insert(nil); err == nil { + t.Error("Expected error for nil tuple") + } else if err.Error() != "nil tuple provided" { + t.Errorf("Expected nil tuple error, got: %v", err) + } + }) + + t.Run("EmptyID", func(t *testing.T) { + if err := store.Insert(&tuplestore.Tuple{ID: ""}); err == nil { + t.Error("Expected error for empty ID") + } else if err.Error() != "empty tuple ID" { + t.Errorf("Expected empty ID error, got: %v", err) + } + }) + + t.Run("LargeData", func(t *testing.T) { + largeData := make([]byte, 2*1024*1024) // 2MB + if err := store.Insert(&tuplestore.Tuple{ + ID: "large_data", + Data: largeData, + }); err != nil { + t.Errorf("Large data insert failed: %v", err) + } + }) +} diff --git a/tuplestore/metrics.go b/tuplestore/metrics.go index 8647c0d..d9e042e 100644 --- a/tuplestore/metrics.go +++ b/tuplestore/metrics.go @@ -1,40 +1,40 @@ -package tuplestore - -import "github.com/prometheus/client_golang/prometheus" - -func setupMetrics() Metrics { - return Metrics{ - InsertCount: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tuple_store_insert_total", - Help: "Total number of tuple insertions", - }), - InsertDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tuple_store_insert_duration_seconds", - Help: "Duration of tuple insert operations", - }), - MergeCount: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tuple_store_merge_total", - Help: "Total number of merge operations", - }), - MergeDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tuple_store_merge_duration_seconds", - Help: "Duration of merge operations", - }), - SegmentCount: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tuple_store_segments_total", - Help: "Total number of segments", - }), - BlockCount: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tuple_store_blocks_total", - Help: "Total number of blocks", - }), - DeltaStoreSize: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tuple_store_delta_size_bytes", - Help: "Size of delta stores in bytes", - }), - MainStoreSize: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "tuple_store_main_size_bytes", - Help: "Size of main stores in bytes", - }), - } +package tuplestore + +import "github.com/prometheus/client_golang/prometheus" + +func setupMetrics() Metrics { + return Metrics{ + InsertCount: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tuple_store_insert_total", + Help: "Total number of tuple insertions", + }), + InsertDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "tuple_store_insert_duration_seconds", + Help: "Duration of tuple insert operations", + }), + MergeCount: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tuple_store_merge_total", + Help: "Total number of merge operations", + }), + MergeDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "tuple_store_merge_duration_seconds", + Help: "Duration of merge operations", + }), + SegmentCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tuple_store_segments_total", + Help: "Total number of segments", + }), + BlockCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tuple_store_blocks_total", + Help: "Total number of blocks", + }), + DeltaStoreSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tuple_store_delta_size_bytes", + Help: "Size of delta stores in bytes", + }), + MainStoreSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tuple_store_main_size_bytes", + Help: "Size of main stores in bytes", + }), + } } \ No newline at end of file diff --git a/tuplestore/operations.go b/tuplestore/operations.go index 7b0fefb..05e4b0a 100644 --- a/tuplestore/operations.go +++ b/tuplestore/operations.go @@ -1,195 +1,218 @@ -package tuplestore - -import ( - "fmt" - "time" - - "github.com/cespare/xxhash" -) - -func (ts *TupleStore) Insert(tuple *Tuple) error { - if tuple == nil { - return ErrNilTuple - } - if tuple.ID == "" { - return ErrEmptyTupleID - } - - start := time.Now() - defer func() { - ts.metrics.InsertDuration.Observe(time.Since(start).Seconds()) - ts.metrics.InsertCount.Inc() - }() - - ts.mu.Lock() - defer ts.mu.Unlock() - - if _, _, exists := ts.idIndex.LookupID(tuple.ID); exists { - return fmt.Errorf("%w: %q", ErrDuplicateID, tuple.ID) - } - - segment, block, err := ts.selectOptimalLocation(tuple) - if err != nil { - return fmt.Errorf("failed to select location: %w", err) - } - - block.DeltaStore.AddTuple(tuple) - ts.updateIndices(tuple, segment.ID, block.ID) - - if ts.shouldMerge(block) { - go ts.scheduleBlockMerge(segment.ID, block.ID) - } - - return nil -} - -func (ts *TupleStore) BatchInsert(tuples []*Tuple) error { - if len(tuples) == 0 { - return nil - } - - start := time.Now() - defer func() { - ts.metrics.InsertDuration.Observe(time.Since(start).Seconds()) - ts.metrics.InsertCount.Add(float64(len(tuples))) - }() - - ts.mu.Lock() - defer ts.mu.Unlock() - - for _, tuple := range tuples { - if tuple == nil { - return ErrNilTuple - } - if tuple.ID == "" { - return ErrEmptyTupleID - } - if _, _, exists := ts.idIndex.LookupID(tuple.ID); exists { - return fmt.Errorf("%w: %q", ErrDuplicateID, tuple.ID) - } - } - - for _, tuple := range tuples { - segment, block, err := ts.selectOptimalLocation(tuple) - if err != nil { - return fmt.Errorf("failed to select location: %w", err) - } - block.DeltaStore.AddTuple(tuple) - ts.updateIndices(tuple, segment.ID, block.ID) - - if ts.shouldMerge(block) { - go ts.scheduleBlockMerge(segment.ID, block.ID) - } - } - - return nil -} - -func (ts *TupleStore) selectOptimalLocation(tuple *Tuple) (*Segment, *Block, error) { - segmentID := ts.selectSegment(tuple) - segment := ts.segments[segmentID] - segment.mu.RLock() - defer segment.mu.RUnlock() - - blockID, err := segment.selectBlock(tuple) - if err != nil { - return nil, nil, err - } - block := segment.Blocks[blockID] - - return segment, block, nil -} - -func (ts *TupleStore) selectSegment(tuple *Tuple) string { - if !tuple.Timestamp.IsZero() && len(ts.timeSegments) > 0 { - for _, seg := range ts.timeSegments { - if seg.created.Year() == tuple.Timestamp.Year() { - return seg.ID - } - } - } - - if len(ts.idRangeSegments) > 0 { - for _, seg := range ts.idRangeSegments { - if len(tuple.ID) > 0 && seg.ID[0] == tuple.ID[0] { - return seg.ID - } - } - } - - hash := xxhash.Sum64String(tuple.ID) - segmentIDs := make([]string, 0, len(ts.segments)) - for id := range ts.segments { - segmentIDs = append(segmentIDs, id) - } - return segmentIDs[hash%uint64(len(segmentIDs))] -} - -func (s *Segment) selectBlock(tuple *Tuple) (string, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if len(s.Blocks) == 0 { - return "", fmt.Errorf("no blocks available in segment %s", s.ID) - } - - hash := xxhash.Sum64String(tuple.ID) - blockIDs := make([]string, 0, len(s.Blocks)) - for id := range s.Blocks { - blockIDs = append(blockIDs, id) - } - return blockIDs[hash%uint64(len(blockIDs))], nil -} - -func (ts *TupleStore) shouldMerge(block *Block) bool { - block.mu.RLock() - defer block.mu.RUnlock() - - if block.DeltaStore.ChangeCount >= ts.mergeThreshold { - return true - } - - if block.DeltaStore.Size() >= ts.maxDeltaSize { - return true - } - - if time.Since(block.LastMergeTime) > ts.maxMergeInterval { - return true - } - - return false -} - -func (ts *TupleStore) scheduleBlockMerge(segmentID, blockID string) { - start := time.Now() - defer func() { - ts.metrics.MergeDuration.Observe(time.Since(start).Seconds()) - ts.metrics.MergeCount.Inc() - }() - - ts.mu.RLock() - segment, exists := ts.segments[segmentID] - ts.mu.RUnlock() - - if !exists { - return - } - - segment.mu.Lock() - block, exists := segment.Blocks[blockID] - segment.mu.Unlock() - - if !exists { - return - } - - block.mu.Lock() - defer block.mu.Unlock() - - block.mergeDeltaStore() - block.LastMergeTime = time.Now() -} - -func (ts *TupleStore) updateIndices(tuple *Tuple, segmentID, blockID string) { - ts.idIndex.Insert(tuple.ID, segmentID, blockID) -} \ No newline at end of file +package tuplestore + +import ( + "fmt" + "math" + "time" + + "github.com/cespare/xxhash" +) + +func (ts *TupleStore) Insert(tuple *Tuple) error { + if tuple == nil { + return ErrNilTuple + } + if tuple.ID == "" { + return ErrEmptyTupleID + } + + start := time.Now() + defer func() { + ts.metrics.InsertDuration.Observe(time.Since(start).Seconds()) + ts.metrics.InsertCount.Inc() + }() + + ts.mu.Lock() + defer ts.mu.Unlock() + + if _, _, exists := ts.idIndex.LookupID(tuple.ID); exists { + return fmt.Errorf("%w: %q", ErrDuplicateID, tuple.ID) + } + + segment, block, err := ts.selectOptimalLocation(tuple) + if err != nil { + return fmt.Errorf("failed to select location: %w", err) + } + + block.DeltaStore.AddTuple(tuple) + ts.updateIndices(tuple, segment.ID, block.ID) + + if ts.shouldMerge(block) { + go ts.scheduleBlockMerge(segment.ID, block.ID) + } + + return nil +} + +func (ts *TupleStore) BatchInsert(tuples []*Tuple) error { + if len(tuples) == 0 { + return nil + } + + // 从第一个元组获取Schema + schema := tuples[0].Schema + + ts.mu.Lock() + defer ts.mu.Unlock() + + // 简单负载均衡:找到元组数量最少的段和块 + var targetSegment *Segment + var targetBlock *Block + minTuples := math.MaxInt32 + + for _, segment := range ts.segments { + for _, block := range segment.Blocks { + block.mu.RLock() + deltaCount := len(block.DeltaStore.tuples) + mainCount := len(block.MainStore.tuples) + totalCount := deltaCount + mainCount + block.mu.RUnlock() + + if totalCount < minTuples { + minTuples = totalCount + targetSegment = segment + targetBlock = block + } + } + } + + if targetBlock == nil { + return fmt.Errorf("无法找到可用的区块") + } + + // 设置块的Schema + targetBlock.mu.Lock() + if len(targetBlock.Schema) == 0 { + targetBlock.Schema = schema + } + targetBlock.mu.Unlock() + + // 批量添加元组 + for _, tuple := range tuples { + // 检查ID是否已存在 + _, _, exists := ts.idIndex.LookupID(tuple.ID) + if exists { + continue // 跳过已存在的元组 + } + + // 添加到目标块 + targetBlock.DeltaStore.AddTuple(tuple) + + // 更新索引 + ts.idIndex.Insert(tuple.ID, targetSegment.ID, targetBlock.ID) + + // 更新指标 + ts.metrics.InsertCount.Inc() + } + + return nil +} + +func (ts *TupleStore) selectOptimalLocation(tuple *Tuple) (*Segment, *Block, error) { + segmentID := ts.selectSegment(tuple) + segment := ts.segments[segmentID] + segment.mu.RLock() + defer segment.mu.RUnlock() + + blockID, err := segment.selectBlock(tuple) + if err != nil { + return nil, nil, err + } + block := segment.Blocks[blockID] + + return segment, block, nil +} + +func (ts *TupleStore) selectSegment(tuple *Tuple) string { + if !tuple.Timestamp.IsZero() && len(ts.timeSegments) > 0 { + for _, seg := range ts.timeSegments { + if seg.created.Year() == tuple.Timestamp.Year() { + return seg.ID + } + } + } + + if len(ts.idRangeSegments) > 0 { + for _, seg := range ts.idRangeSegments { + if len(tuple.ID) > 0 && seg.ID[0] == tuple.ID[0] { + return seg.ID + } + } + } + + hash := xxhash.Sum64String(tuple.ID) + segmentIDs := make([]string, 0, len(ts.segments)) + for id := range ts.segments { + segmentIDs = append(segmentIDs, id) + } + return segmentIDs[hash%uint64(len(segmentIDs))] +} + +func (s *Segment) selectBlock(tuple *Tuple) (string, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if len(s.Blocks) == 0 { + return "", fmt.Errorf("no blocks available in segment %s", s.ID) + } + + hash := xxhash.Sum64String(tuple.ID) + blockIDs := make([]string, 0, len(s.Blocks)) + for id := range s.Blocks { + blockIDs = append(blockIDs, id) + } + return blockIDs[hash%uint64(len(blockIDs))], nil +} + +func (ts *TupleStore) shouldMerge(block *Block) bool { + block.mu.RLock() + defer block.mu.RUnlock() + + if block.DeltaStore.ChangeCount >= ts.mergeThreshold { + return true + } + + if block.DeltaStore.Size() >= ts.maxDeltaSize { + return true + } + + if time.Since(block.LastMergeTime) > ts.maxMergeInterval { + return true + } + + return false +} + +func (ts *TupleStore) scheduleBlockMerge(segmentID, blockID string) { + start := time.Now() + defer func() { + ts.metrics.MergeDuration.Observe(time.Since(start).Seconds()) + ts.metrics.MergeCount.Inc() + }() + + ts.mu.RLock() + segment, exists := ts.segments[segmentID] + ts.mu.RUnlock() + + if !exists { + return + } + + segment.mu.Lock() + block, exists := segment.Blocks[blockID] + segment.mu.Unlock() + + if !exists { + return + } + + block.mu.Lock() + defer block.mu.Unlock() + + block.mergeDeltaStore() + block.LastMergeTime = time.Now() +} + +func (ts *TupleStore) updateIndices(tuple *Tuple, segmentID, blockID string) { + ts.idIndex.Insert(tuple.ID, segmentID, blockID) +} diff --git a/tuplestore/store.go b/tuplestore/store.go index 757df43..6a83d03 100644 --- a/tuplestore/store.go +++ b/tuplestore/store.go @@ -25,13 +25,13 @@ func NewTupleStore(config Config) *TupleStore { for i := 0; i < config.InitialSegments; i++ { segmentID := fmt.Sprintf("segment-%d", i) - store.segments[segmentID] = newSegment(segmentID, config.BlocksPerSegment) + store.segments[segmentID] = newSegment(segmentID, config.BlocksPerSegment, config.UseCompression) } return store } -func newSegment(id string, blockCount int) *Segment { +func newSegment(id string, blockCount int, useCompression bool) *Segment { segment := &Segment{ ID: id, Blocks: make(map[string]*Block), @@ -41,10 +41,11 @@ func newSegment(id string, blockCount int) *Segment { for i := 0; i < blockCount; i++ { blockID := fmt.Sprintf("%s-block-%d", id, i) segment.Blocks[blockID] = &Block{ - ID: blockID, - DeltaStore: &DeltaStore{tuples: make(map[string]*Tuple)}, - MainStore: &MainStore{}, - LastMergeTime: time.Now(), + ID: blockID, + DeltaStore: &DeltaStore{tuples: make(map[string]*Tuple)}, + MainStore: &MainStore{}, + LastMergeTime: time.Now(), + useCompression: useCompression, } } @@ -171,7 +172,7 @@ func (s *Segment) GetBlocks() []*Block { return blocks } -// GetTotalSize 返回存储的总大小(仅用于测试) +// GetTotalSize 返回存储的总大小 func (ts *TupleStore) GetTotalSize() int64 { ts.mu.RLock() defer ts.mu.RUnlock() @@ -180,29 +181,52 @@ func (ts *TupleStore) GetTotalSize() int64 { for _, segment := range ts.segments { segment.mu.RLock() for _, block := range segment.Blocks { - // 计算主存储大小 - block.MainStore.mu.RLock() - for _, tuple := range block.MainStore.tuples { - totalSize += int64(len(tuple.Data)) - } - block.MainStore.mu.RUnlock() - - // 计算增量存储大小 - block.DeltaStore.mu.RLock() - totalSize += block.DeltaStore.size - block.DeltaStore.mu.RUnlock() - - // 如果有压缩列,计算压缩列大小 block.mu.RLock() - if block.CompressedColumns != nil { + + // 如果使用压缩且有压缩数据,则只计算压缩数据大小 + if ts.useCompression && block.CompressedColumns != nil && len(block.CompressedColumns) > 0 { + // 计算压缩列的大小 for _, col := range block.CompressedColumns { - totalSize += int64(len(col.EncodedData) + len(col.NullBitmap)) + if col != nil { + // 计算压缩列的总大小: + // 1. 编码数据大小 + totalSize += int64(len(col.EncodedData)) + // 2. 空值位图大小 + totalSize += int64(len(col.NullBitmap)) + // 3. 字典大小(如果使用字典编码) + if col.EncodingType == EncodingDictionary && col.Dictionary != nil { + // 估算字典大小 + for _, v := range col.Dictionary { + switch val := v.(type) { + case string: + totalSize += int64(len(val)) + case []byte: + totalSize += int64(len(val)) + default: + totalSize += 8 // 假设其他类型平均8字节 + } + } + } + } } + } else { + // 如果没有启用压缩或没有压缩数据,则计算原始数据大小 + // 计算主存储大小 + block.MainStore.mu.RLock() + for _, tuple := range block.MainStore.tuples { + totalSize += int64(len(tuple.Data)) + } + block.MainStore.mu.RUnlock() + + // 计算增量存储大小 + block.DeltaStore.mu.RLock() + totalSize += block.DeltaStore.size + block.DeltaStore.mu.RUnlock() } + block.mu.RUnlock() } segment.mu.RUnlock() } - return totalSize } diff --git a/tuplestore/types.go b/tuplestore/types.go index 9d2d707..dafb565 100644 --- a/tuplestore/types.go +++ b/tuplestore/types.go @@ -94,6 +94,7 @@ type Block struct { mu sync.RWMutex Schema []ElementInfo // 添加 Schema 字段 CompressedColumns map[int]*CompressedColumn // 添加压缩列存储 + useCompression bool } type DeltaStore struct { diff --git "a/\345\216\213\347\274\251\347\256\227\346\263\225\344\273\213\347\273\215,md" "b/\345\216\213\347\274\251\347\256\227\346\263\225\344\273\213\347\273\215,md" new file mode 100644 index 0000000..43d197b --- /dev/null +++ "b/\345\216\213\347\274\251\347\256\227\346\263\225\344\273\213\347\273\215,md" @@ -0,0 +1,89 @@ +# go-tuplestore 压缩算法评估报告 + +## 1. 压缩算法概述 + +go-tuplestore 实现了一套自适应列式压缩算法,支持三种主要压缩方式: + +### 字典编码 (Dictionary Encoding) + +- **原理**:将重复出现的值映射到字典索引,用较小的索引代替原始值 +- **适用场景**:列中有限种类的值重复出现(10%-50%唯一值) +- **典型数据**:分类数据,如性别、国家代码、产品类型 + +### 游程编码 (Run-Length Encoding, RLE) + +- **原理**:记录值及其连续重复的次数,如 "AAABBC" 编码为 "(A,3)(B,2)(C,1)" +- **适用场景**:大量连续重复值(<10%唯一值) +- **典型数据**:状态标志、批处理ID、填充值 + +### 增量编码 (Delta Encoding) + +- **原理**:存储第一个值和后续值的差值,如 [100,101,103,106] 编码为 [100,1,2,3] +- **适用场景**:有序或递增的数值数据 +- **典型数据**:时间戳、自增ID、序列化数据 + +**算法选择规则**: + +- 唯一值占比 <10% → 游程编码 +- 唯一值占比 10%-50% → 字典编码 +- 有序数值数据 → 增量编码 +- 其他情况 → 默认字典编码 + +## 2. 测试数据模式 + +| 数据模式 | 特点 | 用途 | +| ---------- | -------------------------- | -------------------- | +| 随机数据 | 完全随机生成,几乎无重复值 | 测试压缩算法最坏情况 | +| 重复数据 | 每10个元组才有微小变化 | 测试游程编码效果 | +| 序列化数据 | 包含递增的整数序列 | 测试增量编码效果 | +| 分类数据 | 包含4种类别的重复值 | 测试字典编码效果 | + +## 3. 关键发现 + +### 压缩效率 + +- 🏆 重复数据:73:1(最佳压缩) +- 🧩 结构化数据:11:1(序列化/分类) +- ⚠️ 随机数据:0.92:1(轻微体积膨胀) + +### 性能表现 + +- ✅ **查询性能**:全模式提升34%-74% + +- ⚠️ + + 写入性能 + + :全模式下降93%-463% + + - 随机数据插入性能损失最大(-463%) + - 分类数据损失最小(-93%) + +### 算法适配性 + +| 数据模式 | 首选算法 | 实现机制 | 压缩比 | +| :--------: | :------------: | :----------------------: | :----: | +| 重复数据 | 游程编码 (RLE) | 记录连续重复值 | 73:1 | +| 序列化数据 | 增量编码 | 存储差值序列 | 11:1 | +| 分类数据 | 字典编码 | 值→索引映射 | 11:1 | +| 随机数据 | 字典编码 | 因无重复导致轻微数据膨胀 | 0.92:1 | + +## 4. 综合结论 + +- ✅ + + 推荐场景 + + :读多写少的数据服务 + + - 重复数据:即使考虑写入损失仍具高价值 + - 结构化数据:平衡压缩比与性能损失 + +- ⚠️ + + 慎用场景 + + :高频写入的随机数据 + + - 插入性能下降 + - 存储空间无优化 \ No newline at end of file diff --git "a/\345\216\213\347\274\251\347\256\227\346\263\225\346\265\213\350\257\225.md" "b/\345\216\213\347\274\251\347\256\227\346\263\225\346\265\213\350\257\225.md" new file mode 100644 index 0000000..225a815 --- /dev/null +++ "b/\345\216\213\347\274\251\347\256\227\346\263\225\346\265\213\350\257\225.md" @@ -0,0 +1,127 @@ +# 压缩算法测试指南 + +## 一、基础测试命令 + +### 1.1 单元测试执行 +```bash +# 运行所有与压缩相关的单元测试(含数据完整性验证) +go test ./tuplestore -v -run=Compression +``` +*验证数据压缩/解压后的一致性* + +### 1.2 基准测试套件 +```bash +# 运行所有基准测试 +go test ./tuplestore/benchmarks -bench=. -benchmem + +# 仅运行压缩相关基准测试 +go test ./tuplestore/benchmarks -bench=BenchmarkCompression -benchmem +go test ./tuplestore/benchmarks -bench=BenchmarkStandardVsCompressed -benchmem + +# 运行特定数据模式的基准测试 +go test ./tuplestore/benchmarks -bench=Random -benchmem +go test ./tuplestore/benchmarks -bench=Repetitive -benchmem +go test ./tuplestore/benchmarks -bench=Categorical -benchmem +# 运行压缩比率测试并获取详细输出 +go test ./tuplestore/benchmarks -run=TestCompressionRatio -v +``` +*测试覆盖不同数据特征场景* + +## 二、高级测试配置 + +### 2.1 算法专项测试 +```bash +# 测试字典编码 +go test ./tuplestore/benchmarks -bench=BenchmarkCompressionAlgorithms/dictionary -benchmem + +# 测试RLE编码 +go test ./tuplestore/benchmarks -bench=BenchmarkCompressionAlgorithms/rle -benchmem + +# 测试增量编码 +go test ./tuplestore/benchmarks -bench=BenchmarkCompressionAlgorithms/delta -benchmem +``` +*支持算法类型参考TDengine工具集* + +### 2.2 性能剖析方法 +```bash +# CPU性能分析 +go test ./tuplestore/benchmarks -bench=BenchmarkStandardVsCompressed -cpuprofile=cpu.prof + +# 内存性能分析 +go test ./tuplestore/benchmarks -bench=BenchmarkStandardVsCompressed -memprofile=mem.prof +``` +*基于JMH基准测试方法论* + +``` +更高级的看 pprof +# 安装pprof工具(如需) +go install github.com/google/pprof@latest + +# 查看CPU剖析 +go tool pprof cpu.prof +# 在pprof提示符下输入web或top10命令 + +# 查看内存剖析 +go tool pprof mem.prof +``` + + + +## 三、测试结果解读 + +### 3.1 关键性能指标 + +1. 基准测试输出解读 + +- BenchmarkXXX-8 中的数字8表示GOMAXPROCS值(CPU核心数) + +- 1000000表示测试运行的次数 + +- 1234 ns/op 表示每次操作平均耗时(纳秒) + +- 123 B/op 表示每次操作分配的内存字节数 + +- 2 allocs/op 表示每次操作进行的内存分配次数 + +1. 压缩比率结果 + +- compression_ratio 数值越高表示压缩效果越好 + +- 通常随机数据压缩比较低,重复和分类数据压缩比较高 + +1. 查询性能对比 + +- 比较标准查询和压缩查询的性能差异 + +- 理想情况下压缩数据查询不会显著变慢,甚至可能因为数据量减少而加速 + +| 指标 | 说明 | 参考标准 | +| ----------------- | ------------------------- | -------------- | +| ns/op | 单次操作耗时(纳秒) | <1000为优 | +| B/op | 单次内存分配(字节) | 需结合算法类型 | +| allocs/op | 单次内存分配次数 | 需逐步优化 | +| compression_ratio | 压缩比率(原数据/压缩后) | >5x为优秀 | + +### 3.2 数据模式影响 +| 数据类型 | 预期压缩比 | 适用算法 | +| -------- | ------------- | ------------ | +| 随机数据 | 1.2-2x | LZ4/Snappy | +| 重复数据 | 5-10x | RLE/字典编码 | +| 分类数据 | 3-8x 字典编码 | | + +## 四、参数调优建议 + +### 4.1 测试配置调整 +```go +// 修改数据大小 +dataSizes := []int{1000, 10000} // 改为 dataSizes := []int{1000, 5000, 20000} + +// 调整运行时间 +b.Run("compressed_store", func(b *testing.B) { + b.ResetTimer() + b.SetBytes(int64(size * 112)) // 添加这一行来测量吞吐量 + for i := 0; i < b.N; i++ { + // ...测试代码... + } +}) +``` \ No newline at end of file -- Gitee From 94414c1158987dbf1909cb6dba5b5d44fef88ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=BD=B3=E4=BC=9F?= <12445374+jiamu-yue@user.noreply.gitee.com> Date: Sun, 13 Apr 2025 18:38:20 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8E=8B=E7=BC=A9?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E5=8F=8A=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/vcs.xml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .idea/vcs.xml diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file -- Gitee