代码拉取完成,页面将自动刷新
import multiprocessing as mp
import os
import pickle
import random
import time
import torch.multiprocessing
from functools import partial
from os import path as op
from wfcommons import GenomeRecipe, EpigenomicsRecipe, MontageRecipe
from baseline.baseline_scheduler import ECWSD, DEWS
from resources.system import System
from schedulers.random_scheduler import RandomScheduler
from schedulers.scheduler_proto import Scheduler
from schedulers.the_scheduler import ECMWS
from tasks.workflow import Workflow
torch.multiprocessing.set_start_method('spawn', force=True)
def get_results_folder():
return op.join(op.dirname(__file__), '../results/comparing')
def get_data_folder():
return op.join(op.dirname(__file__), '../data/tasks')
def run(workflows, scheduler: Scheduler = RandomScheduler(), termination_time=3600 * 24, slot_duration=600):
system = System.default_system()
while system.clock < termination_time:
duration = min(slot_duration, (termination_time - system.clock))
remaining_seconds = duration
while remaining_seconds:
remaining_seconds = system.process(remaining_seconds)
system.clock += duration
arrived_workflows = []
while workflows and workflows[0].submit_time <= system.clock:
arrived_workflows.append(workflows[0])
workflows = workflows[1:]
if arrived_workflows:
scheduler.run(arrived_workflows, system)
if not workflows:
break
for dc in system.dcs:
for server in dc.servers:
for vm in server.vms:
system.electricity_cost += sum([te.electricity_cost for te in vm.tasks])
return system.electricity_cost
def run_scheduler(cls_list, workflows_egs, seed=9527, num_instances=1):
result = {
cls: [] for cls in cls_list
}
rng = random.Random(seed)
for eg in workflows_egs:
recipe, N, n, rho, workflows = eg
system = System.default_system()
for i in range(num_instances):
for cls in cls_list:
for workflow in workflows:
workflow.reset(rng=rng, rho=rho, system=system, reset_arrivals=True, reset_data=True)
scheduler = cls.load_best()
start = time.time()
cost = run(workflows, scheduler)
end = time.time()
violations = 0
for workflow in workflows:
eft = max([task.finish_time for task in workflow.tasks])
if eft > workflow.deadline:
violations += 1
result[cls].append((recipe, cost, violations, end - start, N, n, rho, cls.__name__))
return result
def run_scheduler_on_chunk(chunk_file, cls_list, num_instances=1):
with open(chunk_file, 'rb') as f:
workflows_eg = pickle.load(f)
return run_scheduler(cls_list, workflows_eg, num_instances=num_instances)
def save_tuples_to_disk(tuples, folder_name, chunk_size=100):
os.makedirs(folder_name, exist_ok=True)
for i in range(0, len(tuples), chunk_size):
with open(os.path.join(folder_name, f'chunk_{i // chunk_size}.pkl'), 'wb') as f:
pickle.dump(tuples[i:i + chunk_size], f)
if __name__ == '__main__':
schedulers = [
ECMWS,
ECWSD,
DEWS,
]
run_scheduler_partial = partial(run_scheduler_on_chunk, cls_list=schedulers, num_instances=2)
recipes = [
MontageRecipe,
EpigenomicsRecipe,
GenomeRecipe
]
workflows_list = Workflow.make_workflows(recipes=recipes,
Ns=(40, 60, 80, 100, 120), ns=(100, 150, 200),
rhos=(0.2, 0.4, 0.6, 0.8, 1.0), return_info=True,
build_system=System.default_system, reset_data=True, init_switch=True)
chunk_size = 1
chunks_folder = op.join(get_data_folder(), 'workflow_chunks_comp')
save_tuples_to_disk(workflows_list, chunks_folder, chunk_size=chunk_size)
chunk_files = [os.path.join(chunks_folder, f) for f in os.listdir(chunks_folder) if f.endswith('.pkl')]
with mp.Pool(96) as pool:
results = pool.map(run_scheduler_partial, chunk_files)
scheduler2result = {
ECMWS: [],
ECWSD: [],
DEWS: []
}
for result in results:
for key in scheduler2result:
scheduler2result[key].extend(result[key])
for scheduler, result in scheduler2result.items():
with open(op.join(get_results_folder(), f'{scheduler.__name__}_comp.pkl'), 'wb') as f:
pickle.dump(result, f)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。