0 Star 1 Fork 1

public-artifacts/ecmws-experiments

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
comparing.py 4.56 KB
一键复制 编辑 原始数据 按行查看 历史
victor puscarschi 提交于 2025-02-10 17:39 +08:00 . init
import multiprocessing as mp
import os
import pickle
import random
import time
import torch.multiprocessing
from functools import partial
from os import path as op
from wfcommons import GenomeRecipe, EpigenomicsRecipe, MontageRecipe
from baseline.baseline_scheduler import ECWSD, DEWS
from resources.system import System
from schedulers.random_scheduler import RandomScheduler
from schedulers.scheduler_proto import Scheduler
from schedulers.the_scheduler import ECMWS
from tasks.workflow import Workflow
torch.multiprocessing.set_start_method('spawn', force=True)
def get_results_folder():
return op.join(op.dirname(__file__), '../results/comparing')
def get_data_folder():
return op.join(op.dirname(__file__), '../data/tasks')
def run(workflows, scheduler: Scheduler = RandomScheduler(), termination_time=3600 * 24, slot_duration=600):
system = System.default_system()
while system.clock < termination_time:
duration = min(slot_duration, (termination_time - system.clock))
remaining_seconds = duration
while remaining_seconds:
remaining_seconds = system.process(remaining_seconds)
system.clock += duration
arrived_workflows = []
while workflows and workflows[0].submit_time <= system.clock:
arrived_workflows.append(workflows[0])
workflows = workflows[1:]
if arrived_workflows:
scheduler.run(arrived_workflows, system)
if not workflows:
break
for dc in system.dcs:
for server in dc.servers:
for vm in server.vms:
system.electricity_cost += sum([te.electricity_cost for te in vm.tasks])
return system.electricity_cost
def run_scheduler(cls_list, workflows_egs, seed=9527, num_instances=1):
result = {
cls: [] for cls in cls_list
}
rng = random.Random(seed)
for eg in workflows_egs:
recipe, N, n, rho, workflows = eg
system = System.default_system()
for i in range(num_instances):
for cls in cls_list:
for workflow in workflows:
workflow.reset(rng=rng, rho=rho, system=system, reset_arrivals=True, reset_data=True)
scheduler = cls.load_best()
start = time.time()
cost = run(workflows, scheduler)
end = time.time()
violations = 0
for workflow in workflows:
eft = max([task.finish_time for task in workflow.tasks])
if eft > workflow.deadline:
violations += 1
result[cls].append((recipe, cost, violations, end - start, N, n, rho, cls.__name__))
return result
def run_scheduler_on_chunk(chunk_file, cls_list, num_instances=1):
with open(chunk_file, 'rb') as f:
workflows_eg = pickle.load(f)
return run_scheduler(cls_list, workflows_eg, num_instances=num_instances)
def save_tuples_to_disk(tuples, folder_name, chunk_size=100):
os.makedirs(folder_name, exist_ok=True)
for i in range(0, len(tuples), chunk_size):
with open(os.path.join(folder_name, f'chunk_{i // chunk_size}.pkl'), 'wb') as f:
pickle.dump(tuples[i:i + chunk_size], f)
if __name__ == '__main__':
schedulers = [
ECMWS,
ECWSD,
DEWS,
]
run_scheduler_partial = partial(run_scheduler_on_chunk, cls_list=schedulers, num_instances=2)
recipes = [
MontageRecipe,
EpigenomicsRecipe,
GenomeRecipe
]
workflows_list = Workflow.make_workflows(recipes=recipes,
Ns=(40, 60, 80, 100, 120), ns=(100, 150, 200),
rhos=(0.2, 0.4, 0.6, 0.8, 1.0), return_info=True,
build_system=System.default_system, reset_data=True, init_switch=True)
chunk_size = 1
chunks_folder = op.join(get_data_folder(), 'workflow_chunks_comp')
save_tuples_to_disk(workflows_list, chunks_folder, chunk_size=chunk_size)
chunk_files = [os.path.join(chunks_folder, f) for f in os.listdir(chunks_folder) if f.endswith('.pkl')]
with mp.Pool(96) as pool:
results = pool.map(run_scheduler_partial, chunk_files)
scheduler2result = {
ECMWS: [],
ECWSD: [],
DEWS: []
}
for result in results:
for key in scheduler2result:
scheduler2result[key].extend(result[key])
for scheduler, result in scheduler2result.items():
with open(op.join(get_results_folder(), f'{scheduler.__name__}_comp.pkl'), 'wb') as f:
pickle.dump(result, f)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/public-artifacts/ecmws-experiments.git
git@gitee.com:public-artifacts/ecmws-experiments.git
public-artifacts
ecmws-experiments
ecmws-experiments
main

搜索帮助