From 5e52bc88496e3d69a0168c8e36ce0b206782578d Mon Sep 17 00:00:00 2001 From: Hanyan Yin Date: Tue, 11 Jun 2024 01:04:24 -0700 Subject: [PATCH] =?UTF-8?q?=E8=AF=BE=E9=A2=98=E4=BA=8C=EF=BC=9A=E6=BB=91?= =?UTF-8?q?=E5=8A=A8=E7=AA=97=E5=8F=A3=E4=B8=8A=E7=9A=84=E6=9C=80=E4=BC=98?= =?UTF-8?q?=E7=9F=A9=E9=98=B5=E7=95=A5=E5=9B=BE=E7=AE=97=E6=B3=95DSFD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 37 +- StreamLearn/Algorithm/SlidingWindowFD/DSFD.py | 144 ++++++ .../Algorithm/SlidingWindowFD/__init__.py | 0 .../SlidingWindowFD/fastfdwithdump.py | 183 ++++++++ .../SlidingWindowFD/frequent_directions.py | 265 +++++++++++ .../sliding_windows_frequent_directions.py | 410 ++++++++++++++++++ StreamLearn/Dataset/FDDataset.py | 34 ++ StreamLearn/tests/test_SWFD.py | 46 ++ 8 files changed, 1116 insertions(+), 3 deletions(-) create mode 100644 StreamLearn/Algorithm/SlidingWindowFD/DSFD.py create mode 100644 StreamLearn/Algorithm/SlidingWindowFD/__init__.py create mode 100644 StreamLearn/Algorithm/SlidingWindowFD/fastfdwithdump.py create mode 100644 StreamLearn/Algorithm/SlidingWindowFD/frequent_directions.py create mode 100644 StreamLearn/Algorithm/SlidingWindowFD/sliding_windows_frequent_directions.py create mode 100644 StreamLearn/Dataset/FDDataset.py create mode 100644 StreamLearn/tests/test_SWFD.py diff --git a/README.md b/README.md index d7421aa..b28b6f4 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ class StreamDataset(Dataset): 针对多源异质流数据分布场景,建立吞吐自适流数据算法,实现对分布变化的鲁棒性。具体来说,针对多数据分布吞吐量相同和不同的两个场景,分别提出GDRO算法和加权GDRO算法,建立理论最优的样本复杂度,同时实验验证了两种方法的有效性。 相关算法工具开源至本项目仓库中,包含`多分布数据集构造`,`GDRO算法和加权GDRO算法实现`,`性能测试`三部分,相关代码参见目录: -- StreamLearn/Datast/AdultDataset.py +- StreamLearn/Dataset/AdultDataset.py - StreamLearn/Algorithm/GDRO/GDRO.py - StreamLearn/Algorithm/GDRO/WGDRO.py - StreamLearn/tests/test_GDRO.py @@ -106,7 +106,7 @@ for i in range(adult_dataset.get_m()): ## 流数据增量学习算法 该算法主要包含`增量学习数据集构造`,`MEMO算法实现`,`性能测试`三部分,相关代码参见目录: -- StreamLearn/Datast/CILDataset.py +- StreamLearn/Dataset/CILDataset.py - StreamLearn/Algorithm/ClassIncrementalLearning/MEMO.py - StreamLearn/tests/test_CIL.py @@ -130,7 +130,7 @@ print(model.evaluate(y, dataset._test_targets)) ## 流数据分布自适应学习算法 该算法主要包含`分布偏移数据集构造`,`ODS算法实现`,`性能测试`三部分,相关代码参见目录: -- StreamLearn/Datast/TTADataset.py +- StreamLearn/Dataset/TTADataset.py - StreamLearn/Algorithm/TTA/ODS.py - StreamLearn/tests/test_ODS.py @@ -169,4 +169,35 @@ pred = estimator.predict(X).detach().cpu() ```bash cd stream-learn python StreamLearn/tests/test_ODS.py --data PATH_TO_DATA --checkpoint PATH_TO_CHECKPOINT +``` + +## 滑动窗口上的最优矩阵略图算子 + +该算子主要包含`数据集`,`DSFD算子实现`,`性能测试`三部分,相关代码参见目录: +- StreamLearn/Dataset/FDDataset.py +- StreamLearn/Algorithm/DSFD.py +- StreamLearn/tests/test_SWFD.py + +数据集下载地址:[百度网盘](https://pan.baidu.com/s/1TMSQ5Plm1E1b_jqCXkjW1A?pwd=esja),提取码: esja。 + +算子应用实例见 StreamLearn/tests/test_SWFD.py 文件,使用方法为: +```bash +PYTHONPATH=. python StreamLearn/tests/test_SWFD.py +``` + +首先需要初始化长度为$N$的滑动窗口下的矩阵略图对象(向量维度为$d$,向量范数上界为$R$,空间开销为$O(ld/\beta)$,误差为$\beta/l$): +```python +swfd = SeqDSFD(N, R, d, l, beta=1.0) +``` + +对于向量流中每次到来的向量`data`,更新矩阵略图 + +```python +swfd.fit(data) +``` + +查询当前滑动窗口组成的矩阵在某单位向量上的投影的模长,其中`direction`为该单位向量 + +```python +predict = swfd.predict(direction) ``` \ No newline at end of file diff --git a/StreamLearn/Algorithm/SlidingWindowFD/DSFD.py b/StreamLearn/Algorithm/SlidingWindowFD/DSFD.py new file mode 100644 index 0000000..e1577be --- /dev/null +++ b/StreamLearn/Algorithm/SlidingWindowFD/DSFD.py @@ -0,0 +1,144 @@ +from StreamLearn.Algorithm.SlidingWindowFD.frequent_directions import ( + FrequentDirectionsWithDump, +) +from StreamLearn.Algorithm.SlidingWindowFD.sliding_windows_frequent_directions import ( + FastSlidingWindowFD, + SlidingWindowFD, +) +import numpy as np +from scipy import linalg +from typing import Dict +from StreamLearn.Base.SemiEstimator import StreamEstimator + + +class SeqDSFD(StreamEstimator): + def __init__( + self, + N: int, + R: float, + d: int, + sketch_dim: int, + beta: float = 1.0, + ty=SlidingWindowFD, + faster=FrequentDirectionsWithDump, + **kwargs, + ): + """Sliding Window on Frequent Directions + + Args: + N (int): Sliding window size. + R (float): Upper bound of square of 2-norm of row vectors. + d (int): Vector dimension. + sketch_dim (int): Sketch dimension. + beta (float): Additional coefficient of error, default as 1.0. + """ + self.N = np.uint64(N) + self.d = d + self.R = R + self.logR = int(np.floor(np.log2(R))) + 1 + self.beta = beta + self.sketch_dim = sketch_dim + self.ty = ty + self.faster = faster + + self.levels: Dict[int, ty] = {} + + if "upper_F_norm" in kwargs: + self.logR = int(np.ceil(np.log2(kwargs["upper_F_norm"] / self.N))) + base = self.N / self.sketch_dim + for j in range(self.logR): + self.levels[j] = ty( + self.N, + self.d, + self.sketch_dim, + error_threshold=base * (2**j), + faster=faster, + ) + + else: + for j in range(self.logR): + self.levels[j] = ty( + self.N, + self.d, + self.sketch_dim, + error_threshold=(2**j) * self.N / self.sketch_dim, + faster=faster, + ) + + self.time = np.uint64(0) + + def fit(self, stream_dataset): + """Handle the input vector + + Args: + X (npt.NDArray): Arriving vector at the time. (row vector: (1, n)-shape) + """ + if stream_dataset.ndim == 1: + stream_dataset = stream_dataset[np.newaxis, :] + + X = stream_dataset + self.time += np.uint64(1) + + for j in range(self.logR): + sq = self.levels[j].fd + q = sq.queue + C = sq.C + while len(q) != 0: + head_snapshot = q[0] + if ( + len(q) > (2 + 8 / self.beta) * self.sketch_dim + or head_snapshot.t + self.N <= self.time + ): + q.popleft() + else: + break + + sq = self.levels[j].fd_aux + q = sq.queue + C = sq.C + while len(q) != 0: + head_snapshot = q[0] + if ( + len(q) > (2 + 8 / self.beta) * self.sketch_dim + or head_snapshot.t + self.N <= self.time + ): + q.popleft() + else: + break + + error = C.get_error() + if X @ X.T >= error: + self.levels[j].append(X) + else: + self.levels[j].fit(X) + + def get(self): + j = 0 + rj = 0 + while j < self.logR: + sq = self.levels[j].fd + q = sq.queue + if len(q) != 0: + head_snapshot = q[0] + if self.time - head_snapshot.s >= min(self.N - 1, self.time - 1): + rj = j + break + else: + break + j += 1 + + j = rj + + return self.levels[j].get() + + def predict(self, X): + sk, _, _, _ = self.get() + if len(sk) == 0: + return 0 + y = sk @ X + y_norm = np.inner(y, y) + return y_norm + + def evaluate(self, y_pred, y_true): + ret = abs(y_pred - y_true) + return ret diff --git a/StreamLearn/Algorithm/SlidingWindowFD/__init__.py b/StreamLearn/Algorithm/SlidingWindowFD/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/StreamLearn/Algorithm/SlidingWindowFD/fastfdwithdump.py b/StreamLearn/Algorithm/SlidingWindowFD/fastfdwithdump.py new file mode 100644 index 0000000..2a74ca0 --- /dev/null +++ b/StreamLearn/Algorithm/SlidingWindowFD/fastfdwithdump.py @@ -0,0 +1,183 @@ +from line_profiler import profile +import numpy as np +import numpy.typing as npt +from scipy import linalg +from fbpca import pca, eigenn + +SVD_COUNT = 0 +VALID_SVD_COUNT = 0 + + +class FastFrequentDirectionsWithDump: + def __init__( + self, d: int, sketch_dim: int, error: float, approx: bool = False, **kwargs + ): + self.d = d + self.sketch_dim = sketch_dim + + self.sketch = np.zeros((self.sketch_dim, self.d), dtype=float) + self.sigma_squared = np.zeros(self.sketch_dim, dtype=float) + + self.max_energy: float = 0.0 + self.error: float = error + self.cov = np.zeros((self.sketch_dim, self.sketch_dim), dtype=float) + self.approx = approx + self.last_svd_size = self.sketch_dim + + self.svd_count = 0 + + @profile + def __flush(self): + global SVD_COUNT + global VALID_SVD_COUNT + sketch = self.sketch[:-1, :] + batch = self.sketch[-1:, :] + upper_right = sketch @ batch.T + lower_left = upper_right.T + lower_right = batch @ batch.T + self.cov = np.block([[self.cov, upper_right], [lower_left, lower_right]]) + # gap = np.max(np.abs(self.sketch@self.sketch.T-self.cov)) + # ret = [] + + # while self.max_energy >= self.error: + # U, s, _ = linalg.svd(self.cov, overwrite_a=False, lapack_driver="gesdd") + # U, s, _ = pca(self.cov, k=1) + # self.max_energy = float(sigma_squared[0]) + + # s, U = eigenn(self.cov, k=1) + if self.max_energy >= self.error: + # U, sigma_squared, _ = pca(self.cov, k=1) + # self.svd_count += 1 + sigma_squared, U = eigenn(self.cov, k=1, n_iter=1) + if sigma_squared[0] < self.error: + self.max_energy = sigma_squared[0] + return None + + SVD_COUNT += 1 + U, sigma_squared, _ = linalg.svd( + self.cov, full_matrices=False, lapack_driver="gesvd" + ) + + self.max_energy = float(sigma_squared[0] - sigma_squared[self.sketch_dim]) + + if self.max_energy >= self.error: + VALID_SVD_COUNT += 1 + + # U = U[:, 0:1] + # sigma_squared = s**2 + i = 0 + while i < len(sigma_squared): + self.max_energy = float( + sigma_squared[i] - sigma_squared[self.sketch_dim] + ) + if self.max_energy < self.error: + break + i += 1 + # while sigma_squared[0] >= self.error: + if i == 0: + return None + + sigma_vt = U[:, :i].T @ self.sketch + # ret.append(sigma_vt) + vt = sigma_vt / np.sqrt(sigma_squared[:i])[:, None] + sigma_vt = np.multiply( + vt, + np.sqrt(sigma_squared[:i] - sigma_squared[self.sketch_dim]).reshape( + -1, 1 + ), + ) + # vt = sigma_vt / linalg.norm(sigma_vt) + # if np.abs(linalg.norm(vt) - 1) >0.1: + # raise Exception('error') + Dv = self.sketch @ vt.T + self.sketch = self.sketch - Dv @ vt + self.cov = self.cov - Dv @ Dv.T + # self.max_energy = float(sigma_squared[1]) + # ret = np.vstack([ret, sigma_vt]) + + # sigma_squared, U = eigenn(self.cov, k=1, n_iter=1) + # # U, sigma_squared, _ = pca(self.cov, k=1) + # # U, sigma_squared, _ = linalg.svd(self.cov, overwrite_a=False, lapack_driver="gesvd") + # U = U[:, :1] + # self.max_energy = float(sigma_squared[0]) + # if self.max_energy > self.error: + # sigma_vt = U[:, :1].T @ self.sketch + # # ret.append(sigma_vt) + # vt = sigma_vt / np.sqrt(sigma_squared[:1])[:, None] + # sigma_vt = np.multiply(vt, np.sqrt( + # sigma_squared[:1]).reshape(-1, 1)) + # # vt = sigma_vt / linalg.norm(sigma_vt) + # # if np.abs(linalg.norm(vt) - 1) >0.1: + # # raise Exception('error') + # Dv = self.sketch @ vt.T + # self.sketch = self.sketch - Dv @ vt + # self.cov = self.cov - Dv @ Dv.T + # return sigma_vt + # else: + # return None + # U, sigma_squared, _ = pca(self.cov, k=1) + # U, sigma_squared, _ = linalg.svd(self.cov, overwrite_a=False, lapack_driver="gesdd") + # U = U[:, 0:1] + # self.max_energy = float(sigma_squared[0]) + # return None + + # if len(ret) == 0: + # return None + # else: + # return np.vstack(ret) + return None + + def get_error(self) -> float: + return self.error + + @profile + def fit(self, X): + """ + Fits the FD transform to dataset X + """ + self.max_energy += float(X @ X.T) + self.sketch = np.vstack([self.sketch, X]) + + if len(self.sketch) >= 2 * self.sketch_dim: + _, s, Vt = linalg.svd( + self.sketch, full_matrices=False, lapack_driver="gesvd" + ) + self.sigma_squared = s**2 + if len(self.sigma_squared) > self.sketch_dim: + self.sigma_squared = ( + self.sigma_squared[: self.sketch_dim] + - self.sigma_squared[self.sketch_dim] + ) + Vt = Vt[: self.sketch_dim] + self.sketch = Vt * np.sqrt(self.sigma_squared).reshape(-1, 1) + # self.cov = self.sketch @ self.sketch.T + ret = self.dump() + else: + ret = self.__flush() + + return ret + + @profile + def dump(self) -> npt.NDArray: + idx = np.where(self.sigma_squared >= self.error) + if len(idx[0]) == 0: + ret = None + else: + ret = self.sketch[idx] + self.sketch[idx, :] = 0.0 + self.sigma_squared[idx] = 0.0 + # np.roll(self.sketch, -1) + # np.roll(self.sigma_squared, -1) + # self.sketch[:, :] = 0 + # self.sigma_squared[:] = 0 + # self.Vt[:, :] = 0 + + self.max_energy = float(max(self.sigma_squared)) + self.cov = self.sketch @ self.sketch.T + return ret + + def get(self): + return self.sketch, None, None, None + + def get_sketch(self): + return self.sketch diff --git a/StreamLearn/Algorithm/SlidingWindowFD/frequent_directions.py b/StreamLearn/Algorithm/SlidingWindowFD/frequent_directions.py new file mode 100644 index 0000000..100fbc4 --- /dev/null +++ b/StreamLearn/Algorithm/SlidingWindowFD/frequent_directions.py @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from line_profiler import profile +import numpy as np +import numpy.typing as npt +from scipy import linalg + +SVD_COUNT_OURS = 0 +FLUSH_HIT = 0 +FLUSH_ENTER = 0 + + +class FrequentDirections: + def __init__(self, d, sketch_dim=8): + """ + Class wrapper for all FD-type methods + + __rotate_and_reduce__ is not defined for the standard FrequentDirections but is for the + subsequent subclasses which inherit from FrequentDirections. + """ + self.d = d + self.delta = 0.0 # For RFD + + if sketch_dim is not None: + self.sketch_dim = sketch_dim + self.sketch = np.zeros((self.sketch_dim, self.d), dtype=float) + self.Vt = np.zeros((self.sketch_dim, self.d), dtype=float) + self.sigma_squared = np.zeros(self.sketch_dim, dtype=float) + + self.svd_count = 0 + + @profile + def fit(self, X, batch_size=1): + """ + Fits the FD transform to dataset X + """ + global SVD_COUNT_OURS + n = X.shape[0] + for i in range(0, n, batch_size): + aux = np.zeros((self.sketch_dim + batch_size, self.d)) + batch = X[i : i + batch_size, :] + # aux = np.concatenate((self.sketch, batch), axis=0) + aux[0 : self.sketch_dim, :] = self.sketch + aux[self.sketch_dim : self.sketch_dim + batch.shape[0], :] = batch + # ! WARNING - SCIPY SEEMS MORE ROBUST THAN NUMPY SO COMMENTING THIS WHICH IS FASTER OVERALL + # try: + # _, s, self.Vt = np.linalg.svd(aux, full_matrices=False) + # except np.linalg.LinAlgError: + # _, s, self.Vt = linalg.svd(aux, full_matrices=False, lapack_driver='gesvd') + _, s, self.Vt = linalg.svd(aux, full_matrices=False, lapack_driver="gesvd") + + # self.svd_count += 1 + SVD_COUNT_OURS += 1 + + self.sigma_squared = s**2 + self.__rotate_and_reduce__() + self.sketch = self.Vt * np.sqrt(self.sigma_squared).reshape(-1, 1) + + def get(self): + return self.sketch, self.sigma_squared, self.Vt, self.delta + + def get_sketch(self): + return self.sketch + + +class FastFrequentDirections(FrequentDirections): + """ + Implements the fast version of FD by doubling space + """ + + def __rotate_and_reduce__(self): + self.sigma_squared = ( + self.sigma_squared[: self.sketch_dim] - self.sigma_squared[self.sketch_dim] + ) + self.Vt = self.Vt[: self.sketch_dim] + + +class RobustFrequentDirections(FrequentDirections): + """ + Implements the RFD version of FD by maintaining counter self.delta. + Still operates in the `fast` regimen by doubling space, as in + FastFrequentDirections + """ + + def __rotate_and_reduce__(self): + if len(self.sigma_squared) > self.sketch_dim: + self.delta += self.sigma_squared[self.sketch_dim] / 2.0 + self.sigma_squared = ( + self.sigma_squared[: self.sketch_dim] + - self.sigma_squared[self.sketch_dim] + ) + self.Vt = self.Vt[: self.sketch_dim] + + +class FrequentDirectionsWithDump(RobustFrequentDirections): + def __init__(self, d: int, sketch_dim: int, error: float): + super().__init__(d, min(sketch_dim, d)) + self.max_energy: float = 0.0 + self.buffer = None + self.error: float = error + + self.flush_hit = 0 + self.flush_enter = 0 + + @profile + def __flush(self): + # self.flush_enter += 1 + global FLUSH_ENTER + FLUSH_ENTER += 1 + if self.buffer is not None: + super().fit( + self.buffer, batch_size=min(self.buffer.shape[0], self.sketch_dim) + ) + self.max_energy = self.sigma_squared[0] + self.buffer = None + + def get_error(self) -> float: + return self.error + + @profile + def fit(self, X, batch_size=1): + global FLUSH_HIT + self.max_energy += X @ X.T + if self.buffer is None: + self.buffer = X + else: + self.buffer = np.concatenate([self.buffer, X]) + + if self.buffer is not None and len(self.buffer) >= self.sketch_dim: + # self.flush_hit += 1 + FLUSH_HIT += 1 + self.__flush() + elif self.max_energy >= self.error: + FLUSH_HIT += 1 + # self.flush_hit += 1 + self.__flush() + + @profile + def dump(self) -> npt.NDArray: + if self.sigma_squared[0] >= self.error: + v = np.sqrt(self.sigma_squared[0]) * self.Vt[0:1] + self.sketch[0, :] = 0 + self.sigma_squared[0] = 0 + self.Vt[0, :] = 0 + np.roll(self.sketch, -1) + np.roll(self.sigma_squared, -1) + np.roll(self.Vt, -1) + self.max_energy = self.sigma_squared[0] + # self.sketch[:, :] = 0 + # self.sigma_squared[:] = 0 + # self.Vt[:, :] = 0 + return v + else: + return None + + def get(self): + self.__flush() + return super().get() + + def get_sketch(self): + self.__flush() + return self.sketch + + def flush(self): + self.__flush() + + # def __rotate_and_reduce__(self): + # # self.delta += self.sigma_squared[self.sketch_dim] / 2. + # self.sigma_squared = self.sigma_squared[:self.sketch_dim] + # # self.sigma_squared[self.sketch_dim] + # self.Vt = self.Vt[:self.sketch_dim] + + +class FasterFrequentDirectionsWithDump(RobustFrequentDirections): + def __init__(self, d: int, sketch_dim: int, error: float): + super().__init__(d, min(sketch_dim, d)) + self.buffer = None + self.error: float = error + + self.flush_hit = 0 + self.flush_enter = 0 + + @profile + def __flush(self): + # self.flush_enter += 1 + global FLUSH_ENTER + FLUSH_ENTER += 1 + if self.buffer is not None: + super().fit( + self.buffer, batch_size=min(self.buffer.shape[0], self.sketch_dim) + ) + self.buffer = None + + def get_error(self) -> float: + return self.error + + @profile + def fit(self, X, batch_size=1): + global FLUSH_HIT + if self.buffer is None: + self.buffer = X + else: + self.buffer = np.concatenate([self.buffer, X]) + + if self.buffer is not None and len(self.buffer) >= self.sketch_dim: + # self.flush_hit += 1 + FLUSH_HIT += 1 + self.__flush() + # elif self.max_energy >= self.error: + # FLUSH_HIT += 1 + # # self.flush_hit += 1 + # self.__flush() + + @profile + def dump(self) -> npt.NDArray: + i = 0 + while i < len(self.sigma_squared) and self.sigma_squared[i] >= self.error: + i += 1 + + if i != 0: + # v = np.sqrt(self.sigma_squared[0]) * self.Vt[0:1] + v = self.sketch[:i, :] + self.sketch[:i, :] = 0 + self.sigma_squared[:i] = 0 + self.Vt[:i, :] = 0 + np.roll(self.sketch, -i) + np.roll(self.sigma_squared, -i) + np.roll(self.Vt, -i) + self.max_energy = self.sigma_squared[0] + # self.sketch[:, :] = 0 + # self.sigma_squared[:] = 0 + # self.Vt[:, :] = 0 + return v + else: + return None + + def get(self): + self.__flush() + return super().get() + + def get_sketch(self): + self.__flush() + return self.sketch + + def flush(self): + self.__flush() + + # def __rotate_and_reduce__(self): + # # self.delta += self.sigma_squared[self.sketch_dim] / 2. + # self.sigma_squared = self.sigma_squared[:self.sketch_dim] + # # self.sigma_squared[self.sketch_dim] + # self.Vt = self.Vt[:self.sketch_dim] diff --git a/StreamLearn/Algorithm/SlidingWindowFD/sliding_windows_frequent_directions.py b/StreamLearn/Algorithm/SlidingWindowFD/sliding_windows_frequent_directions.py new file mode 100644 index 0000000..35e3f1e --- /dev/null +++ b/StreamLearn/Algorithm/SlidingWindowFD/sliding_windows_frequent_directions.py @@ -0,0 +1,410 @@ +from StreamLearn.Algorithm.SlidingWindowFD.frequent_directions import ( + FrequentDirectionsWithDump, + FasterFrequentDirectionsWithDump, +) +from StreamLearn.Algorithm.SlidingWindowFD.fastfdwithdump import ( + FastFrequentDirectionsWithDump, +) +import numpy as np +import numpy.typing as npt +from dataclasses import dataclass, field +from scipy.io import savemat +from scipy import linalg +from collections import deque + + +@dataclass +class FDSnapshot: + v: npt.NDArray + s: np.uint64 + t: np.uint64 + + +@dataclass +class DumpFDwithSnapshotQueue: + C: ( + FrequentDirectionsWithDump + | FastFrequentDirectionsWithDump + | FasterFrequentDirectionsWithDump + ) + queue: deque[FDSnapshot] = field(default_factory=deque) + last_dump_time: np.uint64 = np.uint64(1) + size: float = 0.0 + + +class SlidingWindowFD: + def __init__( + self, + N: int, + d: int, + sketch_dim: int, + C: int = 1, + faster=FrequentDirectionsWithDump, + **kwargs, + ): + """Sliding Window on Frequent Directions + + Args: + N (int): Sliding window size. + d (int): Vector dimension. + sketch_dim (int): Sketch dimension. + error_threshold (float, optional): Dump threshold, default as `1.0`. + """ + if "error_threshold" in kwargs: + self.error = kwargs["error_threshold"] + else: + self.error = N * 1.0 / sketch_dim + + self.faster = faster + + self.fd = DumpFDwithSnapshotQueue(C=self.faster(d, sketch_dim * C, self.error)) + self.fd_aux = DumpFDwithSnapshotQueue( + C=self.faster(d, sketch_dim * C, self.error) + ) + self.period = 0 + + self.N = np.uint64(N) + self.d = d + self.C = C + self.sketch_dim = sketch_dim + self.time = np.uint64(0) + + def append(self, X: npt.NDArray, t=None): + if t != None: + self.time = np.uint64(t) + else: + self.time += np.uint64(1) + + # if self.fd.size > 2 * self.error * self.sketch_dim: + # if self.time % self.N == 1: + while self.period < self.time // self.N: + self.fd = self.fd_aux + self.fd_aux = DumpFDwithSnapshotQueue( + C=self.faster(self.d, self.sketch_dim * self.C, self.error), + last_dump_time=self.time, + ) + self.period += 1 + + s = self.fd.last_dump_time + self.fd.last_dump_time = self.time + 1 + self.fd.queue.append(FDSnapshot(v=X, s=s, t=self.time)) + + s = self.fd_aux.last_dump_time + self.fd_aux.last_dump_time = self.time + 1 + self.fd_aux.queue.append(FDSnapshot(v=X, s=s, t=self.time)) + + def fit(self, X: npt.NDArray, t=None): + """Handle the input vector + + Args: + X (npt.NDArray): Arriving vector at the time. (row vector: (1, n)-shape) + """ + + if t != None: + self.time = np.uint64(t) + else: + self.time += np.uint64(1) + # self.fd.size += X@X.T + # self.fd_aux.size += X@X.T + + # if self.fd.size > 2 * self.error * self.sketch_dim: + # if self.time % self.N == 1: + while self.period < self.time // self.N: + self.fd = self.fd_aux + self.fd_aux = DumpFDwithSnapshotQueue( + C=self.faster(self.d, self.sketch_dim * self.C, self.error), + last_dump_time=self.time, + ) + self.period += 1 + + while len(self.fd.queue) != 0: + head_snapshot = self.fd.queue[0] + if head_snapshot.t + self.N <= self.time: + self.fd.queue.popleft() + else: + break + + while len(self.fd_aux.queue) != 0: + head_snapshot = self.fd_aux.queue[0] + if head_snapshot.t + self.N <= self.time: + self.fd_aux.queue.popleft() + else: + break + + # with energy optimization + self.fd.C.fit(X) + + dumped = self.fd.C.dump() + if dumped is not None: + s = self.fd.last_dump_time + self.fd.last_dump_time = self.time + 1 + self.fd.queue.append(FDSnapshot(v=dumped, s=s, t=self.time)) + + self.fd_aux.C.fit(X) + + dumped = self.fd_aux.C.dump() + if dumped is not None: + s = self.fd_aux.last_dump_time + self.fd_aux.last_dump_time = self.time + 1 + self.fd_aux.queue.append(FDSnapshot(v=dumped, s=s, t=self.time)) + + # without energy + # sketch, sigma_squared, Vt, delta = self.fd.get() + + # if sigma_squared[0] >= (self.N * 1.0 * self.R)/(self.C * self.sketch_dim): + # v = np.sqrt(sigma_squared[0]) * Vt[0:1] + # t = self.time + # self.snapshots.append(FDSnapshot(v, t)) + # sketch[0, :] = 0 + # sigma_squared[0] = 0 + # Vt[0, :] = 0 + # np.roll(sketch, -1) + # np.roll(sigma_squared, -1) + # np.roll(Vt, -1) + + def get(self): + q = self.fd.queue + C = self.fd.C.get_sketch() + if len(q) != 0: + ret = np.vstack([C, *(s.v for s in q)]) + + _, s, Vt = linalg.svd(ret, full_matrices=False, lapack_driver="gesvd") + s = s**2 + if s.shape[0] > self.sketch_dim: + s = s[: self.sketch_dim] - s[self.sketch_dim] + Vt = Vt[: self.sketch_dim] + # else: + # s = np.pad(s, (0, self.sketch_dim-s.shape[0]), 'constant') + # Vt = np.pad( + # Vt, ((0, self.sketch_dim-Vt.shape[0]), (0, 0)), 'constant') + sketch = Vt * np.sqrt(s).reshape(-1, 1) + + return sketch, s, Vt, 0.0 + else: + return self.fd.C.get() + + # @profile + def get_sketch(self): + q = self.fd.queue + ret = self.fd.C.sketch + if len(q) != 0: + ret = np.vstack([ret, *(s.v for s in q)]) + + if self.fd.C.buffer is not None: + ret = np.vstack([ret, self.fd.C.buffer]) + + if len(ret) > self.sketch_dim: + _, s, Vt = linalg.svd(ret, full_matrices=False, lapack_driver="gesvd") + s = s**2 + s = s[: self.sketch_dim] - s[self.sketch_dim] + Vt = Vt[: self.sketch_dim] + sketch = Vt * np.sqrt(s).reshape(-1, 1) + + return sketch + else: + return ret + + def get_size(self): + return len(self.fd.queue) + len(self.fd_aux.queue) + 2 * self.sketch_dim + + +class FastSlidingWindowFD: + def __init__(self, N: int, d: int, sketch_dim: int, C: int = 1, **kwargs): + """Sliding Window on Frequent Directions + + Args: + N (int): Sliding window size. + d (int): Vector dimension. + sketch_dim (int): Sketch dimension. + error_threshold (float, optional): Dump threshold, default as `1.0`. + """ + if "error_threshold" in kwargs: + self.error = kwargs["error_threshold"] + else: + self.error = N * 1.0 / sketch_dim + + self.fd = DumpFDwithSnapshotQueue( + C=FastFrequentDirectionsWithDump(d, sketch_dim * C, self.error) + ) + self.fd_aux = DumpFDwithSnapshotQueue( + C=FastFrequentDirectionsWithDump(d, sketch_dim * C, self.error) + ) + self.period = 0 + + self.N = np.uint64(N) + self.d = d + self.C = C + self.sketch_dim = sketch_dim + self.time = np.uint64(0) + + def append(self, X: npt.NDArray, t=None): + if t != None: + self.time = np.uint64(t) + else: + self.time += np.uint64(1) + + # self.fd.size += X@X.T + # self.fd_aux.size += X@X.T + + # if self.fd.size > 2 * self.error * self.sketch_dim: + while self.period < self.time // self.N: + self.fd = self.fd_aux + self.fd_aux = DumpFDwithSnapshotQueue( + C=FastFrequentDirectionsWithDump( + self.d, self.sketch_dim * self.C, self.error + ), + last_dump_time=self.time, + ) + self.period += 1 + + s = self.fd.last_dump_time + self.fd.last_dump_time = self.time + 1 + self.fd.queue.append(FDSnapshot(v=X, s=s, t=self.time)) + + s = self.fd_aux.last_dump_time + self.fd_aux.last_dump_time = self.time + 1 + self.fd_aux.queue.append(FDSnapshot(v=X, s=s, t=self.time)) + + def fit(self, X: npt.NDArray, t=None): + """Handle the input vector + + Args: + X (npt.NDArray): Arriving vector at the time. (row vector: (1, n)-shape) + """ + + if t != None: + self.time = np.uint64(t) + else: + self.time += np.uint64(1) + + # self.fd.size += X@X.T + # self.fd_aux.size += X@X.T + + # if self.fd.size > 2 * self.error * self.sketch_dim: + while self.period < self.time // self.N: + self.fd = self.fd_aux + self.fd_aux = DumpFDwithSnapshotQueue( + C=FastFrequentDirectionsWithDump( + self.d, self.sketch_dim * self.C, self.error + ), + last_dump_time=self.time, + ) + self.period += 1 + + while len(self.fd.queue) != 0: + head_snapshot = self.fd.queue[0] + if head_snapshot.t + self.N <= self.time: + self.fd.queue.popleft() + else: + break + + while len(self.fd_aux.queue) != 0: + head_snapshot = self.fd_aux.queue[0] + if head_snapshot.t + self.N <= self.time: + self.fd_aux.queue.popleft() + else: + break + + # with energy optimization + dumped = self.fd.C.fit(X) + + if dumped is not None: + s = self.fd.last_dump_time + self.fd.last_dump_time = self.time + 1 + self.fd.queue.append(FDSnapshot(v=dumped, s=s, t=self.time)) + + dumped = self.fd_aux.C.fit(X) + + if dumped is not None: + s = self.fd_aux.last_dump_time + self.fd_aux.last_dump_time = self.time + 1 + self.fd_aux.queue.append(FDSnapshot(v=dumped, s=s, t=self.time)) + + def get(self): + q = self.fd.queue + C = self.fd.C.get_sketch() + if len(q) != 0: + ret = np.vstack([C, *(s.v for s in q)]) + + _, s, Vt = linalg.svd(ret, full_matrices=False, lapack_driver="gesvd") + s = s**2 + s = s[: self.sketch_dim] - s[self.sketch_dim] + Vt = Vt[: self.sketch_dim] + sketch = Vt * np.sqrt(s).reshape(-1, 1) + + return sketch, s, Vt, 0.0 + else: + return self.fd.C.get() + + # @profile + def get_sketch(self): + q = self.fd.queue + ret = self.fd.C.sketch + if len(q) != 0: + ret = np.vstack([ret, *(s.v for s in q)]) + + if self.fd.C.buffer is not None: + ret = np.vstack([ret, self.fd.C.buffer]) + + if len(ret) > self.sketch_dim: + _, s, Vt = linalg.svd(ret, full_matrices=False, lapack_driver="gesvd") + s = s**2 + s = s[: self.sketch_dim] - s[self.sketch_dim] + Vt = Vt[: self.sketch_dim] + sketch = Vt * np.sqrt(s).reshape(-1, 1) + + return sketch + else: + return ret + + def get_size(self): + return len(self.fd.queue) + len(self.fd_aux.queue) + 2 * self.sketch_dim + + +def run(l, N, d, epochs): + A = np.empty((epochs, d)) + sw_fd = SlidingWindowFD(N, d, l) + B = np.empty((epochs, l, d)) + deltas = np.empty((epochs)) + count = 0 + for t in tqdm(range(epochs)): + random_vector = np.random.randn(1, d) + a = random_vector / np.linalg.norm(random_vector, ord=2) + # a = np.sqrt(R/(l-3)) * np.eye(1, d, k=(t//N) % d) + # if (t // 128) % 2 == 0: + # a /= 100 + # count += 1 + # a = random_vector * np.sqrt(R)/d + # while linalg.norm(a, ord=2) ** 2 > R: + # random_vector = np.random.randn(1, d) + # a = random_vector * np.sqrt(R)/d + A[t : t + 1, :] = a + sw_fd.fit(a) + B_t, _, _, delta = sw_fd.get() + B[t] = B_t + deltas[t] = delta + + mdic = { + "l": l, + "N": N, + "d": d, + "R": 1.0, + "epochs": epochs, + "A": A, + "B": B, + "deltas": deltas, + } + mat_path = f"results/result_l={l},N={N},d={d},epochs={epochs}.mat" + savemat(mat_path, mdic) + print(mat_path) + print(count) + + +if __name__ == "__main__": + l = 16 + N = 16**3 + d = 32 + epochs = N * 16 + for i in range(1): + run(l, N, d, epochs) + l, N, d, epochs = l * 2, N * 2, d * 2, epochs * 2 diff --git a/StreamLearn/Dataset/FDDataset.py b/StreamLearn/Dataset/FDDataset.py new file mode 100644 index 0000000..3ca0063 --- /dev/null +++ b/StreamLearn/Dataset/FDDataset.py @@ -0,0 +1,34 @@ +from StreamLearn.Dataset.StreamDataset import StreamDataset +import pandas as pd +import numpy as np + + +class FDDataset(StreamDataset): + def __init__(self, args, name="pamap"): + self.name = name + self.args = args + + match name: + case "pamap": + df = pd.read_csv( + "StreamLearn/Dataset/subject103.dat", delim_whitespace=True + ) + A = df.values.astype(np.float64) + A = A[:, 2:] + A[np.isnan(A)] = 1 + self.m, self.d = A.shape + Rs = np.linalg.norm(A, axis=1) ** 2 + r = np.min(Rs) + A = A / np.sqrt(r) + + Rs = np.linalg.norm(A, axis=1) ** 2 + self.r = np.min(Rs) + self.R = np.max(Rs) + + self.A = A + + def __getitem__(self, index): + return self.A[index] + + def __len__(self): + return len(self.A) diff --git a/StreamLearn/tests/test_SWFD.py b/StreamLearn/tests/test_SWFD.py new file mode 100644 index 0000000..901753a --- /dev/null +++ b/StreamLearn/tests/test_SWFD.py @@ -0,0 +1,46 @@ +import numpy as np +import argparse +import random +from StreamLearn.Algorithm.SlidingWindowFD.DSFD import SeqDSFD +from StreamLearn.Dataset.FDDataset import FDDataset +from scipy import linalg + + +def set_seed(seed): + random.seed(seed) + np.random.seed(seed) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--dataset", type=str, default="imbalance", help="[balance, imbalance]" + ) + parser.add_argument("--N", type=int, default=10000, help="Sliding window size") + parser.add_argument("--R", type=float, default=1403) + parser.add_argument("--l", type=int, default=50) + args = parser.parse_args() + set_seed(2023) + + dataset = FDDataset(args) + d = dataset.d + + N = args.N + R = args.R + l = args.l + A = np.zeros((0, d)) + swfd = SeqDSFD(N, R, d, l, beta=1.0) + for i in range(len(dataset)): + data = dataset[i] + swfd.fit(data) + A = np.vstack([A, data])[-N:] + direction = np.random.normal(size=d) + direction = direction / linalg.norm(direction) + predict = swfd.predict(direction) + ground_truth = linalg.norm(A @ direction, 2) ** 2 + loss = swfd.evaluate(predict, ground_truth) + print("GDRO", i, loss) + + +if __name__ == "__main__": + main() -- Gitee