diff --git a/.gitignore b/.gitignore index 5ac158d5c777460860dc9fcd810c2b0e5371cb53..97d4a1501564743fabd860ea9788d9f47a862d9d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__/ build .vscode/ /dataset -/logs \ No newline at end of file +/logs +.DS_Store diff --git a/StreamLearn/Algorithm/EnsembleUpdate/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..3b0f5e2820f5cf17bf8f7cfcb83ba40fad4648a7 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/__init__.py @@ -0,0 +1,7 @@ +from . import environment, learner, utils + +__all__ = [ + "environment", + "learner", + "utils", +] \ No newline at end of file diff --git a/StreamLearn/Algorithm/EnsembleUpdate/environment/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/environment/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..54a380b16048d564f22b31bd99b3bf917ed2036b --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/environment/__init__.py @@ -0,0 +1,2 @@ +from .domain import Simplex, Ball +from .environment import Environment diff --git a/StreamLearn/Algorithm/EnsembleUpdate/environment/domain.py b/StreamLearn/Algorithm/EnsembleUpdate/environment/domain.py new file mode 100755 index 0000000000000000000000000000000000000000..95fa1cc4db94b80178f75e469e23da23e1fe51a1 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/environment/domain.py @@ -0,0 +1,197 @@ +from abc import ABC, abstractmethod +from copy import deepcopy +from typing import Optional, Union + +import numpy as np + + +class Domain(ABC): + """An abstract class representing the feasible domain. + + Args: + dimension (int): Dimension of the feasible set. + + """ + + def __init__(self, dimension): + self.dimension = dimension + + @abstractmethod + def init_x(self, prior: Optional[Union[str, np.ndarray]], + seed: Optional[int]) -> np.ndarray: + """Initialize a decision in the domain. + """ + pass + + @abstractmethod + def project(self, x: np.ndarray): + """Project the decision :math:`x` back to the feasible set. + """ + pass + + +class Ball(Domain): + """This class defines a Euclid ball as the feasible set. + + Args: + dimension (int): Dimension of the feasible set. + radius (float): Radius of the ball. + center (numpy.ndarray, optional): Coordinates of the center point. + Default to the origin point if not specified. + + Attributes: + R (float): Radius of the minimum outside ball, which is useful for + irregular domains. + r (float): Radius of the maximum inside ball, which is useful for + irregular domains. + """ + + def __init__(self, + dimension: int, + radius: float = 1., + center: np.ndarray = None): + super().__init__(dimension=dimension) + self.radius = radius + self.center = center if center is not None else np.zeros(dimension) + self.R = radius # the radius of the minimum outside ball + self.r = radius # the radius of the maximum inside ball + + def init_x(self, prior: Optional[Union[str, np.ndarray]], + seed: Optional[int]) -> np.ndarray: + """Initialize a decision in the domain. + + Args: + prior (numpy.ndarray, optional): Prior information to initialize + the decision. If a ``numpy.ndarray`` is given, the method will + return ``prior`` as the decision, otherwise return a random vector + in the Ball. + seed (int, optional): Random seed to initial the decision if `prior=None`. + + Returns: + numpy.ndarray: a decision in the ball. + """ + if prior is not None: + assert len(prior) == self.dimension + return np.array(prior) + else: + np.random.seed(seed) + random_direction = np.random.normal(size=self.dimension) + random_direction /= np.linalg.norm(random_direction) + random_radius = np.random.random() + return self.radius * random_direction * random_radius + + def unit_vec(self, seed: Optional[int] = None) -> np.ndarray: + """Sample a unit vector uniformly at random. + + Args: + seed (int, optional): Random seed to sample the vector. + + Returns: + numpy.ndarray: a decision in the ball. + """ + np.random.seed(seed) + random_direction = np.random.normal(size=self.dimension) + random_direction /= np.linalg.norm(random_direction) + return random_direction + + def project(self, x: np.ndarray) -> np.ndarray: + """Project the decision :math:`x` back to the ball by Euclid distance. + + Args: + x(numpy.ndarray): the vector to be projected. + + Returns: + numpy.ndarray: the projected vector. + """ + distance = np.linalg.norm(x - self.center) + if distance > self.r: + x = self.center + (x - self.center) * self.r / distance + return x + + def __mul__(self, scale: float): + new_ball = deepcopy(self) + new_ball.radius *= scale + new_ball.R *= scale + new_ball.r *= scale + return new_ball + + def __rmul__(self, scale: float): + return self.__mul__(scale) + + +class Simplex(Domain): + """This class defines a simplex as the feasible set. + + Args: + dimension (int): Dimension of the feasible set. + """ + + def __init__(self, dimension: int): + super().__init__(dimension=dimension) + + def init_x(self, + prior: Union[str, np.ndarray] = 'uniform', + seed: Optional[int] = None) -> np.ndarray: + """Initialize a decision x in the domain. + + Args: + prior (numpy.ndarray, 'uniform', 'nonuniform', optional): Prior + information to initialize the decision. If a ``numpy.ndarray`` is + given, the method will return ``prior`` as the decision; if + ``prior='uniform'``, the method will return the uniform vector + :math:`x_i = 1/d, \\forall i \in [d]`; if ``prior='nonuniform'``, + the method will return :math:`x_i = \\frac{d+1}{d} \cdot + \\frac{1}{i(i+1)}, \\forall i \in [d]`, where :math:`d` is the + dimension of the simplex; if ``prior=None``, the method will return + a random vector in the simplex. + seed (int, optional): Random seed to initial the decision if `prior=None`. + + Returns: + numpy.ndarray: a decision in the ball. + """ + if prior is None: + np.random.seed(seed) + x = np.random.rand(self.dimension) + x /= np.linalg.norm(x, ord=1) + elif isinstance(prior, np.ndarray): + x = prior + elif prior == 'uniform': + x = np.ones(self.dimension) / self.dimension + elif prior == 'nonuniform': + x = np.array([(self.dimension + 1) / (self.dimension * i * (i + 1)) + for i in range(1, self.dimension + 1)]) + else: + raise TypeError(f'{prior} is not defined.') + return x + + def project(self, + x: np.ndarray, + dist: str = 'kl_div', + norm: Union[int, str] = 1): + """Project the decision :math:`x` back to the simplex. + + Args: + x (numpy.ndarray): Vector to be projected. + dist (str): Distance metric used to project the decision. Valid + options include ``'kl_div'`` or ``'norm'``. if ``dist=kl_div``, the + return decision will be :math:`x_i = x_i / \sum_j x_j`, otherwise, + the norm distance will be used to project the decision. + norm (int, str, optional): Type of norm which is only used when + ``dist='norm'``. Valid options include any positive integer or + ``'inf'`` (infinity norm). + + Returns: + numpy.ndarray: the projected vector. + """ + if dist == 'kl_div': + return x / np.linalg.norm(x, ord=1) + elif dist == 'norm': + import cvxpy as cp # Only import when it needed since it will raise error when using too many sub-processes in multiprocessing. + y = cp.Variable(self.dimension) + obj = cp.Minimize(cp.sum(cp.atoms.norm(y - x, p=norm))) + constr = [y >= 0, cp.sum(y) == 1] + problem = cp.Problem(obj, constr) + problem.solve() + return np.array(y.value).squeeze() + else: + raise TypeError(f'{dist} is not defined.') diff --git a/StreamLearn/Algorithm/EnsembleUpdate/environment/environment.py b/StreamLearn/Algorithm/EnsembleUpdate/environment/environment.py new file mode 100755 index 0000000000000000000000000000000000000000..1794d24e4f5292ccbbe16c43b7d3d694b4e8c073 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/environment/environment.py @@ -0,0 +1,114 @@ +from typing import Callable, Iterable, Optional +from autograd import grad as grad_solver +import numpy as np + + +class Environment: + """Class for the environment, including loss function, optimism, and so on. + + At each round, the environment can set two loss functions, one is the origin + loss function ``func`` :math:`f_t` and the other is the surrogate loss + function ``surrogate_func`` :math:`f'_t` (if any). The gradient of which function is + given to the learner is determined by ``use_surrogate_grad``. + + Args: + func_sequence (Iterable, optional): Loss function sequence for whole + time horizon. + optimism (numpy.ndarray): Optimism at the beginning of current round. + func (Callable, optional): Origin loss function at current round. + grad (numpy.ndarray, optional): Gradient of all decisions for origin + loss function, only used when the gradient of all decisions are the + same, namely, the liner function. + grad_func (Callable, optional): Gradient function for origin loss + function. It can be given by the environment to accelerate the running + time. If it is not given, the origin gradient function will be computed by + ``autograd``. + surrogate_func (Callable, optional): Surrogate loss function at current round. + surrogate_grad (numpy.ndarray, optional): Gradient of all decisions for surrogate + loss function, only used when the gradient of all decisions are the + same, namely, the liner function. + surrogate_grad_func (Callable, optional): Gradient function for surrogate loss + function. It can be given by the environment to accelerate the running + time. If it is not given, the surrogate gradient function will be computed by + ``autograd``. + use_surrogate_grad (bool): Gradient of which function is returned by the + environment. + full_info (bool): Specify the type of feedback: full-information or + bandit feedback. + + """ + + def __init__(self, + func_sequence: Optional[Iterable] = None, + optimism: Optional[np.ndarray] = None, + func: Optional[Callable[[np.ndarray], float]] = None, + grad: Optional[np.ndarray] = None, + grad_func: Optional[Callable[[np.ndarray], float]] = None, + surrogate_func: Optional[Callable[[np.ndarray], float]] = None, + surrogate_grad: Optional[np.ndarray] = None, + surrogate_grad_func: Optional[Callable[[np.ndarray], float]] = None, + use_surrogate_grad: bool = True, + full_info: bool = True) -> None: + + self.func_sequence = func_sequence + self.optimism = optimism + self.func = func + self.grad = grad + self.grad_func = grad_func + self.surrogate_func = surrogate_func + self.surrogate_grad = surrogate_grad + self.surrogate_grad_func = surrogate_grad_func + self.use_surrogate_grad = use_surrogate_grad + self.full_info = full_info + + def __getitem__(self, t): + self.func = self.func_sequence[t] + self.grad = None + self.grad_func = None + self.surrogate_func = None + self.surrogate_grad = None + self.surrogate_grad_func = None + return self + + def get_loss(self, x: np.ndarray): + """Get the loss value of the decision :math:`x`. + + Args: + x (numpy.ndarray): Decision of the learner. + + Returns: + tuple: tuple contains: + loss (float): Origin loss value.\n + surrogate_loss (float): Surrogate loss value + """ + loss = self.func(x) + surrogate_loss = self.surrogate_func( + x) if self.surrogate_func else None + return loss, surrogate_loss + + def get_grad(self, x: np.ndarray): + """Get the gradient of the decision :math:`x`. + + Args: + x (numpy.ndarray): Decision of the learner. + + Returns: + numpy.ndarray: Gradient of the decision :math:`x`. + """ + if self.use_surrogate_grad: + if self.surrogate_grad is not None: + return self.surrogate_grad + elif self.surrogate_grad_func is not None: + return self.surrogate_grad_func(x) + elif self.surrogate_func is not None: + self.surrogate_grad_func = grad_solver(self.surrogate_func) + return self.surrogate_grad_func(x) + else: + pass + if self.grad is not None: + return self.grad + elif self.grad_func is not None: + return self.grad_func(x) + else: + self.grad_func = grad_solver(self.func) + return self.grad_func(x) diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..39b84e8d1c6bff7f9f5a854851edad519b074007 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/__init__.py @@ -0,0 +1,5 @@ +from .base import (OptimisticOGD) +from .meta import (OptimisticHedge, OptimisticLR) +from . import schedule +from . import specification +from . import models diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/base.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/base.py new file mode 100755 index 0000000000000000000000000000000000000000..b6b7f6d17281e14e6ce6a0d5e74f4acf4cb5f17c --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/base.py @@ -0,0 +1,259 @@ +from abc import ABC, abstractmethod +from typing import Optional, Union + +import numpy as np +from ..environment.domain import Domain +from ..environment.environment import Environment + + +class Base(ABC): + """An abstract class for base algorithms. + + Args: + domain (Domain): Feasible set for the base algorithm. + step_size (float, numpy.ndarray): Step size :math:`\eta` for the + base algorithm. Valid types include ``float`` and ``numpy.ndarray``. If + the type of the step size :math:`\eta` is `float`, the algorithm will + use the fixed step size all the time, otherwise, the algorithm will use + the step size :math:`\eta_t` at round $t$. + prior (str, numpy.ndarray, optional): The initial decision of the + algorithm is set as ``domain.init_x(prior, seed)``. + seed (int, optional): The initial decision of the algorithm is set as + ``domain.init_x(prior, seed)``. + """ + + def __init__(self, + domain: Domain, + step_size: Union[float, np.ndarray], + prior: Optional[Union[list, np.ndarray]] = None, + seed: Optional[int] = None): + self.domain = domain + self.step_size = step_size + self.prior = prior + self.seed = seed + self.x = self.domain.init_x(prior, seed) + self.t = 0 + + def opt(self, env: Environment): + """The optimization process of the base algorithm. + + All base algorithms are divided into two parts: + :meth:`~learner.base.Base.opt_by_optimism` at the beginning of + current round and :meth:`~learner.base.Base.opt_by_gradient` at the + end of current round. + + Args: + env (Environment): Environment at the current round. + + Returns: + tuple: tuple contains: + x (numpy.ndarray): Decision at the current round. \n + loss (float): Origin loss at the current round. \n + surrogate_loss (float): the surrogate loss at the current round. + """ + self.opt_by_optimism(env.optimism) + return self.opt_by_gradient(env) + + def get_step_size(self): + """Get the step size at each round. + + Returns: + float: Step size at the current round. + """ + return self.step_size[self.t] if hasattr(self.step_size, + '__len__') else self.step_size + + def opt_by_optimism(self, optimism: Optional[np.ndarray] = None): + """Optimize by the optimism. + + Args: + optimism (numpy.ndarray, optional): External optimism at the beginning of the + current round. + + Returns: + None + """ + pass + + @abstractmethod + def opt_by_gradient(self, env: Environment): + """Optimize by the true gradient. + + All base-algorithms are required to override this method to implement + their own optimization process. + + Args: + env (Environment): Environment at the current round. + + Returns: + tuple: tuple contains: + x (numpy.ndarray): the decision at the current round. \n + loss (float): the origin loss at the current round. \n + surrogate_loss (float): the surrogate loss at the current round. + """ + pass + + def reinit(self): + """Reset the base algorithm to the initial state, which is used for + adaptive algorithms. + """ + self.__init__(self.domain, self.step_size, self.prior, self.seed) + + +class OptimisticBase(Base): + """An abstract class for optimistic-type base algorithms. + + The optimistic-type algorithms are very general and useful, and they can + achieve a tighter regret guarantee with benign “predictable sequences”. + There are mainly two types of optimistic algorithms: Optimistic Online + Mirror Descent (Optimistic OMD) and Optimistic Follow The Regularized Leader + (Optimistic FTRL). The general update rule of ``Optimistic OMD`` is as + follows, + + .. math:: + x_t = {\\arg\min}_{x \in \mathcal{X}} \ \eta_t \langle m_t, x\\rangle + D_R(x, + \hat{x}_t) \n + \hat{x}_{t+1} = {\\arg\min}_{x \in \mathcal{X}} \ \eta_t \langle \\nabla + f_t(x_t), x\\rangle + D_R(x, \hat{x}_t), + + and the general update rule of ``Optimistic FTRL`` is as follows, + + .. math:: + x_t = {\\arg\min}_{x \in \mathcal{X}} \ \eta_t \langle \sum_{s=1}^{t-1} + \\nabla f_s(x_s) + m_t, x\\rangle + R(x), + + where :math:`\eta_t` is the step size at round :math:`t`, :math:`m_t` is the + optimism at the beginning of round :math:`t`, serving as a guess of the true + gradient :math:`\\nabla f_t(x_t)` at round :math:`t`, :math:`R` is the + regularizer and :math:`D_R(\cdot, \cdot)` is the Bregman divergence with + respect to the regularizer :math:`R`. + + Args: + domain (Domain): Feasible set for the base algorithm. + step_size (float, numpy.ndarray): Step size :math:`\eta` for the + base algorithm. Valid types include ``float`` and ``numpy.ndarray``. If + the type of the step size :math:`\eta` is `float`, the algorithm will + use the fixed step size all the time, otherwise, the algorithm will use + the step size :math:`\eta_t` at round :math:`t`. + optimism_type (str, optional): Type of optimism used for algorithm. + Valid actions include ``external``, ``last_grad``, ``middle_grad`` and + ``None``. if ``optimism_type='external'``, the algorithm will accept the + external specified :math:`m_t` by the environment at each round; if + ``optimism_type='last_grad'``, the optimism is set as :math:`m_t = + \\nabla f_{t-1}(x_{t-1})`; if ``optimism_type='middle_grad'``, the + optimism is set as :math:`m_t = \\nabla f_{t-1}(\hat{x}_t)`, and if + ``optimism_type=None``, the optimism is set as :math:`m_t = 0`. + prior (str, numpy.ndarray, optional): The initial decision of the + algorithm is set as ``domain.init_x(prior, seed)``. + seed (int, optional): The initial decision of the algorithm is set as + ``domain.init_x(prior, seed)``. + + References: + https://proceedings.mlr.press/v23/chiang12.html + """ + + def __init__(self, + domain: Domain, + step_size: Union[float, list], + optimism_type: str = 'external', + prior: Optional[Union[list, np.ndarray]] = None, + seed: Optional[int] = None): + super().__init__(domain, step_size, prior, seed) + self.optimism_type = optimism_type + self.middle_x = self.x + self.optimism = np.zeros_like(self.x) + + def compute_internal_optimism(self, env: Environment): + """Compute the internal optimism. + + Args: + env (Environment): Environment at the current round. + """ + if self.optimism_type is None or self.optimism_type == 'external': + pass + elif self.optimism_type == 'last_grad': + self.optimism = self.grad + elif self.optimism_type == 'middle_grad': + self.optimism = env.get_grad(self.middle_x) + else: + raise TypeError(f'{self.optimism_type} is not defined') + + def reinit(self): + self.__init__(self.domain, self.step_size, self.optimism_type, + self.prior, self.seed) + + +class OptimisticOGD(OptimisticBase): + """Implementation of Optimistic Online Gradient Descent. + + ``OptimisticOGD`` is an online convex optimization algorithm, which is a + specialization of ``Optimistic OMD`` with the Euclidean distance as the + regularizer. ``OptimisticOGD`` updates the decision :math:`x_{t}` by + + .. math:: + + x_t = \Pi_{\mathcal{X}}[\hat{x}_t - \eta_t m_t] \n + \hat{x}_{t+1} = \Pi_{\mathcal{X}}[\hat{x}_t - \eta_t \\nabla f_t(x_t)], + + where :math:`\eta_t > 0` is the step size at round :math:`t`, :math:`m_t` is + the optimism at the beginning of round :math:`t`, and + :math:`\Pi_{\mathcal{X}}[\cdot]` denotes the projection onto the nearest + point in :math:`\mathcal{X}`. + + Args: + domain (Domain): Feasible set for the base algorithm. + step_size (float, numpy.ndarray): Step size :math:`\eta` for the + base algorithm. Valid types include ``float`` and ``numpy.ndarray``. If + the type of the step size :math:`\eta` is `float`, the algorithm will + use the fixed step size all the time, otherwise, the algorithm will use + the step size :math:`\eta_t` at round :math:`t`. + optimism_type (str, optional): Type of optimism used for algorithm. + Valid actions include ``external``, ``last_grad``, ``middle_grad`` and + ``None``. if ``optimism_type='external'``, the algorithm will accept the + external specified :math:`m_t` by the environment at each round; if + ``optimism_type='last_grad'``, the optimism is set as :math:`m_t = + \\nabla f_{t-1}(x_{t-1})`; if ``optimism_type='middle_grad'``, the + optimism is set as :math:`m_t = \\nabla f_{t-1}(\hat{x}_t)`, and if + ``optimism_type=None``, the optimism is set as :math:`m_t = 0`. + prior (str, numpy.ndarray, optional): The initial decision of the + algorithm is set as ``domain.init_x(prior, seed)``. + seed (int, optional): The initial decision of the algorithm is set as + ``domain.init_x(prior, seed)``. + + .. note:: + 1. ``OGD`` is a special case of ``OptimisticOGD`` with ``optimism_type=None``. + 2. ``OEGD`` is a special case of ``OptimisticOGD`` with ``optimism_type='middle_grad``. + + References: + https://proceedings.mlr.press/v30/Rakhlin13.html + """ + + def __init__(self, + domain: Domain, + step_size: Union[float, list], + optimism_type: str = 'external', + prior: Optional[Union[list, np.ndarray]] = None, + seed: Optional[int] = None): + super().__init__(domain, step_size, optimism_type, prior, seed) + + def opt_by_optimism(self, optimism: Optional[np.ndarray] = None): + if self.optimism_type is None: + self.optimism = np.zeros_like(self.middle_x) + elif self.optimism_type == 'external': + self.optimism = optimism if optimism is not None else np.zeros_like( + self.middle_x) + else: + pass + step_size = self.get_step_size() + self.x = self.middle_x - step_size * self.optimism + self.x = self.domain.project(self.x) + + def opt_by_gradient(self, env: Environment): + loss, surrogate_loss = env.get_loss(self.x) + self.grad = env.get_grad(self.x) + step_size = self.get_step_size() + self.middle_x = self.middle_x - step_size * self.grad + self.middle_x = self.domain.project(self.middle_x) + self.compute_internal_optimism(env) + self.t += 1 + return self.x, loss, surrogate_loss diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/meta.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/meta.py new file mode 100755 index 0000000000000000000000000000000000000000..eedc61dc97ecd03473cc6dbdca8f7181d3957928 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/meta.py @@ -0,0 +1,383 @@ +from abc import ABC, abstractmethod +from typing import Optional, Union + +import cvxpy as cp +import numpy as np + + +class OptimisticLR: + """An self-confident tuning method for optimistic algorithms. + + The update rule of the optimistic learning rate is + + .. math:: + + \\varepsilon_t = \\frac{\\alpha}{\sqrt{\sum_{s=1}^{t-1}\lVert \ell_t - M_t \\rVert_p^q}}, + + where :math:`\\alpha` is the scale parameter, :math:`p` is the norm order + and :math`q` is the order. + + Args: + scale (float): Scale parameter. + norm (int, inf): Order of norm :math:`p`. + order (int): Order of the norm value :math:`q`. + upper_bound (float): Upper bound of learning rate. + """ + + def __init__(self, + scale: float = 1., + norm: int = np.inf, + order: int = 2, + upper_bound: float = 1.): + self.scale = scale + self.norm = norm + self.order = order + self.upper_bound = upper_bound + self.cum_var = 1 + self.lr = upper_bound + + def update_lr(self, optimism, loss): + """Update learning rate by ``optimism`` and ``loss`` of the current round.""" + self.cum_var += np.linalg.norm( + loss - optimism, ord=self.norm)**self.order + self.lr = min(self.upper_bound, self.scale * self.cum_var**(-0.5)) + return self.lr + + +class Meta(ABC): + """An abstract class for meta-algorithms. + + Args: + prob (numpy.ndarray): Initial probability over the base-learners. + lr (float, numpy.ndarray, OptimisticLR): Learning rate for + meta-algorithm. + """ + + def __init__(self, prob: np.ndarray, lr: Union[float, np.ndarray, + OptimisticLR]): + self._prob = prob + self._init_prob = self._prob.copy() + self.lr = lr + self._active_state = np.ones(len(prob)) + self._active_index = np.where(self._active_state > 0)[0] + self.t = 0 + + def opt(self, + loss_bases: np.ndarray, + loss_meta: Optional[float] = None, + optimism: Optional[np.ndarray] = None): + """The optimization process of the meta-algorithm. + + All base algorithms are divided into two parts: + :meth:`~learner.meta.Meta.opt_by_optimism` at the beginning of + current round and :meth:`~learner.meta.Meta.opt_by_gradient` at the + end of current round. + + Args: + loss_bases (numpy.ndarray): Losses of all alive base-learners. + loss_meta (float, optional): Loss of the combined decision. + optimism (numpy.ndarray): Optimism at the beginning of the + current round, serving as a guess of ``loss_bases``. + + Returns: + numpy.ndarray: Probability over the alive base-learners. + """ + self.opt_by_optimism(optimism) + return self.opt_by_gradient(loss_bases, loss_meta) + + def opt_by_optimism(self, optimism: Optional[np.ndarray]): + """Optimize by the optimism. + + Args: + optimism (numpy.ndarray, optional): the optimism at the beginning of the + current round. + + Returns: + None + """ + pass + + @abstractmethod + def opt_by_gradient( + self, + loss_bases: np.ndarray, + loss_meta: Optional[float] = None, + ): + """Optimize by the true gradient (loss). + + All base-algorithms are required to override this method to implement + their own optimization process. + + Args: + loss_bases (numpy.ndarray): Losses of all alive base-learners. + loss_meta (float): Loss of the combined decision. + + Returns: + numpy.ndarray: Probability over the alive base-learners. + """ + pass + + def get_lr(self): + """Compute the learning rate for meta-algorithms. + + If the type of the learning rate :math:`\\varepsilon` is ``float``, this + method will return the constant learning rate :math:`\\varepsilon` all the + time; if the type of :math:`\\varepsilon` is ``numpy.ndarray`` with + shape ``[T, ]``, this method will return a scalar + :math:`\\varepsilon[t]` at round :math:`t`; if the type of + :math:`\\varepsilon` is ``numpy.ndarray`` with shape ``[T, N]`` where + :math:`N` is the dimension, this method will return a vector + :math:`\\varepsilon[t]` at time :math:`t`; if the type of + :math:`\\varepsilon` is ``numpy.ndarray`` with shape ``[1, N]``, this + method will return a vector :math:`\\varepsilon` all the time; if the + type of :math:`\\varepsilon` is + :meth:`~learner.meta.Meta.OptimisticLR`, the learning rate + :math:`\\varepsilon_t` is computed by + :meth:`~learner.meta.Meta.OptimisticLR.compute_lr()` at round + :math:`t`. + """ + if isinstance(self.lr, OptimisticLR): + return self.lr.lr + elif isinstance(self.lr, np.ndarray): + if self.lr.ndim == 1: + return self.lr[self.t] + elif self.lr.ndim == 2: + if self.lr.shape[0] == 1: + return self.lr[0, self._active_index] + elif self.lr.shape[1] == 1: + return self.lr[self.t, 0] + else: + assert self.lr.shape[1] == len(self.active_state) + return self.lr[self.t, self._active_index] + else: + return self.lr + + @property + def active_state(self): + """Get the active state of base-learners: + + - 0: sleep + - 1: active at the current round and the previous round. + - 2: active at the current round and sleep at the previous round. + """ + return self._active_state + + @active_state.setter + def active_state(self, active_state): + self._active_state = active_state + self._active_index = np.where(active_state > 0)[0] + + @property + def prob(self): + """Get the current probability over the alive base-learners.""" + return self._prob[self._active_index] + + @prob.setter + def prob(self, prob): + self._prob[self._active_index] = prob + + @property + def init_prob(self): + """Get the initial probability over the current alive base-learners.""" + return self._init_prob[self._active_index] + + +class OptimisticMeta(Meta): + """An abstract class for optimistic-type base algorithms. + + The optimistic-type algorithms are very general and useful, and they can + achieve a tighter regret guarantee with benign “predictable sequences”. + There are mainly two types of optimistic algorithms: Optimistic Online + Mirror Descent (Optimistic OMD) and Optimistic Follow The Regularized Leader + (Optimistic FTRL). The general update rule of ``Optimistic OMD`` is as + follows, + + .. math:: + p_t = {\\arg\min}_{p \in \Delta_N} \ \\varepsilon_t \langle M_t, p\\rangle + D_R(p, + \hat{p}_t) \n + \hat{p}_{t+1} = {\\arg\min}_{x \in \Delta_N} \ \\varepsilon_t \langle + \ell_t, p\\rangle + D_R(p, \hat{p}_t), + + and the general update rule of ``Optimistic FTRL`` is as follows, + + .. math:: + p_t = {\\arg\min}_{x \in \Delta_N} \ \\varepsilon_t \langle \sum_{s=1}^{t-1} + \ell_s + M_t, x\\rangle + R(x), + + where :math:`\\varepsilon_t` is the step size at round :math:`t`, :math:`M_t` is the + optimism at the beginning of round :math:`t`, serving as a guess of the true + gradient :math:`\ell_t` at round :math:`t`, :math:`R` is the + regularizer and :math:`D_R(\cdot, \cdot)` is the Bregman divergence with + respect to the regularizer :math:`R`. + + Args: + prob (numpy.ndarray): The initial probability over the base-learners. + lr (float, numpy.ndarray, OptimisticLR): The learning rate for + meta-algorithm. + optimism_type (str, optional): the type of optimism used for algorithm. + Valid actions include ``external``, ``last_loss`` and + ``None``. if ``optimism_type='external'``, the algorithm will accept the + external optimism :math:`M_t` at each round; if + ``optimism_type='last_loss'``, the optimism is set as :math:`M_t = + \ell_{t-1}`; and if ``optimism_type=None``, the optimism is set as :math:`M_t = 0`. + + References: + https://proceedings.mlr.press/v23/chiang12.html + """ + + def __init__(self, + prob: np.ndarray, + lr: Union[float, np.ndarray, OptimisticLR], + optimism_type: Optional[str] = 'external'): + super().__init__(prob, lr) + self.optimism_type = optimism_type + self._middle_prob = self._prob.copy() + self._optimism = np.zeros_like(self._prob) + + def compute_internal_optimism(self, loss_bases: np.ndarray): + """Compute the internal optimism for the next round. + + If ``optimism_type='last_loss'``, the interval optimism is set as + :math:`M_t = \ell_{t-1}`. + + Args: + loss_bases (numpy.ndarray): Losses of base-learners at the current round. + """ + if self.optimism_type is None or self.optimism_type == 'external': + pass + elif self.optimism_type == 'last_loss': + self.optimism = loss_bases + else: + raise TypeError(f'{self.optimism_type} is not defined') + + @property + def optimism(self): + """Get the current optimism of the alive base-learners.""" + return self._optimism[self._active_index] + + @optimism.setter + def optimism(self, optimism: Optional[np.ndarray]): + self._optimism[self._active_index] = optimism + + @property + def middle_prob(self): + """Get the current intermediate probability over the alive base-learners.""" + return self._middle_prob[self._active_index] + + @middle_prob.setter + def middle_prob(self, middle_prob): + self._middle_prob[self._active_index] = middle_prob + + +class OptimisticHedge(OptimisticMeta): + """Implementation of Optimistic Hedge. + + ``Hedge`` is the most popular algorithm for tracking the best expert + problem. ``Optimistic Hedge`` is an enhanced version of ``Hedge`` which can + further incorporate the knowledge of predictable sequences. There are two + versions of ``Optimistic Hedge``: the greedy version and the lazy version. + The greedy version is a special case of ``Optimistic OMD`` with the negative + entropy as the regularizer and the lazy version is a special case of + ``Optimistic FTRL``. The greedy version ``Optimistic Hedge`` updates the + decision :math:`p_t` by + + .. math:: + + {p}_{t, i} = \\frac{\hat{p}_{t,i}\exp(-\\varepsilon_t + M_{t,i})}{\sum_{j=1}^N \hat{p}_{t,j} \exp(-\\varepsilon_t M_{t,j})}, + \\forall i \in [N] \n + \hat{p}_{t+1, i} = \\frac{\hat{p}_{t,i}\exp(-\\varepsilon_t (\ell_{t,i} + + b_{t,i}))}{\sum_{j=1}^N \hat{p}_{t,j} \exp(-\\varepsilon_t (\ell_{t,j}+ + b_{t,j}))}, \\forall i \in [N], + + and the lazy version ``Optimistic Hedge`` updates the decision :math:`x_t` by + + .. math:: + + p_{t, i} \propto p_{1,i} \exp \Big(-\\varepsilon_t (\sum_{s=1}^{t-1} (\ell_{s,i} + + b_{s,i})+ m_{t,i})\Big), \\forall i \in [N], + + where :math:`\\varepsilon_t > 0` is the learning rate at round :math:`t`, + :math:`M_t` is the optimism at the beginning of round :math:`t`, :math:`b_t` + is the correction term. + + Args: + prob (numpy.ndarray): Initial probability over the base-learners. + lr (float, numpy.ndarray, OptimisticLR): The learning rate for + meta-algorithm. + optimism_type (str, optional): Type of optimism used for algorithm. + Valid actions include ``external``, ``last_loss`` and + ``None``. if ``optimism_type='external'``, the algorithm will accept the + external optimism :math:`M_t` at each round; if + ``optimism_type='last_loss'``, the optimism is set as :math:`M_t = + \ell_{t-1}`; and if ``optimism_type=None``, the optimism is set as + :math:`M_t = 0`. + is_lazy (bool): Type of the update version: lazy or greedy. The + default is False. + correct (bool): Whether to use correction term. :math:`b_t` is + set as :math:`b_t = \\varepsilon_t (\ell_t-M_t)^2` if + ``correct=True`` and :math:`b_t=0` otherwise. The default is False. + + .. note:: + 1. Greedy version ``Optimistic Hedge`` is a special case of ``OptimisticOMD`` + with the negative entropy as the regularizer. + + 2. Lazy version ``Optimistic Hedge`` is a special case of ``OptimisticFTRL`` + with the negative entropy as the regularizer. + """ + + def __init__(self, + prob: np.ndarray, + lr: Union[float, np.ndarray, OptimisticLR], + optimism_type: Optional[str] = 'external', + is_lazy: bool = False, + correct: bool = False): + super().__init__(prob, lr, optimism_type) + self.is_lazy = is_lazy + if self.is_lazy: + self._cum_loss = np.zeros_like(self._prob) + self.correct = correct + + def opt_by_optimism(self, optimism: Optional[np.ndarray]): + if self.optimism_type == 'external' and optimism is not None: + self.optimism = optimism + else: + pass + lr = self.get_lr() + exp_optimism = np.exp(-lr * self.optimism) + self.prob = self.middle_prob * exp_optimism / np.dot( + self.middle_prob, exp_optimism) + + def opt_by_gradient(self, + loss_bases: np.ndarray, + loss_meta: Optional[float] = None): + lr = self.get_lr() + if self.correct: + correction = lr * (loss_bases - self.optimism)**2 + else: + correction = np.zeros_like(loss_bases) + if self.is_lazy: + self.cum_loss += loss_bases + exp_loss = np.exp(-lr * (self.cum_loss + correction)) + self.middle_prob = self.init_prob * exp_loss / np.dot( + self.init_prob, exp_loss) + else: + exp_loss = np.exp(-lr * (loss_bases + correction)) + self.middle_prob = self.middle_prob * exp_loss / np.dot( + self.middle_prob, exp_loss) + # update learning rate + if isinstance(self.lr, OptimisticLR): + self.lr.update_lr(self.optimism, loss_bases) + # update by internal optimism + self.compute_internal_optimism(loss_bases) + self.t += 1 + return self.prob + + @property + def cum_loss(self): + """Get the cumulative loss of the alive base-learners.""" + return self._cum_loss[self._active_index] + + @cum_loss.setter + def cum_loss(self, cum_loss: np.ndarray): + self._cum_loss[self._active_index] = cum_loss diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/models/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/models/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..5c610286b57229d34f936e198046e23e88511a5e --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/models/__init__.py @@ -0,0 +1,2 @@ +from .model import Model +from .swordpp import SwordPP diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/models/model.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/models/model.py new file mode 100755 index 0000000000000000000000000000000000000000..017ea061e4cb0dd52b0a819265e42ae0ba07bf9f --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/models/model.py @@ -0,0 +1,123 @@ +from typing import Optional +import numpy as np +from abc import ABC, abstractmethod +from ...environment.environment import Environment +from ..meta import Meta +from ..schedule.schedule import Schedule +from ..specification.optimism_base import OptimismBase +from ..specification.optimism_meta import OptimismMeta +from ..specification.surrogate_base import SurrogateBase +from ..specification.surrogate_meta import SurrogateMeta +#-------------------------------------------------------------------- +import sklearn + +class Model(sklearn.base.BaseEstimator): + """Combines several main components into the final algorithm. + + Args: + schedule (Schedule): Schedule method, refer to + ``schedule.schedule``. + meta (Meta): Meta algorithm, refer to ``meta``. + surrogate_base (SurrogateBase): Surrogate loss class for base-learners, + refer to ``specification.surrogate_base``. + optimism_base (optimismBase): Optimism class for base-learners, + refer to ``specification.optimism_base``. + surrogate_meta (SurrogateMeta): Surrogate loss class for meta-algorithm, + refer to ``specification.surrogate_meta``. + optimism_base (OptimismBase): Optimism class for meta-algorithm, + refer to ``specification.optimism_meta``. + """ + + def __init__(self, + schedule: Schedule, + meta: Meta, + surrogate_base: SurrogateBase = None, + optimism_base: OptimismBase = None, + surrogate_meta: SurrogateMeta = None, + optimism_meta: OptimismMeta = None) -> None: + self.schedule = schedule + self.meta = meta + self.surrogate_base = surrogate_base + self.optimism_base = optimism_base + self.surrogate_meta = surrogate_meta + self.optimism_meta = optimism_meta + self.t = 0 + self.internal_optimism_base = None + self.internal_optimism_meta = None + + def fit(self, X = None, y = None, online_to_batch = False, loss_func = None): + if online_to_batch: + raise NotImplementedError("online_to_batch functionality has been removed.") + else: + env = Environment(func = loss_func) + self.opt(env) + return self + + def predict(self, x = None): + variables = vars(self) + self.schedule.t = self.t + self.meta.active_state = self.schedule.active_state + self.x_bases = self.schedule.x_active_bases + self.x = np.dot(self.meta.prob, self.x_bases) + variables['x_active_bases'] = self.schedule.x_active_bases + variables['prob'] = self.meta.prob + variables['x_combined'] = x_combined = np.dot(variables['prob'], variables['x_active_bases']) + return x_combined + + def opt(self, env: Environment): + self.env = env + variables = vars(self) + if env.full_info: + loss, surrogate_loss = env.get_loss(self.x) + self.grad = env.get_grad(self.x) + else: + raise NotImplementedError("Bandit setting and perturbation have been removed.") + + # update bases + base_env = Environment(func=env.func, grad_func=env.grad_func) + if self.surrogate_base is not None: + base_env.surrogate_func, base_env.surrogate_grad = self.surrogate_base.compute_surrogate_base( + variables) + if self.optimism_base is not None and self.optimism_base.is_external: + optimism_base = self.optimism_base.compute_optimism_base(variables) + else: + optimism_base = self.internal_optimism_base + self.schedule.opt_by_optimism(optimism_base) + self.loss_bases, self.surrogate_loss_bases = self.schedule.opt_by_gradient( + base_env) + + # compute surrogate loss of meta # + if self.surrogate_meta is not None: + self.loss_bases = self.surrogate_meta.compute_surrogate_meta( + variables) + + # update meta + if self.optimism_meta is not None and self.optimism_meta.is_external: + optimism_meta = self.optimism_meta.compute_optimism_meta(variables) + else: + optimism_meta = self.internal_optimism_meta + self.meta.opt_by_optimism(optimism_meta) + + self.meta.opt_by_gradient(self.loss_bases, loss) + + # compute internal optimism of bases + self.compute_internal_optimism(variables) + + self.t += 1 + return self.x, loss, surrogate_loss + + def compute_internal_optimism(self, variables): + """Compute the internal optimism. + + Args: + variables (dict): Intermediate variables at the current round. + """ + if self.optimism_base is not None and self.optimism_base.is_external is False: + self.internal_optimism_base = self.optimism_base.compute_optimism_base( + variables) + if self.optimism_meta is not None and self.optimism_meta.is_external is False: + self.internal_optimism_meta = self.optimism_meta.compute_optimism_meta( + variables) + + def set_func(self, func): + self._func = func diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/models/swordpp.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/models/swordpp.py new file mode 100755 index 0000000000000000000000000000000000000000..52b4be81c3ae61f6d3489a36094cdf44f2f90004 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/models/swordpp.py @@ -0,0 +1,70 @@ +from typing import Optional, Union + +import numpy as np + +from ...environment.domain import Domain +from .model import Model +from ..base import OptimisticOGD +from ..meta import OptimisticHedge, OptimisticLR +from ..schedule.schedule import Schedule +from ..schedule.ssp import DiscreteSSP +from ..specification.optimism_base import LastGradOptimismBase +from ..specification.optimism_meta import \ + InnerSwitchingOptimismMeta +from ..specification.surrogate_base import InnerSurrogateBase +from ..specification.surrogate_meta import \ + InnerSwitchingSurrogateMeta + + +class SwordPP(Model): + """Implementation of Adaptivity and Non-stationarity: Problem-dependent + Dynamic Regret for Online Convex Optimization. + + ``Swordpp`` is an improved version of ``Sword``, who reduces the gradient + query complexity of each round from :math:`\mathcal{O}(\log T)` to :math:`1` + and achieves the best-of-both-worlds dynamic regret bounds by a single + algorithm. + + References: + https://arxiv.org/abs/2112.14368 + """ + + def __init__(self, + domain: Domain, + T: int, + G: float, + L_smooth: float, + min_step_size: Optional[float] = None, + max_step_size: Optional[float] = None, + prior: Optional[Union[list, np.ndarray]] = None, + seed: Optional[int] = None): + D = 2 * domain.R + if min_step_size is None: + min_step_size = (D**2 / (G**2 * T))**0.5 + if max_step_size is None: + max_step_size = 1 / (8 * L_smooth) + ssp = DiscreteSSP( + OptimisticOGD, + min_step_size, + max_step_size, + grid=2, + domain=domain, + prior=prior, + seed=seed) + schedule = Schedule(ssp) + lr = OptimisticLR(upper_bound=1 / (8 * D**2 * L_smooth)) + meta = OptimisticHedge(prob=np.ones(len(ssp)) / len(ssp), lr=lr) + surrogate_base = InnerSurrogateBase() + optimism_base = LastGradOptimismBase() + penalty = 2 * L_smooth + surrogate_meta = InnerSwitchingSurrogateMeta( + penalty=penalty, norm=2, order=2) + optimism_meta = InnerSwitchingOptimismMeta( + penalty=penalty, norm=2, order=2) + super().__init__( + schedule, + meta, + surrogate_base=surrogate_base, + surrogate_meta=surrogate_meta, + optimism_base=optimism_base, + optimism_meta=optimism_meta) diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..f8862c3a5b815b687daa19843bbde11b0b499f78 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/__init__.py @@ -0,0 +1,3 @@ +from .schedule import Schedule +from .ssp import SSP, DiscreteSSP +from .cover import Cover, FullCover diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/cover.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/cover.py new file mode 100755 index 0000000000000000000000000000000000000000..8a32804f1853ee257bfcc89c3d691d01ae096267 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/cover.py @@ -0,0 +1,76 @@ +from abc import ABC, abstractmethod +from typing import Optional + +import numpy as np + + +class Cover(ABC): + """The abstract class for problem-independent cover. + + Args: + active_state (numpy.ndarray): Initial active state. + alive_time_threshold (int, optional): Minimal interval length for cover. + All intervals whose length are less than ``alive_time_threshold`` + will not be activated. + """ + + def __init__(self, + active_state: np.ndarray, + alive_time_threshold: Optional[int] = None): + self._t = 0 + self._active_state = active_state + self._alive_time_threshold = alive_time_threshold + + @property + def t(self): + """Set the number of current round and compute the active state at + current round. + """ + + return self._t + + @t.setter + @abstractmethod + def t(self): + pass + + @property + def active_state(self): + """Get the active state of base-learners: + + - 0: sleep + - 1: active at the current round and the previous round. + - 2: active at the current round and sleep at the previous round. + """ + + if self._alive_time_threshold is None: + return self._active_state + else: + return self.check_threshold() + + def check_threshold(self): + """Check the interval length. All intervals whose length are less than + ``alive_time_threshold`` will not be activated. + """ + threshold_idx = int(np.ceil(np.log2(self._alive_time_threshold))) + active_state = self._active_state.copy() + active_state[:threshold_idx] = 0 + if not np.any(active_state[threshold_idx:] > 0): + active_state[threshold_idx] = 2 if self.t == 0 else 1 + return active_state + + +class FullCover(Cover): + """The cover that sets all base-learners alive all the time, which is used + for dynamic algorithms. + + Args: + N (int): Number of base-learners. + """ + + def __init__(self, N: int): + super().__init__(np.ones(N), None) + + @Cover.t.setter + def t(self, t): + self._t = t diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/schedule.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/schedule.py new file mode 100755 index 0000000000000000000000000000000000000000..81475107c52f43869453f8bccebd12dadc5157dc --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/schedule.py @@ -0,0 +1,96 @@ +from abc import ABC, abstractmethod +from typing import Optional +import cvxpy as cp +import numpy as np +from ...environment.environment import Environment +from .cover import (Cover, FullCover) +from .ssp import SSP + + +class Schedule: + """The class to schedule base-learners with problem-independent cover. + + Args: + ssp (SSP): A ssp instance containing a bunch of initialized + base-learners. + cover (Cover, optional): A cover instance deciding the + active state of base-learners. + + """ + + def __init__(self, ssp: SSP, cover: Optional[Cover] = None): + self.bases = ssp.bases + self.cover = cover if cover is not None else FullCover(len(self.bases)) + self._t = 0 + self._optimism = None + self.active_state = np.ones(len(self.bases)) + self.active_index = np.where(self.active_state > 0)[0] + + def opt_by_optimism(self, optimism: np.ndarray): + """Optimize by the optimism for all base-learners. + + Args: + optimism (numpy.ndarray): External optimism for all alive base-learners. + """ + for idx in self.active_index: + self.bases[idx].opt_by_optimism(optimism) + + def opt_by_gradient(self, env: Environment): + """Optimize by the gradient for all base-learners. + + Args: + env (Environment): Environment at current round. + + Returns: + tuple: tuple contains: + loss (float): the origin loss of all alive base-learners. \n + surrogate_loss (float): the surrogate loss of all alive base-learners. + """ + loss = np.zeros(len(self.active_index)) + surrogate_loss = np.zeros_like(loss) + for i, idx in enumerate(self.active_index): + _, loss[i], surrogate_loss[i] = self.bases[idx].opt_by_gradient( + env) + return loss, surrogate_loss + + @property + def t(self): + """Set the number of current round, get active state from ``cover`` and + reinitialize the base-learners whose active state is 2. + """ + return self._t + + @t.setter + def t(self, t): + self._t = t + self.cover.t = t + self.active_state = self.cover.active_state + self.active_index = np.where(self.active_state > 0)[0] + self.reinit_bases() + + @property + def x_active_bases(self): + """Get the decisions of all alive base-learners. + + Returns: + numpy.ndarray: Decisions of all alive base learners. + """ + return np.array([self.bases[i].x for i in self.active_index]) + + @property + def optimism(self): + """Get the optimisms of all alive base-learners. + + Returns: + numpy.ndarray: Optimisms of all alive base learners. + """ + self._optimism = np.zeros_like(self.x_active_bases) + for i, idx in enumerate(self.active_index): + self._optimism[i] = self.bases[idx].optimism + return self._optimism + + def reinit_bases(self): + """Reinitialize the base-learners whose active state is 2.""" + reinit_idx = np.where(self.active_state == 2)[0] + for idx in reinit_idx: + self.bases[idx].reinit() diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/ssp.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/ssp.py new file mode 100755 index 0000000000000000000000000000000000000000..5ef95946e79cbed56b63abeeb108eedb4132b88f --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/schedule/ssp.py @@ -0,0 +1,87 @@ +import copy + +import numpy as np +from ..base import Base + + +class SSP: + """The simplest class to initialize base-learners, which accepts the base + instances as the input directly. + + Args: + bases (list): List of base-learners. + """ + def __init__(self, bases: list = None): + self.bases = bases + + def __add__(self, ssp): + new_ssp = copy.deepcopy(self) + new_ssp.bases = self.bases + ssp.bases if self.bases is not None else ssp.bases + return new_ssp + + def __len__(self): + return len(self.bases) + + +class StepSizeFreeSSP(SSP): + """The class to initialize step size free base-learners. + + Args: + base_class (Base): Base class to schedule. + num_bases (int): Number of base-learners. + **kwargs_base (dict): Parameters of base-learners. + """ + + def __init__(self, base_class: Base, num_bases: int, **kwargs_base: dict): + bases = [base_class(**kwargs_base) for _ in range(num_bases)] + super().__init__(bases) + + +class DiscreteSSP(SSP): + """The most commonly used SSP for dynamic algorithms, which construct a step + size pool at first, and then initialize multiple base-learners, each employs + a specific step size. + + Args: + base_class (Base): Base class to initialize. + min_step_size (float): Minimal value of the possible range of the + optimal step size. + max_step_size (float): Maximal value of the possible range of the + optimal step size. + grid (int): Grid size to discretize the possible range of the optimal + step size. + **kwargs (dict): Parameter of the base-learners. + """ + + def __init__(self, + base_class: Base, + min_step_size: float, + max_step_size: float, + grid: int = 2, + **kwargs_base): + self.step_pool = self.discretize(min_step_size, max_step_size, grid) + bases = [ + base_class(step_size=self.step_pool[i], **kwargs_base) + for i in range(len(self.step_pool)) + ] + super().__init__(bases) + + @staticmethod + def discretize(min_step_size: float, + max_step_size: float, + grid: float = 2.): + """Discretize the possible range of the optimal step size exponentially + + Args: + min_step_size (float): Minimal value of the possible range of the + optimal step size. + max_step_size (float): Maximal value of the possible range of the + optimal step size. + grid (int): Grid size to discretize the possible range of the optimal + step size. + """ + step_pool = [min_step_size] + while (min_step_size <= max_step_size): + min_step_size *= grid + step_pool.append(min_step_size) + return np.array(step_pool) diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..76231308dfde1e0a4a654370d3e0ddc032c1e8fa --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/__init__.py @@ -0,0 +1,5 @@ +from .optimism_base import (LastGradOptimismBase) +from .optimism_meta import ( + InnerSwitchingOptimismMeta) +from .surrogate_base import (InnerSurrogateBase) +from .surrogate_meta import (InnerSwitchingSurrogateMeta) diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/optimism_base.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/optimism_base.py new file mode 100755 index 0000000000000000000000000000000000000000..2eb191bd63897c3f29af9f0068317e01fccf3465 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/optimism_base.py @@ -0,0 +1,30 @@ +from abc import ABC, abstractmethod + + +class OptimismBase(ABC): + """The abstract class defines the optimism for base-learners. + + Attributes: + is_external (bool): Indicates the optimism is given by the environment + or computed by the algorithm itself. The default is True. + """ + + def __init__(self, is_external: bool = True): + self.is_external = is_external + + @abstractmethod + def compute_optimism_base(self): + """Compute the optimism for base-learners.""" + raise NotImplementedError() + + +class LastGradOptimismBase(OptimismBase): + """The class will set the optimism :math:`m_{t+1}` of the base-learners as + :math:`m_t = \\nabla f_{t-1}(x_{t-1})`, where :math:`x_{t-1}` is the + submitted decision at round :math:`t-1`.""" + + def __init__(self): + super().__init__(is_external=False) + + def compute_optimism_base(self, variables): + return variables['grad'] diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/optimism_meta.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/optimism_meta.py new file mode 100755 index 0000000000000000000000000000000000000000..e5bef21dc834f49e973610f3aa4ba8d1a3e974e7 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/optimism_meta.py @@ -0,0 +1,64 @@ +from abc import ABC, abstractmethod + +import numpy as np + + +class OptimismMeta(ABC): + """The abstract class defines the optimism for meta-algorithm. + + Attributes: + is_external (bool): Indicates the optimism of meta-algorithm depends on + the optimism given by the environment or computed by the algorithm + itself. The default is True. + """ + + def __init__(self, is_external: bool = True): + self.is_external = is_external + + @abstractmethod + def compute_optimism_meta(self): + """Compute the optimism for meta-algorithm.""" + raise NotImplementedError() + + +class InnerSwitchingOptimismMeta(OptimismMeta): + """The abstract class defines the inner function with switching cost to compute the + optimism for meta-algorithm. + + Args: + penalty (float): Penalty coefficient of switching cost term. + norm (int): Order of norm :math:`p`. + order (int): Order of switching cost :math:`q`. + """ + + def __init__(self, penalty: float, norm: int = 2, order: int = 2): + super().__init__(is_external=True) + self.penalty = penalty + self.norm = norm + self.order = order + + def compute_optimism_meta(self, variables): + """Set the surrogate loss of meta-algorithm as + + .. math:: + + M_t(x)=\langle m_t, x_{t,i} \\rangle + \lVert + x_{t,i} - x_{t-1, i} \\rVert_p^q, + + where :math:`x_t` is the submitted decision and :math:`x_{t, i}` is the + decision of base-learner i at round :math:`t`. + """ + new_x_bases = variables['schedule'].x_active_bases + x_bases = variables['x_bases'] if 'x_bases' in variables else None + optimism = variables['schedule'].optimism + optimism_meta = self.inner_switching(new_x_bases, optimism, + self.penalty, x_bases, self.norm, + self.order) + return optimism_meta + + @staticmethod + def inner_switching(x, gradient, penalty, x_last, norm=2, order=2): + if x_last is None: + x_last = x + return (x * gradient).sum(axis=1) + penalty * np.linalg.norm( + x - x_last, ord=norm, axis=1)**order diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/surrogate_base.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/surrogate_base.py new file mode 100755 index 0000000000000000000000000000000000000000..06c7f1159afc5ce32a37ef0ef08d89511865f243 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/surrogate_base.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod + +import numpy as np + + +class SurrogateBase(ABC): + """The abstract class defines the surrogate loss functions and surrogate + gradient (if possible) for base-learners.""" + + def __init__(self): + pass + + @abstractmethod + def compute_surrogate_base(self, variables): + """Compute the surrogate loss functions and surrogate + gradient (if possible) for base-learners.""" + raise NotImplementedError() + + +class InnerSurrogateBase(SurrogateBase): + """The class defines the inner surrogate loss function for base-learners.""" + + def __init__(self): + pass + + def compute_surrogate_base(self, variables): + """Compute the surrogate loss functions and surrogate + gradient (if possible) for base-learners. + + Replace original convex function :math:`f_t(x)` with + + .. math:: + + f'_t(x)=\langle \\nabla f_t(x_t), x \\rangle, + + for all base-learners, where :math:`x_t` is the submitted decision at + round :math:`t`. Since the gradient of any decision for the inner + function is :math:`\\nabla f_t(x_t)`, this method will return it also to + reduce the gradient query complexity for base-learners. + + Args: + variables (dict): intermediate variables of the learning process at + current round. + Returns: + tuple: tuple contains: + surrogate_func (Callable): Surrogate function for base-learners. \n + surrogate_grad (numpy.ndarray): Surrogate gradient for base-learners. + """ + return lambda x: np.dot(x, variables['grad']), variables['grad'] diff --git a/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/surrogate_meta.py b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/surrogate_meta.py new file mode 100755 index 0000000000000000000000000000000000000000..2b8bcafe208f0c88e034d931b5a3b7a556b99faa --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/learner/specification/surrogate_meta.py @@ -0,0 +1,62 @@ +from abc import ABC, abstractmethod +from typing import Optional + +import numpy as np + + +class SurrogateMeta(ABC): + """The abstract class defines the surrogate loss that passed to meta-algorithm.""" + + def __init__(self): + pass + + @abstractmethod + def compute_surrogate_meta(self): + """Compute the surrogate loss that passed to meta-algorithm.""" + pass + +class InnerSwitchingSurrogateMeta(SurrogateMeta): + """The class defines the inner surrogate loss with switching cost for + meta-algorithm. + + Args: + penalty (float): Penalty coefficient of switching cost term. + norm (int): Order of norm :math:`p`. + order (int): Order of switching cost :math:`q`. + + """ + + def __init__(self, penalty: float, norm: int = 2, order: int = 2): + self.penalty = penalty + self.norm = norm + self.order = order + self.x_last = None + + def compute_surrogate_meta(self, variables): + """Set the surrogate loss of meta-algorithm as + + .. math:: + + \ell'_t(x)=\langle \\nabla f_t(x_t), x_{t,i} \\rangle + \lVert + x_{t,i} - x_{t-1, i} \\rVert_p^q, + + where :math:`x_t` is the submitted decision and :math:`x_{t, i}` is the + decision of base-learner i at round :math:`t`. + """ + loss = self.inner_switching(variables['x_bases'], variables['grad'], + self.penalty, self.x_last, self.norm, + self.order) + self.x_last = variables['x_bases'] + return loss + + @staticmethod + def inner_switching(x: np.ndarray, + gradient: np.ndarray, + penalty: float, + x_last: Optional[np.ndarray], + norm: int = 2, + order: int = 2): + if x_last is None: + x_last = x + return np.dot(x, gradient) + penalty * np.linalg.norm( + x - x_last, ord=norm, axis=1)**order diff --git a/StreamLearn/Algorithm/EnsembleUpdate/utils/__init__.py b/StreamLearn/Algorithm/EnsembleUpdate/utils/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..e49dee2e0dd1b682f3a92660e36178c4b99a6df6 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/utils/__init__.py @@ -0,0 +1,2 @@ +from . import data_generator +from . import plot diff --git a/StreamLearn/Algorithm/EnsembleUpdate/utils/data_generator.py b/StreamLearn/Algorithm/EnsembleUpdate/utils/data_generator.py new file mode 100755 index 0000000000000000000000000000000000000000..869ace27bc8912c9cd918b7507c1a8b67106a0e5 --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/utils/data_generator.py @@ -0,0 +1,73 @@ +import math +from abc import ABC, abstractmethod +from typing import Optional + +import numpy as np + + +class DataGenerator(ABC): + """Synthetic data generator.""" + + def __init__(self): + pass + + @abstractmethod + def generate_data(self): + """Generate data by parameters.""" + raise NotImplementedError() + + +class LinearRegressionGenerator(DataGenerator): + """Generate data for linear regression.""" + + def __init__(self): + super().__init__() + + def generate_data(self, + T: int, + dimension: int, + stage: int = 1, + radius: float = 1., + Gamma: float = 1., + mu: float = 0., + sigma: float = 0.05, + seed: Optional[int] = None): + """Generate linear data with with abrupt changing environment. + + The synthetic data are generated as follows: at each round, the feature + :math:`\\varphi_t \in \mathbb{R}^d` is is randomly generated from a ball + with a radius of :math:`\Gamma`, i.e., :math:`\{\\varphi \in \mathbb{R}^d \mid + \lVert \\varphi \\rVert_2 \leq \Gamma\}`; the associated label is set as + :math:`y_t = \\varphi_t^\\top x_t^* + \epsilon_t`, where :math:`\epsilon_t` is + a random Gaussian noise and :math:`x_t^*` is the underlying model. The + underlying model :math:`x_t^*` is randomly sampled from a ball with a + radius of :math:`R`. To simulate the non-stationary environments with + abrupt changes. The total rounds are divided into :math:`S` stages in + which the underlying model is forced to be stationary. + + Args: + T (int): Number of total rounds. + dimension (int): Dimension of the feature. + stage (int): Numbers of stages. + radius (float): radius of the underlying model. + Gamma (float): radius of the feature. + mu (float): Mean ("center") of the noise distribution. + sigma (float): Standard deviation (spread or "width") of the noise + distribution. Must be non-negative. + seed (Optional): Random seed to generate data. + """ + np.random.seed(seed) + random_directions = np.random.normal(size=(dimension, T)) + random_directions /= np.linalg.norm(random_directions, axis=0) + random_radii = np.random.random(T)**(1 / dimension) + feature = Gamma * (random_directions * random_radii).T + label = np.zeros(T) + step = math.ceil(T / stage) + for i in range(stage): + random_vec = np.random.normal(size=dimension) + x = random_vec / np.linalg.norm( + random_vec) * radius * np.random.rand() + left, right = i * step, min((i + 1) * step, T) + label[left:right] = np.dot(feature[left:right, :], x) + noise = np.random.normal(mu, sigma, T) + return feature, label + noise diff --git a/StreamLearn/Algorithm/EnsembleUpdate/utils/plot.py b/StreamLearn/Algorithm/EnsembleUpdate/utils/plot.py new file mode 100755 index 0000000000000000000000000000000000000000..8a09d21f3c8115f3321356c4dd4e82e67f57b30e --- /dev/null +++ b/StreamLearn/Algorithm/EnsembleUpdate/utils/plot.py @@ -0,0 +1,58 @@ +from typing import Optional +import numpy as np +import matplotlib +import matplotlib.pyplot as plt + + +def plot(loss: np.ndarray, + labels: list, + cum: bool = True, + title: Optional[str] = None, + file_path: Optional[str] = None, + x_label: Optional[str] = 'Iteration', + y_label: Optional[str] = 'Cumulative Loss'): + """Visualize the results of multiple learners.: + + Args: + loss (numpy.ndarray): Losses of multiple learners. + labels (list): labels of learners. + cum (bool): Show the cumulative loss or instantaneous loss. + title (str, optional): Title of the figure. + file_path (str, optional): File path to save the results. + x_lable (str, optional): Label of :math:`x` axis. + y_lable (str, optional): Label of :math:`y` axis. + """ + plt.figure() + matplotlib.rcParams['font.family'] = "sans-serif" + matplotlib.rcParams['font.sans-serif'] = "Arial" + if loss.ndim == 1: + loss = np.expand_dims(loss, axis = 0) + labels = [labels] + assert loss.ndim == 3 or loss.ndim == 2 + assert loss.shape[0] == len(labels) + if loss.ndim == 3: + xaxis = np.arange(0, loss.shape[2]) + if cum is True: + loss = np.cumsum(loss, axis=2) + loss_mean, loss_std = np.mean(loss, axis=1), np.std(loss, axis=1) + else: + xaxis = np.arange(0, loss.shape[1]) + if cum is True: + loss = np.cumsum(loss, axis=1) + loss_mean, loss_std = loss, np.zeros_like(loss) + + plt.grid(linestyle=':', linewidth=0.5) + plt.title(title) + plt.xlabel(x_label) + plt.ylabel(y_label) + for i in range(len(loss_mean)): + plt.plot(xaxis, loss_mean[i], label=labels[i]) + plt.fill_between( + xaxis, + loss_mean[i] - loss_std[i], + loss_mean[i] + loss_std[i], + alpha=0.15) + plt.legend(loc='upper left') + if file_path is not None: + plt.savefig(file_path) + plt.show() diff --git a/StreamLearn/Algorithm/__init__.py b/StreamLearn/Algorithm/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..6f84cbaf856e33decb1a7a3a6d6c4e38890c4200 100644 --- a/StreamLearn/Algorithm/__init__.py +++ b/StreamLearn/Algorithm/__init__.py @@ -0,0 +1,15 @@ +""" +StreamLearn Algorithm Module + +This module contains various stream learning and continual learning algorithms. +""" + +# Only import EnsembleUpdate for now to avoid dependency issues +try: + from . import EnsembleUpdate +except ImportError: + pass + +__all__ = [ + "EnsembleUpdate" +] diff --git a/StreamLearn/__init__.py b/StreamLearn/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/StreamLearn/legacy/README.md b/StreamLearn/legacy/README.md index 8afed62d76721ffc1eadeff849248d35f5aa12e7..9df625c3dd0cb2074762d83f7e99257596a6d80e 100644 --- a/StreamLearn/legacy/README.md +++ b/StreamLearn/legacy/README.md @@ -302,6 +302,23 @@ print('SketchedBandit CIFAR10 dataset: ', metrics) 实现了基于采样的、分布式流数据环境下、亚线性通信量的元素估计(NDV)算法。详见[https://gitee.com/yinhanyan/ndv_-estimation_in_distributed_environment](https://gitee.com/yinhanyan/ndv_-estimation_in_distributed_environment)。 +### 2.5 面向模型更新的集成结合算子 + + +本模块实现了面向流数据场景的在线集成更新算法,相关代码参见: + +- StreamLearn/Algorithm/EnsembleUpdate/base +- StreamLearn/Algorithm/EnsembleUpdate/meta +- StreamLearn/Algorithm/EnsembleUpdate/schedule +- StreamLearn/Algorithm/EnsembleUpdate/learner/swordpp.py +- StreamLearn/tests/test_EnsembleUpdate.py + +该算子结合了基础学习器(base)、元学习器(meta)和调度策略(schedule)模块,能够在数据流到来时动态地进行模型集成与更新,无需离线重训练,适用于在线学习和模型自适应场景。提供的模块算子可以相互组合, 构建集成更新算法. 在`StreamLearn/Algorithm/EnsembleUpdate/learner/swordpp.py` 中, 我们已提供构成的算法. + +**使用方法示例**(在 `StreamLearn/tests/test_EnsembleUpdate.py`中, 我们提供了参考测试文件). +在测试过程中,我们以在线回归任务为例,构造了一个带有阶段性变化的二次函数作为数据生成环境。算法在数据流到来时,动态地对每个时刻的输入进行预测,并根据真实反馈实时更新模型参数,实现了对分布变化的自适应。整个流程无需离线重训练,能够持续跟踪数据分布的变化,体现了集成更新算子的高效性与鲁棒性。具体实现细节可参考 `StreamLearn/tests/test_EnsembleUpdate.py`,其中包含了数据生成、模型初始化、在线交互等完整流程。 + + ## 课题三 ### 3.1 流数据分布自适应学习算法 @@ -365,7 +382,7 @@ python StreamLearn/tests/test_ODS.py --data PATH_TO_DATA --checkpoint PATH_TO_CH 地址: 通过百度网盘分享的文件:SAFC_datasets_CIFAR10.zip -链接:https://pan.baidu.com/s/1xtZjSxIIEMnUwoM7VXCzkQ +链接:https://pan.baidu.com/s/1xtZjSxIIEMnUwoM7VXCzkQ 提取码:nudt 说明: @@ -376,7 +393,7 @@ python StreamLearn/tests/test_ODS.py --data PATH_TO_DATA --checkpoint PATH_TO_CH def __init__(self,args_address): # 存储地址 self.save_dir=args_address.save_dir - + # 读取地址 self.PathSet=args_address.PathSet self.new_path=args_address.new_path @@ -397,7 +414,7 @@ def stream_fit(self): print('####eval####') print("Have read stage1 model!") print('####eval####') - + # 第二阶段数据读取与训练 SAFC_Stage2(self.new_path,stage1model,self.save_dir) # 第二阶段模型提取 @@ -426,7 +443,7 @@ def SAFC_Stage1(PathSet,save_dir): print('####eval####') print("开始读取第一阶段数据!") print('####eval####') - + X_past,Y_past=readbatchtoPython(PathSet) X_past,Y_past=datareconsrtuct(X_past,Y_past) Y_past=Y_past+1 @@ -465,7 +482,7 @@ def SAFC_Stage1(PathSet,save_dir): print('####eval####') print("end svm1 training!") print('####eval####') - + return ``` @@ -476,8 +493,8 @@ def SAFC_Stage2(new_path,svm1,save_dir): eta = 0.1 alpha_set = [10 ** -2, 10 ** -1, 10 ** 0] alpha = 0.001 - beta = 0.01 - + beta = 0.01 + # 第二阶段数据读取与模型训练 print('####eval####') print("开始读取第二阶段数据!") @@ -546,7 +563,7 @@ def SAFC_Stage2(new_path,svm1,save_dir): print('####eval####') print("end SAFC_ID training!") print('####eval####') - + return # 其中,alpha_best1, beta_best1, alpha_best2, beta_best2, eta是超参数 @@ -569,7 +586,7 @@ def SAFC_test(test_path,w_ours1,w_ours2): F1_weight_ours1, F1_weight_ours2 = [], [] F1_macro_ours1, F1_macro_ours2 = [], [] F1_micro_ours1, F1_micro_ours2 = [], [] - + # 测试数据读取与评估 print('####eval####') print("开始读取测试数据!") @@ -654,7 +671,7 @@ def SAFC_test(test_path,w_ours1,w_ours2): .format(meanAcc_ours1,meanAcc_ours2, meanAuc_ours1,meanAuc_ours2,meanF1_macro_ours1,meanF1_macro_ours2,meanF1_weight_ours1,meanF1_weight_ours2,meanF1_micro_ours1,meanF1_micro_ours2)) print('####eval####') print('Finished!') - + return ``` @@ -693,7 +710,7 @@ class BBDM_achieve(): def __init__(self,arg_address): self.parser = argparse.ArgumentParser(description='BBDM') - #数据读取 + #数据读取 def steam_dataread(self): if (self.args.shift_type == 3) or (self.args.shift_type == 4): self.alpha = np.ones(10) * self.args.shift_para @@ -730,7 +747,7 @@ class BBDM_achieve(): self.m_valid = self.raw_data.get_validsize() self.m_test = self.raw_data.get_testsize() - #训练 + #训练 def stream_fit(self): self.device = torch.device("cpu") self.kwargs = {'num_workers': 1, 'pin_memory': True} if self.use_cuda else {} @@ -742,9 +759,9 @@ class BBDM_achieve(): self.test_loader = data.DataLoader(self.test_data,batch_size=self.args.batch_size, shuffle=False, **self.kwargs) self.pre_test, self.acc_test, _, _, self.test_out = test(self.args,self.base_model, self.device, self.test_loader) self.test_tensor = torch.Tensor(self.test_out) - self.test_label = torch.Tensor(self.test_labels) + self.test_label = torch.Tensor(self.test_labels) self.classes_test = count_classes(self.test_label, self.num_classes) - self.preds_test = torch.softmax(self.test_tensor/self.T + self.b, dim=1) + self.preds_test = torch.softmax(self.test_tensor/self.T + self.b, dim=1) ``` @@ -848,7 +865,7 @@ def train_step( # 从data_batch中提取数据域 obs = data_batch[0] act = data_batch[1] - + # 消耗/使用数据 ... ``` diff --git a/StreamLearn/tests/test_EnsembleUpdate.py b/StreamLearn/tests/test_EnsembleUpdate.py new file mode 100755 index 0000000000000000000000000000000000000000..590bfe086a1f86195c66c75772fc3f5a516dad38 --- /dev/null +++ b/StreamLearn/tests/test_EnsembleUpdate.py @@ -0,0 +1,47 @@ +import sys +import os +import numpy as np +from StreamLearn.Algorithm.EnsembleUpdate.environment.domain import Ball +from StreamLearn.Algorithm.EnsembleUpdate.learner.models.swordpp import SwordPP +from StreamLearn.Algorithm.EnsembleUpdate.utils.data_generator import LinearRegressionGenerator +from StreamLearn.Algorithm.EnsembleUpdate.utils.plot import plot + +def main(): + # 添加项目根目录到Python路径 + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) + + # Prepare model + T, dimension, stage, R, Gamma, scale = 2000, 3, 100, 1, 1, 1/2 + feature, label = LinearRegressionGenerator().generate_data( + T, dimension, stage, R, Gamma, seed=0) + D, r = 2 * R, R + G = scale * D * Gamma ** 2 + C = scale * 1 / 2 * (D * Gamma) ** 2 + L_smooth = Gamma ** 2 + + domain = Ball(dimension=dimension, radius=R) + online_learner = SwordPP( + domain=domain, + T=T, + G=G, + L_smooth=L_smooth, + seed=0) + labels = 'SwordPP' + + # Online interaction + loss = np.zeros((T)) + for t in range(T): + # Decision + x_t = online_learner.predict() + # Get loss function + loss_func = lambda x: scale * 1 / 2 * ((np.dot(x, feature[t]) - label[t]) ** 2) + loss[t] = loss_func(x_t) + # Update model + online_learner.fit(loss_func=loss_func, online_to_batch=False) + + print("stream ensemble update test is finished!") + # Plot loss curve, and you can specify the file_path to save the plot + plot(loss, labels, file_path=None) + +if __name__ == "__main__": + main()