代码拉取完成,页面将自动刷新
import itertools
import random
from enum import Enum
from typing import Optional, Union, List
import gym
import numpy as np
import torch
from gym.core import RenderFrame
from wfcommons import MontageRecipe, EpigenomicsRecipe, GenomeRecipe
from methods.dp_method import BottleLayerAwareDeadlinePartition
from methods.methods_proto import WorkflowSequencing, DeadlinePartition, TaskSequencing
from methods.ts_method import MixedRandomTaskSequencing, CompositeTaskSequencing
from methods.ws_method import ContentionAwareWorkflowSequencing
from resources.system import System
from schedulers.scheduler_proto import Scheduler
from tasks.workflow import Workflow
class CWSAgent(WorkflowSequencing):
def __init__(self, avail_alphas=None):
if avail_alphas is None:
avail_alphas = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
self.combinations = [[a1, a2, a3] for a1, a2, a3 in
filter(lambda x: sum(x) == 1, itertools.product(*[avail_alphas] * 3))]
self.chosen = random.choice(self.combinations)
self.method = ContentionAwareWorkflowSequencing(*self.chosen)
def run(self, workflows, system):
return self.method.run(workflows, system)
class BLDPAgent(DeadlinePartition):
def __init__(self, avail_beta=None):
if avail_beta is None:
avail_beta = [1.5, 2, 4, 8, 16]
self.avail_beta = avail_beta
self.chosen = random.choice(self.avail_beta)
self.method = BottleLayerAwareDeadlinePartition(self.chosen)
def run(self, workflow, system):
return self.method.run(workflow, system)
class TSAgent(TaskSequencing):
def __init__(self, avail_TS=None):
if avail_TS is None:
avail_TS = [0, 1, 2]
self.avail_TS = avail_TS
self.chosen = random.choice(avail_TS)
self.method = CompositeTaskSequencing(self.chosen)
def run(self, workflow, system):
return self.method.run(workflow, system)
class Signal(Enum):
NEW_WORKFLOW = -1
TASKS_RUN_OUT = 0
WORKFLOWS_RUN_OUT = 1
DONE_SIMULATION = 2
class SchedulerAgent(Scheduler):
def get_params_dict(self):
return {
'alpha1_3': self.ws_method.chosen,
'beta': self.dp_method.chosen,
'TS': self.ts_method.chosen
}
def __init__(self, avail_alphas=None, avail_beta=None, avail_TS=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ws_method = CWSAgent(avail_alphas)
self.dp_method = BLDPAgent(avail_beta)
self.ts_method = TSAgent(avail_TS)
self.state = np.array([
*self.ws_method.chosen,
self.dp_method.chosen,
0, 0, 0
])
self.state[4 + self.ts_method.chosen] = 1
def run(self, workflows, system):
workflows = self.ws_method.run(workflows, system)
for workflow in workflows:
self.dp_method.run(workflow, system)
tasks = self.ts_method.run(workflow, system)
for task in tasks:
task.workflow = workflow
yield task
def run(system, termination_time=3600 * 24, slot_duration=1200, workflows=None):
# workflows = Workflow.default_workflows()
if workflows is None:
workflows = Workflow.make_workflows(rhos=(random.choice([0.2, 0.4, 0.6, 0.8, 1]),), reset_data=True,
reset_arrivals=True,
recipes=(random.choice([MontageRecipe, EpigenomicsRecipe, GenomeRecipe]),),
build_system=System.default_system)
agent = SchedulerAgent()
yield agent.state
while system.clock < termination_time:
duration = min(slot_duration, (termination_time - system.clock))
remaining_seconds = duration
while remaining_seconds:
remaining_seconds = system.process(remaining_seconds)
system.clock += duration
arrived_workflows = []
while workflows and workflows[0].submit_time <= system.clock:
arrived_workflows.append(workflows[0])
workflows = workflows[1:]
if arrived_workflows:
system.to_graph()
system.set_gen_graph(True)
for task in agent.run(arrived_workflows, system):
yield task
system.set_gen_graph(False)
if not workflows:
break
# for dc in system.dcs:
# for server in dc.servers:
# for vm in server.vms:
# system.electricity_cost += sum([te.electricity_cost for te in vm.tasks])
yield Signal.DONE_SIMULATION
# yield system.electricity_cost
class DummyFirstTask:
def __init__(self):
self.deadline = 1
self.finish_time = 1
class DummyFirstTaskExecution:
def __init__(self):
self.task = DummyFirstTask()
self.electricity_cost = 0
class SimpleEnv(gym.Env):
def __init__(self, state_space, action_space, reward_fn, task_embedding_cls, resource_embedding_cls, build_system,
termination_time=3600 * 24, slot_duration=600, seed=9527, workflows=None):
self.observation_space = state_space
self.action_space = action_space
self.reward_fn = reward_fn
self.current_workflow = None
self.current_task = None
self.current_state = None
self.current_reward = 0
self.last_task_execution = DummyFirstTaskExecution()
self.task_embedding = task_embedding_cls.load_best()
self.resource_embedding = resource_embedding_cls.load_best()
self.system = build_system()
self.build_system = build_system
self.termination_time = termination_time
self.slot_duration = slot_duration
self.action_space.seed(seed)
self.observation_space.seed(seed)
self.workflows = workflows
self.done = False
self.sync_run = None
self.cost = None
self.methods_state = None
def step(self, action: np.array):
dc_index, vm_index = int(action[0]), int(action[1])
vm = self.system.dcs[dc_index].vms[vm_index]
vf_level = len(vm.speed) - 1
self.last_task_execution = vm.add_task(self.current_task, vf_level)
self.step_run()
return self.current_state, self.current_reward, self.done, False, {}
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
self.system = self.build_system()
self.sync_run = run(self.system, self.termination_time, self.slot_duration, workflows=self.workflows)
self.done = False
self.methods_state = next(self.sync_run)
self.step_run()
return self.current_state, {}
def render(self) -> Optional[Union[RenderFrame, List[RenderFrame]]]:
pass
def step_run(self):
if (task_or_done := next(self.sync_run)) is not Signal.DONE_SIMULATION:
self.current_workflow = task_or_done.workflow
self.current_task = task_or_done
task_graph = self.current_workflow.to_digraph(system=self.system, focused_task=self.current_task)
# TODO: concat task, resource...
resource_graph = self.system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes+server_indexes].reshape(-1)
self.current_state = torch.cat([
torch.mean(self.task_embedding.run(task_graph), dim=0),
rc_nodes_vec,
torch.tensor(self.system.flatten_normalized_electricity_prices),
torch.tensor(self.methods_state)
], dim=0).cpu().numpy()
self.current_state = torch.cat([
torch.mean(self.task_embedding.run(task_graph), dim=0),
# torch.mean(rc_nodes_vec, dim=0),
rc_nodes_vec,
torch.tensor(self.system.flatten_normalized_electricity_prices),
torch.tensor(self.methods_state)
], dim=0)
self.current_reward = self.reward_fn(self.last_task_execution)
else:
self.done = True
# self.cost = next(self.sync_run)
self.sync_run = None
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。