From c1630ab9bb89bf22168a9b8d35537b84081a780d Mon Sep 17 00:00:00 2001 From: fanglanyue Date: Fri, 29 Aug 2025 11:06:55 +0800 Subject: [PATCH] msprof-analyze use msmonitor.db --- .../cluster_analyse/analysis/base_analysis.py | 1 + .../cluster_analyse/cluster_analysis.py | 44 +-- .../data_preprocessor.py | 70 ++++ .../mindspore_data_preprocessor.py | 24 +- .../msprof_data_preprocessor.py | 4 +- .../prof_data_allocate.py | 207 ++++++++++ .../pytorch_data_preprocessor.py | 27 +- .../recipes/base_recipe_analysis.py | 52 +-- .../communication_matrix_sum.py | 2 +- .../recipes/freq_analysis/freq_analysis.py | 2 +- .../recipes/slow_rank/slow_rank.py | 2 +- .../compare_backend/utils/args_manager.py | 23 +- .../msprof_analyze/prof_common/constant.py | 2 + .../test_data_preprocessor.py | 90 +++++ .../test_prof_data_allocate.py | 352 ++++++++++++++++++ .../test_pytorch_data_preprocessor.py | 5 +- .../recipes/test_base_recipe_analysis.py | 5 +- 17 files changed, 808 insertions(+), 104 deletions(-) create mode 100644 profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/prof_data_allocate.py create mode 100644 profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_data_preprocessor.py create mode 100644 profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_prof_data_allocate.py diff --git a/profiler/msprof_analyze/cluster_analyse/analysis/base_analysis.py b/profiler/msprof_analyze/cluster_analyse/analysis/base_analysis.py index 0d14af769..f72e80644 100644 --- a/profiler/msprof_analyze/cluster_analyse/analysis/base_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/analysis/base_analysis.py @@ -30,6 +30,7 @@ class BaseAnalysis: self.cluster_analysis_output_path = param.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH) self.data_map = param.get(Constant.DATA_MAP) self.data_type = param.get(Constant.DATA_TYPE) + self.prof_type = param.get(Constant.PROFILING_TYPE) self.communication_ops = [] self.collective_group_dict = param.get(Constant.COMM_DATA_DICT, {}).get(Constant.COLLECTIVE_GROUP) self.comm_ops_struct = {} diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py b/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py index fb1b1ccba..e96c9dc3b 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_analysis.py @@ -20,9 +20,7 @@ import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from msprof_analyze.cluster_analyse.analysis.analysis_facade import AnalysisFacade -from msprof_analyze.cluster_analyse.cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor -from msprof_analyze.cluster_analyse.cluster_data_preprocess.mindspore_data_preprocessor import MindsporeDataPreprocessor -from msprof_analyze.cluster_analyse.cluster_data_preprocess.msprof_data_preprocessor import MsprofDataPreprocessor +from msprof_analyze.cluster_analyse.cluster_data_preprocess.prof_data_allocate import ProfDataAllocate from msprof_analyze.cluster_analyse.communication_group.communication_group_generator import CommunicationGroupGenerator from msprof_analyze.prof_common.additional_args_manager import AdditionalArgsManager from msprof_analyze.prof_common.constant import Constant @@ -70,35 +68,11 @@ class Interface: return self.collection_path def allocate_prof_data(self): - ascend_pt_dirs = [] - ascend_ms_dirs = [] - prof_dirs = [] - for root, dirs, _ in PathManager.limited_depth_walk(self.collection_path): - for dir_name in dirs: - if dir_name.endswith(self.ASCEND_PT): - ascend_pt_dirs.append(os.path.join(root, dir_name)) - if dir_name.endswith(self.ASCEND_MS): - ascend_ms_dirs.append(os.path.join(root, dir_name)) - if dir_name.startswith(self.PROF): - prof_dirs.append(os.path.join(root, dir_name)) - pytorch_processor = PytorchDataPreprocessor(ascend_pt_dirs) - pt_data_map = pytorch_processor.get_data_map() - pt_data_type = pytorch_processor.get_data_type() - ms_processor = MindsporeDataPreprocessor(ascend_ms_dirs) - ms_data_map = ms_processor.get_data_map() - ms_data_type = ms_processor.get_data_type() - if pt_data_map and ms_data_map: - logger.error("Can not analyze pytorch and mindspore meantime.") + allocator = ProfDataAllocate(self.collection_path) + if not allocator.allocate_prof_data(): return {} - if pt_data_map: - return {Constant.DATA_MAP: pt_data_map, Constant.DATA_TYPE: pt_data_type, Constant.IS_MSPROF: False} - if ms_data_map: - return {Constant.DATA_MAP: ms_data_map, Constant.DATA_TYPE: ms_data_type, Constant.IS_MSPROF: False, - Constant.IS_MINDSPORE: True} - msprof_processor = MsprofDataPreprocessor(prof_dirs) - prof_data_map = msprof_processor.get_data_map() - prof_data_type = msprof_processor.get_data_type() - return {Constant.DATA_MAP: prof_data_map, Constant.DATA_TYPE: prof_data_type, Constant.IS_MSPROF: True} + return {Constant.DATA_MAP: allocator.data_map, Constant.DATA_TYPE: allocator.data_type, + Constant.PROFILING_TYPE: allocator.prof_type} def run(self): PathManager.check_input_directory_path(self.collection_path) @@ -106,7 +80,8 @@ class Interface: PathManager.check_path_owner_consistent([self.collection_path, self.cluster_analysis_output_path]) data_dict = self.allocate_prof_data() - data_map, data_type = data_dict.get(Constant.DATA_MAP), data_dict.get(Constant.DATA_TYPE) + data_map, data_type, prof_type = (data_dict.get(Constant.DATA_MAP), data_dict.get(Constant.DATA_TYPE), + data_dict.get(Constant.PROFILING_TYPE)) if not data_map: logger.warning("Can not get rank info or profiling data.") return @@ -120,8 +95,9 @@ class Interface: Constant.ANALYSIS_MODE: self.analysis_mode, Constant.DATA_MAP: data_map, Constant.DATA_TYPE: data_type, - Constant.IS_MSPROF: data_dict.get(Constant.IS_MSPROF, False), - Constant.IS_MINDSPORE: data_dict.get(Constant.IS_MINDSPORE, False), + Constant.PROFILING_TYPE: data_dict.get(Constant.PROFILING_TYPE), + Constant.IS_MSPROF: prof_type == Constant.MSPROF, + Constant.IS_MINDSPORE: prof_type == Constant.MINDSPORE, Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: self.cluster_analysis_output_path }) if self.analysis_mode in COMM_FEATURE_LIST: diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/data_preprocessor.py b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/data_preprocessor.py index c227d669a..c4574ad94 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/data_preprocessor.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/data_preprocessor.py @@ -16,20 +16,78 @@ import os import re from abc import abstractmethod +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger + +logger = get_logger() + class DataPreprocessor: PROFILER_INFO_HEAD = 'profiler_info_' PROFILER_INFO_EXTENSION = '.json' + TIME_POSITION_DICT = { + Constant.PYTORCH: -3, + Constant.MINDSPORE: -3, + Constant.MSMONITOR: -2, + Constant.MSPROF: -2 + } + PROFILING_DIR_FORMAT = { + Constant.PYTORCH: "{worker_name}_{timestamp}_ascend_pt", + Constant.MINDSPORE: "{worker_name}_{timestamp}_ascend_ms", + Constant.MSPROF: "PROF_{number}_{timestamp}_{string}", + Constant.MSMONITOR: "msmonitor_{pid}_{timestamp}_{rank_id}.db" + } def __init__(self, path_list: list): self.path_list = path_list self.data_map = {} + self.data_type = None @property @abstractmethod def db_pattern(self): pass + @staticmethod + def postprocess_data_map(data_map, prof_type): + if not data_map: + return {} + + timestamp_position = DataPreprocessor.TIME_POSITION_DICT.get(prof_type, None) + if timestamp_position is None: + logger.error(f'Unsupported profiling type: {prof_type}. ' + f'Unable to determine timestamp position for path processing.') + return {} + + valid_data_map = {} + invalid_ranks = [] + + for rank_id, path_list in data_map.items(): + if not path_list: + continue + if len(path_list) == 1: + valid_data_map[rank_id] = path_list[0] + continue + + # 处理多个路径的情况,选择时间戳最新的路径 + try: + sorted_paths = sorted(path_list, key=lambda x: int(x.split('_')[timestamp_position]), reverse=True) + latest_path = sorted_paths[0] + valid_data_map[rank_id] = latest_path + logger.info(f"Rank {rank_id}: Multiple profiling paths detected. " + f"Selected latest timestamp path: {latest_path}") + except Exception as e: + invalid_ranks.append(rank_id) + + if invalid_ranks: + logger.warning( + "Failed to process multiple profiling paths for some ranks. " + f"Affected rank_id: {invalid_ranks}. " + f"Expected path formats: {DataPreprocessor.PROFILING_DIR_FORMAT.get(prof_type)}" + ) + + return valid_data_map + @abstractmethod def get_data_map(self): pass @@ -46,6 +104,18 @@ class DataPreprocessor: return rank_id return -1 + def get_data_type(self): + if self.data_type is not None: + return self.data_type + data_type_record = set() + for _, dir_name in self.data_map.items(): + ascend_profiler_output = os.path.join(dir_name, Constant.ASCEND_PROFILER_OUTPUT) + data_type = Constant.DB if self._check_db_type(ascend_profiler_output) else Constant.TEXT + data_type_record.add(data_type) + if len(data_type_record) == 1: + return data_type_record.pop() + return Constant.INVALID + def _check_db_type(self, dir_name): for file_name in os.listdir(dir_name): if re.match(self.db_pattern, file_name): diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py index 328d0fa9b..16411996b 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py @@ -43,27 +43,19 @@ class MindsporeDataPreprocessor(DataPreprocessor): return "" def get_data_map(self) -> dict: + unknown_rank_paths = [] rank_id_map = defaultdict(list) for dir_name in self.path_list: rank_id = self.get_rank_id(dir_name) if rank_id < 0: - logger.error("fail to get rankid or rankid invalid.") + unknown_rank_paths.append(dir_name) continue ascend_profiler_output = os.path.join(dir_name, Constant.ASCEND_PROFILER_OUTPUT) if os.path.exists(ascend_profiler_output) and os.path.isdir(ascend_profiler_output): - data_type = Constant.DB if self._check_db_type(ascend_profiler_output) else Constant.TEXT - self.data_type.add(data_type) rank_id_map[rank_id].append(dir_name) - logger.debug(f"rank_id: {rank_id}, data_type: {data_type}, directory: {dir_name}") - try: - for (rank_id, dir_list) in rank_id_map.items(): - dir_list.sort(key=lambda x: x.split('_')[-3]) - self.data_map[rank_id] = dir_list[0] - except Exception as e: - raise RuntimeError("Found invalid directory name!") from e - return self.data_map - - def get_data_type(self): - if len(self.data_type) == 1: - return self.data_type.pop() - return Constant.INVALID + self.data_map = self.postprocess_data_map(rank_id_map, Constant.MINDSPORE) + if unknown_rank_paths: + logger.warning(f"Failed to get rank_id for some paths." + f"Affected paths: {unknown_rank_paths}\n" + "Expected to get rank_id from profiler_info_{rank_id}.json") + return self.data_map \ No newline at end of file diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/msprof_data_preprocessor.py b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/msprof_data_preprocessor.py index b212d7ce1..b8e989c90 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/msprof_data_preprocessor.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/msprof_data_preprocessor.py @@ -88,9 +88,7 @@ class MsprofDataPreprocessor(DataPreprocessor): prof_data_uid[(host_id, device_id)].append(dir_name) if prof_data_rank: - for rank_id, dir_list in prof_data_rank.items(): - dir_list.sort(key=lambda x: x.split('_')[-2]) - self.data_map[rank_id] = dir_list[0] + self.data_map = self.postprocess_data_map(prof_data_rank, Constant.MSPROF) else: ordered_keys = sorted(prof_data_uid.keys(), key=lambda x: (x[0], x[1])) rank_id = 0 diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/prof_data_allocate.py b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/prof_data_allocate.py new file mode 100644 index 000000000..9db3e8137 --- /dev/null +++ b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/prof_data_allocate.py @@ -0,0 +1,207 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from collections import defaultdict +import re +from typing import List, Dict + +from msprof_analyze.cluster_analyse.cluster_data_preprocess.data_preprocessor import DataPreprocessor +from msprof_analyze.cluster_analyse.cluster_data_preprocess.mindspore_data_preprocessor import MindsporeDataPreprocessor +from msprof_analyze.cluster_analyse.cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor +from msprof_analyze.cluster_analyse.cluster_data_preprocess.msprof_data_preprocessor import MsprofDataPreprocessor +from msprof_analyze.prof_common.path_manager import PathManager +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.prof_common.logger import get_logger + +logger = get_logger() + + +class ProfDataAllocate: + + DB_PATTERNS = { + Constant.PYTORCH: re.compile(r'^ascend_pytorch_profiler(?:_\d+)?\.db$'), + Constant.MINDSPORE: re.compile(r'^ascend_mindspore_profiler(?:_\d+)?\.db$'), + Constant.MSPROF: re.compile(r'^msprof_\d{14}\.db$'), + Constant.MSMONITOR: re.compile(r'^msmonitor_(\d+)_\d{17}_(-1|\d+)\.db$') + } + + ASCEND_PT = "ascend_pt" + ASCEND_MS = "ascend_ms" + PROF = "PROF_" + + def __init__(self, profiling_path): + self.profiling_path = profiling_path + self.data_type = "" + self.data_map = {} + self.prof_type = "" + + self._msmonitor_data_map = {} + + @staticmethod + def match_file_pattern_in_dir(dir_name, file_pattern): + for file_name in os.listdir(dir_name): + if file_pattern.match(file_name): + return file_name + return "" + + @staticmethod + def _extract_rank_id_from_profiler_db(file_name: str, prof_type: str): + """从profiler_db文件名中提取rank_id,传入的file_name已经过正则匹配""" + try: + if prof_type in [Constant.PYTORCH, Constant.MINDSPORE, Constant.MSMONITOR]: + return int(file_name.strip(".db").split("_")[-1]) + else: + logger.error(f"Unsupported prof_type {prof_type}. Can not extract rank_id from profile db.") + return None + except (IndexError, ValueError): + return None + + @staticmethod + def _postprocess_data_maps(data_maps: Dict): + """后处理数据映射""" + return ( + DataPreprocessor.postprocess_data_map(data_maps[Constant.PYTORCH], Constant.PYTORCH), + DataPreprocessor.postprocess_data_map(data_maps[Constant.MINDSPORE], Constant.MINDSPORE), + DataPreprocessor.postprocess_data_map(data_maps[Constant.MSMONITOR], Constant.MSMONITOR) + ) + + def allocate_prof_data(self): + if self.allocate_db_prof_data() and self.prof_type in [Constant.PYTORCH, Constant.MINDSPORE]: + return True + if self.allocate_text_prof_data(): + return True + if self._msmonitor_data_map: + self._set_prof_data(Constant.MSMONITOR, Constant.DB, self._msmonitor_data_map) + return True + logger.error(f"Failed to allocate profiling data!") + return False + + def allocate_db_prof_data(self): + data_maps = { + Constant.PYTORCH: defaultdict(list), + Constant.MINDSPORE: defaultdict(list), + Constant.MSMONITOR: defaultdict(list) + } + # 处理输入路径,搜索路径下所有文件夹与文件,max_depth=10 + for root, dirs, files in PathManager.limited_depth_walk(self.profiling_path): + self._scan_dirs_for_profiler_db(root, dirs, data_maps) + self._scan_files_for_msmonitor_db(root, files, data_maps[Constant.MSMONITOR]) + + # 处理输入路径为msmonitor db文件的情况 + if os.path.isfile(self.profiling_path): + root, file_name = os.path.split(self.profiling_path) + self._scan_files_for_msmonitor_db(root, [file_name], data_maps[Constant.MSMONITOR]) + + # data_map: Dict[int, List[str]] --> Dict[int, str] 卡号路径一一对应 + pytorch_data_map, mindspore_data_map, msmonitor_data_map = self._postprocess_data_maps(data_maps) + + if not (pytorch_data_map or mindspore_data_map or msmonitor_data_map): + return False + + # 检查是否多种类型文件同时存在 + if pytorch_data_map and mindspore_data_map: + logger.error(f"Can not analysis pytorch and mindspore at the same time!") + self.prof_type = Constant.INVALID + return False + + # 确定采集类型prof_type + if msmonitor_data_map: + self._msmonitor_data_map = msmonitor_data_map + if pytorch_data_map: + self._set_prof_data(Constant.PYTORCH, Constant.DB, pytorch_data_map) + if mindspore_data_map: + self._set_prof_data(Constant.MINDSPORE, Constant.DB, mindspore_data_map) + return True + + def allocate_text_prof_data(self): + if self.prof_type == Constant.INVALID: + return False + + ascend_pt_dirs = [] + ascend_ms_dirs = [] + prof_dirs = [] + + def classify_dir(root, dir_name): + """根据文件夹名称分类""" + dir_path = os.path.join(root, dir_name) + if dir_name.endswith(self.ASCEND_PT): + ascend_pt_dirs.append(dir_path) + elif dir_name.endswith(self.ASCEND_MS): + ascend_ms_dirs.append(dir_path) + elif dir_name.startswith(self.PROF): + prof_dirs.append(dir_path) + + # 单独处理输入路径 + parent_dir = os.path.dirname(self.profiling_path) + current_dir_name = os.path.basename(self.profiling_path) + classify_dir(parent_dir, current_dir_name) + # 递归处理子路径 + for root, dirs, _ in PathManager.limited_depth_walk(self.profiling_path): + for dir_name in dirs: + classify_dir(root, dir_name) + + pytorch_processor = PytorchDataPreprocessor(ascend_pt_dirs) + pt_data_map = pytorch_processor.get_data_map() + ms_processor = MindsporeDataPreprocessor(ascend_ms_dirs) + ms_data_map = ms_processor.get_data_map() + if pt_data_map and ms_data_map: + logger.error("Can not analyze pytorch and mindspore meantime.") + self.prof_type = Constant.INVALID + return False + if pt_data_map: + self._set_prof_data(Constant.PYTORCH, Constant.TEXT, pt_data_map) + return True + if ms_data_map: + self._set_prof_data(Constant.MINDSPORE, Constant.TEXT, ms_data_map) + return True + + # 统一处理msprof数据 + msprof_processor = MsprofDataPreprocessor(prof_dirs) + msprof_data_map = msprof_processor.get_data_map() + msprof_data_type = msprof_processor.get_data_type() + if msprof_data_map and msprof_data_type != Constant.INVALID: + self._set_prof_data(Constant.MSPROF, msprof_data_type, msprof_data_map) + return True + return False + + def _scan_dirs_for_profiler_db(self, root: str, dirs: List[str], data_maps: Dict): + if Constant.ASCEND_PROFILER_OUTPUT not in dirs: + return + profiler_dir = os.path.join(root, Constant.ASCEND_PROFILER_OUTPUT) + for prof_type in [Constant.PYTORCH, Constant.MINDSPORE]: + file_name = self.match_file_pattern_in_dir(profiler_dir, self.DB_PATTERNS[prof_type]) + if not file_name: + continue + rank_id = self._extract_rank_id_from_profiler_db(file_name, prof_type) + if rank_id is not None: + data_maps[prof_type][rank_id].append(root) + + def _scan_files_for_msmonitor_db(self, root: str, files: List[str], msmonitor_map: Dict): + msmonitor_pattern = self.DB_PATTERNS[Constant.MSMONITOR] + for file_name in files: + if file_name.endswith(".db") and msmonitor_pattern.match(file_name): + rank_id = self._extract_rank_id_from_profiler_db(file_name, Constant.MSMONITOR) + if rank_id is not None: + msmonitor_map[rank_id].append(os.path.join(root, file_name)) + + def _set_prof_data(self, prof_type, data_type, data_map): + if prof_type != Constant.MSMONITOR and self._msmonitor_data_map: + logger.warning(f"Find {prof_type} and msmonitor data at the same time! Just analysis {prof_type} data!") + self.prof_type = prof_type + self.data_type = data_type + self.data_map = data_map + + diff --git a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index b79249336..482b9fd32 100644 --- a/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/msprof_analyze/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -27,34 +27,31 @@ class PytorchDataPreprocessor(DataPreprocessor): def __init__(self, path_list: list): super().__init__(path_list) - self.data_type = set() @property def db_pattern(self): return r'^ascend_pytorch_profiler(?:_\d+)?\.db$' def get_data_map(self) -> dict: + unknown_rank_paths = [] rank_id_map = defaultdict(list) for dir_name in self.path_list: rank_id = self.get_rank_id(dir_name) if rank_id < 0: - logger.error("fail to get rankid or rankid invalid.") + unknown_rank_paths.append(dir_name) continue ascend_profiler_output = os.path.join(dir_name, Constant.ASCEND_PROFILER_OUTPUT) if os.path.exists(ascend_profiler_output) and os.path.isdir(ascend_profiler_output): - data_type = Constant.DB if self._check_db_type(ascend_profiler_output) else Constant.TEXT - self.data_type.add(data_type) rank_id_map[rank_id].append(dir_name) - logger.debug(f"rank_id: {rank_id}, data_type: {data_type}, directory: {dir_name}") - try: - for (rank_id, dir_list) in rank_id_map.items(): - dir_list.sort(key=lambda x: x.split('_')[-3]) - self.data_map[rank_id] = dir_list[0] - except Exception as e: - raise RuntimeError("Found invalid directory name!") from e + self.data_map = self.postprocess_data_map(rank_id_map, Constant.PYTORCH) + if unknown_rank_paths: + logger.warning(f"Failed to get rank_id for some paths." + f"Affected paths: {unknown_rank_paths}\n" + "Expected to get rank_id from profiler_info_{rank_id}.json") return self.data_map - def get_data_type(self): - if len(self.data_type) == 1: - return self.data_type.pop() - return Constant.INVALID + + + + + diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py index ee4cc567e..c0fe0b20b 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/base_recipe_analysis.py @@ -49,8 +49,7 @@ class BaseRecipeAnalysis(ABC): self._recipe_name = params.get(Constant.RECIPE_NAME, "") self._parallel_mode = params.get(Constant.PARALLEL_MODE, "") self._export_type = params.get(Constant.EXPORT_TYPE, "") - self._is_msprof = params.get(Constant.IS_MSPROF) - self._is_mindspore = params.get(Constant.IS_MINDSPORE) + self._prof_type = params.get(Constant.PROFILING_TYPE) self._cluster_analysis_output_path = os.path.join( params.get(Constant.CLUSTER_ANALYSIS_OUTPUT_PATH, self._collection_dir), Constant.CLUSTER_ANALYSIS_OUTPUT) self._output_path = self._cluster_analysis_output_path if self._export_type == "db" else os.path.join( @@ -115,21 +114,23 @@ class BaseRecipeAnalysis(ABC): ) def dump_data(self, data, file_name, table_name=None, index=True, custom_db_path=None): + if not isinstance(data, pd.DataFrame): + logger.error(f"Unknown dump data type: {type(data)}, expected pandas DataFrame") + return + if data.empty: + logger.warning(f"Empty DataFrame. Skip data dump!") + return if table_name: result_db = custom_db_path if custom_db_path else os.path.join(self.output_path, file_name) + logger.info(f"Exporting data to database: {result_db}, table: {table_name}") conn, cursor = DBManager.create_connect_db(result_db) - if isinstance(data, pd.DataFrame): - data.to_sql(table_name, conn, if_exists='replace', index=index) - else: - logger.error(f"Unknown dump data type: {type(data)}") + data.to_sql(table_name, conn, if_exists='replace', index=index) DBManager.destroy_db_connect(conn, cursor) else: result_csv = os.path.join(self.output_path, file_name) - if isinstance(data, pd.DataFrame): - data = convert_unit(data, self.DB_UNIT, self.UNIT) - FileManager.create_csv_from_dataframe(result_csv, data, index=index) - else: - logger.error(f"Unknown dump data type: {type(data)}") + logger.info(f"Exporting data to CSV file: {result_csv}") + data = convert_unit(data, self.DB_UNIT, self.UNIT) + FileManager.create_csv_from_dataframe(result_csv, data, index=index) def create_notebook(self, filename, notebook_template_dir=None, replace_dict=None): if notebook_template_dir is None: @@ -241,10 +242,12 @@ class BaseRecipeAnalysis(ABC): else: logger.warning(f"Profiler DB file not found, rank id: {rank_id}, db path: {profiler_db_path}.") - if os.path.exists(analysis_db_path): - db_path_dict[Constant.ANALYSIS_DB_PATH] = analysis_db_path - else: - logger.warning(f"Analysis DB file not found, rank id: {rank_id}, db path: {analysis_db_path}.") + if self._prof_type != Constant.MSMONITOR: + if os.path.exists(analysis_db_path): + db_path_dict[Constant.ANALYSIS_DB_PATH] = analysis_db_path + else: + logger.warning(f"Analysis DB file not found, rank id: {rank_id}, db path: {analysis_db_path}.") + if db_path_dict.get(Constant.PROFILER_DB_PATH): db_paths.append(db_path_dict) if invalid_rank_id: @@ -252,19 +255,26 @@ class BaseRecipeAnalysis(ABC): return db_paths def _get_profiler_db_path(self, rank_id, data_path): - if self._is_msprof: + if self._prof_type == Constant.MSPROF: db_path = MsprofDataPreprocessor.get_msprof_profiler_db_path(data_path) return db_path if db_path else os.path.join(data_path, "msprof_xx.db") - if self._is_mindspore: + if self._prof_type == Constant.MINDSPORE: return os.path.join(data_path, Constant.SINGLE_OUTPUT, f"ascend_mindspore_profiler_{rank_id}.db") - return os.path.join(data_path, Constant.SINGLE_OUTPUT, f"ascend_pytorch_profiler_{rank_id}.db") + if self._prof_type == Constant.PYTORCH: + return os.path.join(data_path, Constant.SINGLE_OUTPUT, f"ascend_pytorch_profiler_{rank_id}.db") + if self._prof_type == Constant.MSMONITOR: + return data_path + return "" def _get_analysis_db_path(self, data_path): - if self._is_msprof: + if self._prof_type == Constant.MSPROF: return os.path.join(data_path, Constant.ANALYZE_DIR, "communication_analyzer.db") - if self._is_mindspore: + if self._prof_type == Constant.MINDSPORE: return os.path.join(data_path, Constant.SINGLE_OUTPUT, "communication_analyzer.db") - return os.path.join(data_path, Constant.SINGLE_OUTPUT, "analysis.db") + if self._prof_type == Constant.PYTORCH: + return os.path.join(data_path, Constant.SINGLE_OUTPUT, "analysis.db") + return "" + def _get_step_range(self, db_path): step_range = {} diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/communication_matrix_sum/communication_matrix_sum.py b/profiler/msprof_analyze/cluster_analyse/recipes/communication_matrix_sum/communication_matrix_sum.py index 8b91626fe..82a4f87e9 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/communication_matrix_sum/communication_matrix_sum.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/communication_matrix_sum/communication_matrix_sum.py @@ -201,7 +201,7 @@ class CommMatrixSum(BaseRecipeAnalysis): data_service = DatabaseService(analysis_db_path, {}) data_service.add_table_for_query(TableConstant.TABLE_COMM_ANALYZER_MATRIX) matrix_data = data_service.query_data().get(TableConstant.TABLE_COMM_ANALYZER_MATRIX) - if self._is_msprof or self._is_mindspore: + if self._prof_type in [Constant.MSPROF, Constant.MINDSPORE]: matrix_data = self._trans_msprof_matrix_data(matrix_data) result_data[self.MATRIX_DATA] = matrix_data return result_data diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/freq_analysis/freq_analysis.py b/profiler/msprof_analyze/cluster_analyse/recipes/freq_analysis/freq_analysis.py index 985288ca4..f729512e7 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/freq_analysis/freq_analysis.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/freq_analysis/freq_analysis.py @@ -40,7 +40,7 @@ class FreqAnalysis(BaseRecipeAnalysis): return os.path.basename(os.path.dirname(__file__)) def reducer_func(self, mapper_res): - if self._is_msprof: + if self._prof_type == Constant.MSPROF: logger.warning("Freq analysis do not support msprof db now.") return mapper_res = list(filter(lambda res: res[0] is not None, mapper_res)) diff --git a/profiler/msprof_analyze/cluster_analyse/recipes/slow_rank/slow_rank.py b/profiler/msprof_analyze/cluster_analyse/recipes/slow_rank/slow_rank.py index 618385839..06643df24 100644 --- a/profiler/msprof_analyze/cluster_analyse/recipes/slow_rank/slow_rank.py +++ b/profiler/msprof_analyze/cluster_analyse/recipes/slow_rank/slow_rank.py @@ -103,7 +103,7 @@ class SlowRankAnalysis(BaseRecipeAnalysis): return concated_df def run(self, context): - if self._is_msprof: + if self._prof_type == Constant.MSPROF: logger.warning("Slow rank analysis do not support msprof db now.") return diff --git a/profiler/msprof_analyze/compare_tools/compare_backend/utils/args_manager.py b/profiler/msprof_analyze/compare_tools/compare_backend/utils/args_manager.py index 84e853ef3..5ca442a14 100644 --- a/profiler/msprof_analyze/compare_tools/compare_backend/utils/args_manager.py +++ b/profiler/msprof_analyze/compare_tools/compare_backend/utils/args_manager.py @@ -124,16 +124,25 @@ class ArgsManager: @classmethod def parse_profiling_path(cls, file_path: str): PathManager.input_path_common_check(file_path) + # 处理输入为单个文件的情况 if os.path.isfile(file_path): (split_file_path, split_file_name) = os.path.split(file_path) (shot_name, extension) = os.path.splitext(split_file_name) - if extension != ".json": + if extension == ".json": + json_type = FileManager.check_json_type(file_path) + return { + Constant.PROFILING_TYPE: json_type, Constant.PROFILING_PATH: file_path, + Constant.TRACE_PATH: file_path + } + elif extension == ".db": + if shot_name.startswith(("ascend_pytorch_profiler", "ascend_mindspore_profiler", "msmonitor")): + return { + Constant.PROFILING_TYPE: Constant.NPU, Constant.PROFILING_PATH: file_path, + Constant.PROFILER_DB_PATH: file_path + } + else: msg = f"Invalid profiling path suffix: {file_path}" raise RuntimeError(msg) - json_type = FileManager.check_json_type(file_path) - return { - Constant.PROFILING_TYPE: json_type, Constant.PROFILING_PATH: file_path, Constant.TRACE_PATH: file_path - } path_dict = {} sub_dirs = os.listdir(file_path) @@ -146,8 +155,8 @@ class ArgsManager: profiler_output = ascend_output if os.path.isdir(ascend_output) else file_path sub_dirs = os.listdir(profiler_output) for sub_dir in sub_dirs: - if sub_dir.startswith(("ascend_pytorch_profiler", "ascend_mindspore_profiler")) and sub_dir.endswith( - ".db"): + if (sub_dir.startswith(("ascend_pytorch_profiler", "ascend_mindspore_profiler", "msmonitor")) + and sub_dir.endswith(".db")): db_path = os.path.join(profiler_output, sub_dir) path_dict.update({Constant.PROFILING_TYPE: Constant.NPU, Constant.PROFILING_PATH: file_path, Constant.PROFILER_DB_PATH: db_path, Constant.ASCEND_OUTPUT_PATH: profiler_output}) diff --git a/profiler/msprof_analyze/prof_common/constant.py b/profiler/msprof_analyze/prof_common/constant.py index df2215529..07fe6579e 100644 --- a/profiler/msprof_analyze/prof_common/constant.py +++ b/profiler/msprof_analyze/prof_common/constant.py @@ -409,6 +409,8 @@ class Constant(object): MINDSPORE_VERSION = "mindspore_version" PYTORCH = "pytorch" MINDSPORE = "mindspore" + MSPROF = "msprof" + MSMONITOR = "msmonitor" # node type MODULE_TYPE = 0 diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_data_preprocessor.py b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_data_preprocessor.py new file mode 100644 index 000000000..7bbf493cd --- /dev/null +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_data_preprocessor.py @@ -0,0 +1,90 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import shutil +import unittest +from unittest.mock import patch + +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.cluster_analyse.cluster_data_preprocess.data_preprocessor import DataPreprocessor + + +class TestDataPreprocessor(unittest.TestCase): + + def test_postprocess_data_map_when_legal_pytorch_map_then_return_valid_data_map(self): + """输入pytorch_data_map中有rank对应多个路径,返回最新的路径""" + data_map = { + 0: ["./cluster_data/ubuntu_00000_202509010000_ascend_pt", + "./cluster_data/ubuntu_11111_202508310000_ascend_pt"], + 1: ["./cluster_data/ubuntu_00001_202509010000_ascend_pt", + "./cluster_data/ubuntu_11112_202508310000_ascend_pt"] + } + res = DataPreprocessor.postprocess_data_map(data_map, Constant.PYTORCH) + self.assertEqual(len(res), 2) + self.assertEqual(res.get(0), "./cluster_data/ubuntu_00000_202509010000_ascend_pt") + self.assertEqual(res.get(1), "./cluster_data/ubuntu_00001_202509010000_ascend_pt") + + @patch('msprof_analyze.cluster_analyse.cluster_data_preprocess.data_preprocessor.logger') + def test_postprocess_data_map_when_part_legal_msprof_map_then_valid_data_map_and_logger_warning(self, mock_logger): + """输入msprof_data_map中有个别rank文件名不符合要求,返回剩余部分并有logger warning""" + data_map = { + 0: ["./cluster_data/PROF_00000_202509010000_aaaaa", + "./cluster_data/PROF_device_0"], + 1: ["./cluster_data/PROF_00001_202509010001_bbbbb", + "./cluster_data/PROF_00002_202508310000_ccccc"] + } + res = DataPreprocessor.postprocess_data_map(data_map, Constant.MSPROF) + self.assertEqual(len(res), 1) + self.assertEqual(res.get(1), "./cluster_data/PROF_00001_202509010001_bbbbb") + + expected_message = ( + 'Failed to process multiple profiling paths for some ranks. ' + 'Affected rank_id: [0]. ' + 'Expected path formats: PROF_{number}_{timestamp}_{string}' + ) + mock_logger.warning.assert_called_once_with(expected_message) + + def test_postprocess_data_map_when_illegal_msmonitor_map_then_return_empty(self): + """输入msmonitor_data_map中所有rank文件名不符合要求,返回空字典""" + data_map = { + 0: ["./cluster_data/msmonitor_new_1.db", + "./cluster_data/msmonitor_old_1.db"] + } + res = DataPreprocessor.postprocess_data_map(data_map, Constant.MSMONITOR) + self.assertEqual(res, {}) + + def test_postprocess_data_map_when_illegal_prof_type_then_return_empty(self): + """输入不合法的prof_type,返回空字典""" + data_map = { + 0: ["./cluster_data/ubuntu_00000_202509010000_ascend_pt", + "./cluster_data/ubuntu_11111_202508310000_ascend_pt"], + 1: ["./cluster_data/ubuntu_00001_202509010000_ascend_pt", + "./cluster_data/ubuntu_11112_202508310000_ascend_pt"] + } + res = DataPreprocessor.postprocess_data_map(data_map, Constant.UNKNOWN) + self.assertEqual(res, {}) + + def test_postprocess_mindspore_data_map_when_only_one_prof_path_then_return_valid_data_map(self): + """输入mindspore_data_map中rank路径一一对应,返回有效的data_map""" + data_map = { + 0: ["./cluster_data/ubuntu_00000_202509010000_ascend_ms"], + 1: ["./cluster_data/ubuntu_00001_202509010000_ascend_ms"] + } + res = DataPreprocessor.postprocess_data_map(data_map, Constant.PYTORCH) + self.assertEqual(len(res), 2) + self.assertEqual(res.get(0), "./cluster_data/ubuntu_00000_202509010000_ascend_ms") + self.assertEqual(res.get(1), "./cluster_data/ubuntu_00001_202509010000_ascend_ms") \ No newline at end of file diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_prof_data_allocate.py b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_prof_data_allocate.py new file mode 100644 index 000000000..aa749b856 --- /dev/null +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_prof_data_allocate.py @@ -0,0 +1,352 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import unittest +from unittest.mock import patch, Mock +from collections import defaultdict + +from msprof_analyze.prof_common.constant import Constant +from msprof_analyze.cluster_analyse.cluster_data_preprocess.prof_data_allocate import ProfDataAllocate + + +NAMESPACE = "msprof_analyze.cluster_analyse.cluster_data_preprocess.prof_data_allocate." + + +class TestProfDataAllocate(unittest.TestCase): + """Test cases for ProfDataAllocate class""" + + TEST_DIR = os.path.join(os.path.dirname(__file__), 'TEST_PROF_DATA_ALLOCATE') + + def setUp(self): + """Set up test environment""" + if os.path.exists(self.TEST_DIR): + shutil.rmtree(self.TEST_DIR) + os.makedirs(self.TEST_DIR) + + def tearDown(self): + """Clean up test environment""" + if os.path.exists(self.TEST_DIR): + shutil.rmtree(self.TEST_DIR) + + def test_init_when_given_profiling_path_then_initialize_correctly(self): + """Test initialization with profiling path""" + profiling_path = "/test/path" + allocator = ProfDataAllocate(profiling_path) + + self.assertEqual(allocator.profiling_path, profiling_path) + self.assertEqual(allocator.data_type, "") + self.assertEqual(allocator.data_map, {}) + self.assertEqual(allocator.prof_type, "") + self.assertEqual(allocator._msmonitor_data_map, {}) + + def test_match_file_pattern_in_dir_when_file_exists_then_return_filename(self): + """Test matching file pattern when file exists""" + test_dir = os.path.join(self.TEST_DIR, "test_dir") + os.makedirs(test_dir) + + # Create test file + test_file = os.path.join(test_dir, "ascend_pytorch_profiler_1.db") + with open(test_file, 'w') as f: + f.write("test") + + pattern = ProfDataAllocate.DB_PATTERNS[Constant.PYTORCH] + result = ProfDataAllocate.match_file_pattern_in_dir(test_dir, pattern) + + self.assertEqual(result, "ascend_pytorch_profiler_1.db") + + def test_match_file_pattern_in_dir_when_file_not_exists_then_return_empty_string(self): + """Test matching file pattern when file doesn't exist""" + test_dir = os.path.join(self.TEST_DIR, "test_dir") + os.makedirs(test_dir) + + pattern = ProfDataAllocate.DB_PATTERNS[Constant.PYTORCH] + result = ProfDataAllocate.match_file_pattern_in_dir(test_dir, pattern) + + self.assertEqual(result, "") + + def test_extract_rank_id_from_profiler_db_when_pytorch_file_then_return_rank_id(self): + """Test extracting rank ID from PyTorch profiler DB filename""" + file_name = "ascend_pytorch_profiler_1.db" + prof_type = Constant.PYTORCH + + result = ProfDataAllocate._extract_rank_id_from_profiler_db(file_name, prof_type) + + self.assertEqual(result, 1) + + def test_extract_rank_id_from_profiler_db_when_mindspore_file_then_return_rank_id(self): + """Test extracting rank ID from MindSpore profiler DB filename""" + file_name = "ascend_mindspore_profiler_2.db" + prof_type = Constant.MINDSPORE + + result = ProfDataAllocate._extract_rank_id_from_profiler_db(file_name, prof_type) + + self.assertEqual(result, 2) + + def test_extract_rank_id_from_profiler_db_when_msmonitor_file_then_return_rank_id(self): + """Test extracting rank ID from MSMonitor profiler DB filename""" + file_name = "msmonitor_1234567_20250101120000000_1.db" + prof_type = Constant.MSMONITOR + + result = ProfDataAllocate._extract_rank_id_from_profiler_db(file_name, prof_type) + + self.assertEqual(result, 1) + + def test_extract_rank_id_from_profiler_db_when_invalid_format_then_return_none(self): + """Test extracting rank ID from invalid filename format""" + file_name = "invalid_filename.db" + prof_type = Constant.PYTORCH + + result = ProfDataAllocate._extract_rank_id_from_profiler_db(file_name, prof_type) + + self.assertIsNone(result) + + def test_extract_rank_id_from_profiler_db_when_unsupported_prof_type_then_return_none(self): + """Test extracting rank ID from unsupported profiler type""" + file_name = "test.db" + prof_type = "unsupported" + + result = ProfDataAllocate._extract_rank_id_from_profiler_db(file_name, prof_type) + + self.assertIsNone(result) + + @patch(NAMESPACE + 'ProfDataAllocate.allocate_db_prof_data') + @patch(NAMESPACE + 'ProfDataAllocate.allocate_text_prof_data') + def test_allocate_prof_data_when_db_allocation_succeeds_then_return_true(self, mock_text_alloc, mock_db_alloc): + """Test prof data allocation when DB allocation succeeds""" + allocator = ProfDataAllocate(self.TEST_DIR) + mock_db_alloc.return_value = True + allocator.prof_type = Constant.PYTORCH + + result = allocator.allocate_prof_data() + + self.assertTrue(result) + mock_db_alloc.assert_called_once() + mock_text_alloc.assert_not_called() + + @patch(NAMESPACE + 'ProfDataAllocate.allocate_db_prof_data') + @patch(NAMESPACE + 'ProfDataAllocate.allocate_text_prof_data') + def test_allocate_prof_data_when_db_fails_text_succeeds_then_return_true(self, mock_text_alloc, mock_db_alloc): + """Test prof data allocation when DB fails but text allocation succeeds""" + allocator = ProfDataAllocate(self.TEST_DIR) + mock_db_alloc.return_value = False + mock_text_alloc.return_value = True + + result = allocator.allocate_prof_data() + + self.assertTrue(result) + mock_db_alloc.assert_called_once() + mock_text_alloc.assert_called_once() + + @patch(NAMESPACE + 'ProfDataAllocate.allocate_db_prof_data') + @patch(NAMESPACE + 'ProfDataAllocate.allocate_text_prof_data') + def test_allocate_prof_data_when_both_fail_then_return_false(self, mock_text_alloc, mock_db_alloc): + """Test prof data allocation when both DB and text allocation fail""" + allocator = ProfDataAllocate(self.TEST_DIR) + mock_db_alloc.return_value = False + mock_text_alloc.return_value = False + allocator._msmonitor_data_map = {} + + result = allocator.allocate_prof_data() + + self.assertFalse(result) + mock_db_alloc.assert_called_once() + mock_text_alloc.assert_called_once() + + @patch(NAMESPACE + 'ProfDataAllocate.allocate_db_prof_data') + @patch(NAMESPACE + 'ProfDataAllocate.allocate_text_prof_data') + def test_allocate_prof_data_when_msmonitor_data_exists_then_return_true(self, mock_text_alloc, mock_db_alloc): + """Test prof data allocation when MSMonitor data exists""" + allocator = ProfDataAllocate(self.TEST_DIR) + mock_db_alloc.return_value = False + mock_text_alloc.return_value = False + allocator._msmonitor_data_map = {1: ["/path/to/file.db"]} + + result = allocator.allocate_prof_data() + + self.assertTrue(result) + self.assertEqual(allocator.prof_type, Constant.MSMONITOR) + self.assertEqual(allocator.data_type, Constant.DB) + self.assertEqual(allocator.data_map, {1: ["/path/to/file.db"]}) + + @patch('msprof_analyze.prof_common.path_manager.PathManager.limited_depth_walk') + @patch(NAMESPACE + 'ProfDataAllocate.match_file_pattern_in_dir') + def test_allocate_db_prof_data_when_pytorch_data_exists_then_return_true(self, mock_match_file, mock_walk): + """Test DB prof data allocation when PyTorch data exists""" + # Mock directory structure + mock_walk.return_value = [ + (self.TEST_DIR, ["ASCEND_PROFILER_OUTPUT"], []), + (os.path.join(self.TEST_DIR, "ASCEND_PROFILER_OUTPUT"), [], ["ascend_pytorch_profiler_1.db"]) + ] + mock_match_file.side_effect = ["ascend_pytorch_profiler_1.db", ""] + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_db_prof_data() + + self.assertTrue(result) + self.assertEqual(allocator.prof_type, Constant.PYTORCH) + self.assertEqual(allocator.data_type, Constant.DB) + + @patch('msprof_analyze.prof_common.path_manager.PathManager.limited_depth_walk') + @patch(NAMESPACE + 'ProfDataAllocate.match_file_pattern_in_dir') + def test_allocate_db_prof_data_when_msmonitor_data_exists_then_return_true(self, mock_match_file, mock_walk): + """Test DB prof data allocation when MSMonitor data exists""" + # Mock directory structure + mock_walk.return_value = [ + (self.TEST_DIR, [], ["msmonitor_1234567_20250101120000000_1.db"]) + ] + mock_match_file.return_value = "msmonitor_1234567_20250101120000000_1.db" + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_db_prof_data() + + self.assertTrue(result) + self.assertIn(1, allocator._msmonitor_data_map) + + @patch('msprof_analyze.prof_common.path_manager.PathManager.limited_depth_walk') + @patch(NAMESPACE + 'ProfDataAllocate.match_file_pattern_in_dir') + def test_allocate_db_prof_data_when_both_pytorch_and_mindspore_then_return_false(self, mock_match_file, mock_walk): + """Test DB prof data allocation when both PyTorch and MindSpore data exist""" + # Mock directory structure with both types + mock_walk.return_value = [ + (self.TEST_DIR, ["ASCEND_PROFILER_OUTPUT"], []), + (os.path.join(self.TEST_DIR, "ASCEND_PROFILER_OUTPUT"), [], ["ascend_pytorch_profiler_1.db", + "ascend_mindspore_profiler_2.db"]) + ] + mock_match_file.side_effect = ["ascend_pytorch_profiler_1.db", "ascend_mindspore_profiler_2.db"] + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_db_prof_data() + + self.assertFalse(result) + self.assertEqual(allocator.prof_type, Constant.INVALID) + + @patch('msprof_analyze.prof_common.path_manager.PathManager.limited_depth_walk') + def test_allocate_db_prof_data_when_no_data_exists_then_return_false(self, mock_walk): + """Test DB prof data allocation when no data exists""" + mock_walk.return_value = [(self.TEST_DIR, [], [])] + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_db_prof_data() + + self.assertFalse(result) + + def test_allocate_text_prof_data_when_pytorch_text_data_exists_then_return_true(self): + """Test text prof data allocation when PyTorch text data exists""" + pytorch_dir = os.path.join(self.TEST_DIR, "test_ascend_pt") + os.makedirs(pytorch_dir) + os.makedirs(os.path.join(pytorch_dir, Constant.ASCEND_PROFILER_OUTPUT)) + profiler_info = os.path.join(pytorch_dir, "profiler_info_1.json") + with open(profiler_info, 'w') as f: + f.write("profiler_info") + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_text_prof_data() + + self.assertTrue(result) + self.assertEqual(allocator.prof_type, Constant.PYTORCH) + self.assertEqual(allocator.data_type, Constant.TEXT) + + def test_allocate_text_prof_data_when_both_pytorch_and_mindspore_text_exist_then_return_false(self,): + """Test text prof data allocation when both PyTorch and MindSpore text data exist""" + pytorch_dir = os.path.join(self.TEST_DIR, "test_ascend_pt") + mindspore_dir = os.path.join(self.TEST_DIR, "test_ascend_ms") + dir_list = [pytorch_dir, os.path.join(pytorch_dir, Constant.ASCEND_PROFILER_OUTPUT), + mindspore_dir, os.path.join(mindspore_dir, Constant.ASCEND_PROFILER_OUTPUT)] + for path in dir_list: + os.makedirs(path) + profiler_info_list = [os.path.join(pytorch_dir, "profiler_info_1.json"), + os.path.join(mindspore_dir, "profiler_info_0.json")] + for profiler_info in profiler_info_list: + with open(profiler_info, 'w') as f: + f.write("profiler_info") + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_text_prof_data() + + self.assertFalse(result) + self.assertEqual(allocator.prof_type, Constant.INVALID) + + @patch('msprof_analyze.prof_common.path_manager.PathManager.limited_depth_walk') + @patch(NAMESPACE + 'MsprofDataPreprocessor') + def test_allocate_text_prof_data_when_msprof_data_exists_then_return_true(self, mock_msprof, mock_walk): + """Test text prof data allocation when MSPROF data exists""" + # Mock directory structure + mock_walk.return_value = [ + (self.TEST_DIR, ["PROF_001_20250101_test"], []) + ] + + # Mock MSPROF processor + mock_msprof_processor = Mock() + mock_msprof_processor.get_data_map.return_value = {1: self.TEST_DIR} + mock_msprof_processor.get_data_type.return_value = Constant.DB + mock_msprof.return_value = mock_msprof_processor + + allocator = ProfDataAllocate(self.TEST_DIR) + result = allocator.allocate_text_prof_data() + + self.assertTrue(result) + self.assertEqual(allocator.prof_type, Constant.MSPROF) + self.assertEqual(allocator.data_type, Constant.DB) + + def test_allocate_text_prof_data_when_invalid_prof_type_then_return_false(self): + """Test text prof data allocation when prof type is invalid""" + allocator = ProfDataAllocate(self.TEST_DIR) + allocator.prof_type = Constant.INVALID + + result = allocator.allocate_text_prof_data() + + self.assertFalse(result) + + def test_set_prof_data_when_given_valid_data_then_set_correctly(self): + """Test setting prof data with valid input""" + allocator = ProfDataAllocate(self.TEST_DIR) + prof_type = Constant.PYTORCH + data_type = Constant.DB + data_map = {1: ["/path/to/data"]} + + allocator._set_prof_data(prof_type, data_type, data_map) + + self.assertEqual(allocator.prof_type, prof_type) + self.assertEqual(allocator.data_type, data_type) + self.assertEqual(allocator.data_map, data_map) + + @patch(NAMESPACE + 'logger') + def test_set_prof_data_when_msmonitor_data_exists_then_log_warning(self, mock_logger): + """Test setting prof data when MSMonitor data already exists""" + allocator = ProfDataAllocate(self.TEST_DIR) + allocator._msmonitor_data_map = {1: ["/path/to/msmonitor.db"]} + allocator._set_prof_data(Constant.PYTORCH, Constant.DB, {1: ["/path/to/data"]}) + mock_logger.warning.assert_called_once() + self.assertEqual(allocator.prof_type, Constant.PYTORCH) + self.assertEqual(allocator.data_type, Constant.DB) + + def test_set_prof_data_when_msmonitor_prof_type_then_set_msmonitor_data(self): + """Test setting prof data when prof type is MSMonitor""" + allocator = ProfDataAllocate(self.TEST_DIR) + prof_type = Constant.MSMONITOR + data_type = Constant.DB + data_map = {1: ["/path/to/msmonitor.db"]} + + allocator._set_prof_data(prof_type, data_type, data_map) + + self.assertEqual(allocator.prof_type, prof_type) + self.assertEqual(allocator.data_type, data_type) + self.assertEqual(allocator.data_map, data_map) + + +if __name__ == '__main__': + unittest.main() diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_pytorch_data_preprocessor.py b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_pytorch_data_preprocessor.py index 176d1c85d..a3d58920d 100644 --- a/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_pytorch_data_preprocessor.py +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/cluster_data_preprocess/test_pytorch_data_preprocessor.py @@ -49,7 +49,6 @@ class TestPytorchDataPreprocessor(unittest.TestCase): self.dirs = [os.path.join(self.DIR_PATH, filename) for filename in os.listdir(self.DIR_PATH)] - def tearDown(self) -> None: shutil.rmtree(self.DIR_PATH) @@ -75,5 +74,5 @@ class TestPytorchDataPreprocessor(unittest.TestCase): self.assertEqual(len(ret), 2) self.assertIn(1, ret.keys()) self.assertIn(2, ret.keys()) - self.assertIn(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt'), ret.values()) - self.assertIn(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt'), ret.values()) + self.assertIn(os.path.join(self.DIR_PATH, 'worker1_11111112_ascend_pt'), ret.values()) + self.assertIn(os.path.join(self.DIR_PATH, 'worker2_11111113_ascend_pt'), ret.values()) diff --git a/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_base_recipe_analysis.py b/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_base_recipe_analysis.py index e31db347d..bd19d49ff 100644 --- a/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_base_recipe_analysis.py +++ b/profiler/msprof_analyze/test/ut/cluster_analyse/recipes/test_base_recipe_analysis.py @@ -34,6 +34,7 @@ class TestBaseRecipeAnalysis(unittest.TestCase): Constant.RECIPE_NAME: 'test_recipe', Constant.PARALLEL_MODE: 'parallel', Constant.EXPORT_TYPE: 'csv', + Constant.PROFILING_TYPE: Constant.PYTORCH, Constant.IS_MSPROF: False, Constant.IS_MINDSPORE: False, Constant.CLUSTER_ANALYSIS_OUTPUT_PATH: '/tmp/to/output', @@ -215,7 +216,7 @@ class TestBaseRecipeAnalysis(unittest.TestCase): self.assertEqual(result, os.path.join('test_path', Constant.SINGLE_OUTPUT, 'ascend_pytorch_profiler_0.db')) # 测试 MindSpore 情况 - self.analysis._is_mindspore = True + self.analysis._prof_type = Constant.MINDSPORE result = self.analysis._get_profiler_db_path(0, 'test_path') self.assertEqual(result, os.path.join('test_path', Constant.SINGLE_OUTPUT, 'ascend_mindspore_profiler_0.db')) @@ -226,7 +227,7 @@ class TestBaseRecipeAnalysis(unittest.TestCase): self.assertEqual(result, os.path.join('test_path', Constant.SINGLE_OUTPUT, 'analysis.db')) # 测试 MindSpore 情况 - self.analysis._is_mindspore = True + self.analysis._prof_type = Constant.MINDSPORE result = self.analysis._get_analysis_db_path('test_path') self.assertEqual(result, os.path.join('test_path', Constant.SINGLE_OUTPUT, 'communication_analyzer.db')) -- Gitee