diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index e8a9f5a2834b9de7e81c98e96f1f54e8399c2f1b..00d14f2f414e99025186c4618dce8521e0e35063 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -1757,7 +1757,7 @@ c10::intrusive_ptr ProcessGroupHCCL::collective( // No need to detect recv. batch_isend_irecv inputs is incorrect, need require special treatments. if (c10_npu::model_state().get_model_mode() == c10_npu::ModelMode::L_TRAIN && c10_npu::option::OptionsManager::GetSilenceCheckFlag() != c10_npu::option::CHECK_CLOSE - && opType != c10d::OpType::RECV && opType != c10d::OpType::UNKNOWN) { + && opType != c10d::OpType::RECV && opType != c10d::OpType::UNKNOWN && opType != c10d::OpType::BROADCAST) { for (const auto i : c10::irange(inputs.size())) { npuGuard.set_index(devices[i].index()); c10_npu::NPUStreamGuard guard(hcclStreams[i]); @@ -2044,6 +2044,20 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( return HCCL_SUCCESS; }, + [&](std::vector& hcclStreams, c10::intrusive_ptr&) { + // Only need detect src rank. + if (c10_npu::model_state().get_model_mode() == c10_npu::ModelMode::L_TRAIN + && c10_npu::option::OptionsManager::GetSilenceCheckFlag() != c10_npu::option::CHECK_CLOSE) { + const std::vector& ranks = groupRanks(); + if (opts.rootRank == ranks[rank_]) { + for (const auto i : c10::irange(tensors.size())) { + c10_npu::NPUStreamGuard guard(hcclStreams[0]); + silenceCheck(tensors[i], c10d::OpType::BROADCAST); + } + } + } + }, + [&](std::vector& hcclStreams, c10::intrusive_ptr&) {}, c10d::OpType::BROADCAST); } diff --git a/torch_npu/csrc/profiler/profiler_mgr.cpp b/torch_npu/csrc/profiler/profiler_mgr.cpp index a6200222979b4316c5b8a3d1d8b60f48b68313e3..477b072ee4f734e7e6988ff4e57f26d9f0c05cc7 100644 --- a/torch_npu/csrc/profiler/profiler_mgr.cpp +++ b/torch_npu/csrc/profiler/profiler_mgr.cpp @@ -74,7 +74,6 @@ void ProfilerMgr::EnableMsProfiler(uint32_t *deviceIdList, uint32_t deviceNum, a void ProfilerMgr::Start(const NpuTraceConfig &npu_config, bool cpu_trace) { - c10_npu::npuSynchronizeDevice(); if (npu_trace_.load() == true) { aclprofAicoreMetrics aic_metrics = ACL_AICORE_NONE; auto level_iter = trace_level_map_.find(npu_config.trace_level); diff --git a/torch_npu/profiler/analysis/_profiling_parser.py b/torch_npu/profiler/analysis/_profiling_parser.py index e4f7b23790fc1a8fdb6a219c5ea1b9f8d3bda0b3..36a57c28bfe020a741999f2646773bf33104d4db 100644 --- a/torch_npu/profiler/analysis/_profiling_parser.py +++ b/torch_npu/profiler/analysis/_profiling_parser.py @@ -68,7 +68,7 @@ class ProfilingParser: return if not ProfilerPathManager.get_cann_path(self._profiler_path): return - if not CannPackageManager.cann_package_support_export_db(): + if not CannPackageManager.SUPPORT_EXPORT_DB: raise RuntimeError("Current CANN package version does not support export db. " "If you want to export db, you can install supported CANN package version.") diff --git a/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py b/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py index 228ae69525dc7ea597079fe01c30e6b774dab469..8086cbb2e0dd9be148610025b457bbfc60ee765a 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py @@ -6,21 +6,23 @@ from ._constant import print_error_msg __all__ = [] +def check_cann_package_support_export_db() -> bool: + err_msg = "Failed to check if current CANN package version support export db!" + try: + msprof_path = shutil.which("msprof") + if not msprof_path: + print_error_msg(f"{err_msg} msprof command not found!") + return False + COMMAND_SUCCESS = 0 + completed_process = subprocess.run([msprof_path, "--help"], capture_output=True, shell=False, text=True) + if completed_process.returncode != COMMAND_SUCCESS: + print_error_msg(f"{err_msg} Failed to run command: msprof --help!") + return False + return "--type" in completed_process.stdout + except Exception: + print_error_msg(err_msg) + return False + + class CannPackageManager: - @classmethod - def cann_package_support_export_db(cls) -> bool: - err_msg = "Failed to check if current CANN package version support export db!" - try: - msprof_path = shutil.which("msprof") - if not msprof_path: - print_error_msg(f"{err_msg} msprof command not found!") - raise RuntimeError(f"{err_msg} msprof command not found!") - - COMMAND_SUCCESS = 0 - completed_process = subprocess.run([msprof_path, "--help"], capture_output=True, shell=False, text=True) - if completed_process.returncode != COMMAND_SUCCESS: - print_error_msg(f"{err_msg} Failed to run command: msprof --help!") - raise RuntimeError(f"{err_msg} Failed to run command: msprof --help!") - return "--type" in completed_process.stdout - except Exception as err: - raise RuntimeError(err_msg) from err + SUPPORT_EXPORT_DB = check_cann_package_support_export_db() diff --git a/torch_npu/profiler/analysis/prof_common_func/_db_manager.py b/torch_npu/profiler/analysis/prof_common_func/_db_manager.py index 4256823fc533bb1a64caf00cc24e2956dedc2965..df9b7416cdc1b727f278c5d5a8324254316e309d 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_db_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_db_manager.py @@ -38,7 +38,7 @@ class DbManager: if os.path.exists(db_path): FileManager.check_db_file_vaild(db_path) try: - conn = sqlite3.connect(db_path, timeout=cls.MAX_TIMEOUT) + conn = sqlite3.connect(db_path, timeout=cls.MAX_TIMEOUT, check_same_thread=False) except sqlite3.Error as err: return EmptyClass("emoty conn"), EmptyClass("empty curs") diff --git a/torch_npu/profiler/analysis/prof_common_func/_id_manager.py b/torch_npu/profiler/analysis/prof_common_func/_id_manager.py index 841816c83489f2afc70fe678b3d21ea911d57db9..7c78cf5d03471f9f37005c80094d1bc5f3d26a48 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_id_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_id_manager.py @@ -31,6 +31,7 @@ class Str2IdManager: return data for k, v in self._str_id_map.items(): data.append([v, k]) + self._str_id_map.clear() return data diff --git a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py index 95cdb9016798f3faf63dd531411c444e6e632e10..c729681ee09e0d32dceef549475b7d47b8ca1d5b 100644 --- a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py +++ b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py @@ -55,19 +55,19 @@ class ParserDepsConfig: Constant.DEPS: [Constant.TREE_BUILD_PARSER]}, Constant.DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.CANN_EXPORT_PARSER]}, - Constant.FWK_API_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, + Constant.FWK_API_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PARSER]}, - Constant.MEMORY_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.DB_PARSER, Constant.MEMORY_PREPARE]}, - Constant.STEP_INFO_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.DB_PARSER, Constant.TREE_BUILD_PARSER]}, - Constant.COMMUNICATION_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, + Constant.MEMORY_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.DB_PARSER, Constant.MEMORY_PREPARE, Constant.FWK_API_DB_PARSER]}, + Constant.STEP_INFO_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.DB_PARSER, Constant.TREE_BUILD_PARSER, Constant.MEMORY_DB_PARSER]}, + Constant.COMMUNICATION_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PARSER, Constant.CANN_ANALYZE_PARSER, Constant.STEP_INFO_DB_PARSER]}, - Constant.TRACE_STEP_TIME_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.DB_PARSER, Constant.STEP_INFO_DB_PARSER]}, - Constant.GC_RECORD_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.DB_PARSER]} + Constant.TRACE_STEP_TIME_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.DB_PARSER, Constant.STEP_INFO_DB_PARSER, Constant.COMMUNICATION_DB_PARSER]}, + Constant.GC_RECORD_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.DB_PARSER, Constant.STEP_INFO_DB_PARSER]} } ONLY_FWK_CONFIG = { @@ -78,10 +78,10 @@ class ParserDepsConfig: Constant.CANN_EXPORT_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, Constant.DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.CANN_EXPORT_PARSER]}, - Constant.FWK_API_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, + Constant.FWK_API_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PARSER]}, - Constant.MEMORY_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.DB_PARSER]}, - Constant.GC_RECORD_DB_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.DB_PARSER]} + Constant.MEMORY_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.DB_PARSER, Constant.FWK_API_DB_PARSER]}, + Constant.GC_RECORD_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.DB_PARSER, Constant.MEMORY_DB_PARSER]} } diff --git a/torch_npu/profiler/profiler_interface.py b/torch_npu/profiler/profiler_interface.py index 68a7a59e7834e69023bc0d75c89bf724be744d7f..ea5144813b1a5efabd5670c2668c86364eb2247d 100644 --- a/torch_npu/profiler/profiler_interface.py +++ b/torch_npu/profiler/profiler_interface.py @@ -148,7 +148,7 @@ class _ProfInterface: print_warn_msg("Experimental config will not be uesd while ProfilerActivity.NPU is not set.") if ProfilerActivity.NPU in self.activities and self.experimental_config.export_type == Constant.Db: - if not CannPackageManager.cann_package_support_export_db(): + if not CannPackageManager.SUPPORT_EXPORT_DB: raise RuntimeError("Current cann package does not support export db. " "If you want to export db, you can install supported CANN package version.") diff --git a/torch_npu/utils/_step.py b/torch_npu/utils/_step.py index 960c0afff2703c9885986627f7a6a19326679b6e..06ee394243067c34b7864c9a99e3d2b6493cedfc 100644 --- a/torch_npu/utils/_step.py +++ b/torch_npu/utils/_step.py @@ -47,7 +47,7 @@ perf_dump_enable = False IS_IN_BACKWARD = 0 -def input_hook(idx, asd_flag): +def input_hook(idx, asd_flag, first_module_id, first_module_name, module_name): def hook(grad): global IS_IN_BACKWARD @@ -57,17 +57,29 @@ def input_hook(idx, asd_flag): else: IS_IN_BACKWARD = IS_IN_BACKWARD & 2 # 011 & 010 = 010 + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: input_hook, IS_IN_BACKWARD is {IS_IN_BACKWARD}, first_module_id is {first_module_id}, first_module_name is {first_module_name}, module_name is {module_name}") if not IS_IN_BACKWARD: torch_npu._C._npu_set_call_state("forward") return return hook -def output_hook(grad): - global IS_IN_BACKWARD - IS_IN_BACKWARD = 3 # 011 - torch_npu._C._npu_set_call_state("backward") - return grad +def output_hook(first_module_id, first_module_name, module_name): + def hook1(grad): + global IS_IN_BACKWARD + IS_IN_BACKWARD = 3 # 011 + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: output_hook, IS_IN_BACKWARD is {IS_IN_BACKWARD}, first_module_id is {first_module_id}, first_module_name is {first_module_name}, module_name is {module_name}") + torch_npu._C._npu_set_call_state("backward") + return grad + return hook1 def _is_inner_module(module): @@ -87,20 +99,27 @@ class SilentCheckState: self.input_hook_flag = False self.is_training = False self.first_module_id = "" + self.first_module_name = "" self.first_weight = None self.last_weight = None self.last_tensor = None self.last_tensor_id = None self.first_tensor_id = None - def init_module_info(self, module_id, training): + def init_module_info(self, module_id, training, module_name): self.first_module_id = module_id self.first_forward = False self.is_training = training + self.first_module_name = module_name if self.is_training: torch_npu._C._npu_set_module_train_state("train") else: torch_npu._C._npu_set_module_train_state("infer") + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: init_module_info, first_module_id is {module_id}, is_training is {training}, module_name is {module_name}") def check_tensor_dtype(self, tensor): if not self.dtype_support: @@ -122,20 +141,20 @@ class SilentCheckState: self.first_weight = param break - def register_input_hook_before_call(self, asd_flag, *args): + def register_input_hook_before_call(self, asd_flag, module, *args): # Search the first tensor (if the first tensor is input) if self.is_training and not self.input_hook_flag: for x in args: if isinstance(x, torch.Tensor) and x.requires_grad: - x.register_hook(input_hook(self.first_module_id, asd_flag)) + x.register_hook(input_hook(self.first_module_id, asd_flag, self.first_module_id, self.first_module_name, module._get_name())) self.input_hook_flag = True break - def register_input_hook_after_call(self, output): + def register_input_hook_after_call(self, output, module): # Search the first tensor (if the first tensor is output of an inner module) if not self.input_hook_flag: if isinstance(output, torch.Tensor) and output.requires_grad: - output.register_hook(input_hook(self.first_module_id, asd_enable)) + output.register_hook(input_hook(self.first_module_id, asd_enable, self.first_module_id, self.first_module_name, module._get_name())) self.input_hook_flag = True self.first_tensor_id = id(output) @@ -152,19 +171,19 @@ class SilentCheckState: self.last_tensor_id = id(output) self.last_tensor = output - def init_all_hook(self, asd_flag): + def init_all_hook(self, asd_flag, module): if self.is_training: # Otherwise, there is only one weight in the outer module if self.first_tensor_id != self.last_tensor_id: if self.last_tensor is not None: - self.last_tensor.register_hook(output_hook) + self.last_tensor.register_hook(output_hook(self.first_module_id, self.first_module_name, module._get_name())) if self.last_weight_hook_handles.get(self.first_module_id, None) is None: if self.last_weight is not None: - last_weight_handle = self.last_weight.register_hook(output_hook) + last_weight_handle = self.last_weight.register_hook(output_hook(self.first_module_id, self.first_module_name, module._get_name())) self.last_weight_hook_handles[self.first_module_id] = last_weight_handle if self.weight_hook_handles.get(self.first_module_id, None) is None: if self.first_weight is not None: - first_weight_handle = self.first_weight.register_hook(input_hook("", asd_flag)) + first_weight_handle = self.first_weight.register_hook(input_hook("", asd_flag, self.first_module_id, self.first_module_name, module._get_name())) self.weight_hook_handles[self.first_module_id] = first_weight_handle self.init_marks[self.first_module_id] = True @@ -285,7 +304,7 @@ def _custom_call(self, *args, **kwargs): if asd_enable and not IS_IN_BACKWARD: if silent_check.first_forward: - silent_check.init_module_info(id(self), self.training) + silent_check.init_module_info(id(self), self.training, self._get_name()) self.outer = True if silent_check.is_training and not silent_check.init_marks.get(silent_check.first_module_id, False): @@ -301,7 +320,7 @@ def _custom_call(self, *args, **kwargs): warnings.warn(f"Warning: Module has unsupported dtype tensor, silent check will be closed.") # Search the first tensor (if the first tensor is input) - silent_check.register_input_hook_before_call(asd_enable, *args) + silent_check.register_input_hook_before_call(asd_enable, self, *args) tmp = original_call(self, *args, **kwargs) @@ -310,7 +329,7 @@ def _custom_call(self, *args, **kwargs): silent_check.search_first_weight(self) # Search the first tensor (if the first tensor is output of an inner module) - silent_check.register_input_hook_after_call(tmp) + silent_check.register_input_hook_after_call(tmp, self) # Search the last weight (only in inner module) silent_check.search_last_weight(self) @@ -325,7 +344,7 @@ def _custom_call(self, *args, **kwargs): if asd_enable and not IS_IN_BACKWARD: if hasattr(self, "outer") and self.outer: - silent_check.init_all_hook(asd_enable) + silent_check.init_all_hook(asd_enable, self) silent_check.init_param() self.outer = False @@ -345,6 +364,20 @@ def _parse_perf_config(): return config_dict +original_broadcast = torch.distributed.broadcast + + +def _broadcast(tensor, src, group=None, async_op=False): + global silent_check + global IS_IN_BACKWARD + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: broadcast, IS_IN_BACKWARD is {IS_IN_BACKWARD}, first_module_id is {silent_check.first_module_id}, is_training is {silent_check.is_training}, first_module_name is {silent_check.first_module_name}") + original_broadcast(tensor, src, group, async_op) + + def add_perf_dump_patch(): global perf_dump_enable global asd_enable @@ -366,3 +399,4 @@ def add_perf_dump_patch(): if perf_dump_enable or asd_enable: Module.__call__ = _custom_call + torch.distributed.broadcast = _broadcast