Ai
0 Star 1 Fork 1

public-artifacts/ecmws-experiments

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
simple_env.py 8.36 KB
一键复制 编辑 原始数据 按行查看 历史
victor puscarschi 提交于 2025-02-10 17:39 +08:00 . init
import itertools
import random
from enum import Enum
from typing import Optional, Union, List
import gym
import numpy as np
import torch
from gym.core import RenderFrame
from wfcommons import MontageRecipe, EpigenomicsRecipe, GenomeRecipe
from methods.dp_method import BottleLayerAwareDeadlinePartition
from methods.methods_proto import WorkflowSequencing, DeadlinePartition, TaskSequencing
from methods.ts_method import MixedRandomTaskSequencing, CompositeTaskSequencing
from methods.ws_method import ContentionAwareWorkflowSequencing
from resources.system import System
from schedulers.scheduler_proto import Scheduler
from tasks.workflow import Workflow
class CWSAgent(WorkflowSequencing):
def __init__(self, avail_alphas=None):
if avail_alphas is None:
avail_alphas = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
self.combinations = [[a1, a2, a3] for a1, a2, a3 in
filter(lambda x: sum(x) == 1, itertools.product(*[avail_alphas] * 3))]
self.chosen = random.choice(self.combinations)
self.method = ContentionAwareWorkflowSequencing(*self.chosen)
def run(self, workflows, system):
return self.method.run(workflows, system)
class BLDPAgent(DeadlinePartition):
def __init__(self, avail_beta=None):
if avail_beta is None:
avail_beta = [1.5, 2, 4, 8, 16]
self.avail_beta = avail_beta
self.chosen = random.choice(self.avail_beta)
self.method = BottleLayerAwareDeadlinePartition(self.chosen)
def run(self, workflow, system):
return self.method.run(workflow, system)
class TSAgent(TaskSequencing):
def __init__(self, avail_TS=None):
if avail_TS is None:
avail_TS = [0, 1, 2]
self.avail_TS = avail_TS
self.chosen = random.choice(avail_TS)
self.method = CompositeTaskSequencing(self.chosen)
def run(self, workflow, system):
return self.method.run(workflow, system)
class Signal(Enum):
NEW_WORKFLOW = -1
TASKS_RUN_OUT = 0
WORKFLOWS_RUN_OUT = 1
DONE_SIMULATION = 2
class SchedulerAgent(Scheduler):
def get_params_dict(self):
return {
'alpha1_3': self.ws_method.chosen,
'beta': self.dp_method.chosen,
'TS': self.ts_method.chosen
}
def __init__(self, avail_alphas=None, avail_beta=None, avail_TS=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ws_method = CWSAgent(avail_alphas)
self.dp_method = BLDPAgent(avail_beta)
self.ts_method = TSAgent(avail_TS)
self.state = np.array([
*self.ws_method.chosen,
self.dp_method.chosen,
0, 0, 0
])
self.state[4 + self.ts_method.chosen] = 1
def run(self, workflows, system):
workflows = self.ws_method.run(workflows, system)
for workflow in workflows:
self.dp_method.run(workflow, system)
tasks = self.ts_method.run(workflow, system)
for task in tasks:
task.workflow = workflow
yield task
def run(system, termination_time=3600 * 24, slot_duration=1200, workflows=None):
# workflows = Workflow.default_workflows()
if workflows is None:
workflows = Workflow.make_workflows(rhos=(random.choice([0.2, 0.4, 0.6, 0.8, 1]),), reset_data=True,
reset_arrivals=True,
recipes=(random.choice([MontageRecipe, EpigenomicsRecipe, GenomeRecipe]),),
build_system=System.default_system)
agent = SchedulerAgent()
yield agent.state
while system.clock < termination_time:
duration = min(slot_duration, (termination_time - system.clock))
remaining_seconds = duration
while remaining_seconds:
remaining_seconds = system.process(remaining_seconds)
system.clock += duration
arrived_workflows = []
while workflows and workflows[0].submit_time <= system.clock:
arrived_workflows.append(workflows[0])
workflows = workflows[1:]
if arrived_workflows:
system.to_graph()
system.set_gen_graph(True)
for task in agent.run(arrived_workflows, system):
yield task
system.set_gen_graph(False)
if not workflows:
break
# for dc in system.dcs:
# for server in dc.servers:
# for vm in server.vms:
# system.electricity_cost += sum([te.electricity_cost for te in vm.tasks])
yield Signal.DONE_SIMULATION
# yield system.electricity_cost
class DummyFirstTask:
def __init__(self):
self.deadline = 1
self.finish_time = 1
class DummyFirstTaskExecution:
def __init__(self):
self.task = DummyFirstTask()
self.electricity_cost = 0
class SimpleEnv(gym.Env):
def __init__(self, state_space, action_space, reward_fn, task_embedding_cls, resource_embedding_cls, build_system,
termination_time=3600 * 24, slot_duration=600, seed=9527, workflows=None):
self.observation_space = state_space
self.action_space = action_space
self.reward_fn = reward_fn
self.current_workflow = None
self.current_task = None
self.current_state = None
self.current_reward = 0
self.last_task_execution = DummyFirstTaskExecution()
self.task_embedding = task_embedding_cls.load_best()
self.resource_embedding = resource_embedding_cls.load_best()
self.system = build_system()
self.build_system = build_system
self.termination_time = termination_time
self.slot_duration = slot_duration
self.action_space.seed(seed)
self.observation_space.seed(seed)
self.workflows = workflows
self.done = False
self.sync_run = None
self.cost = None
self.methods_state = None
def step(self, action: np.array):
dc_index, vm_index = int(action[0]), int(action[1])
vm = self.system.dcs[dc_index].vms[vm_index]
vf_level = len(vm.speed) - 1
self.last_task_execution = vm.add_task(self.current_task, vf_level)
self.step_run()
return self.current_state, self.current_reward, self.done, False, {}
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
self.system = self.build_system()
self.sync_run = run(self.system, self.termination_time, self.slot_duration, workflows=self.workflows)
self.done = False
self.methods_state = next(self.sync_run)
self.step_run()
return self.current_state, {}
def render(self) -> Optional[Union[RenderFrame, List[RenderFrame]]]:
pass
def step_run(self):
if (task_or_done := next(self.sync_run)) is not Signal.DONE_SIMULATION:
self.current_workflow = task_or_done.workflow
self.current_task = task_or_done
task_graph = self.current_workflow.to_digraph(system=self.system, focused_task=self.current_task)
# TODO: concat task, resource...
resource_graph = self.system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes+server_indexes].reshape(-1)
self.current_state = torch.cat([
torch.mean(self.task_embedding.run(task_graph), dim=0),
rc_nodes_vec,
torch.tensor(self.system.flatten_normalized_electricity_prices),
torch.tensor(self.methods_state)
], dim=0).cpu().numpy()
self.current_state = torch.cat([
torch.mean(self.task_embedding.run(task_graph), dim=0),
# torch.mean(rc_nodes_vec, dim=0),
rc_nodes_vec,
torch.tensor(self.system.flatten_normalized_electricity_prices),
torch.tensor(self.methods_state)
], dim=0)
self.current_reward = self.reward_fn(self.last_task_execution)
else:
self.done = True
# self.cost = next(self.sync_run)
self.sync_run = None
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/public-artifacts/ecmws-experiments.git
git@gitee.com:public-artifacts/ecmws-experiments.git
public-artifacts
ecmws-experiments
ecmws-experiments
main

搜索帮助