# streamLearn **Repository Path**: LinJianghao/stream-learn ## Basic Information - **Project Name**: streamLearn - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 30 - **Created**: 2024-06-12 - **Last Updated**: 2024-06-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # StreamLearn 流数据项目代码框架 ## StreamEstimator Stream learn算法主要需要实现以下接口,具体实现参考StreamLearn/Base/DeepModelMixin.py 注意以下几点 1. 每个算法单独为一个文件夹或者python文件,放在Algorithm文件夹下面(实现StreamEstimator)。 2. 每个数据集单独作为一个文件夹或者python文件,放在Dataset文件夹(实现StreamDataset)。 3. 每个算法需要提供一个测试入口,作为算法调用的样例。放在tests文件夹。 4. 提供对应的代码说明,可以参考下面的“流数据分布鲁棒学习算法” 5. 数据集中的数据如果不太大(1MB以下)可以将数据放在Dataset/data文件夹下,新建一个项目对应的子目录。如果数据集较大,应当在国内的数据平台(例如百度网盘)上传并在README中提供对应的链接和使用方法。 ```python class StreamEstimator(ABC, BaseEstimator): @abstractmethod def fit(self, stream_dataset): """ Train a stream model. :param stream_dataset: Instances of a stream dataset. """ raise NotImplementedError("The fit() method of StreamEstimator must be implemented.") @abstractmethod def predict(self, X): """ Predict y for input X. :param X: input. """ raise NotImplementedError("The predict() method of StreamEstimator must be implemented.") @abstractmethod def evaluate(self, y_pred, y_true): """ Evaluate stream algorithm on a stream dataset. :param y_pred: predict y. :param y_true: ground-truth y. """ raise NotImplementedError("The evaluate() method of StreamEstimator must be implemented.") ``` ## StreamDataset ```python class StreamDataset(Dataset): pass ``` --- ## 课题一:流数据分布鲁棒学习算法 针对多源异质流数据分布场景,建立吞吐自适流数据算法,实现对分布变化的鲁棒性。具体来说,针对多数据分布吞吐量相同和不同的两个场景,分别提出GDRO算法和加权GDRO算法,建立理论最优的样本复杂度,同时实验验证了两种方法的有效性。 相关算法工具开源至本项目仓库中,包含`多分布数据集构造`,`GDRO算法和加权GDRO算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/AdultDataset.py - StreamLearn/Algorithm/GDRO/GDRO.py - StreamLearn/Algorithm/GDRO/WGDRO.py - StreamLearn/tests/test_GDRO.py 首先,按照以下方式构造流式数据集adult_dataset,其中参数 args.dataset_mode='balance' 表明流数据吞吐量相同场景;args.dataset_mode='imbalance' 表明吞吐量不同场景。 ```python adult_dataset = AdultDataset(args) ``` AdultData源数据链接([Link][https://archive.ics.uci.edu/dataset/2/adult]),预处理后数据([Link][https://pan.baidu.com/s/1sQu_o9qiaXUTvHFEeswZtA],提取码:jhxx) 其次,分别调用GDRO算法和加权GDRO算法,在adult_dataset上进行训练 ```python # GDRO算法训练 GDRO1 = GDRO(args) GDRO1.fit(adult_dataset) ``` ```python # 加权GDRO算法训练 WGDRO1 = WGDRO(args) WGDRO1.fit(adult_dataset) ``` 最后,分别对两个模型进行性能测试 ```python # GDRO算法测试 for i in range(adult_dataset.get_m()): data = adult_dataset.get_testdata(i) predict = GDRO1.predict(data[:, :-1]) loss = GDRO1.evaluate(predict, data[:, -1]).mean() print('GDRO', i, loss) ``` ```python # 加权GDRO算法测试 for i in range(adult_dataset.get_m()): data = adult_dataset.get_testdata(i) predict = WGDRO1.predict(data[:, :-1]) loss = WGDRO1.evaluate(predict, data[:, -1]).mean() print('WGDRO', i, loss) ``` ## 课题二 ### 2.1 高吞吐率图流的动态图神经网络 该算法主要包含`数据集`,`DSFD算子实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/DTDGsDataset.py - StreamLearn/Algorithm/DGNN.py - StreamLearn/tests/test_DecoupledDGNN.py 算子应用实例见 StreamLearn/tests/test_DecoupledDGNN.py 文件,使用方法为: ```bash python StreamLearn/tests/test_DecoupledDGNN.py --data NAME_OF_DATA --checkpoint PATH_TO_CHECKPOINT ``` 首先,获取算法所需数据集:Dataset文件夹中包含了数据下载和预处理的代码,用户可以根据需要修改其中的数据存储路径。 ```python from StreamLearn.Dataset.DTDGsDataset import DTDGsDataset dataset = DTDGsDataset(args.data) ``` 其次,按照以下方式获取动态图节点的时序表示: ```python from StreamLearn.Algorithm.DecoupledDGNN.graph_embs import GraphEmbs gen_embs = GraphEmbs(dataset.path, args.data, args.rmax, args.alpha) gen_embs.load_and_process_data ``` 最后,调用DGNN算法进行训练并测试: ```python from StreamLearn.Algorithm.DecoupledDGNN.DGNN import DGNN execute = DGNN(args) execute.train() ``` ### 2.2 滑动窗口上的最优矩阵略图算子 该算子主要包含`数据集`,`DSFD算子实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/FDDataset.py - StreamLearn/Algorithm/DSFD.py - StreamLearn/tests/test_SWFD.py 数据集下载地址:[百度网盘](https://pan.baidu.com/s/1TMSQ5Plm1E1b_jqCXkjW1A?pwd=esja),提取码: esja。 算子应用实例见 StreamLearn/tests/test_SWFD.py 文件,使用方法为: ```bash PYTHONPATH=. python StreamLearn/tests/test_SWFD.py ``` 首先需要初始化长度为$N$的滑动窗口下的矩阵略图对象(向量维度为$d$,向量范数上界为$R$,空间开销为$O(ld/\beta)$,误差为$\beta/l$): ```python swfd = SeqDSFD(N, R, d, l, beta=1.0) ``` 对于向量流中每次到来的向量`data`,更新矩阵略图 ```python swfd.fit(data) ``` 查询当前滑动窗口组成的矩阵在某单位向量上的投影的模长,其中`direction`为该单位向量 ```python predict = swfd.predict(direction) ``` ### 2.3 基于采样的分布式环境下的元素估计算法 实现了基于采样的、分布式流数据环境下、亚线性通信量的元素估计(NDV)算法。详见[https://gitee.com/yinhanyan/ndv_-estimation_in_distributed_environment](https://gitee.com/yinhanyan/ndv_-estimation_in_distributed_environment)。 ## 课题三 ### 3.1 流数据分布自适应学习算法 该算法主要包含`分布偏移数据集构造`,`ODS算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/TTADataset.py - StreamLearn/Algorithm/TTA/ODS.py - StreamLearn/tests/test_ODS.py 首先,测试代码的配置文件为:StreamLearn/Config/ODS.py,用户可修改测试文件进行不同复合数据分布变化的测试。 数据集下载地址:[CIFAR10-C](https://zenodo.org/records/2535967),训练参数下载地址:[百度网盘](https://pan.baidu.com/s/1mxADnKpv73X-Tu1uR8fkXg),提取码: qaj2。 其次,按照以下方式构造流式包含复合数据分布变化(包含协变量分布和标记分布偏移)的 CIFAR10 数据集。 ```python import StreamLearn.Dataset.TTADataset as datasets dataset = datasets.CIFAR10CB( root=args.stream.dataset_dir, batch_size=args.stream.batch_size, severities=args.stream.severities, corruptions=args.stream.corruptions, bind_class=args.stream.bind_class, bind_ratio=args.stream.bind_ratio, seed=args.seed, ) ``` 其中,`root`为数据根目录、`batch_size`控制数据流批大小、`severities`与`corruptions`控制协变量分布偏移的程度与类型、`bind_class`与`bind_ratio`控制标记分布变化的类别与比例、`seed`为数据集生成随机种子。 继而,调用 ODS 算法,复用已训练完毕的模型在复合分布偏移的数据流中进行自适应学习。 ```python args.method.model = net estimator = ODS.ODS(args.method) ``` 其中,`net`保存了深度学习模型,`args.method`中保存了 ODS 算法所需的超参数。 最后,将数据流中的样本输入算法中进行预测。 ```python # ODS算法测试 pred = estimator.predict(X).detach().cpu() ``` 测试具体代码详见 StreamLearn/tests/test_ODS.py 文件,使用方法为: ```bash cd stream-learn python StreamLearn/tests/test_ODS.py --data PATH_TO_DATA --checkpoint PATH_TO_CHECKPOINT ``` ### 3.2 SAFC:同时增强特征和类的增量学习 该算法主要包括增量数据集构造,SAFC两种变体SAFC_D和SAFC_ID算法实现,性能测试三部分 首先,获取算法所需数据集: Dataset文件中有本算法需要用到的4个.pkl文件,由于文件较大,需要首先从百度网盘上下载Dataset.zip文件; 解压缩后放在与SAFC_DandSAFC_IDmain_finetune.py主代码文件的同级目录下; 链接:https://pan.baidu.com/s/147pIg7T84_2Nfc9bexGoWQ 提取码:SAFC 其次,调用SAFC算法进行训练: SAFC训练基于第一阶段复用的SVM模型,因此先训练SVM,并保存训练好的SVM模型: ```python print('####eval####') print("begin svm1 training!") print('####eval####') svm1 = SVC(probability=True,kernel="linear") svm1.fit(data_s1.tolist(), label_s1_vec.tolist()) dump(svm1, save_dir+'/svm1.model') #复用SVM,训练SAFC_D,并保存训练后的模型: print('####eval####') print("begin SAFC_D training!") print('####eval####') w_ours1 = SAFC_D(np.mat(svm1._get_coef()), np.transpose(data_s2), label_s2, alpha_best1, beta_best1, eta) dump(w_ours1, save_dir+'/SAFC_D.model') #复用SVM,训练SAFC_ID,并保存训练后的模型: print('####eval####') print("begin SAFC_ID training!") print('####eval####') w_ours2 = SAFC_ID(np.mat(svm1._get_coef()),np.transpose(data_s2),label_s2,alpha_best2,beta_best2,eta) dump(w_ours2, save_dir+'/SAFC_ID.model') #其中,alpha_best1, beta_best1, alpha_best2, beta_best2, eta是超参数 ``` 最后,分别对两个模型进行性能测试: ```python # SAFC_D: print('####eval####') print("begin SAFC_D testing!") print('####eval####') pred1, acc_ours1, auc_ours1, f1wei_ours1, f1macro_ours1, f1micro_ours1 = Predict(w_ours1,np.transpose(test_data),test_label) print('####eval####') print("begin SAFC_D evaluation!") print('####eval####') Acc_ours1.append(acc_ours1) AUC_ours1.append(auc_ours1) F1_weight_ours1.append(f1wei_ours1) F1_macro_ours1.append(f1macro_ours1) F1_micro_ours1.append(f1micro_ours1) # SAFC_ID: print('####eval####') print("begin SAFC_ID testing!") print('####eval####') pred21,acc_ours2,auc_ours2,f1wei_ours2,f1macro_ours2,f1micro_ours2 = Predict(w_ours2,np.transpose(test_data),test_label) print('####eval####') print("begin SAFC_ID evaluation!") print('####eval####') Acc_ours2.append(acc_ours2) AUC_ours2.append(auc_ours2) F1_weight_ours2.append(f1wei_ours2) F1_macro_ours2.append(f1macro_ours2) F1_micro_ours2.append(f1micro_ours2) ``` 在test文件中,示例代码SAFC_DandSAFC_IDmain_finetune.py描述了新增相较于第一阶段inputaddfeatureratio新类特征的场景: inputaddfeatureratio可以是10%,20%,..."ALL", "ALL"表示新增新类的全部特征 运行时,只需在终端输入口令即可: SAFC_DandSAFC_IDmain(inputaddfeatureratio) ModelsIN用于存放特征增加场景下获得的分类器 ## 课题四 ### 4.1 分布式交互学习流数据存储管理系统 GEAR GEAR是一个以GPU为中心、针对现代高性能服务器GPU硬件特性和高性能网络链接设计优化的分布式流数据存储管理系统,以支持基于大规模流数据的交互式学习。 该系统主要包含 `GEAR系统实现`和`最简样例运行代码`两部分 ,相关代码参见目录 * StreamLearn/Algorithm/GEAR * StreamLearn/tests/GEAR **样例运行** 首先,参考`StreamLearn/Algorithm/GEAR/README.md`文档中搭建基本的PyTorch-GPU运行环境,然后运行在目录下运行`pip install`命令以编译安装GEAR系统到Python环境: ```shell cd StreamLearn/Algorithm/GEAR/ pip install -r requirements. pip install . ``` 其次,参考运行`StreamLearn/tests/GEAR/offline/single-node/create.py`以下载并转换`hopper`数据集,该最小数据集会被存放于`/tmp/gear/checkpoints/example_shared_dataset.pt`的默认路径下(可通过`--data_path`参数指定存放路径)。 ```shell cd StreamLearn/tests/GEAR/offline/single-node/ python create.py --data_path /tmp/gear/checkpoints/example_shared_dataset.pt ``` 之后,运行`StreamLearn/tests/GEAR/offline/single-node/run.sh`脚本以快速运行样例程序。 **和现有分布式训练工作流集成** 如果希望在DeepSpeed分布式训练工作流集成调用GEAR,请参考`StreamLearn/tests/GEAR/offline/single-node/main.py`中的`setup`方法进行环境初始化和数据层加载: ```python def setup(): ... # 初始化DeepSpeed分布式环境 deepspeed.init_distributed(dist_init_required=True) # GEAR数据层运行时加载 gear.init() # GEAR数据层定义 # 如此处GEAR从离线数据集中记载 offline_loader_params = { "data_path": args.data_path, "mpu": None, "batch_size": 32, "sampling_method": "Uniform", "patterns": [ { "name": "observations", "pad": "tail", # Literal["head", "tail"] "offset": -1000, "length": 1000, # fetch entire sequence for a trajectory no longer than 1000 steps }, { "name": "actions", "pad": "tail", # Literal["head", "tail"] "offset": -1000, "length": 1000, }, ], } mpu = ModelParallelismUnit(device=torch.device("cuda")) mpu.build_ds_mpu() offline_loader_params["mpu"] = mpu offline_loader_params["attach"] = int(os.environ["LOCAL_RANK"]) != 0 loader = gear.loader.OfflineLoader.create(**offline_loader_params) ``` 其后在模型训练/推理过程中,GEAR数据层的交互方式参考`StreamLearn/tests/GEAR/offline/single-node/models/mlp/funcs.py`文件中`train_step`方法: ```python def train_step( loader, model, optimizer, step_id: int, tensorboard_writer: Union[SummaryWriter, None], ) ... # GEAR对外暴露使用逻辑和普通pytorch dataloader高度相似 timesteps, data_batch = next(loader) # 从data_batch中提取数据域 obs = data_batch[0] act = data_batch[1] # 消耗/使用数据 ... ``` ## 课题五:流数据增量学习算法 该算法主要包含`增量学习数据集构造`,`MEMO算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Dataset/CILDataset.py - StreamLearn/Algorithm/ClassIncrementalLearning/MEMO.py - StreamLearn/tests/test_CIL.py 首先,按照以下方式构造流式数据集CIFAR100。 ```python dataset = CIFAR100_CIL(root='/home/anony/DATASETS', download=True) ``` 其次,调用MEMO算法,在流数据上进行训练 ```python # MEMO算法训练 model = get_CIL_method(parser.parse_args().alg) model.fit(dataset) ``` 最后,分别对两个模型进行性能测试 ```python # MEMO算法测试 y = model.predict(X=dataset._test_data) print(model.evaluate(y, dataset._test_targets)) ```