0 Star 1 Fork 1

public-artifacts/ecmws-experiments

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ra_method.py 9.28 KB
一键复制 编辑 原始数据 按行查看 历史
victor puscarschi 提交于 2025-02-10 17:39 +08:00 . init
import random
import numpy as np
import torch
from methods.methods_proto import ResourceAllocation, MDPResourceAllocation
from models.ra_agent import ResourceAllocationAgent
from models.ra_agent_dvfs import ResourceAllocationAgentDvfs
from models.resource_embedding import ResourceEmbedding
from models.resource_embedding_dvfs import ResourceEmbeddingDvfs
from models.task_embedding import TaskEmbedding
from models.task_embedding_dvfs import TaskEmbeddingDvfs
from utils.given_data import AssertConst
def find_min_electricity_price(dcs, time):
lowest_price = float('inf')
best_dc = None
for dc in dcs:
cost = dc.electricity_price_at(time)
if cost < lowest_price:
lowest_price = cost
best_dc = dc
return best_dc
class RandomPolicy(ResourceAllocation):
def run(self, task, workflow, system):
dc = random.choice(system.dcs)
server = random.choice(dc.servers)
vm = random.choice(server.vms)
vm.add_task(task, 0)
class RandomPolicyDvfs(ResourceAllocation):
def run(self, task, workflow, system):
dc = random.choice(system.dcs)
server = random.choice(dc.servers)
vm = random.choice(server.vms)
vf = random.randint(0, system.num_vf_levels - 1)
vm.add_task(task, vf)
class SimplePolicy(ResourceAllocation):
def run(self, task, workflow, system):
selected_dc = find_min_electricity_price(system.dcs, task.avg_est)
best_vm = None
best_cost = float('inf')
for server in selected_dc.servers:
for vm in server.vms:
if task.predecessors:
avail_time = max([predecessor.finish_time for predecessor in task.predecessors])
est = max(avail_time, vm.avail_time)
else:
est = vm.avail_time
eft = est + task.workload / vm.speed[-1]
max_transtime = 0
for pred in task.predecessors:
if pred.vm.server == vm.server:
continue
if (transtime := task.get_data_volume_transferred_from(pred) / (
system.B_in.current_bw if pred.vm.dc != vm.dc else system.B_out.current_bw)) > max_transtime:
max_transtime = transtime
eft += max_transtime
if eft <= task.deadline:
cost = (eft - est) * selected_dc.electricity_price_at(est)
if cost < best_cost:
best_vm = vm
best_cost = cost
if best_vm is None:
best_vm = max(selected_dc.vms, key=lambda vm: vm.speed[0] / vm.power[0])
best_vm.add_task(task, 0)
return True
class SimplePolicyDvfs(ResourceAllocation):
def run(self, task, workflow, system):
selected_dc = find_min_electricity_price(system.dcs, task.avg_est)
best_vm = None
best_cost = float('inf')
best_vf_level = 0
for server in selected_dc.servers:
for vm in server.vms:
if task.predecessors:
avail_time = max([predecessor.finish_time for predecessor in task.predecessors])
est = max(avail_time, vm.avail_time)
else:
est = vm.avail_time
for vf_level, speed, power in zip(range(len(vm.speed)), vm.speed, vm.power):
eft = est + task.workload / speed
max_transtime = 0
for pred in task.predecessors:
if pred.vm.server == vm.server:
continue
if (transtime := task.get_data_volume_transferred_from(pred) / (
system.B_in.current_bw if pred.vm.dc != vm.dc else system.B_out.current_bw)) > max_transtime:
max_transtime = transtime
eft += max_transtime
if eft <= task.deadline:
cost = (eft - est) * selected_dc.electricity_price_at(est) * power
if cost < best_cost:
best_vm = vm
best_cost = cost
best_vf_level = vf_level
else:
break
if best_vm is None:
best_vm = max(selected_dc.vms, key=lambda vm: vm.speed[0] / vm.power[0])
best_vm.add_task(task, best_vf_level)
return True
def reward_fn(task_execution):
if task_execution is None:
return 0
task = task_execution.task
return - task_execution.electricity_cost * (1 - max(0, (task.finish_time - task.deadline) / task.deadline))
class RLAgentWithFallbackPolicy(MDPResourceAllocation):
def __init__(self, Conft=AssertConst.CONFIDENCE_THRESHOLD, agent=None, fallback_policy=None, given_device='cuda'):
if agent is None:
self.agent = ResourceAllocationAgent.load_best()
self.agent.to(given_device)
else:
self.agent = agent
self.given_device = given_device
if fallback_policy is None:
self.fallback_policy = SimplePolicy()
else:
self.fallback_policy = fallback_policy
self.reward_fn = reward_fn
self.task_embedding = TaskEmbedding.load_best(device=given_device)
self.resource_embedding = ResourceEmbedding.load_best(device=given_device)
self.Conft = Conft
self.rc_nodes_vec = None
def reset_sys_graph(self):
self.rc_nodes_vec = None
def step(self, task, workflow, system, methods_state):
task_graph = workflow.to_digraph(system=system, focused_task=task)
task_repr = self.task_embedding.run(task_graph)
task_repr = torch.mean(task_repr, dim=0)
# resource_repr = self.resource_embedding.run(resource_graph)
# resource_repr = torch.mean(resource_repr, dim=0)
if self.rc_nodes_vec is None:
resource_graph = system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes + server_indexes].reshape(-1)
self.rc_nodes_vec = rc_nodes_vec
else:
rc_nodes_vec = self.rc_nodes_vec
state = np.concatenate([
task_repr, rc_nodes_vec,
system.flatten_normalized_electricity_prices,
methods_state
])
action, prob = self.agent.run(state, given_device=self.given_device)
if prob >= self.Conft:
dc_index, vm_index = action
vm = system.dcs[dc_index].vms[vm_index]
vm.add_task(task, 0)
return 1
else:
self.fallback_policy.run(task, workflow, system)
return 0
class RLAgentWithFallbackPolicyDvfs(MDPResourceAllocation):
def __init__(self, Conft=AssertConst.CONFIDENCE_THRESHOLD, agent=None, fallback_policy=None, given_device='cuda'):
if agent is None:
self.agent = ResourceAllocationAgentDvfs.load_best()
else:
self.agent = agent
self.agent.to(given_device)
if fallback_policy is None:
self.fallback_policy = SimplePolicyDvfs()
else:
self.fallback_policy = fallback_policy
self.given_device = given_device
self.reward_fn = reward_fn
self.task_embedding = TaskEmbeddingDvfs.load_best()
self.resource_embedding = ResourceEmbeddingDvfs.load_best()
self.Conft = Conft
self.rc_nodes_vec = None
def reset_sys_graph(self):
self.rc_nodes_vec = None
def step(self, task, workflow, system, methods_state):
task_graph = workflow.to_digraph(system=system, focused_task=task)
task_repr = self.task_embedding.run(task_graph)
task_repr = torch.mean(task_repr, dim=0)
# resource_repr = torch.mean(resource_repr, dim=0)
if self.rc_nodes_vec is None:
resource_graph = system.to_graph()
dc_indexes = []
server_indexes = []
for i, node in enumerate(resource_graph.nodes):
if node.startswith('d') and len(node) <= 2:
dc_indexes.append(i)
if len(node.split('_')) == 2:
server_indexes.append(i)
rc_nodes_vec = self.resource_embedding.run(resource_graph)
rc_nodes_vec = rc_nodes_vec[dc_indexes + server_indexes].reshape(-1)
self.rc_nodes_vec = rc_nodes_vec
else:
rc_nodes_vec = self.rc_nodes_vec
state = np.concatenate([
task_repr, rc_nodes_vec,
system.flatten_normalized_electricity_prices,
methods_state
])
action, prob = self.agent.run(state, given_device=self.given_device)
if prob >= self.Conft:
dc_index, vm_index, vf_level = action
vm = system.dcs[dc_index].vms[vm_index]
vm.add_task(task, vf_level)
return 1
else:
self.fallback_policy.run(task, workflow, system)
return 0
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/public-artifacts/ecmws-experiments.git
git@gitee.com:public-artifacts/ecmws-experiments.git
public-artifacts
ecmws-experiments
ecmws-experiments
main

搜索帮助