diff --git a/rl-plugin/verl_npu/workers/rollout/vllm_rollout/vllm_rollout_spmd.py b/rl-plugin/verl_npu/workers/rollout/vllm_rollout/vllm_rollout_spmd.py index ea2e5161a9c5e5a41dd85665dbf558075d86408d..480cdaf51dbec373c40ee158d03b7f04ff5f5165 100644 --- a/rl-plugin/verl_npu/workers/rollout/vllm_rollout/vllm_rollout_spmd.py +++ b/rl-plugin/verl_npu/workers/rollout/vllm_rollout/vllm_rollout_spmd.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gc import os from copy import deepcopy @@ -23,10 +22,8 @@ import vllm.envs as envs from omegaconf import DictConfig, OmegaConf from vllm import LLM, SamplingParams from vllm.distributed import parallel_state as vllm_ps -from verl.utils.device import get_device_name, get_torch_device from verl.workers.rollout.base import BaseRollout from verl.workers.rollout.vllm_rollout.vllm_rollout_spmd import vLLMRollout -from verl.workers.sharding_manager.hybrid_tp_config import HybridTPConfig from verl_npu.patch_util import NPUPatchHelper @@ -46,13 +43,6 @@ class vLLMRolloutPatch(NPUPatchHelper[vLLMRollout]): super(BaseRollout, self).__init__() self.config = config - # create HybridTPConfig - self.hybrid_tp_config = HybridTPConfig.from_dict_config( - self.config.get("hybrid_tp", {}), - ) - - print(f"[NPU Patch] hybrid_tp_config is : {self.hybrid_tp_config if self.hybrid_tp_config else '{}'}") - tensor_parallel_size = self.config.get("tensor_model_parallel_size", 1) if tensor_parallel_size > torch.distributed.get_world_size(): raise ValueError( @@ -133,23 +123,6 @@ class vLLMRolloutPatch(NPUPatchHelper[vLLMRollout]): self._init_dp_env(config) enable_infer_ep = True - # Extract hybrid TP config for additional_config - additional_config = {} - if self.hybrid_tp_config.enabled: - # Extract tp_size values from hybrid_tp_config - if self.hybrid_tp_config.qkv_proj_tp_size is not None: - additional_config["qkvproj_tensor_parallel_size"] = self.hybrid_tp_config.qkv_proj_tp_size - if self.hybrid_tp_config.o_proj_tp_size is not None: - additional_config["oproj_tensor_parallel_size"] = self.hybrid_tp_config.o_proj_tp_size - if self.hybrid_tp_config.lm_head_tp_size is not None: - additional_config["lmhead_tensor_parallel_size"] = self.hybrid_tp_config.lm_head_tp_size - - print(f"[NPU Patch] vLLM additional_config: {additional_config if additional_config else '{}'}") - - # Add additional_config to engine_kwargs if not empty - if additional_config: - engine_kwargs["additional_config"] = additional_config - self.inference_engine = LLM( model=model_path, enable_sleep_mode=config.free_cache_engine, diff --git a/rl-plugin/verl_npu/workers/sharding_manager/__init__.py b/rl-plugin/verl_npu/workers/sharding_manager/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/rl-plugin/verl_npu/workers/sharding_manager/hybrid_tp_config.py b/rl-plugin/verl_npu/workers/sharding_manager/hybrid_tp_config.py deleted file mode 100644 index f472a1779267ed41abdf8ec87b9cf913eb01fb85..0000000000000000000000000000000000000000 --- a/rl-plugin/verl_npu/workers/sharding_manager/hybrid_tp_config.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright (c) 2025, HUAWEI CORPORATION. All rights reserved. -# Copyright 2025 Snowflake Inc. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from dataclasses import dataclass -from typing import Dict, List, Optional - -from omegaconf import DictConfig - - -@dataclass -class HybridTPConfig: - """Configuration for hybrid TP strategy. - - This class defines the configuration for applying different TP sizes - to different layers of the model (qkv_proj, o_proj, lm_head). - """ - - # Whether to enable hybrid TP strategy - enabled: bool = False - - # TP size for qkv_proj layer (attention q/k/v projection) - # None means follow external tensor_model_parallel_size - qkv_proj_tp_size: Optional[int] = None - - # TP size for o_proj layer (attention output projection) - # None means follow external tensor_model_parallel_size - o_proj_tp_size: Optional[int] = None - - # TP size for lm_head layer - # None means follow external tensor_model_parallel_size - lm_head_tp_size: Optional[int] = None - - # Custom layer name mappings for non-standard models - custom_layer_mappings: Optional[Dict[str, List[str]]] = None - - # Tp size from rollout config - external_tp_size: Optional[int] = None - - def __post_init__(self): - """Post-initialization validation.""" - - def _validate_custom_mappings(self): - """validate custom mappings""" - required_layers = ["attention_output", "lm_head"] - for layer in required_layers: - if layer not in self.custom_layer_mappings: - raise ValueError(f"Custom layer mappings must include '{layer}'") - - - @classmethod - def from_dict_config(cls, config: DictConfig, external_tp_size: Optional[int] = None) -> "HybridTPConfig": - """Create HybridTPConfig from DictConfig. - - Args: - config: DictConfig containing hybrid_tp configuration - external_tp_size: External tensor_model_parallel_size - - Returns: - HybridTPConfig instance - """ - if not config or not config.get("enabled", False): - return cls(enabled=False) - - qkv_proj_tp_size = config.get("qkvproj_tensor_parallel_size") - if qkv_proj_tp_size is None and external_tp_size is not None: - qkv_proj_tp_size = external_tp_size - - # Get TP sizes, use external_tp_size as default if not specified - o_proj_tp_size = config.get("oproj_tensor_parallel_size") - if o_proj_tp_size is None and external_tp_size is not None: - o_proj_tp_size = external_tp_size - - lm_head_tp_size = config.get("lmhead_tensor_parallel_size") - if lm_head_tp_size is None and external_tp_size is not None: - lm_head_tp_size = external_tp_size - - custom_layer_mappings = config.get("custom_layer_mappings") - - return cls( - enabled=True, - qkv_proj_tp_size=qkv_proj_tp_size, - o_proj_tp_size=o_proj_tp_size, - lm_head_tp_size=lm_head_tp_size, - external_tp_size=external_tp_size, - custom_layer_mappings=custom_layer_mappings, - ) - - - - def validate(self) -> bool: - """Validate config fields correctness""" - if not self.enabled: - return True - - if not self.is_hybrid_enabled(): - return True - - # basic tp size check - for tp_size in [self.o_proj_tp_size, self.qkv_proj_tp_size, self.lm_head_tp_size]: - if tp_size is not None: - if tp_size <= 0: - raise ValueError(f"TP size must be positive, got {tp_size}") - - if self.custom_layer_mappings: - self._validate_custom_mappings() - - return True - - def get_tp_size_for_layer(self, layer_name: str) -> int: - """Get TP size for a specific layer. - - Args: - layer_name: Name of the layer - external_tp_size: External tensor_model_parallel_size - - Returns: - TP size for the layer - """ - if not self.enabled: - return self.external_tp_size - - # Apply custom layer mappings - mapped_name = self.custom_layer_mappings.get(layer_name, layer_name) - - # Determine TP size based on layer type - if "o_proj" in mapped_name or "self_attn" in mapped_name: - return self.o_proj_tp_size - elif "lm_head" in mapped_name or "output_layer" in mapped_name: - return self.lm_head_tp_size - else: - return self.external_tp_size - - def get_tp_size_for_layer_type(self, layer_type: str) -> int: - """Get tp size for layer type""" - size_map = { - "attention_output": self.o_proj_tp_size, - "lm_head": self.lm_head_tp_size - } - return size_map.get(layer_type) or self.external_tp_size - - def is_hybrid_enabled(self) -> bool: - """Check if hybrid TP is enabled and at least one layer has different TP size.""" - if not self.enabled: - return False - - # Check if at least one layer has different TP size - # This will be checked against external_tp_size in the processor - return any([ - self.o_proj_tp_size is not None, - self.lm_head_tp_size is not None - ]) \ No newline at end of file