代码拉取完成,页面将自动刷新
"""
This script is used to convert the cluster performance directory generated by mindinsight r1.4 and previous versions
into the new cluster performance directory.
"""
import os
import shutil
import logging
import json
import re
import argparse
class TransformClusterProfilerFile:
"""Transform cluster profiler file."""
_host_ips_mapping_filename = 'host_ips_mapping.txt'
def __init__(self, src_cluster_profiler_dir, dst_cluster_profiler_dir):
self._src_cluster_profiler_dir = src_cluster_profiler_dir
self._dst_cluster_profiler_dir = os.path.join(dst_cluster_profiler_dir, 'profiler')
self._host_ips_mapping_info = self._get_host_ips_mapping_info()
self._host_ips_dir = self._get_host_ips_dir()
self._host_device_rank_relation = self._get_host_device_rank_relation()
def execute(self):
if not os.path.exists(self._dst_cluster_profiler_dir):
os.makedirs(self._dst_cluster_profiler_dir)
for item in self._host_device_rank_relation:
# item[0]:host_ip, item[2]:rank_id
self._transform_cluster_profiler_file(item[0], item[2])
def _transform_cluster_profiler_file(self, host_ip, rank_id):
profiler_dir = os.path.join(self._src_cluster_profiler_dir, 'cluster_profiler', host_ip, 'profiler')
if not os.path.exists(profiler_dir):
logging.error('Did not find profiler dir : %s', profiler_dir)
return
for entry in os.scandir(profiler_dir):
if entry.is_file():
new_file_name = re.sub(r'\d', rank_id, entry.name)
shutil.copyfile(os.path.join(profiler_dir, entry.name),
os.path.join(self._dst_cluster_profiler_dir, new_file_name))
def _get_host_ips_mapping_info(self):
"""Get host ips mapping info."""
host_ips_mapping_info = list()
file_path = os.path.join(self._src_cluster_profiler_dir, self._host_ips_mapping_filename)
if not os.path.exists(file_path):
logging.error('Did not find host_ips_mapping file: %s', file_path)
with open(file_path, 'r') as src_file:
for line in src_file.readlines():
mapping_info = line.split()
if len(mapping_info) > 1:
# mapping_info[0]:host_ip, mapping_info[1]:host_mapping_ip
host_ips_mapping_info.append([mapping_info[0], mapping_info[1]])
return host_ips_mapping_info
def _get_host_ips_dir(self):
"""Get host ips dir."""
host_ips_dir = []
target_dir_path = os.path.join(self._src_cluster_profiler_dir, 'cluster_profiler')
if not os.path.exists(target_dir_path):
logging.error('Did not find cluster_profiler dir : %s', target_dir_path)
entries = os.scandir(target_dir_path)
# host_mapping_id_index:1
host_mapping_ips = [i[1] for i in self._host_ips_mapping_info]
for entry in entries:
if entry.is_symlink():
continue
if entry.is_dir():
if entry.name in host_mapping_ips:
host_ips_dir.append(entry.name)
return host_ips_dir
def _get_host_device_rank_relation(self):
"""Get host_ip device_id rank_id relation."""
rank_table_file_path = self._get_rank_table_file_path()
if not os.path.exists(rank_table_file_path):
logging.error('Did not find rank table file under %s', self._src_cluster_profiler_dir)
with open(rank_table_file_path, 'r', encoding='utf-8') as file:
try:
relation_info = json.load(file)
except json.JSONDecodeError as err:
logging.exception(err)
host_device_rank_relation = list()
servers_info = relation_info.get("server_list")
for server_info in servers_info:
server_id = server_info.get("server_id")
devices_info = server_info.get("device")
for device_info in devices_info:
device_id = device_info.get("device_id")
rank_id = device_info.get("rank_id")
host_device_rank_relation.append([server_id, device_id, rank_id])
host_ips_mapping_info = self._get_host_ips_mapping_info()
for item in host_device_rank_relation:
# host_ip_index:0,host_mapping_id_index:1
target_info = [i for i in host_ips_mapping_info if item[0] == i[0]]
# target_info is like:[[host_ip, host_mapping_ip]]
item[0] = target_info[0][1]
return host_device_rank_relation
def _get_rank_table_file_path(self):
"""Get rank table file path."""
file_path = ''
target_dir_path = self._src_cluster_profiler_dir
entries = os.scandir(target_dir_path)
for entry in entries:
if entry.is_symlink():
continue
if entry.is_file() and entry.name.endswith('.json'):
file_path = os.path.join(target_dir_path, entry.name)
break
return file_path
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--src_dir', type=str, help='the cluster directory path to be converted')
parser.add_argument('--dst_dir', type=str, help='converted cluster directory path')
args = parser.parse_args()
print("Start conversion...")
transformFile = TransformClusterProfilerFile(args.src_dir, args.dst_dir)
transformFile.execute()
print("Conversion completed.")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。