From bfdbe51a2273a5e1f97677fe24e80c50ef067db8 Mon Sep 17 00:00:00 2001 From: Mingkai Chan Date: Fri, 27 Dec 2024 16:06:16 +0800 Subject: [PATCH] add patcher --- .gitignore | 3 +- docs/get_started/patcher.md | 62 ++++ mx_driving/__init__.py | 3 +- mx_driving/ops/multi_scale_deformable_attn.py | 1 + mx_driving/patcher/__init__.py | 69 +++++ mx_driving/patcher/brake.py | 49 ++++ mx_driving/patcher/distribute.py | 9 + mx_driving/patcher/mmcv.py | 48 +++ mx_driving/patcher/mmdet.py | 121 ++++++++ mx_driving/patcher/mmdet3d.py | 98 +++++++ mx_driving/patcher/numpy.py | 8 + mx_driving/patcher/optimizer.py | 277 ++++++++++++++++++ mx_driving/patcher/patcher.py | 89 ++++++ mx_driving/patcher/profiler.py | 72 +++++ mx_driving/patcher/tensor.py | 21 ++ 15 files changed, 928 insertions(+), 2 deletions(-) create mode 100644 docs/get_started/patcher.md create mode 100644 mx_driving/patcher/__init__.py create mode 100644 mx_driving/patcher/brake.py create mode 100644 mx_driving/patcher/distribute.py create mode 100644 mx_driving/patcher/mmcv.py create mode 100644 mx_driving/patcher/mmdet.py create mode 100644 mx_driving/patcher/mmdet3d.py create mode 100644 mx_driving/patcher/numpy.py create mode 100644 mx_driving/patcher/optimizer.py create mode 100644 mx_driving/patcher/patcher.py create mode 100644 mx_driving/patcher/profiler.py create mode 100644 mx_driving/patcher/tensor.py diff --git a/.gitignore b/.gitignore index 5a97690d..142087ec 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__/ cmake-build-debug build *.egg-info/ -.vscode/ \ No newline at end of file +.vscode/ +CMakeFiles \ No newline at end of file diff --git a/docs/get_started/patcher.md b/docs/get_started/patcher.md new file mode 100644 index 00000000..527b9167 --- /dev/null +++ b/docs/get_started/patcher.md @@ -0,0 +1,62 @@ +# 快速迁移模型 +mx_driving 提供了 `Patcher` monkey_patch 类来帮助用户快速迁移模型。 + +## 1. 解决mmcv系列版本冲突问题 +假设你安装了mmcv 1.7.2,但是mmdet3d 需要 mmcv <= 1.7.0, 而mmdet 需要 mmcv <= 1.6.0, 这时候你就可以使用 mx_driving 来解决这个问题。 + +```python +from mx_driving.patcher import patch_mmcv_version +patch_mmcv_version("1.6.0") +``` +注意,你可能需要在import mmcv 之前调用 `patch_mmcv_version` 函数,否则仍然可能会出现版本冲突问题。 + +## 2. 使用默认patcher +mx_driving 提供了一个默认的patcher,可以帮助用户快速迁移模型。 + +```python +from mx_driving.patcher import default_patcher_builder +with default_patcher_builder.build() as patcher: + # train model here +``` + +## 3. 使用自定义patcher +你也可以使用 `PatcherBuilder` 类来创建一个自定义patcher。 + +```python +from mx_driving.patcher import PatcherBuilder, Patch +patcher_builder = PatcherBuilder() +patcher_builder.add_module_patch("torch", Patch(index)) +with patcher_builder.build() as patcher: + # train model here +``` + +## 4. 使用profiler +mx_driving 提供了一个profiler,可以帮助开发者快速定位性能瓶颈。 + +```python +with default_patcher_builder.with_profiling(path="/path/to/save/profiler/result", level=0).build() as patcher: + # train model here +``` +level 0: 最小膨胀,只记录NPU活动 +level 1: 记录NPU和CPU活动 +level 2: 记录NPU和CPU活动,并打印调用栈 + +## 5. 禁用某个patch + +```python +with default_patcher_builder.disable_patches("msda", "index").build() as patcher: + # train model here +``` +## 6. 支持特性 +- [x] 支持一键迁移npu(默认关闭私有格式) +- [x] 支持mmcv系列版本冲突问题 +- [x] 支持自定义patcher +- [x] 支持profiler +- [x] 支持禁用patch +- [x] 支持DeformConv2d +- [x] 支持ModulatedDeformConv2d +- [x] 支持MultiScaleDeformableAttnFunction +- [x] 支持bool index改写masked_select +- [x] 支持Resnet优化 +- [x] 支持提前终止训练 + diff --git a/mx_driving/__init__.py b/mx_driving/__init__.py index d7a434aa..e85a3be7 100644 --- a/mx_driving/__init__.py +++ b/mx_driving/__init__.py @@ -1,7 +1,7 @@ import os import mx_driving._C - +from .patcher import default_patcher_builder, patch_mmcv_version from .modules.roi_point_pool_3d import RoIPointPool3d from .modules.sparse_conv import SparseConv3d, SparseInverseConv3d, SubMConv3d from .modules.sparse_modules import SparseConvTensor, SparseModule, SparseSequential @@ -93,6 +93,7 @@ __all__ = [ "npu_dynamic_scatter", "npu_max_pool2d", "npu_nms3d", + "MultiScaleDeformableAttnFunction", "npu_points_in_box", "npu_points_in_box_all", "points_in_box", diff --git a/mx_driving/ops/multi_scale_deformable_attn.py b/mx_driving/ops/multi_scale_deformable_attn.py index 4c7ffef6..d4b82f28 100644 --- a/mx_driving/ops/multi_scale_deformable_attn.py +++ b/mx_driving/ops/multi_scale_deformable_attn.py @@ -25,6 +25,7 @@ class MultiScaleDeformableAttnFunction(Function): value_level_start_index: torch.Tensor, sampling_locations: torch.Tensor, attention_weights: torch.Tensor, + **kwargs, ) -> torch.Tensor: value_spatial_shapes = value_spatial_shapes.int() value_level_start_index = value_level_start_index.int() diff --git a/mx_driving/patcher/__init__.py b/mx_driving/patcher/__init__.py new file mode 100644 index 00000000..f1fa9e11 --- /dev/null +++ b/mx_driving/patcher/__init__.py @@ -0,0 +1,69 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +""" +This module is used to patch the code to support mx_driving. +Usage: + - with default config, mx_driving will be applied to all the code that using mmcv and torch. + ```python + from mx_driving.patcher import default_patcher_builder + with default_patcher_builder.build() as patcher: + # train model here + ``` + - if you want to use mx_driving with other frameworks, you can use the following code to customize the patcher. + + ```python + from mx_driving.patcher import PatcherBuilder, Patcher, Patch + from mx_driving.patcher.mmcv import msda + from mx_driving.patcher.tensor import index + from mx_driving.patcher.mmcv import patch_mmcv_version + if __name__ == "__main__": + patcher_builder = PatcherBuilder() + patcher_builder.add_module_patch("mmcv.ops", Patch(msda)) + patcher_builder.add_module_patch("torch", Patch(index)) + + with patcher_builder.build() as patcher: + # train model here + ``` + +""" + +from mx_driving.patcher.distribute import ddp +from mx_driving.patcher.mmcv import dc, mdc, msda, patch_mmcv_version +from mx_driving.patcher.mmdet import pseudo_sampler, resnet_add_relu, resnet_maxpool +from mx_driving.patcher.mmdet3d import nuscences_dataset, nuscences_metric +from mx_driving.patcher.numpy import numpy_type +from mx_driving.patcher.optimizer import optimizer_hooks, optimizer_wrapper +from mx_driving.patcher.patcher import Patch, Patcher, PatcherBuilder +from mx_driving.patcher.tensor import index + + +default_patcher_builder = ( + PatcherBuilder() + .add_module_patch("mmcv.ops", Patch(msda), Patch(dc), Patch(mdc)) + .add_module_patch("torch", Patch(index)) + .add_module_patch("numpy", Patch(numpy_type)) + .add_module_patch("mmdet.core.bbox.samplers", Patch(pseudo_sampler)) + .add_module_patch("mmcv.parallel", Patch(ddp)) + .add_module_patch("mmdet.models.backbones.resnet", Patch(resnet_add_relu), Patch(resnet_maxpool)) + .add_module_patch("mmdet3d.datasets.nuscenes_dataset", Patch(nuscences_dataset)) + .add_module_patch("mmdet3d.evaluation.metrics", Patch(nuscences_metric)) +) + +__all__ = [ + "default_patcher_builder", + "msda", + "deform_conv2d", + "modulated_deform_conv2d", + "index", + "PatcherBuilder", + "Patcher", + "Patch", + "patch_mmcv_version", + "pseudo_sampler", + "numpy_type", + "ddp", + "resnet_add_relu", + "resnet_maxpool", + "nuscences_dataset", + "nuscences_metric", + "optimizer", +] diff --git a/mx_driving/patcher/brake.py b/mx_driving/patcher/brake.py new file mode 100644 index 00000000..90f08f64 --- /dev/null +++ b/mx_driving/patcher/brake.py @@ -0,0 +1,49 @@ +# Copyright (c) OpenMMLab. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +import sys +import time +from types import ModuleType +from typing import Dict + + +def brake(runner: ModuleType, options: Dict): + when_iter = options["when_iter"] + if not isinstance(when_iter, int): + raise ValueError(f"when_iter must be an integer, but got {type(when_iter)}") + + def train(self, data_loader, **kwargs): + self.model.train() + self.mode = "train" + self.data_loader = data_loader + self._max_iters = self._max_epochs * len(data_loader) + self.call_hook("before_train_epoch") + time.sleep(2) + + for i, data_batch in enumerate(data_loader): + self.data_batch = data_batch + self._inner_iter = i + self.call_hook("before_train_iter") + self.run_iter(data_batch, train_mode=True, **kwargs) + self.call_hook("after_train_iter") + del self.data_batch + self._iter += 1 + if self._iter == when_iter: + sys.exit(0) + self.call_hook("after_train_epoch") + self._epoch += 1 + + def run_epoch(self) -> None: + self.runner.call_hook("before_train_epoch") + self.runner.model.train() + for idx, data_batch in enumerate(self.data_loader): + self.run_iter(idx, data_batch) + if self._iter == when_iter: + sys.exit(0) + + self.runner.call_hook("after_train_epoch") + self.runner._epoch += 1 + + if hasattr(runner, "EpochBasedRunner"): + runner.EpochBasedRunner.train = train + elif hasattr(runner, "EpochBasedTrainLoop"): + runner.EpochBasedTrainLoop.run_epoch = run_epoch diff --git a/mx_driving/patcher/distribute.py b/mx_driving/patcher/distribute.py new file mode 100644 index 00000000..c56e158f --- /dev/null +++ b/mx_driving/patcher/distribute.py @@ -0,0 +1,9 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +from types import ModuleType +from typing import Dict + + +def ddp(mmcvparraller: ModuleType, options: Dict): + if hasattr(mmcvparraller, "distributed"): + import mmcv + mmcvparraller.distributed.MMDistributedDataParallel = mmcv.device.npu.NPUDistributedDataParallel \ No newline at end of file diff --git a/mx_driving/patcher/mmcv.py b/mx_driving/patcher/mmcv.py new file mode 100644 index 00000000..a31bc761 --- /dev/null +++ b/mx_driving/patcher/mmcv.py @@ -0,0 +1,48 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +import importlib +from types import ModuleType +from typing import Dict + + +def patch_mmcv_version(expected_version: str): + try: + mmcv = importlib.import_module("mmcv") + origin_version = mmcv.__version__ + if origin_version == expected_version: + return + mmcv.__version__ = expected_version + try: + # fix mmdet stupid compatibility check + importlib.import_module("mmdet") + importlib.import_module("mmdet3d") + except ImportError: + return + finally: + # restore mmcv version + mmcv.__version__ = origin_version + except ImportError: + return + + +def msda(mmcvops: ModuleType, options: Dict): + from mx_driving import MultiScaleDeformableAttnFunction, multi_scale_deformable_attn + + if hasattr(mmcvops, "multi_scale_deformable_attn"): + mmcvops.multi_scale_deformable_attn.MultiScaleDeformableAttnFunction = MultiScaleDeformableAttnFunction + mmcvops.multi_scale_deformable_attn.multi_scale_deformable_attn = multi_scale_deformable_attn + + +def dc(mmcvops: ModuleType, options: Dict): + from mx_driving import DeformConv2dFunction, deform_conv2d + + if hasattr(mmcvops, "deform_conv"): + mmcvops.deform_conv.DeformConv2dFunction = DeformConv2dFunction + mmcvops.deform_conv.deform_conv2d = deform_conv2d + + +def mdc(mmcvops: ModuleType, options: Dict): + from mx_driving import ModulatedDeformConv2dFunction, modulated_deform_conv2d + + if hasattr(mmcvops, "modulated_deform_conv"): + mmcvops.modulated_deform_conv.ModulatedDeformConv2dFunction = ModulatedDeformConv2dFunction + mmcvops.modulated_deform_conv.modulated_deform_conv2d = modulated_deform_conv2d diff --git a/mx_driving/patcher/mmdet.py b/mx_driving/patcher/mmdet.py new file mode 100644 index 00000000..d3d961c8 --- /dev/null +++ b/mx_driving/patcher/mmdet.py @@ -0,0 +1,121 @@ +# Copyright (c) OpenMMLab. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +from types import ModuleType +from typing import Dict + + +def pseudo_sampler(mmdetsamplers: ModuleType, options: Dict): + if hasattr(mmdetsamplers, "pseudo_sampler"): + + def sample(self, assign_result, bboxes, gt_bboxes, *args, **kwargs): + import torch + + pos_inds = torch.squeeze(assign_result.gt_inds > 0, -1) + neg_inds = torch.squeeze(assign_result.gt_inds == 0, -1) + gt_flags = bboxes.new_zeros(bboxes.shape[0], dtype=torch.uint8) + sampling_result = mmdetsamplers.sampling_result.SamplingResult( + pos_inds, neg_inds, bboxes, gt_bboxes, assign_result, gt_flags + ) + return sampling_result + + mmdetsamplers.pseudo_sampler.PseudoSampler.sample = sample + + +def resnet_add_relu(mmdetresnet: ModuleType, options: Dict): + if hasattr(mmdetresnet, "BasicBlock"): + from mx_driving import npu_add_relu + import torch.utils.checkpoint as cp + + def forward(self, x): + def _inner_forward(x): + identity = x + out = self.conv1(x) + out = self.norm1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.norm2(out) + + if self.downsample is not None: + identity = self.downsample(x) + out = npu_add_relu(out, identity) + + return out + + if self.with_cp and x.requires_grad: + out = cp.checkpoint(_inner_forward, x) + else: + out = _inner_forward(x) + + return out + + mmdetresnet.BasicBlock.forward = forward + + if hasattr(mmdetresnet, "Bottleneck"): + + def forward(self, x): + """Forward function.""" + + def _inner_forward(x): + identity = x + out = self.conv1(x) + out = self.norm1(out) + out = self.relu(out) + + if self.with_plugins: + out = self.forward_plugin(out, self.after_conv1_plugin_names) + + out = self.conv2(out) + out = self.norm2(out) + out = self.relu(out) + + if self.with_plugins: + out = self.forward_plugin(out, self.after_conv2_plugin_names) + + out = self.conv3(out) + out = self.norm3(out) + + if self.with_plugins: + out = self.forward_plugin(out, self.after_conv3_plugin_names) + + if self.downsample is not None: + identity = self.downsample(x) + + out = npu_add_relu(out, identity) + + return out + + if self.with_cp and x.requires_grad: + out = cp.checkpoint(_inner_forward, x) + else: + out = _inner_forward(x) + + return out + + mmdetresnet.Bottleneck.forward = forward + + +def resnet_maxpool(mmdetresnet: ModuleType, options: Dict): + if hasattr(mmdetresnet, "ResNet"): + from mx_driving import npu_max_pool2d + + def forward(self, x): + if self.deep_stem: + x = self.stem(x) + else: + x = self.conv1(x) + x = self.norm1(x) + x = self.relu(x) + if x.requires_grad: + x = self.maxpool(x) + else: + x = npu_max_pool2d(x, 3, 2, 1) + out = [] + for i, layer_name in enumerate(self.res_layers): + res_layer = getattr(self, layer_name) + x = res_layer(x) + if i in self.out_indices: + out.append(x) + return tuple(out) + + mmdetresnet.ResNet.forward = forward diff --git a/mx_driving/patcher/mmdet3d.py b/mx_driving/patcher/mmdet3d.py new file mode 100644 index 00000000..7dc2644e --- /dev/null +++ b/mx_driving/patcher/mmdet3d.py @@ -0,0 +1,98 @@ +# Copyright (c) OpenMMLab. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +from types import ModuleType +from typing import Dict, List, Tuple, Union + + +def nuscences_dataset(mmdet3ddatasets: ModuleType, options: Dict): + if hasattr(mmdet3ddatasets, "output_to_nusc_box"): + import numpy as np + import pyquaternion + from nuscenes.utils.data_classes import Box as NuScenesBox + + def output_to_nusc_box(detection, with_velocity=True): + box3d = detection["boxes_3d"] + scores = detection["scores_3d"].numpy() + labels = detection["labels_3d"].numpy() + + box_gravity_center = box3d.gravity_center.numpy() + box_dims = box3d.dims.numpy() + box_yaw = box3d.yaw.numpy() + box_yaw = -box_yaw - np.pi / 2 + + box_list = [] + for i in range(len(box3d)): + quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw[i]) + if with_velocity: + velocity = (*box3d.tensor[i, 7:9], 0.0) + else: + velocity = (0, 0, 0) + box = NuScenesBox( + box_gravity_center[i], box_dims[i], quat, label=labels[i], score=scores[i], velocity=velocity + ) + box_list.append(box) + return box_list + + mmdet3ddatasets.output_to_nusc_box = output_to_nusc_box + + +def nuscences_metric(mmdet3dmetrics: ModuleType, options: Dict): + if hasattr(mmdet3dmetrics, "output_to_nusc_box"): + import numpy as np + import pyquaternion + from nuscenes.utils.data_classes import Box as NuScenesBox + from mmdet3d.structures import CameraInstance3DBoxes, LiDARInstance3DBoxes + + def output_to_nusc_box(detection: dict) -> Tuple[List[NuScenesBox], Union[np.ndarray, None]]: + bbox3d = detection["bboxes_3d"] + scores = detection["scores_3d"].numpy() + labels = detection["labels_3d"].numpy() + attrs = None + if "attr_labels" in detection: + attrs = detection["attr_labels"].numpy() + + box_gravity_center = bbox3d.gravity_center.numpy() + box_dims = bbox3d.dims.numpy() + box_yaw = bbox3d.yaw.numpy() + + box_list = [] + + if isinstance(bbox3d, LiDARInstance3DBoxes): + box_yaw = -box_yaw - np.pi / 2 + for i in range(len(bbox3d)): + quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw[i]) + velocity = (*bbox3d.tensor[i, 7:9], 0.0) + box = NuScenesBox( + box_gravity_center[i], + box_dims[i], + quat, + label=labels[i], + score=scores[i], + velocity=velocity, + ) + box_list.append(box) + elif isinstance(bbox3d, CameraInstance3DBoxes): + # our Camera coordinate system -> nuScenes box coordinate system + # convert the dim/rot to nuscbox convention + nus_box_dims = box_dims[:, [2, 0, 1]] + nus_box_yaw = -box_yaw + for i in range(len(bbox3d)): + q1 = pyquaternion.Quaternion(axis=[0, 0, 1], radians=nus_box_yaw[i]) + q2 = pyquaternion.Quaternion(axis=[1, 0, 0], radians=np.pi / 2) + quat = q2 * q1 + velocity = (bbox3d.tensor[i, 7], 0.0, bbox3d.tensor[i, 8]) + box = NuScenesBox( + box_gravity_center[i], + nus_box_dims[i], + quat, + label=labels[i], + score=scores[i], + velocity=velocity, + ) + box_list.append(box) + else: + raise NotImplementedError(f"Do not support convert {type(bbox3d)} bboxes " "to standard NuScenesBoxes.") + + return box_list, attrs + + mmdet3dmetrics.output_to_nusc_box = output_to_nusc_box diff --git a/mx_driving/patcher/numpy.py b/mx_driving/patcher/numpy.py new file mode 100644 index 00000000..912b803f --- /dev/null +++ b/mx_driving/patcher/numpy.py @@ -0,0 +1,8 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +from types import ModuleType +from typing import Dict + + +def numpy_type(np: ModuleType, options: Dict): + if not hasattr(np, "bool"): + np.bool = bool diff --git a/mx_driving/patcher/optimizer.py b/mx_driving/patcher/optimizer.py new file mode 100644 index 00000000..1ac0478a --- /dev/null +++ b/mx_driving/patcher/optimizer.py @@ -0,0 +1,277 @@ +# Copyright (c) OpenMMLab. All rights reserved. +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +from types import ModuleType +from typing import Dict, Optional, Union + +from torch import Tensor +import torch.nn as nn + + +def optimizer_hooks(mmcvhooks: ModuleType, options: Dict): + """ + Patch mmcv hooks to support gradient accumulation and fp16 training. + mmcv 1.x required. + patch module: "mmcv.runner.hooks" + """ + if hasattr(mmcvhooks, "optimizer"): + import logging + from mmcv.runner.hooks import HOOKS, Hook + from mmcv.utils import _BatchNorm + from torch.npu.amp import GradScaler + from mmcv.runner.fp16_utils import wrap_fp16_model + + @HOOKS.register_module(force=True) + class OptimizerHook(Hook): + def __init__(self, grad_clip: Optional[dict] = None, detect_anomalous_params: bool = False): + self.grad_clip = grad_clip + self.detect_anomalous_params = detect_anomalous_params + + def clip_grads(self, params, runner): + params = list(filter(lambda p: p.requires_grad and p.grad is not None, params)) + if len(params) > 0: + return runner.optimizer.clip_grad_norm_fused_(**self.grad_clip) + return None + + def after_train_iter(self, runner): + runner.optimizer.zero_grad() + if self.detect_anomalous_params: + self.detect_anomalous_parameters(runner.outputs["loss"], runner) + runner.outputs["loss"].backward() + + if self.grad_clip is not None: + grad_norm = self.clip_grads(runner.model.parameters(), runner) + if grad_norm is not None: + runner.log_buffer.update({"grad_norm": float(grad_norm)}, runner.outputs["num_samples"]) + runner.optimizer.step() + + def detect_anomalous_parameters(self, loss: Tensor, runner) -> None: + logger = runner.logger + parameters_in_graph = set() + visited = set() + + def traverse(grad_fn): + if grad_fn is None: + return + if grad_fn not in visited: + visited.add(grad_fn) + if hasattr(grad_fn, "variable"): + parameters_in_graph.add(grad_fn.variable) + parents = grad_fn.next_functions + if parents is not None: + for parent in parents: + grad_fn = parent[0] + traverse(grad_fn) + + traverse(loss.grad_fn) + for n, p in runner.model.named_parameters(): + if p not in parameters_in_graph and p.requires_grad: + logger.log( + level=logging.ERROR, + msg=f"{n} with shape {p.size()} is not " f"in the computational graph \n", + ) + + @HOOKS.register_module(force=True) + class GradientCumulativeOptimizerHook(OptimizerHook): + def __init__(self, cumulative_iters: int = 1, **kwargs): + super().__init__(**kwargs) + + if not isinstance(cumulative_iters, int) or cumulative_iters <= 0: + raise ValueError( + f"cumulative_iters only accepts positive int, but got " f"{type(cumulative_iters)} instead." + ) + + self.cumulative_iters = cumulative_iters + self.divisible_iters = 0 + self.remainder_iters = 0 + self.initialized = False + + def has_batch_norm(self, module: nn.Module) -> bool: + if isinstance(module, _BatchNorm): + return True + for m in module.children(): + if self.has_batch_norm(m): + return True + return False + + def _init(self, runner): + if runner.iter % self.cumulative_iters != 0: + runner.logger.warning( + "Resume iter number is not divisible by cumulative_iters in " + "GradientCumulativeOptimizerHook, which means the gradient of " + "some iters is lost and the result may be influenced slightly." + ) + + if self.has_batch_norm(runner.model) and self.cumulative_iters > 1: + runner.logger.warning( + "GradientCumulativeOptimizerHook may slightly decrease " + "performance if the model has BatchNorm layers." + ) + + self.divisible_iters = runner.max_iters // self.cumulative_iters * self.cumulative_iters + self.remainder_iters = runner.max_iters - self.divisible_iters + + self.initialized = True + + def _get_loss_factor(self, runner): + """Get loss division factor for the current iteration.""" + if runner.iter < runner.max_iters - self.remainder_iters: + loss_factor = self.cumulative_iters + else: + loss_factor = self.remainder_iters + runner.logger.warning( + f"Loss will be divided by {loss_factor} in the last " + f"{self.remainder_iters} iterations because they are not " + f"enough for {self.cumulative_iters} cumulative_iters." + ) + if loss_factor <= 0: + raise ValueError("loss_factor should be larger than 0.") + + return loss_factor + + def after_train_iter(self, runner): + if not self.initialized: + self._init(runner) + + loss = runner.outputs["loss"] / self._get_loss_factor(runner) + loss.backward() + + if self.every_n_iters(runner, self.cumulative_iters) or self.is_last_iter(runner): + + if self.grad_clip is not None: + grad_norm = self.clip_grads(runner.model.parameters(), runner) + if grad_norm is not None: + # Add grad norm to the logger + runner.log_buffer.update({"grad_norm": float(grad_norm)}, runner.outputs["num_samples"]) + runner.optimizer.step() + runner.optimizer.zero_grad() + + @HOOKS.register_module(force=True) + class Fp16OptimizerHook(OptimizerHook): + + def __init__( + self, + grad_clip: Optional[dict] = None, + coalesce: bool = True, + bucket_size_mb: int = -1, + loss_scale: Union[float, str, dict] = 512.0, + distributed: bool = True, + ): + self.grad_clip = grad_clip + self.coalesce = coalesce + self.bucket_size_mb = bucket_size_mb + self.distributed = distributed + self._scale_update_param = None + if loss_scale == "dynamic": + self.loss_scaler = GradScaler() + elif isinstance(loss_scale, float): + self._scale_update_param = loss_scale + self.loss_scaler = GradScaler(init_scale=loss_scale) + elif isinstance(loss_scale, dict): + self.loss_scaler = GradScaler(**loss_scale) + else: + raise ValueError("loss_scale must be of type float, dict, or " f'"dynamic", got {loss_scale}') + + def before_run(self, runner) -> None: + """Preparing steps before Mixed Precision Training.""" + # wrap model mode to fp16 + wrap_fp16_model(runner.model) + # resume from state dict + if "fp16" in runner.meta and "loss_scaler" in runner.meta["fp16"]: + scaler_state_dict = runner.meta["fp16"]["loss_scaler"] + self.loss_scaler.load_state_dict(scaler_state_dict) + + def copy_grads_to_fp32(self, fp16_net: nn.Module, fp32_weights: Tensor) -> None: + """Copy gradients from fp16 model to fp32 weight copy.""" + for fp32_param, fp16_param in zip(fp32_weights, fp16_net.parameters()): + if fp16_param.grad is not None: + if fp32_param.grad is None: + fp32_param.grad = fp32_param.data.new(fp32_param.size()) + fp32_param.grad.copy_(fp16_param.grad) + + def copy_params_to_fp16(self, fp16_net: nn.Module, fp32_weights: Tensor) -> None: + """Copy updated params from fp32 weight copy to fp16 model.""" + for fp16_param, fp32_param in zip(fp16_net.parameters(), fp32_weights): + fp16_param.data.copy_(fp32_param.data) + + def after_train_iter(self, runner) -> None: + runner.model.zero_grad() + runner.optimizer.zero_grad() + + self.loss_scaler.scale(runner.outputs["loss"]).backward() + self.loss_scaler.unscale_(runner.optimizer) + # grad clip + if self.grad_clip is not None: + grad_norm = self.clip_grads(runner.model.parameters(), runner) + if grad_norm is not None: + # Add grad norm to the logger + runner.log_buffer.update({"grad_norm": float(grad_norm)}, runner.outputs["num_samples"]) + # backward and update scaler + self.loss_scaler.step(runner.optimizer) + self.loss_scaler.update(self._scale_update_param) + + # save state_dict of loss_scaler + runner.meta.setdefault("fp16", {})["loss_scaler"] = self.loss_scaler.state_dict() + + @HOOKS.register_module(force=True) + class GradientCumulativeFp16OptimizerHook(GradientCumulativeOptimizerHook, Fp16OptimizerHook): + """Fp16 optimizer Hook (using PyTorch's implementation) implements + multi-iters gradient cumulating. + + If you are using PyTorch >= 1.6, torch.cuda.amp is used as the backend, + to take care of the optimization procedure. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def after_train_iter(self, runner) -> None: + if not self.initialized: + self._init(runner) + + loss = runner.outputs["loss"] / self._get_loss_factor(runner) + self.loss_scaler.scale(loss).backward() + + if self.every_n_iters(runner, self.cumulative_iters) or self.is_last_iter(runner): + + # copy fp16 grads in the model to fp32 params in the optimizer + self.loss_scaler.unscale_(runner.optimizer) + + if self.grad_clip is not None: + grad_norm = self.clip_grads(runner.model.parameters(), runner) + if grad_norm is not None: + # Add grad norm to the logger + runner.log_buffer.update({"grad_norm": float(grad_norm)}, runner.outputs["num_samples"]) + + # backward and update scaler + self.loss_scaler.step(runner.optimizer) + self.loss_scaler.update(self._scale_update_param) + + # save state_dict of loss_scaler + runner.meta.setdefault("fp16", {})["loss_scaler"] = self.loss_scaler.state_dict() + + # clear grads + runner.model.zero_grad() + runner.optimizer.zero_grad() + + +def optimizer_wrapper(mmcvoptwrapper: ModuleType, options: Dict): + """ + Patch mmcv optimizer wrapper to support gradient clipping. + mmcv 2.x required. + patch module: "mmcv.optim.optimizer.optimizer_wrapper" + """ + if hasattr(mmcvoptwrapper, "OptimWrapper"): + OptimWrapper = mmcvoptwrapper.OptimWrapper + orig_init = OptimWrapper.__init__ + + def _get_clip_func(optimizer): + def clip_func(params, **kwargs): + return optimizer.clip_grad_norm_fused_(**kwargs) + + return clip_func + + def new_init(self, *args, **kwargs): + orig_init(self, *args, **kwargs) + self.clip_grads = _get_clip_func(self.optimizer) + + OptimWrapper.__init__ = new_init diff --git a/mx_driving/patcher/patcher.py b/mx_driving/patcher/patcher.py new file mode 100644 index 00000000..004b4b2a --- /dev/null +++ b/mx_driving/patcher/patcher.py @@ -0,0 +1,89 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +import importlib +import warnings +from typing import Callable, Dict, List, Optional, Set + +from mx_driving.patcher.profiler import profiler +from mx_driving.patcher.brake import brake + + +class Patch: + def __init__(self, func: Callable, options: Optional[Dict] = None, priority: int = 0): + self.func = func + self.name = func.__name__ + self.options = {} if options is None else options + self.priority = priority + self.is_applied = False + + def __lt__(self, other): + return self.priority < other.priority + + +class Patcher: + def __init__(self, module_patches: Dict[str, List[Patch]], blacklist: Set[str]): + self.modules = [] + self.module_patches = module_patches + self.blacklist = blacklist + for module_name in module_patches: + try: + module = importlib.import_module(module_name) + self.modules.append(module) + except ModuleNotFoundError: + warnings.warn(f"Module {module_name} not found") + continue + + def apply(self): + for module in self.modules: + for patch in self.module_patches[module.__name__]: + if patch.name in self.blacklist or patch.is_applied: + continue + try: + patch.func(module, patch.options) + patch.is_applied = True + print(f"Applied patch {patch.name} to module {module.__name__}") + except Exception as e: + warnings.warn(f"Failed to apply patch {patch.name} to module {module.__name__}: {e}") + + def transfer_to_npu(self): + import torch + import torch_npu + from torch_npu.contrib import transfer_to_npu + + torch.npu.config.allow_internal_format = False + + def __enter__(self): + self.transfer_to_npu() + self.apply() + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class PatcherBuilder: + def __init__(self): + self.module_patches = {} + self.blacklist: Set[str] = set() + + def add_module_patch(self, module_name: str, *patches: Patch) -> "PatcherBuilder": + if module_name not in self.module_patches: + self.module_patches[module_name] = [] + self.module_patches[module_name].extend(patches) + self.module_patches[module_name].sort() + return self + + def disable_patches(self, *patch_names: str) -> "PatcherBuilder": + self.blacklist.update(patch_names) + return self + + def with_profiling(self, path: str, level: int = 0) -> "PatcherBuilder": + return self.add_module_patch( + "mmcv.runner", Patch(profiler, {"profiling_path": path, "profiling_level": level}) + ).add_module_patch("mmengine.runner", Patch(profiler, {"profiling_path": path, "profiling_level": level})) + + def brake_at(self, when_iter: int) -> "PatcherBuilder": + return self.add_module_patch("mmcv.runner", Patch(brake, {"when_iter": when_iter})).add_module_patch( + "mmengine.runner", Patch(brake, {"when_iter": when_iter}) + ) + + def build(self): + return Patcher(self.module_patches, self.blacklist) diff --git a/mx_driving/patcher/profiler.py b/mx_driving/patcher/profiler.py new file mode 100644 index 00000000..97618897 --- /dev/null +++ b/mx_driving/patcher/profiler.py @@ -0,0 +1,72 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved. +import time +from types import ModuleType +from typing import Dict + + +def profiler(runner: ModuleType, options: Dict): + import torch_npu + + path = options["profiling_path"] + level = options["profiling_level"] + + activities = ( + [torch_npu.profiler.ProfilerActivity.NPU] + if level == 0 + else [ + torch_npu.profiler.ProfilerActivity.NPU, + torch_npu.profiler.ProfilerActivity.CPU, + ] + ) + profiler_level = torch_npu.profiler.ProfilerLevel.Level0 if level == 0 else torch_npu.profiler.ProfilerLevel.Level1 + + def train(self, data_loader, **kwargs): + self.model.train() + self.mode = "train" + self.data_loader = data_loader + self._max_iters = self._max_epochs * len(data_loader) + self.call_hook("before_train_epoch") + time.sleep(2) + with torch_npu.profiler.profile( + activities=activities, + with_stack=level == 2, + record_shapes=level > 0, + profile_memory=level == 2, + schedule=torch_npu.profiler.schedule(wait=1, warmup=1, active=1, repeat=1, skip_first=20), + experimental_config=torch_npu.profiler._ExperimentalConfig(profiler_level=profiler_level), + on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(path), + ) as prof: + for i, data_batch in enumerate(data_loader): + self.data_batch = data_batch + self._inner_iter = i + self.call_hook("before_train_iter") + self.run_iter(data_batch, train_mode=True, **kwargs) + self.call_hook("after_train_iter") + del self.data_batch + self._iter += 1 + prof.step() + self.call_hook("after_train_epoch") + self._epoch += 1 + + def run_epoch(self) -> None: + self.runner.call_hook("before_train_epoch") + self.runner.model.train() + with torch_npu.profiler.profile( + activities=activities, + with_stack=level == 2, + record_shapes=level > 0, + profile_memory=level == 2, + schedule=torch_npu.profiler.schedule(wait=1, warmup=1, active=1, repeat=1, skip_first=20), + experimental_config=torch_npu.profiler._ExperimentalConfig(profiler_level=profiler_level), + on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(path), + ) as prof: + for idx, data_batch in enumerate(self.data_loader): + self.run_iter(idx, data_batch) + + self.runner.call_hook("after_train_epoch") + self.runner._epoch += 1 + + if hasattr(runner, "EpochBasedRunner"): + runner.EpochBasedRunner.train = train + elif hasattr(runner, "EpochBasedTrainLoop"): + runner.EpochBasedTrainLoop.run_epoch = run_epoch diff --git a/mx_driving/patcher/tensor.py b/mx_driving/patcher/tensor.py new file mode 100644 index 00000000..082ddc24 --- /dev/null +++ b/mx_driving/patcher/tensor.py @@ -0,0 +1,21 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +from types import ModuleType +from typing import Dict + + +# rewrite torch.Tensor.__getitem__ to support boolean indexing +def index(torch: ModuleType, options: Dict): + fn = torch.Tensor.__getitem__ + + def new_fn(self, indices): + # check if indices is a boolean tensor + if not isinstance(indices, torch.Tensor) or indices.dtype != torch.bool or indices.dim() != 1: + return fn(self, indices) + if self.dim() == 1: + return torch.masked_select(self, indices) + if self.dim() == 2 and self.shape[0] == indices.shape[0]: + indices = indices.unsqueeze(1).expand(self.shape) + return torch.masked_select(self, indices).view(-1, self.shape[1]) + return fn(self, indices) # fallback to the original function + + torch.Tensor.__getitem__ = new_fn -- Gitee