# rabbit **Repository Path**: lenssh/rabbit ## Basic Information - **Project Name**: rabbit - **Description**: 轻量级分布式文档管理(包括图片)系统,支持全文索引,模式匹配查找,图片相似性搜索,以及各种文件的在线存档与读取。系统搭建简单,易于扩展,便于历史归档。支持并行计算。 - **Primary Language**: Rust - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2023-02-09 - **Last Updated**: 2023-05-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: GFS, search ## README 分布式企业文档在线存储管理系统 # 基本功能 - 文件存储(写入)/下载(download) - 图片在线展示(img src),文本文件内容的在线预览(pre) - 在线文件的不可传递性(连接过期自动失效,防止盗链) - 文件名检索能力,文本文件(或可提取文本内容,如docx,pptx)的全文检索能力。 - 方便运维(部署简单,易于扩容,具备适当容灾能力) ## more (mybe) - 内容相似性查找 - 图片相似性查找 - 并行计算的文件处理 # rabbit的基本概念定义与说明 - dynasty 中文:朝代!每个dynasty都是一个完整独立的gfs系统。不同dynasty之间没有通信关联,相互独立存在,以dynasty_id区分。通过不同dynasty,可以实现历史数据的归档,新gfs系统的平滑上线扩容。比如一个dynasty数据达到容量80%,可以建立新的dynasty, 并通过cur_dynasty指定写入新的dynasty, 旧dynasty将用于只读。dynasty只存在client端,服务端没有dynasty的概念。 - node 一个gfs中的最小独立服务系统,一个node的root目录下最多可以有256个ihash目录,也可以一个node只服务一个ihash目录节点,分配策略放在client,通过路有表实现。理论上可以最大256个indoe组成一个gfs dynasty,从而实现列式存储,达到扩容和压力分散的目的。 - ihash 文件名进行md5后sum()%256的结果,也是inode中root目录下一级子目录的名称(转换为16进制), stat中,以及对外传递,都是采用i32,但真实的目录,采用format!("{:02x}",ihash)变成2位的16进制码作为目录名。 - idx ihash目录下,增加一级idx目录节点。相同的文件名,可以通过idx隔离。idx是stat记录中的主键,自助递增。slave将根据idx进行数据同步。所以slave的stat表不同于master,主键不是自增的,而是外部根据master写入的。 - master/slave 一个node,可以只有一个master,也可以master+slave(多个)结构,master负责写入数据,并MQ通知slave进行拉取同步;slave通过订阅MQ得到数据同步事件,同时提供定时检测(query from master by stat),自主更新的能力。通过增加slave,可以实现负责分担和容灾备份的作用。一个slave可以随时切换为master。 - router,决定文件分布的路由表,应用端通过gfsc,依赖于此router.json,完成gfs拓扑结构的定义。 具体参加下面的router.json,可以通过配置,决策node包含的ihash情况,256个哈希值,可以全部放在一个Node,也可以分别部署在256个Node中。 - operator,算子, - 算子部署,operator deployment - map/reduce - gfs支持的2类文件写入:存档类和record类 * 存档类文件。此类文件以独立文件作为操作粒度,gfs严格进行name hash确定ihash, 并递增分配idx, 所以可以多次写入而不会覆盖。 * 存档类有gfs stat记录。 * record类文本行记录。此类文件每次写入的是文本行(记录)粒度,可以多次写入一个指定的文件,可以覆盖,也可以append。 * record类文件不遵从hash原则,可以指定ihash、idx、name;也不自动Node内同步,如果需要同步,需要显式调用/node/sync with FileParam * record类文件不能覆盖存档类,但存档类可以覆盖record类。 * record类没有gfs stat记录,也没有其他系统记录。 # 应用系统/客户端的文件信息存储 - 任何一个文件的定位,完整信息必须包括: - 1. dynasty id,由数字0开始,+1递增。dynasty id将决定此dynasty的writer和reader。也就是具体的服务器机器定位。 - 2. ihash,实际是 hex_decode(md5(filename))%256, 决定了那个inode以及那个目录下。 - 3. idx,inode写入时分配的写入id, 决定了ihash目录下的子目录。 - 4. filename,具体的文件名。 dynasty_id, idx, filename 是必须的,ihash可以借助gfsc基于filename重新计算生成,借助gfsc,可以得到如此的访问连接: ```text https://ip:port/reader?ihash=1&idx=2&name=filename.abc https://ip:port/download?ihash=1&idx=2&name=filename.abc ``` # gfsc router.json ```json { "cur_dynasty":1, "dynasties":[ { "dynasty" : 0, "nodes": [ {"max":255,"min":0, "writer":"http://localhost:9996","readers":["http://localhost:9996","http://localhost:9997"]} ] }, { "dynasty" : 1, "nodes": [ {"max":127,"min":0, "writer":"http://localhost:7000","readers":["http://localhost:7000","http://localhost:7001"]}, {"max":255,"min":128, "writer":"http://localhost:7002","readers":["http://localhost:7002","http://localhost:7003"]} ] } ] } ``` # 系统采用master-slave结构进行容灾备份方案 - 系统采用sqlite作为所有文件的index,也将是master-slave同步的依据。 - 系统采用2种同步体系;MQ通知(暂用redis),实现准实时同步;slave定时主动从master拉取信息,进行同步。 redis消息进行同步的策略,可以达到更实时同步的效果,但依赖外部redis系统;timer同步模式不依赖任何外部系统,但有5秒延迟。 ```text "writer":"http://localhost:7000", # readers可以有多个 "readers":["http://localhost:7000","http://localhost:7001"] master1: /home/lengss/rabbit/rabbit -t debug -p 7000 -d /home/lengss/rabbit/root0 slave1-1: /home/lengss/rabbit/rabbit -t debug -p 7001 -d /home/lengss/rabbit/root01 -m http://localhost:7000 master2: /home/lengss/rabbit/rabbit -t debug -p 7002 -d /home/lengss/rabbit/root1 slave2-1: /home/lengss/rabbit/rabbit -t debug -p 7003 -d /home/lengss/rabbit/root11 -m http://localhost:7002 ``` ## sqlite for stat ```sql CREATE TABLE stat( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, fhash CHAR(2) NOT NULL, status TINYINT UNSIGNED NOT NULL DEFAULT 0, ctime DATETIME ); CREATE INDEX idx_fhash ON stat(fhash); ``` # 系统的扩展(扩容) - 系统采用dynasty的概念,一个dynasty最多可以容纳256组hash的Node(Master-slave),dynasty采用数字0..x进行定位。 - 通常,采用数字递增来分配新的dynasty,用于解决旧dynasty的容量问题(压力问题通过分裂Node和增加slave解决)。 - 按此理论,容量可以无限扩容。但也带来问题:不同的库,内容不再去重,而是数据完全独立; # 关于受控分享 - 系统没有复杂的权限控制(过于复杂啰嗦,放权给使用系统实现吧),但不等于可以随便访问或使用资源。 - 系统采用token机制进行有效控制,token具有一定的时效控制。gfsc自动提供相应的token生成和校验。 - 系统提供文本内容的在线预览展示,二进制下载,以及图片的在线显示等基础功能。 # web端应用 ## upload, save GfsResponse + dynasty ```rust pub async fn demo_form_upload( State(shared): State>, mut multipart: Multipart) -> AResult { let mut rets: Vec = Vec::new(); while let Some(field) = multipart.next_field().await.unwrap() { let file_name = if let Some(file_name) = field.file_name() { file_name.to_owned() } else { continue; }; if let Ok(ret) = gfsc_stream_to_gfs(&shared.router, &file_name, field).await { rets.push(ret); } } save_gfs_response_to_db(&shared.db, dynasty, rets).await?; ... ... ``` ## download ```rust let dlink = router.get_download_url(dynasty, idx, ihash, fname.as_str())?; html.push_str(format!("download it ", dlink).as_str()); ``` ## text file's preview router.get_reader_url(dynasty, id, ihash, filename)?; ```rust let ext: Vec<&str> = file.name.split(".").collect(); if ext.len() > 0 { let texts = vec!["txt", "docx", "pptx", "xlsx", "py", "rs", "go"]; let ext = ext[ext.len() - 1]; if texts.contains(&ext) { let preview_url = router.get_reader_url(router.cur_dynasty, file.id, ihash, file.name.as_str())?; builder.push_str(r#"preview "#); builder.push_str(file.name.as_str()); builder.push_str(r#"
"#); } } ``` ## image online router.get_image_url(dynasty, id, ihash, filename)?; ```rust let ext: Vec<&str> = file.name.split(".").collect(); if ext.len() > 0 { let images = vec!["png", "jpg", "jpeg"]; let ext = ext[ext.len() - 1]; if images.contains(&ext) { let image_url = router.get_image_url(router.cur_dynasty, file.id, ihash, file.name.as_str())?; builder.push_str(r#"image online view "#); builder.push_str(file.name.as_str()); builder.push_str(r#"
"#); } } ``` ## search/grep/whereis ```rust gfsc_grep_file(&router, dynasty, pattern).await?; gfsc_whereis(&router, dynasty, filename ).await; gfsc_search(&router, dynasty, &searchParams).await; ``` ## dir/get_files ```rust gfsc_dir(&router, dynasty, ihash, idx, limit).await?; gfsc_get_files(&router, dynasty, 0, 10).await?; ``` ## read text let text = gfsc_read_text(&router, dynasty, idx, ihash, &save_name)?; # code examples ## examples as: - [dir.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/dir.rs) - [download.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/download.rs) - [grep.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/grep.rs) - [readline.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/readline.rs) - [router.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/router.rs) - [search.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/search.rs) - [sendfile.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/sendfile.rs) - [whereis.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/whereis.rs) - [parallel.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/examples/parallel.rs) ## App as: - [demo.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/src/bin/demo.rs) - [hare.rs](https://gitee.com/lenssh/rabbit/blob/master/gfsc/src/bin/hare.rs) # parallel computer 最初设计的算子是动态库,动态库通过deploy在gfs上发布到每个节点。然后通过libloading,动态载入算子(lib*.so),执行计算任务。 执行测试发现经常core,开发效率也不高,于是就把算子改称了python脚本。 ## 应用案例 ```rust let router = GfsRouter::new(config); // 读取部署算子的信息文件,获取算子位置和writer let src = String::from_utf8(std::fs::read(fname)?)?; let dp: Vec = serde_json::from_str(src.as_str())?; println!("{:#?}", dp); // 构造符合计算的算子部署参数结构: HashMap < (min,max),idx > let op_name =dp[0].name.clone(); let ihash = string_hash256(&op_name); let mut w_idx = 0; let mut ids:HashMap<(i32,i32),i32> = HashMap::new(); for i in 0..dp.len(){ println!("node-{:#?}, operator is: ({}, {}, {}), range [{},{}]", i, &op_name, ihash, dp[i].idx, dp[i].min, dp[i].max ); ids.insert((dp[i].min, dp[i].max), dp[i].idx ); if dp[i].min == 0 { w_idx = dp[i].idx } } // 构造传递给算子的参数结构 let mut args:HashMap = HashMap::new(); args.insert("arg1".to_string(), arg1.to_string()); args.insert("arg2".to_string(), arg2.to_string()); args.insert(PARALLEL_RECORD_KEY.to_string(), dp[0].writer.clone()); args.insert(PARALLEL_RECORD_IDX.to_string(), w_idx.to_string()); // 重置算子的计算状态 gfsc_record_clear(&router, ihash, 7, &op_name).await?; // 启动新的运算, 具体执行情况可以访问提供给算子的writer服务的/parallel/status/operator_name了解 gfsc_map_task(&router, &op_name, ids, &mut args).await ``` ```python ``` ## 算子example ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- import urllib.parse import re import sys import requests import json BASE_DIR = os.path.dirname(os.path.abspath(__file__)) def main( argv ): if len(argv) < 3: print("xxx obj.log args") return fname = argv[1] print("obj file = %s"%(fname)) arg = argv[2] src = urllib.parse.unquote_plus(arg) obj = json.loads(src) print(src) print(obj) if __name__ == "__main__": main(sys.argv) ``` # WebAssemble in gfs for servlet