diff --git a/docs/api/api_python/core/mindformers.core.ProfileMonitor.rst b/docs/api/api_python/core/mindformers.core.ProfileMonitor.rst index 63d66540105832ce101723fa5da5774a29b1d9fd..e644ea7de7e2cc74f3a276c73d933c0808f7bf53 100644 --- a/docs/api/api_python/core/mindformers.core.ProfileMonitor.rst +++ b/docs/api/api_python/core/mindformers.core.ProfileMonitor.rst @@ -1,7 +1,7 @@ mindformers.core.ProfileMonitor =============================== -.. py:class:: mindformers.core.ProfileMonitor(start_step=1, stop_step=10, output_path=None, start_profile=True, profile_communication=False, profile_memory=True, config=None, **kwargs) +.. py:class:: mindformers.core.ProfileMonitor(start_step=1, stop_step=10, output_path=None, start_profile=True, profile_rank_ids=None, profile_pipeline = False, profile_communication=False, profile_memory=False, profiler_level=0, with_stack=False, data_simplification=True, config=None, **kwargs) 监控训练过程的性能分析回调函数。 @@ -11,5 +11,13 @@ mindformers.core.ProfileMonitor - **output_path** (str) - 保存profiling生成文件的文件夹路径。默认值: ``None`` 。 - **start_profile** (str) - 是否打开profiling功能。默认值: ``True`` 。 - **profile_communication** (str) - 在分布式训练期间是否收集通信性能数据。默认值: ``False`` 。 - - **profile_memory** (str) - 是否收集张量的内存数据。默认值: ``True`` 。 + - **profile_memory** (str) - 是否收集张量的内存数据。默认值: ``False`` 。 + - **profile_rank_ids** (list) - 指定rank ids开启profiling。默认值: ``None``,即该配置不生效,所有rank id均开启profiling。 + - **profile_pipeline** (str) - 是否按流水线并行每个stage的其中一张卡开启profiling。默认值: ``False`` 。 + - **profiler_level** (int) - 采集profiling数据的级别(0, 1, 2)。默认值: ``0`` 。 + - ``0`` - 最精简的采集性能数据级别,只采集计算类算子耗时数据和通信类大算子基础数据。 + - ``1`` - 在level0基础上,额外采集CANN层AscendCL数据、AICORE性能数据以及通信类小算子数据。 + - ``2`` - 在level1基础上,额外采集CANN层中图编译等级为O2和Runtime数据。 + - **with_stack** (str) - 是否收集Python侧的调用栈数据。默认值: ``False`` 。 + - **data_simplification** (str) - 是否开启数据精简,开启后将在导出profiling数据后删除FRAMEWORK目录以及其他多余数据。默认值: ``True`` 。 - **config** (dict) - 配置项,用于对相关配置信息进行profiling,比如并行配置。默认值: ``None`` 。 \ No newline at end of file diff --git a/mindformers/core/callback/callback.py b/mindformers/core/callback/callback.py index 66dc9a36c48eed3145dcb32e38fbd9548cd5e57d..d58d515fe4920cd4c21ba90a60399dea66d13521 100644 --- a/mindformers/core/callback/callback.py +++ b/mindformers/core/callback/callback.py @@ -32,12 +32,13 @@ from mindspore.train.serialization import _get_merged_param_data from mindspore.nn.cell import Cell from mindspore.ops.operations.comm_ops import Broadcast from mindspore.common import jit +from mindspore.profiler import ProfilerLevel from mindformers.tools.register import MindFormerRegister, MindFormerModuleType from mindformers.tools.cloud_adapter.cloud_adapter import Local2ObsMonitor from mindformers.tools.logger import logger from mindformers.tools.utils import get_output_root_path, get_output_subpath, get_remote_save_url, check_in_modelarts,\ - get_real_rank, get_real_group_size + get_real_rank, get_real_group_size, get_pipeline_rank_ids __all__ = ['ObsMonitor', 'MFLossMonitor', 'CheckpointMonitor', 'SummaryMonitor', 'ProfileMonitor', 'EvalCallBack'] @@ -722,46 +723,75 @@ class ProfileMonitor(Callback): stop_step (int): The step to stop profiling. Default: 10. output_path (str): The result of profiling will be saved in this path. Default: None. start_profile (str): Whether to enable profiling. Default: True. + profile_rank_ids (list): Specify rank ids to enable profiling. Default: None(All rank ids are enabled). + profile_pipeline (str): Whether to enable profiling on one card of each parallel stage. Default: False. profile_communication (str): Whether to collect communication performance data during multi-device training. Default: False. - profile_memory (str): Whether to collect Tensor memory data. Default: True. + profile_memory (str): Whether to collect Tensor memory data. Default: False. config (dict): Configuration items, used to profile relevant configuration information, such as parallel configuration. Default: None. + profiler_level (int): Collection level of profiling data(0, 1, 2). Default: 0. + + - 0: The most streamlined level of performance data collection, + only collecting execution time data for computational operators and + basic data for large communication operators. + - 1: In addition to level 0, extra data is collected for CANN layer AscendCL, + AICORE performance data, and small communication operators. + - 2: In addition to level 1, extra data is collected for graph compile level O2 + and Runtime in the CANN layer. + + with_stack (str): Whether to collect Python-side stack trace data. Default: False. + data_simplification (str): Whether to enable data simplification, which will delete the FRAMEWORK directory + and other extraneous data after exporting profiling data. Default: True. Examples: >>> from mindformers.core import ProfileMonitor >>> monitor = ProfileMonitor(output_path='./profile_dir') """ - def __init__(self, start_step=1, stop_step=10, - output_path=None, start_profile=True, - profile_communication=False, profile_memory=True, config=None, **kwargs): + def __init__(self, start_step=1, stop_step=10, output_path=None, + start_profile=True, profile_rank_ids=None, profile_pipeline=False, + profile_communication=False, profile_memory=False, config=None, + profiler_level=0, with_stack=False, data_simplification=True, **kwargs): super(ProfileMonitor, self).__init__() self.start_step = start_step self.stop_step = stop_step self.start_profile = start_profile + self.profile_rank_ids = profile_rank_ids + self.profile_pipeline = profile_pipeline self.profile_communication = profile_communication + self.profiler_level = self._get_profiler_level(profiler_level) + self.profiler = None if profile_communication and not start_profile: raise ValueError("When profile_communication is True, start_profile must also be True") rank_id = get_real_rank() - if not output_path: - output_path = get_output_subpath('profile', rank_id) - else: - output_path = os.path.join(output_path, 'profile', 'rank_{}'.format(rank_id)) - logger.info("Profile save path: %s", output_path) + self.pipeline_rank_ids = get_pipeline_rank_ids() if self.profile_pipeline else None + if self.pipeline_rank_ids == [-1]: + raise ValueError(f"Device num should be divided by pipeline stage num.") - if ms.get_context("device_target") == "GPU" and profile_memory: - logger.warning("The parameter profile_memory is not supported on GPU currently, so is changed to False. ") - profile_memory = False - - self.profiler = Profiler( - start_profile=start_profile, output_path=output_path, - profile_communication=profile_communication, profile_memory=profile_memory, **kwargs) - self._record_metadata(config) - self.run_context = None - self.output_path = output_path + if self._is_profile_required(rank_id): + if not output_path: + output_path = get_output_subpath('profile', rank_id) + else: + output_path = os.path.join(output_path, 'profile', 'rank_{}'.format(rank_id)) + logger.info("Profile save path: %s", output_path) + + if ms.get_context("device_target") == "GPU" and profile_memory: + logger.warning("The parameter profile_memory is not supported on GPU currently, " + "so is changed to False. ") + profile_memory = False + + self.profiler = Profiler( + start_profile=start_profile, output_path=output_path, + profile_communication=profile_communication, profile_memory=profile_memory, + profiler_level=self.profiler_level, with_stack=with_stack, + data_simplification=data_simplification, **kwargs + ) + self._record_metadata(config) + self.run_context = None + self.output_path = output_path def step_begin(self, run_context): """ @@ -772,7 +802,7 @@ class ProfileMonitor(Callback): """ cb_params = run_context.original_args() step_num = cb_params.cur_step_num - if step_num == self.start_step and not self.start_profile: + if step_num == self.start_step and not self.start_profile and self.profiler: self.profiler.start() def step_end(self, run_context): @@ -784,7 +814,7 @@ class ProfileMonitor(Callback): """ cb_params = run_context.original_args() step_num = cb_params.cur_step_num - if step_num == self.stop_step: + if step_num == self.stop_step and self.profiler: self.profiler.stop() self.profiler.analyse() logger.info("End of Profiling, please view the profile data under %s and analyze it using mindinsight." @@ -817,6 +847,42 @@ class ProfileMonitor(Callback): except AttributeError as e: logger.warning("Profiler failed to record distributed args, %s", e) + def _is_profile_required(self, rank_id): + """ + Determine whether current rank id needs to enable profiling. + + Args: + rank_id (int): current rank id. + """ + if not self.profile_rank_ids and not self.pipeline_rank_ids: + return True + + profile_ids = self.profile_rank_ids if isinstance(self.profile_rank_ids, list) else [] + pipeline_ids = self.pipeline_rank_ids if isinstance(self.pipeline_rank_ids, list) else [] + + if rank_id in profile_ids or rank_id in pipeline_ids: + return True + + return False + + @staticmethod + def _get_profiler_level(level): + """ + Obtain profiler level based on the level value with integer type. + + Args: + level (int): the value of profiler_level in MF config. + """ + if level is None: + return ProfilerLevel.Level0 + + max_level = len(ProfilerLevel.__members__) - 1 + if level < 0 or level > max_level: + logger.warning("Invalid profiler_level: %s, return None.", level) + return None + profiler_level = getattr(ProfilerLevel, f"Level{level}") + return profiler_level + @MindFormerRegister.register(MindFormerModuleType.CALLBACK) class EvalCallBack(Callback): diff --git a/mindformers/tools/utils.py b/mindformers/tools/utils.py index f4c383120d500646974566c255f155011841a0b3..81bbc08a5e20641fce5230f84705a24db8ceeb72 100644 --- a/mindformers/tools/utils.py +++ b/mindformers/tools/utils.py @@ -29,10 +29,10 @@ try: except ImportError: fcntl = None +import mindspore as ms from mindspore import Tensor, context from mindspore._checkparam import args_type_check -from mindspore.communication import get_group_size, get_rank -import mindspore.communication.comm_func as comm_func +from mindspore.communication import get_group_size, get_rank, comm_func PARALLEL_MODE = {'DATA_PARALLEL': context.ParallelMode.DATA_PARALLEL, 'SEMI_AUTO_PARALLEL': context.ParallelMode.SEMI_AUTO_PARALLEL, @@ -692,3 +692,22 @@ def barrier_world(action: str = None): logger.info("Now barriered...") comm_func.barrier() + + +def get_pipeline_rank_ids(): + """Calculate rank id of each stage and return a list of first rank id in each stage. + + Returns: + pipeline_rank_ids: a list of pipeline rank ids or + an invalid value(-1) if the configuration of pp is invalid. + """ + device_num = get_real_group_size() + current_stage_num = ms.get_auto_parallel_context('pipeline_stages') + + if device_num % current_stage_num != 0: + return [-1] + + devices_per_stage = device_num // current_stage_num + pipeline_rank_ids = [i * devices_per_stage for i in range(current_stage_num)] + + return pipeline_rank_ids diff --git a/mindformers/trainer/trainer.py b/mindformers/trainer/trainer.py index c0fb5155bfb3bd9f2a3719c952122b36d88035ee..19bbc1b63db7606df25abe3ba18d3d0581c775c1 100644 --- a/mindformers/trainer/trainer.py +++ b/mindformers/trainer/trainer.py @@ -1120,9 +1120,14 @@ class Trainer: start_step=self.config.profile_start_step, stop_step=self.config.profile_stop_step, start_profile=start_profile, + profile_rank_ids=self.config.profile_rank_ids, + profile_pipeline=self.config.profile_pipeline, profile_communication=profile_communication, profile_memory=self.config.profile_memory, output_path=self.config.profile_output, + profiler_level=self.config.profiler_level, + with_stack=self.config.with_stack, + data_simplification=self.config.data_simplification, config=self.config) logger.warning( "Please reduce the data sample size with 'num_samples' in MindSpore data format according to " diff --git a/tests/st/test_ut/test_tools/__init__.py b/tests/st/test_ut/test_tools/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d83e968c55c91675408606d34b560e985d788de9 --- /dev/null +++ b/tests/st/test_ut/test_tools/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""test tools""" diff --git a/tests/st/test_ut/test_tools/test_utils.py b/tests/st/test_ut/test_tools/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..35c01ec61c95bf7090e7816c014dcb3c35db34a1 --- /dev/null +++ b/tests/st/test_ut/test_tools/test_utils.py @@ -0,0 +1,53 @@ +# Copyright 2024 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""test utils""" +from unittest import mock + +import pytest + +from mindformers.tools.utils import get_pipeline_rank_ids + + +class TestGetPipelineRankMethod: + """A test class for testing tools utils method.""" + + @pytest.mark.run(order=1) + @mock.patch('mindformers.tools.utils.get_real_group_size') + @mock.patch('mindformers.tools.utils.ms.get_auto_parallel_context') + def test_get_pipeline_rank_ids_with_valid(self, mock_get_parallal_stage_num, mock_get_real_group_size): + """test get pipeline rank ids in normal condition.""" + mock_get_real_group_size.return_value = 8 + mock_get_parallal_stage_num.return_value = 2 + + test_rank_ids = get_pipeline_rank_ids() + expected_ids = [0, 4] + + assert len(test_rank_ids) == len(expected_ids) + assert test_rank_ids == expected_ids + + @pytest.mark.run(order=2) + @mock.patch('mindformers.tools.utils.get_real_group_size') + @mock.patch('mindformers.tools.utils.ms.get_auto_parallel_context') + def test_get_pipeline_rank_ids_with_invalid(self, mock_get_parallal_stage_num, mock_get_real_group_size): + """test get pipeline rank ids in normal condition.""" + mock_get_real_group_size.return_value = 8 + mock_get_parallal_stage_num.return_value = 3 + + test_rank_ids = get_pipeline_rank_ids() + + expected_ids = [-1] + + assert len(test_rank_ids) == len(expected_ids) + assert test_rank_ids == expected_ids