代码拉取完成,页面将自动刷新
import random
import numpy as np
import torch
from methods.methods_proto import ResourceAllocation, MDPResourceAllocation
from models.ra_agent import ResourceAllocationAgent
from models.ra_agent_dvfs import ResourceAllocationAgentDvfs
from models.resource_embedding import ResourceEmbedding
from models.resource_embedding_dvfs import ResourceEmbeddingDvfs
from models.task_embedding import TaskEmbedding
from models.task_embedding_dvfs import TaskEmbeddingDvfs
from utils.given_data import AssertConst
def find_min_electricity_price(dcs, time):
lowest_price = float('inf')
best_dc = None
for dc in dcs:
cost = dc.electricity_price_at(time)
if cost < lowest_price:
lowest_price = cost
best_dc = dc
return best_dc
class RandomPolicy(ResourceAllocation):
def run(self, task, workflow, system):
dc = random.choice(system.dcs)
server = random.choice(dc.servers)
vm = random.choice(server.vms)
vm.add_task(task, 0)
class RandomPolicyDvfs(ResourceAllocation):
def run(self, task, workflow, system):
dc = random.choice(system.dcs)
server = random.choice(dc.servers)
vm = random.choice(server.vms)
vf = random.randint(0, system.num_vf_levels - 1)
vm.add_task(task, vf)
class SimplePolicy(ResourceAllocation):
def run(self, task, workflow, system):
selected_dc = find_min_electricity_price(system.dcs, task.avg_est)
best_vm = None
best_cost = float('inf')
for server in selected_dc.servers:
for vm in server.vms:
if task.predecessors:
avail_time = max([predecessor.finish_time for predecessor in task.predecessors])
est = max(avail_time, vm.avail_time)
else:
est = vm.avail_time
eft = est + task.workload / vm.speed[-1]
max_transtime = 0
for pred in task.predecessors:
if pred.vm.server == vm.server:
continue
if (transtime := task.get_data_volume_transferred_from(pred) / (
system.B_in.current_bw if pred.vm.dc != vm.dc else system.B_out.current_bw)) > max_transtime:
max_transtime = transtime
eft += max_transtime
if eft <= task.deadline:
cost = (eft - est) * selected_dc.electricity_price_at(est)
if cost < best_cost:
best_vm = vm
best_cost = cost
if best_vm is None:
best_vm = max(selected_dc.vms, key=lambda vm: vm.speed[0] / vm.power[0])
best_vm.add_task(task, 0)
return True
class SimplePolicyDvfs(ResourceAllocation):
def run(self, task, workflow, system):
selected_dc = find_min_electricity_price(system.dcs, task.avg_est)
best_vm = None
best_cost = float('inf')
best_vf_level = 0
for server in selected_dc.servers:
for vm in server.vms:
if task.predecessors:
avail_time = max([predecessor.finish_time for predecessor in task.predecessors])
est = max(avail_time, vm.avail_time)
else:
est = vm.avail_time
for vf_level, speed, power in zip(range(len(vm.speed)), vm.speed, vm.power):
eft = est + task.workload / speed
max_transtime = 0
for pred in task.predecessors:
if pred.vm.server == vm.server:
continue
if (transtime := task.get_data_volume_transferred_from(pred) / (
system.B_in.current_bw if pred.vm.dc != vm.dc else system.B_out.current_bw)) > max_transtime:
max_transtime = transtime
eft += max_transtime
if eft <= task.deadline:
cost = (eft - est) * selected_dc.electricity_price_at(est) * power
if cost < best_cost:
best_vm = vm
best_cost = cost
best_vf_level = vf_level
else:
break
if best_vm is None:
best_vm = max(selected_dc.vms, key=lambda vm: vm.speed[0] / vm.power[0])
best_vm.add_task(task, best_vf_level)
return True
def reward_fn(task_execution):
if task_execution is None:
return 0
task = task_execution.task
return - task_execution.electricity_cost * (1 - max(0, (task.finish_time - task.deadline) / task.deadline))
class RLAgentWithFallbackPolicy(MDPResourceAllocation):
def __init__(self, Conft=AssertConst.CONFIDENCE_THRESHOLD, agent=None, fallback_policy=None, given_device='cuda'):
if agent is None:
self.agent = ResourceAllocationAgent.load_best()
self.agent.to(given_device)
else:
self.agent = agent
self.given_device = given_device
if fallback_policy is None:
self.fallback_policy = SimplePolicy()
else:
self.fallback_policy = fallback_policy
self.reward_fn = reward_fn
self.task_embedding = TaskEmbedding.load_best(device=given_device)
self.resource_embedding = ResourceEmbedding.load_best(device=given_device)
self.Conft = Conft
self.rc_nodes_vec = None
def reset_sys_graph(self):
self.rc_nodes_vec = None
def step(self, task, workflow, system, methods_state):
task_graph = workflow.to_digraph(system=system, focused_task=task)
task_repr = self.task_embedding.run(task_graph)
task_repr = torch.mean(task_repr, dim=0)
# resource_repr = self.resource_embedding.run(resource_graph)
# resource_repr = torch.mean(resource_repr, dim=0)
if self.rc_nodes_vec is None:
resource_graph = system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes + server_indexes].reshape(-1)
self.rc_nodes_vec = rc_nodes_vec
else:
rc_nodes_vec = self.rc_nodes_vec
state = np.concatenate([
task_repr, rc_nodes_vec,
system.flatten_normalized_electricity_prices,
methods_state
])
action, prob = self.agent.run(state, given_device=self.given_device)
if prob >= self.Conft:
dc_index, vm_index = action
vm = system.dcs[dc_index].vms[vm_index]
vm.add_task(task, 0)
return 1
else:
self.fallback_policy.run(task, workflow, system)
return 0
class RLAgentWithFallbackPolicyDvfs(MDPResourceAllocation):
def __init__(self, Conft=AssertConst.CONFIDENCE_THRESHOLD, agent=None, fallback_policy=None, given_device='cuda'):
if agent is None:
self.agent = ResourceAllocationAgentDvfs.load_best()
else:
self.agent = agent
self.agent.to(given_device)
if fallback_policy is None:
self.fallback_policy = SimplePolicyDvfs()
else:
self.fallback_policy = fallback_policy
self.given_device = given_device
self.reward_fn = reward_fn
self.task_embedding = TaskEmbeddingDvfs.load_best()
self.resource_embedding = ResourceEmbeddingDvfs.load_best()
self.Conft = Conft
self.rc_nodes_vec = None
def reset_sys_graph(self):
self.rc_nodes_vec = None
def step(self, task, workflow, system, methods_state):
task_graph = workflow.to_digraph(system=system, focused_task=task)
task_repr = self.task_embedding.run(task_graph)
task_repr = torch.mean(task_repr, dim=0)
# resource_repr = torch.mean(resource_repr, dim=0)
if self.rc_nodes_vec is None:
resource_graph = system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes + server_indexes].reshape(-1)
self.rc_nodes_vec = rc_nodes_vec
else:
rc_nodes_vec = self.rc_nodes_vec
state = np.concatenate([
task_repr, rc_nodes_vec,
system.flatten_normalized_electricity_prices,
methods_state
])
action, prob = self.agent.run(state, given_device=self.given_device)
if prob >= self.Conft:
dc_index, vm_index, vf_level = action
vm = system.dcs[dc_index].vms[vm_index]
vm.add_task(task, vf_level)
return 1
else:
self.fallback_policy.run(task, workflow, system)
return 0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。