代码拉取完成,页面将自动刷新
import itertools
import random
from enum import Enum
from typing import Optional, Union, List
import gym
import numpy as np
import torch
from gym.core import RenderFrame
from wfcommons import MontageRecipe, EpigenomicsRecipe, GenomeRecipe
from methods.dp_method import FrequencyAwareDeadlinePartition
from methods.methods_proto import WorkflowSequencing, DeadlinePartition, TaskSequencing
from methods.ts_method import FrequencyAwareTaskSequencing
from methods.ws_method import FrequencyAndContentionAwareWorkflowSequencing
from resources.system import System
from schedulers.scheduler_proto import Scheduler
from tasks.workflow import Workflow
class FCWSAgent(WorkflowSequencing):
def __init__(self, avail_alphas=None):
if avail_alphas is None:
avail_alphas = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
self.combinations = [[a1, a2, a3] for a1, a2, a3 in
filter(lambda x: sum(x) == 1, itertools.product(*[avail_alphas] * 3))]
self.chosen = random.choice(self.combinations)
self.method = FrequencyAndContentionAwareWorkflowSequencing(*self.chosen)
def run(self, workflows, system):
return self.method.run(workflows, system)
class FDPAgent(DeadlinePartition):
def __init__(self, avail_DP=None):
if avail_DP is None:
avail_DP = [0, 1, 2]
self.avail_DP = avail_DP
self.chosen = random.choice(self.avail_DP)
self.method = FrequencyAwareDeadlinePartition(self.chosen)
def run(self, workflow, system):
return self.method.run(workflow, system)
class FTSAgent(TaskSequencing):
def __init__(self, avail_TS=None):
if avail_TS is None:
avail_TS = [0, 1, 2]
self.avail_TS = avail_TS
self.chosen = random.choice(self.avail_TS)
self.method = FrequencyAwareTaskSequencing(self.chosen)
def run(self, workflow, system):
return self.method.run(workflow, system)
class Signal(Enum):
NEW_WORKFLOW = -1
TASKS_RUN_OUT = 0
WORKFLOWS_RUN_OUT = 1
DONE_SIMULATION = 2
class SchedulerAgentDvfs(Scheduler):
def get_params_dict(self):
return {
'alpha1_3': self.ws_method.chosen,
'DP': self.dp_method.chosen,
'TS': self.ts_method.chosen,
}
def __init__(self, avail_alphas=None, avail_DP=None, avail_TS=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ws_method = FCWSAgent(avail_alphas)
self.dp_method = FDPAgent(avail_DP)
self.ts_method = FTSAgent(avail_TS)
self.state = np.array([
*self.ws_method.chosen,
0, 0, 0,
0, 0, 0
])
self.state[3 + self.dp_method.chosen] = 1
self.state[6 + self.ts_method.chosen] = 1
def run(self, workflows, system):
workflows = self.ws_method.run(workflows, system)
for workflow in workflows:
self.dp_method.run(workflow, system)
for task in self.ts_method.run(workflow, system):
task.workflow = workflow
yield task
def run(system, termination_time=3600 * 24, slot_duration=1200, workflows=None):
if workflows is None:
workflows = Workflow.make_workflows(rhos=(random.choice([0.2, 0.4, 0.6, 0.8, 1]),), reset_data=True,
reset_arrivals=True,
recipes=(random.choice([MontageRecipe, EpigenomicsRecipe, GenomeRecipe]),),
build_system=System.default_dvfs_system)
agent = SchedulerAgentDvfs()
yield agent.state
while system.clock < termination_time:
duration = min(slot_duration, (termination_time - system.clock))
remaining_seconds = duration
while remaining_seconds:
remaining_seconds = system.process(remaining_seconds)
system.clock += duration
arrived_workflows = []
while workflows and workflows[0].submit_time <= system.clock:
arrived_workflows.append(workflows[0])
workflows = workflows[1:]
if arrived_workflows:
for task in agent.run(arrived_workflows, system):
yield task
if not workflows:
break
yield Signal.DONE_SIMULATION
class DummyFirstTask:
def __init__(self):
self.deadline = 1
self.finish_time = 1
class DummyFirstTaskExecution:
def __init__(self):
self.task = DummyFirstTask()
self.electricity_cost = 0
class SimpleEnvDvfs(gym.Env):
def __init__(self, state_space, action_space, reward_fn, task_embedding_cls, resource_embedding_cls, build_system,
termination_time=3600 * 24, slot_duration=600, seed=9527, workflows=None):
self.observation_space = state_space
self.action_space = action_space
self.reward_fn = reward_fn
self.current_workflow = None
self.current_task = None
self.current_state = None
self.current_reward = 0
self.last_task_execution = DummyFirstTaskExecution()
self.task_embedding = task_embedding_cls.load_best()
self.resource_embedding = resource_embedding_cls.load_best()
self.system = build_system()
self.build_system = build_system
self.termination_time = termination_time
self.slot_duration = slot_duration
self.action_space.seed(seed)
self.observation_space.seed(seed)
self.workflows = workflows
self.done = False
self.sync_run = None
self.cost = None
self.methods_state = None
def step(self, action: np.array):
dc_index, vm_index, vf_level = int(action[0]), int(action[1]), int(action[2])
vm = self.system.dcs[dc_index].vms[vm_index]
self.last_task_execution = vm.add_task(self.current_task, vf_level)
self.step_run()
return self.current_state, self.current_reward, self.done, False, {}
def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
self.system = self.build_system()
self.sync_run = run(self.system, self.termination_time, self.slot_duration, workflows=self.workflows)
self.done = False
self.methods_state = next(self.sync_run)
self.step_run()
return self.current_state, {}
def render(self) -> Optional[Union[RenderFrame, List[RenderFrame]]]:
pass
def step_run(self):
if (task_or_done := next(self.sync_run)) is not Signal.DONE_SIMULATION:
self.current_workflow = task_or_done.workflow
self.current_task = task_or_done
task_graph = self.current_workflow.to_digraph(system=self.system, focused_task=self.current_task)
resource_graph = self.system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes+server_indexes].reshape(-1)
self.current_state = torch.cat([
torch.mean(self.task_embedding.run(task_graph), dim=0),
rc_nodes_vec,
torch.tensor(self.system.flatten_normalized_electricity_prices),
torch.tensor(self.methods_state)
], dim=0).cpu().numpy()
self.current_reward = self.reward_fn(self.last_task_execution)
else:
self.done = True
self.sync_run = None
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。