diff --git a/.gitignore b/.gitignore
index f5a996cd55bb06a0a484fea883f37515b0a38767..8aa198fc7c211f14c83ead26b577016dc6c91d77 100644
--- a/.gitignore
+++ b/.gitignore
@@ -157,4 +157,7 @@ cython_debug/
/kernel*/
/logs/
/model_from_hf/
-/model_weights/
\ No newline at end of file
+/model_weights/
+/outputs/
+/vllm/
+/vllm_ascend/
\ No newline at end of file
diff --git a/README.md b/README.md
index f3a5d3ed3aa1c190b2fb1335c9ca895ac093f6c5..81fe4c908f535cdf78135f4842c5583c3cfc0d28 100644
--- a/README.md
+++ b/README.md
@@ -54,7 +54,11 @@ MindSpeed RL是基于昇腾生态的强化学习加速框架,旨在为华为 [
vllm |
- 0.7.3 |
+ main |
+
+
+ vllm-ascend |
+ main |
diff --git a/cli/infer_vllm.py b/cli/infer_vllm.py
deleted file mode 100644
index a0af648b7e9118fa03434404f5230f95cd6cf3a9..0000000000000000000000000000000000000000
--- a/cli/infer_vllm.py
+++ /dev/null
@@ -1,244 +0,0 @@
-import os
-import argparse
-import logging
-
-import tensordict
-import torch
-import torch_npu
-from torch_npu.contrib import transfer_to_npu
-import vllm.distributed.parallel_state as ps
-
-from mindspeed_rl.models.rollout.vllm_engine import VLLMInferEngine
-from mindspeed_rl.utils.loggers import Loggers
-
-logger = Loggers(
- name="vllm_engine_inference",
-)
-
-
-def get_args():
- parser = argparse.ArgumentParser()
- group = parser.add_argument_group(title='inference args')
- group.add_argument('--tokenizer-name-or-path', type=str,
- help="Huggingface config path.")
- group.add_argument('--load-format', type=str,
- choices=["auto", "megatron"], default="auto",
- help="Vllm weight load format, support auto from huggingface and from megatron format.")
- group.add_argument('--load', type=str,
- default=None,
- help="Vllm weight path for megatron load format.")
- group.add_argument('--tensor-parallel-size', type=int,
- default=1,
- help="infer tensor parallel size")
- group.add_argument('--query', type=str, default="Write an essay about the importance of higher education.",
- help='Input query.')
- group.add_argument('--task', type=str,
- choices=["generation", "chat"], default="chat",
- help='Inference task, generation or chat.')
- group.add_argument('--gpu-memory-utilization', type=float, default=0.9,
- help='Device memory ratio allocated for vllm.')
-
- group = parser.add_argument_group(title='distributed')
- group.add_argument('--distributed-backend', default='nccl',
- choices=['nccl', 'gloo'],
- help='Which backend to use for distributed training.')
- group.add_argument('--local-rank', type=int, default=int(os.getenv('LOCAL_RANK', '0')),
- help='Local rank passed from distributed launcher.')
- group.add_argument('--prompt-type', type=str, default=None,
- choices=['default', 'empty', 'trl', 'qwen', 'qwen_r1', "qwen_math_r1", 'llama3', 'mistral', 'mixtral', 'gemma', 'llama2',
- 'alpaca', 'deepseek2', 'deepseek2-lite', 'minicpm3', 'cpm', 'baichuan2', 'deepseek3'],
- help='Which template to use for constructing prompts in training/inference.' 'e.g., "qwen"')
- group.add_argument('--prompt-type-path', type=str, default=None,
- help='Path to the json file of templates.')
- group = parser.add_argument_group(title='sampling params')
- group.add_argument('--num-completions', type=int, default=1,
- help='Number of output sequences to return for the given prompt.')
- group.add_argument('--logprobs', type=int, default=1,
- help='Number of log probabilities to return per output token.')
- group.add_argument('--max-tokens', type=int, default=128,
- help='Maximum number of tokens to generate per output sequence.')
- group.add_argument('--top-p', type=float, default=1.0,
- help='Float that controls the cumulative probability of the top tokens to consider.')
- group.add_argument('--top-k', type=int, default=-1,
- help='Integer that controls the number of top tokens to consider. Set to -1 to consider all tokens.')
- group.add_argument('--temperature', type=float, default=1.0,
- help='Float that controls the randomness of the sampling.')
- return parser.parse_args()
-
-
-def process_outputs(outputs):
- res = ""
- for output in outputs:
- prompt = output.prompt
- generated_text = output.outputs[0].text
- res = res + f"Prompt: {prompt!r}\nGenerated Text: {generated_text!r}\n"
- res = res + "-" * 80
- return res
-
-
-def main():
- logger.info("start vllm_engine inference")
- args = get_args()
-
- sampling_config = {
- "num_completions": args.num_completions, # 每个输入提示生成的独立完成项数量
- "logprobs": args.logprobs, # 返回的 top token 的对数概率数量
- "max_tokens": args.max_tokens, # 生成输出的最大 token 数量
- "top_p": args.top_p, # 核采样的累积概率阈值
- "top_k": args.top_k, # 采样时考虑的最高概率 token 的数量
- "temperature": args.temperature, # 控制预测随机性的温度参数
- "detokenize": True # 是否将生成的 token 转换回可读字符串
- }
-
- inference_engine = VLLMInferEngine(
- megatron_config=None,
- sampling_config=sampling_config,
- train_expert_parallel_size=1,
- infer_expert_parallel_size=1,
- tokenizer_name_or_path=args.tokenizer_name_or_path,
- prompt_type=args.prompt_type,
- prompt_type_path=args.prompt_type_path,
- train_tensor_parallel_size=args.tensor_parallel_size,
- train_pipeline_parallel_size=1,
- infer_tensor_parallel_size=args.tensor_parallel_size,
- infer_pipeline_parallel_size=1,
- max_num_seqs=1,
- gpu_memory_utilization=args.gpu_memory_utilization,
- trust_remote_code=True,
- load_format=args.load_format
- )
-
- if args.load_format == "megatron":
- tp_rank = ps._TP.rank_in_group
- weights_path = os.path.join(args.load, f"iter_0000001/mp_rank_{tp_rank:02}/model_optim_rng.pt")
-
- actor_weights = torch.load(weights_path)['model']
- actor_weights = replace_state_dict_name(
- actor_weights,
- vllm_dict=inference_engine.model.state_dict(),
- arch=inference_engine.model.__class__.__name__)
- logger.info("sync_model_weights")
- inference_engine.sync_model_weights(actor_weights)
-
- logger.info("init_cache_engine")
- inference_engine.init_cache_engine()
-
- if args.task == "chat":
- chat_task(inference_engine, args.query)
- elif args.task == "generation":
- generate_task(inference_engine, args.query)
-
-
-def chat_task(inference_engine, query):
- conversation = [
- {
- "role": "user",
- "content": query,
- },
- ]
- outputs = inference_engine.chat(conversation)
- res = process_outputs(outputs)
- logger.info('Query: {}'.format(query))
- logger.info('Responses:\n{}'.format(res))
-
-
-def generate_task(inference_engine, query):
- outputs = inference_engine.llm.generate(
- prompts=[query],
- sampling_params=inference_engine.sampling_params,
- )
- res = process_outputs(outputs)
- logger.info('Query: {}'.format(query))
- logger.info('Responses:\n{}'.format(res))
-
-
-def replace_state_dict_name(state_dict, vllm_dict, arch=None):
- params_mapping = [
- # (megatron core gpt model name, vllm model name)
- ("embedding.word_embeddings", "model.embed_tokens"),
- ("self_attention.linear_qkv", "self_attn.qkv_proj"),
- ("self_attention.linear_proj", "self_attn.o_proj"),
- ("input_layernorm", "input_layernorm"),
- ("pre_mlp_layernorm", "post_attention_layernorm"),
- ("mlp.linear_fc1", "mlp.gate_up_proj"),
- ("mlp.linear_fc2", "mlp.down_proj"),
- ("decoder.final_layernorm", "model.norm"),
- ("output_layer", "lm_head"),
- # Deepseek add
- ("self_attention.linear_qb", "self_attn.q_b_proj"),
- ("self_attention.linear_kvb", "self_attn.kv_b_proj"),
- ("mlp.router.weight", "mlp.gate.weight"),
- ("mlp.router.expert_bias", "mlp.gate.e_score_correction_bias"),
- ("mlp.shared_experts.linear_fc1", "mlp.shared_experts.gate_up_proj"),
- ("mlp.shared_experts.linear_fc2", "mlp.shared_experts.down_proj"),
- ("mlp.experts.weight1", "mlp.experts.w13_weight"),
- ("mlp.experts.weight2", "mlp.experts.w2_weight"),
- ("self_attention.q_layernorm", "self_attn.q_a_layernorm"),
- ("self_attention.k_layernorm", "self_attn.kv_a_layernorm"),
- ]
-
-
- new_state_dict = {}
- for name, loaded_weight in state_dict.items():
- if "_extra_state" in name:
- continue
- if "Deepseek" in arch:
- name = _replace_name_m2v_deepseek(name, params_mapping)
- else:
- name = _replace_name_m2v(name, params_mapping)
-
- # the router bias in raw weight in fp32
- if "e_score_correction_bias" in name:
- loaded_weight = loaded_weight.to(vllm_dict[name].dtype)
-
- # to adapter 'copy_' in megatron weight loader to save memory
- if "mlp.experts" in name:
- loaded_weight = loaded_weight.view(vllm_dict[name].shape)
-
- new_state_dict[name] = loaded_weight
- return new_state_dict
-
-
-def _replace_name_m2v(name, name_mapping):
- """
- Transfer state dict names from megatron to vllm.
- """
- for m_name, v_name in name_mapping:
- if m_name not in name:
- continue
- if "layers" in name: # deal with decoder layers
- name = name.replace("decoder", "model")
- name_list = name.split(".")
- if "layer_norm_weight" in name_list or "layer_norm_bias" in name_list:
- param_name_list = name_list[:3]
- param_name_list.append(v_name)
- param_name = ".".join(param_name_list)
- else:
- param_name_list = name_list[:3]
- weight_or_bias = name_list[-1]
- param_name_list.append(v_name)
- param_name_list.append(weight_or_bias)
- param_name = ".".join(param_name_list)
- return param_name
- else:
- param_name = name.replace(m_name, v_name)
- return param_name
- return name
-
-
-def _replace_name_m2v_deepseek(name, name_mapping):
- """
- Transfer state dict names from megatron to vllm.
- """
- for m_name, v_name in name_mapping:
- if m_name not in name:
- continue
- if "layers" in name: # deal with decoder layers
- name = name.replace("decoder", "model")
- param_name = name.replace(m_name, v_name)
- return param_name
- return name
-
-
-if __name__ == "__main__":
- main()
diff --git a/configs/envs/runtime_env.yaml b/configs/envs/runtime_env.yaml
index 1525a2f02aa7778be9fbbc37baca1e6468c98cba..7ac3d617a32ee76ae3008875a1d8253408323c66 100644
--- a/configs/envs/runtime_env.yaml
+++ b/configs/envs/runtime_env.yaml
@@ -8,6 +8,15 @@ env_vars:
HCCL_IF_BASE_PORT: '48000'
CUDA_DEVICE_MAX_CONNECTIONS: '1'
HYDRA_FULL_ERROR: '1'
+ VLLM_DP_SIZE: '1'
+ HCCL_BUFFSIZE: '256'
+ VLLM_USE_V1: '1'
+ VLLM_VERSION: '0.9.0'
+ VLLM_ENABLE_GRAPH_MODE: '0'
+ VLLM_ENABLE_MC2: '0'
+ HCCL_OP_EXPANSION_MODE: "AIV"
+ VLLM_ENABLE_TOPK_OPTIMZE: "1"
+
# GLOO_SOCKET_IFNAME: "Your SOCKET IFNAME"
# TP_SOCKET_IFNAME: "Your SOCKET IFNAME"
# HCCL_SOCKET_IFNAME: "Your SOCKET IFNAME"
\ No newline at end of file
diff --git a/configs/grpo_qwen25_32b_A3.yaml b/configs/grpo_qwen25_32b_A3.yaml
index 69da5f52e943a4dca21adfe16524ef1dcf6a16d7..c48e1a930d498dddf14b10253523f1c321c95df7 100644
--- a/configs/grpo_qwen25_32b_A3.yaml
+++ b/configs/grpo_qwen25_32b_A3.yaml
@@ -79,6 +79,7 @@ rl_config:
guarantee_order: true
generate_config:
+ enforce_eager: True
trust_remote_code: true
offload_train_optimizer: true
offload_train_grad: true
diff --git a/configs/grpo_qwen25_7b_A3.yaml b/configs/grpo_qwen25_7b_A3.yaml
index 1bcab0b53d53f64401dbde0d9d4b198255e348ae..f452fc342b47fc16a3a04fa201c14cf68d28d824 100644
--- a/configs/grpo_qwen25_7b_A3.yaml
+++ b/configs/grpo_qwen25_7b_A3.yaml
@@ -81,6 +81,7 @@ rl_config:
num_npus: 16
generate_config:
+ enforce_eager: True
trust_remote_code: true
offload_train_optimizer: true
offload_train_grad: true
diff --git a/docs/install_guide.md b/docs/install_guide.md
index 6c0071944f0e463ba0a41da11d1866abed764c2d..6272a7097530dafbc14c2172354ff0c7a9a1243f 100644
--- a/docs/install_guide.md
+++ b/docs/install_guide.md
@@ -37,16 +37,16 @@ pip install apex-0.1.dev*.whl
```shell
git clone https://github.com/vllm-project/vllm.git
cd vllm
-git checkout v0.7.3
+git checkout 5bc1ad6cee754405464a9957e86cf3a9302e4986
pip install -r requirements-build.txt
VLLM_TARGET_DEVICE=empty pip install .
```
### vllm_ascend安装
```shell
-git clone -b v0.7.3-dev https://github.com/vllm-project/vllm-ascend.git
+git clone -b main https://github.com/vllm-project/vllm-ascend.git
cd vllm-ascend
-git checkout 0713836e95fe993feefe334945b5b273e4add1f1
+git checkout 908a851
pip install -e .
```
diff --git a/examples/infer/infer_vllm_deepseekv3_671b.sh b/examples/infer/infer_vllm_deepseekv3_671b.sh
deleted file mode 100644
index 4bbbbca299efe9fe7e2e1ce5f4d3d5654a92294b..0000000000000000000000000000000000000000
--- a/examples/infer/infer_vllm_deepseekv3_671b.sh
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/bash
-
-export GLOO_SOCKET_IFNAME="Your SOCKET IFNAME"
-export TP_SOCKET_IFNAME="Your SOCKET IFNAME"
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-
-GPUS_PER_NODE=8
-MASTER_ADDR="host ip"
-MASTER_PORT=6001
-NNODES=4
-NODE_RANK="node rank"
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $GPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-
-INFER_ARGS="
- --tokenizer-name-or-path 'your huggingface config path' \
- --load-format megatron \
- --load 'megatron weight path' \
- --tensor-parallel-size 32 \
- --task chat \
-"
-
-torchrun $DISTRIBUTED_ARGS cli/infer_vllm.py \
- $INFER_ARGS \
- --query "Write an essay about the importance of higher education." \
- --distributed-backend nccl
diff --git a/mindspeed_rl/config_cls/generate_config.py b/mindspeed_rl/config_cls/generate_config.py
index 15df52ea631dcb5e898a8266545f22115c11ae42..3c13f23d8207e982514495832cb72b68a3505443 100644
--- a/mindspeed_rl/config_cls/generate_config.py
+++ b/mindspeed_rl/config_cls/generate_config.py
@@ -25,6 +25,9 @@ class GenerateConfig(BaseConfig):
dtype: Data type for model weights. Default is "bfloat16".
gpu_memory_utilization: GPU memory utilization factor. Default is 0.5.
+ enforce_eager: Whether to always use eager-mode PyTorch. If True, we will disable ACL graph and always execute the model in eager mode.
+ If False, we will use ACL graph and eager execution in hybrid for maximal performance and flexibility.
+
sampling_config: Configuration for text generation sampling. Default values are set for various sampling parameters.
- num_completions: The number of independent completions to generate for each input prompt. Default is 1.
- logprobs: The number of top tokens to return log probabilities for. Default is 1.
@@ -72,6 +75,7 @@ class GenerateConfig(BaseConfig):
self.enable_prefix_caching = False
self.num_scheduler_steps = 1
+ self.enforce_eager = False
# 采样配置的默认值,用于生成文本时的采样策略设置
self.sampling_config = {
diff --git a/mindspeed_rl/config_cls/megatron_config.py b/mindspeed_rl/config_cls/megatron_config.py
index fe15a82241fb734675f5497b2e48a870bc6557a2..44d7ceadd586affcbd7598da2ab25c73e17f5ce4 100644
--- a/mindspeed_rl/config_cls/megatron_config.py
+++ b/mindspeed_rl/config_cls/megatron_config.py
@@ -119,6 +119,7 @@ class MegatronConfig(BaseConfig):
tensor_model_parallel_size: Size of tensor model parallelism (default: 1)
pipeline_model_parallel_size: Size of pipeline model parallelism (default: 1)
expert_model_parallel_size: Degree of expert model parallelism (default: 1)
+ num_layers_per_virtual_pipeline_stage: Degree of vpp (default: None)
lr: Learning rate (default: None)
lr_decay_style: Learning rate decay style (default: 'linear')
min_lr: Minimum learning rate (default: 0.0)
@@ -198,6 +199,7 @@ class MegatronConfig(BaseConfig):
reuse_fp32_param: The distributed training optimizer frees up 'param copies of FP32 to save memory. (default: False)
moe_tp_extend_ep: use tp group to extend experts parallelism instead of sharding weight tensor of experts in tp group
moe_alltoall_overlap_comm: moe_alltoall_overlap_comm
+ noop_layers: noop layers string
'''
def __init__(self, training_config: Dict, model_config: Dict):
@@ -305,6 +307,7 @@ class MegatronConfig(BaseConfig):
self.tensor_model_parallel_size = 1
self.pipeline_model_parallel_size = 1
self.expert_model_parallel_size = 1
+ self.num_layers_per_virtual_pipeline_stage = None
self.lr = None
self.lr_decay_style = 'linear'
self.min_lr = 0.0
@@ -358,5 +361,6 @@ class MegatronConfig(BaseConfig):
self.overlap_param_gather = False
self.recompute_activation_function = False
self.swap_attention = False
+ self.noop_layers = None
self.update(training_config, model_config)
diff --git a/mindspeed_rl/config_cls/validate_config.py b/mindspeed_rl/config_cls/validate_config.py
index e037bcd842ea71f8c4f005154eab0785fd9cdd9e..dbeba666c060e0c3b8c26917f6d879f4e3dd1a7a 100644
--- a/mindspeed_rl/config_cls/validate_config.py
+++ b/mindspeed_rl/config_cls/validate_config.py
@@ -29,6 +29,7 @@ def validate_rl_args(
raise ValueError(
f"integrated_mode_config should not be set when use_integrated_worker mode is off.")
+
# 校验序列长度与模型最大长度
if generate_config.max_model_len < actor_config.seq_length:
raise ValueError(
diff --git a/mindspeed_rl/models/base/base_inference_engine.py b/mindspeed_rl/models/base/base_inference_engine.py
index 5719cd8099588cd791a381ada13be941bc2ffe2a..81a1971628a67d8e9717669a3c48c5005ad1e4ff 100644
--- a/mindspeed_rl/models/base/base_inference_engine.py
+++ b/mindspeed_rl/models/base/base_inference_engine.py
@@ -17,6 +17,7 @@ class BaseInferEngine(ABC):
prompt_type: str = None,
prompt_type_path: str = None,
train_expert_parallel_size: int = 1,
+ train_context_parallel_size: int = 1,
infer_tensor_parallel_size: int = 8,
infer_pipeline_parallel_size: int = 1,
infer_expert_parallel_size: int = 1,
@@ -34,6 +35,7 @@ class BaseInferEngine(ABC):
train_tensor_parallel_size (int): Tensor parallel size during training.
train_pipeline_parallel_size (int): Pipeline parallel size during training.
train_expert_parallel_size (int): Expert parallel size during training.
+ train_context_parallel_size (int): Context parallel size during training.
infer_tensor_parallel_size (int): Tensor parallel size during inference.
infer_pipeline_parallel_size (int): Pipeline parallel size during inference.
infer_expert_parallel_size (int): Expert parallel size during inference.
@@ -49,6 +51,7 @@ class BaseInferEngine(ABC):
self.train_tensor_parallel_size = train_tensor_parallel_size
self.train_pipeline_parallel_size = train_pipeline_parallel_size
self.train_expert_parallel_size = train_expert_parallel_size
+ self.train_context_parallel_size = train_context_parallel_size
self.infer_tensor_parallel_size = infer_tensor_parallel_size
self.infer_pipeline_parallel_size = infer_pipeline_parallel_size
self.infer_expert_parallel_size = infer_expert_parallel_size
diff --git a/mindspeed_rl/models/base/base_training_engine.py b/mindspeed_rl/models/base/base_training_engine.py
index 55a4b6b46c57e924bd7ae912a5276277381b99cb..3c3d97e0735c888bc441dd4f3aef43baaf096126 100644
--- a/mindspeed_rl/models/base/base_training_engine.py
+++ b/mindspeed_rl/models/base/base_training_engine.py
@@ -92,6 +92,9 @@ class BaseTrainingEngine(ABC):
shuffle_mini_batch=self.shuffle_mini_batch)
n_micro_batch = len(batches)
seq_len = batches[0]['input_ids'].shape[1]
+ data_iter = iter(batches)
+ if len(self.model) > 1:
+ data_iter = [iter(batches) for _ in self.model]
self.loss_func.add_loss_meta_info(self.get_loss_meta_func())
@@ -104,7 +107,7 @@ class BaseTrainingEngine(ABC):
# batch should be a list of batches inside micro-batches
losses_reduced = self.forward_backward_func(
forward_step_func=forward_step,
- data_iterator=iter(batches),
+ data_iterator=data_iter,
model=self.model,
num_microbatches=n_micro_batch,
seq_length=seq_len,
diff --git a/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py b/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py
index a6264f29f466b73a13a3d78801b282c67ed1556d..3323579eb3debd927b2cb7396fca012da586cc21 100644
--- a/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py
+++ b/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py
@@ -7,13 +7,6 @@ import torch
import torch.nn as nn
from transformers.configuration_utils import PretrainedConfig
-from vllm.model_executor.layers.linear import (
- ColumnParallelLinear, MergedColumnParallelLinear, QKVParallelLinear,
- RowParallelLinear, ReplicatedLinear)
-from vllm.model_executor.layers.fused_moe.layer import FusedMoE
-from vllm.model_executor.layers.vocab_parallel_embedding import ParallelLMHead, VocabParallelEmbedding
-from vllm.model_executor.models import ModelRegistry
-
class InferParallelConfig:
def __init__(self, infer_tensor_parallel_size: int, infer_pipeline_parallel_size: int, infer_expert_parallel_size: int):
@@ -88,14 +81,15 @@ def deepseek_megatron_weight_loader(actor_weights: Dict, vllm_model: nn.Module,
if name not in params_dict.keys():
raise ValueError(f"unexpected key {name} in deepseek_megatron_weight_loader")
if "mlp.experts.w13_weight" in name:
- loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts, hf_config.hidden_size, -1).transpose(2, 1).contiguous())
+ loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts // infer_paralle_config.infer_expert_parallel_size, hf_config.hidden_size, -1).transpose(2, 1).contiguous())
if "mlp.experts.w2_weight" in name:
- loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts, -1, hf_config.hidden_size).transpose(2, 1).contiguous())
+ loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts // infer_paralle_config.infer_expert_parallel_size, -1, hf_config.hidden_size).transpose(2, 1).contiguous())
load_single_weight(params_dict, name, loaded_weight)
return vllm_model
def _get_model_weight_loader(arch: str):
+ from vllm.model_executor.models import ModelRegistry
if arch in MODEL_MEGATRON_WEIGHT_LOADER_REGISTRY:
return MODEL_MEGATRON_WEIGHT_LOADER_REGISTRY[arch]
raise ValueError(f"Model architectures {arch} are not supported for now. "
@@ -146,6 +140,23 @@ def load_single_weight(params_dict, name, loaded_weight):
def update_megatron_weight_loader():
+ from vllm.model_executor.layers.linear import (
+ ColumnParallelLinear, MergedColumnParallelLinear, QKVParallelLinear,
+ RowParallelLinear, ReplicatedLinear)
+ from vllm.model_executor.layers.fused_moe.layer import FusedMoE
+ from vllm.model_executor.layers.vocab_parallel_embedding import ParallelLMHead, VocabParallelEmbedding
+
+ LAYER_WEIGHT_MEGATRON_LOADER_REGISTRY = {
+ ColumnParallelLinear: parallel_weight_loader,
+ MergedColumnParallelLinear: parallel_weight_loader,
+ QKVParallelLinear: parallel_weight_loader,
+ RowParallelLinear: parallel_weight_loader,
+ VocabParallelEmbedding: parallel_weight_loader,
+ ParallelLMHead: parallel_weight_loader,
+ ReplicatedLinear: parallel_weight_loader,
+ FusedMoE: parallel_weight_loader
+ }
+
for layer_class, weight_loader in LAYER_WEIGHT_MEGATRON_LOADER_REGISTRY.items():
layer_class.weight_loader = weight_loader
@@ -176,16 +187,6 @@ MODEL_MEGATRON_WEIGHT_LOADER_REGISTRY = {
"LlamaForCausalLM": llama_megatron_core_weight_loader,
"Qwen2ForCausalLM": qwen_megatron_weight_loader,
"DeepseekV3ForCausalLM": deepseek_megatron_weight_loader,
-}
-
-
-LAYER_WEIGHT_MEGATRON_LOADER_REGISTRY = {
- ColumnParallelLinear: parallel_weight_loader,
- MergedColumnParallelLinear: parallel_weight_loader,
- QKVParallelLinear: parallel_weight_loader,
- RowParallelLinear: parallel_weight_loader,
- VocabParallelEmbedding: parallel_weight_loader,
- ParallelLMHead: parallel_weight_loader,
- ReplicatedLinear: parallel_weight_loader,
- FusedMoE: parallel_weight_loader
+ "DeepseekV2ForCausalLM": deepseek_megatron_weight_loader,
+ "CustomDeepseekV3ForCausalLM": deepseek_megatron_weight_loader,
}
diff --git a/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py b/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py
index abcd38be61970ae4c03cd6cb2f74ec8bf029b09b..0de4687dbbcc6aa10df8574ad4d8a80c4a109dca 100644
--- a/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py
+++ b/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py
@@ -4,11 +4,18 @@
"""Model and data parallel groups."""
import os
+import re
+import socket
+import subprocess
+from datetime import timedelta
from typing import Optional
import torch
-import torch.distributed
+import torch.distributed as dist
import vllm.distributed.parallel_state as ps
+import vllm_ascend.distributed.parallel_state as ascend_ps
+import vllm.envs as envs
+from vllm.config import get_current_vllm_config
from vllm.distributed.parallel_state import (
get_pp_group,
@@ -17,6 +24,10 @@ from vllm.distributed.parallel_state import (
init_model_parallel_group,
)
+from mindspeed_rl.utils.loggers import Loggers
+
+logger = Loggers(__name__)
+
"""
This version is strongly tied with Megatron to implement HybridEngine and weight sharing between vllm and Megatron.
@@ -31,6 +42,12 @@ _DEVICE_MESH = None
_TP = None
# Pipeline model parallel group that the current rank belongs to.
_PP = None
+# Expert model parallel group that the current rank belongs to.
+_EP = None
+# Expert tensor model parallel group that the current rank belongs to.
+_ETP = None
+# Data model parallel group that the current rank belongs to.
+_DP = None
# Tensor model parallel group
_TP_GROUP_RANKS = None
@@ -42,16 +59,20 @@ def get_vllm_tp_group_ranks():
# This method is for initializing the ParallelGroup when using HybridEngine
def initialize_parallel_state(
- distributed_init_method: str = "env://",
- backend: str = "hccl",
- infer_tensor_model_parallel_size: int = 1,
- train_tensor_model_parallel_size: int = 1,
- infer_pipeline_model_parallel_size: int = 1,
- train_pipeline_model_parallel_size: int = 1
+ distributed_init_method: str = "env://",
+ backend: str = "hccl",
+ infer_tensor_model_parallel_size: int = 1,
+ train_tensor_model_parallel_size: int = 1,
+ infer_pipeline_model_parallel_size: int = 1,
+ train_pipeline_model_parallel_size: int = 1,
+ infer_expert_tensor_parallel_size: int = 1,
+ train_expert_tensor_parallel_size: int = 1,
+ train_expert_model_parallel_size: int = 1,
+ infer_expert_model_parallel_size: int = 1,
+ train_context_model_parallel_size: int = 1,
):
os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"
-
# NOTE(sgm): Modify for verl, Env vars will be set by TORCHRUN.
rank = int(os.getenv("RANK", "-1"))
local_rank = int(os.getenv("LOCAL_RANK", "0"))
@@ -60,6 +81,8 @@ def initialize_parallel_state(
world_size = int(os.getenv("WORLD_SIZE", "-1"))
if world_size == -1:
raise ValueError("The world_size is set to -1, not initialized by TORCHRUN")
+ config = get_current_vllm_config()
+ config.parallel_config.tensor_parallel_size = infer_tensor_model_parallel_size
init_distributed_environment(world_size, rank, distributed_init_method, local_rank, backend)
if torch.distributed.get_world_size() > 1:
# NOTE: build a sepearate inference group with infer tp & micro dp
@@ -67,54 +90,29 @@ def initialize_parallel_state(
infer_tensor_model_parallel_size=infer_tensor_model_parallel_size,
train_tensor_model_parallel_size=train_tensor_model_parallel_size,
infer_pipeline_model_parallel_size=infer_pipeline_model_parallel_size,
- train_pipeline_model_parallel_size=train_pipeline_model_parallel_size
+ train_pipeline_model_parallel_size=train_pipeline_model_parallel_size,
+ infer_expert_tensor_parallel_size=infer_expert_tensor_parallel_size,
+ train_expert_tensor_parallel_size=train_expert_tensor_parallel_size,
+ train_expert_model_parallel_size=train_expert_model_parallel_size,
+ infer_expert_model_parallel_size=infer_expert_model_parallel_size,
+ train_context_model_parallel_size=train_context_model_parallel_size
)
else:
initialize_model_parallel(infer_tensor_model_parallel_size, infer_pipeline_model_parallel_size, backend)
-def ensure_model_parallel_initialized(
- tensor_model_parallel_size: int,
- pipeline_model_parallel_size: int = 1,
- backend: Optional[str] = None,
-) -> None:
- """Helper to initialize model parallel groups if they are not initialized,
- or ensure tensor-parallel and pipeline-parallel sizes are equal to expected
- values if the model parallel groups are initialized.
- """
- # get the backend of _DEVICE_WORLD_GROUP
- backend = backend or torch.distributed.get_backend(get_world_group().device_group)
- if not model_parallel_is_initialized():
- initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size, backend)
- return
-
- current_tp_size = get_tensor_model_parallel_world_size()
- if current_tp_size != tensor_model_parallel_size:
- raise ValueError(
- "tensor parallel group already initialized, but of unexpected size: "
- f"{current_tp_size=} vs. "
- f"{tensor_model_parallel_size=}"
- )
- pp_world_size = get_pp_group().world_size
- if pp_world_size != pipeline_model_parallel_size:
- raise ValueError(
- "pipeline parallel group already initialized, but of unexpected size: "
- f"{pp_world_size=} vs. "
- f"{pipeline_model_parallel_size=}"
- )
-
-
-def model_parallel_is_initialized():
- """Check if tensor and pipeline parallel groups are initialized."""
- return ps._TP is not None
- # and _PIPELINE_MODEL_PARALLEL_GROUP is not None)
-
-
def initialize_model_parallel_for_vllm(
- infer_tensor_model_parallel_size: int,
- train_tensor_model_parallel_size: int = 1,
- infer_pipeline_model_parallel_size: int = 1,
- train_pipeline_model_parallel_size: int = 1
+ infer_tensor_model_parallel_size: int,
+ train_tensor_model_parallel_size: int = 1,
+ infer_pipeline_model_parallel_size: int = 1,
+ train_pipeline_model_parallel_size: int = 1,
+ infer_expert_tensor_parallel_size: int = 1,
+ train_expert_tensor_parallel_size: int = 1,
+ train_expert_model_parallel_size: int = 1,
+ infer_expert_model_parallel_size: int = 1,
+ train_context_model_parallel_size: int = 1,
+ num_process: int = 1,
+ rebulid_EP_group: bool = False
) -> None:
# Get world size and rank. Ensure some consistencies.
@@ -149,8 +147,10 @@ def initialize_model_parallel_for_vllm(
Returns: list of group_lists
[[g0, g1], [g2, g3], [g4, g5], [g6, g7]]
'''
- if ((world_size // (train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size < infer_tensor_model_parallel_size or
- ((world_size // (train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size) % infer_tensor_model_parallel_size != 0):
+ if ((world_size // (
+ train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size < infer_tensor_model_parallel_size or
+ ((world_size // (
+ train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size) % infer_tensor_model_parallel_size != 0):
raise ValueError(
f"Can't split train tp size {train_tensor_model_parallel_size} to infer tp size {infer_tensor_model_parallel_size} "
f"with train dp size {(world_size // (train_tensor_model_parallel_size * train_pipeline_model_parallel_size))}.")
@@ -179,16 +179,13 @@ def initialize_model_parallel_for_vllm(
[[g0, g2], [g1, g3], [g4, g6], [g5, g7]]
'''
if train_tensor_model_parallel_size < infer_tensor_model_parallel_size or train_tensor_model_parallel_size % infer_tensor_model_parallel_size != 0:
- raise ValueError(f"Can't gather train tp size {train_tensor_model_parallel_size} to infer tp size {infer_tensor_model_parallel_size}")
+ raise ValueError(
+ f"Can't gather train tp size {train_tensor_model_parallel_size} to infer tp size {infer_tensor_model_parallel_size}")
num_tensor_model_parallel_groups = world_size // infer_tensor_model_parallel_size
- num_tensor_model_parallel_groups_per_train_tp = train_tensor_model_parallel_size // infer_tensor_model_parallel_size
group_ranks = []
- for i in range(num_tensor_model_parallel_groups // num_tensor_model_parallel_groups_per_train_tp):
- start = train_tensor_model_parallel_size * i
- end = train_tensor_model_parallel_size * (i + 1)
- for j in range(num_tensor_model_parallel_groups_per_train_tp):
- ranks = list(range(start + j, end, num_tensor_model_parallel_groups_per_train_tp))
- group_ranks.append(ranks)
+ for i in range(num_tensor_model_parallel_groups):
+ ranks = list(range(i * infer_tensor_model_parallel_size, (i + 1) * infer_tensor_model_parallel_size))
+ group_ranks.append(ranks)
return group_ranks
@@ -201,9 +198,11 @@ def initialize_model_parallel_for_vllm(
_TP_GROUP_RANKS = tp_group_ranks
return tp_group_ranks
+ tp_group_ranks = get_tp_group_ranks()
+ logger.info(f"TP rank: {tp_group_ranks}")
_TP = init_model_parallel_group(
- group_ranks=get_tp_group_ranks(),
+ group_ranks=tp_group_ranks,
local_rank=get_world_group().local_rank,
backend=backend,
use_message_queue_broadcaster=True,
@@ -218,16 +217,134 @@ def initialize_model_parallel_for_vllm(
ranks = list(range(i, world_size, num_pipeline_model_parallel_groups))
group_ranks.append(ranks)
# pipeline parallel does not need custom allreduce
+ logger.info(f"PP rank: {group_ranks}")
_PP = init_model_parallel_group(
group_ranks, get_world_group().local_rank, backend,
)
ps._PP = _PP # for verl
+ data_parallel_size = 1
+ from vllm.config import get_current_vllm_config
+ config = get_current_vllm_config()
+
+ if config is not None:
+ data_parallel_size = config.parallel_config.data_parallel_size
+
+ num_expert_parallel_groups: int = infer_expert_tensor_parallel_size
+ num_expert_tensor_parallel_groups: int = world_size // infer_expert_tensor_parallel_size
+
+ num_rank_per_process = world_size // num_process
+ all_ranks = list(range(world_size))
+
+ global _EP
+ assert _EP is None, ("expert parallel group is already initialized")
+ group_ranks = []
+
+ if rebulid_EP_group:
+ # 重新建组
+ tensor_model_parallel_size = train_tensor_model_parallel_size
+ context_parallel_size = train_context_model_parallel_size
+ expert_model_parallel_size = train_expert_model_parallel_size
+ train_data_parallel_size = world_size // tensor_model_parallel_size // train_pipeline_model_parallel_size
+ tensor_and_data_group_size_with_cp: int = tensor_model_parallel_size * train_data_parallel_size * context_parallel_size
+ num_tensor_and_data_groups_with_cp: int = world_size // tensor_and_data_group_size_with_cp
+ num_expert_groups: int = train_data_parallel_size * context_parallel_size // expert_model_parallel_size
+ tensor_and_expert_group_size = tensor_model_parallel_size * expert_model_parallel_size
+ all_tensor_and_expert_group_ranks = []
+ for i in range(num_tensor_and_data_groups_with_cp):
+ for j in range(num_expert_groups):
+ start_rank = i * tensor_and_data_group_size_with_cp + j * tensor_and_expert_group_size
+ end_rank = i * tensor_and_data_group_size_with_cp + (j + 1) * tensor_and_expert_group_size
+ ranks = range(start_rank, end_rank)
+ all_tensor_and_expert_group_ranks.append(list(ranks))
+ train_all_tensor_and_expert_group_ranks_tensor = torch.tensor(all_tensor_and_expert_group_ranks)
+ # 将训练态的EPG按照推理EP进行转置
+ infer_actual_expert_model_parallel_size = infer_tensor_model_parallel_size * infer_expert_model_parallel_size
+ experts_memory_expend_N = infer_actual_expert_model_parallel_size // tensor_and_expert_group_size
+ ep_group_num = world_size // tensor_and_expert_group_size
+ group_ranks = []
+ for i in range(0, ep_group_num, experts_memory_expend_N):
+ per_ep_group = train_all_tensor_and_expert_group_ranks_tensor[i:i + experts_memory_expend_N]
+ per_ep_group_T = per_ep_group.T
+ ranks = per_ep_group_T.reshape(-1).tolist()
+ group_ranks.append(ranks)
+ logger.info(f"EP rank: {group_ranks}")
+
+ else:
+ # 保序
+ group_ranks = []
+ tensor_model_parallel_size = infer_tensor_model_parallel_size
+ context_parallel_size = 1
+ expert_model_parallel_size = infer_expert_model_parallel_size
+ infer_data_parallel_size = world_size // tensor_model_parallel_size // infer_pipeline_model_parallel_size
+ tensor_and_data_group_size_with_cp: int = tensor_model_parallel_size * infer_data_parallel_size * context_parallel_size
+ num_tensor_and_data_groups_with_cp: int = world_size // tensor_and_data_group_size_with_cp
+ num_expert_groups: int = infer_data_parallel_size * context_parallel_size // expert_model_parallel_size
+ tensor_and_expert_group_size = tensor_model_parallel_size * expert_model_parallel_size
+ group_ranks = []
+ for i in range(num_tensor_and_data_groups_with_cp):
+ for j in range(num_expert_groups):
+ start_rank = i * tensor_and_data_group_size_with_cp + j * tensor_and_expert_group_size
+ end_rank = i * tensor_and_data_group_size_with_cp + (j + 1) * tensor_and_expert_group_size
+ ranks = range(start_rank, end_rank)
+ group_ranks.append(list(ranks))
+ logger.info(f"EP rank: {group_ranks}")
+
+ ascend_ps._EP = init_model_parallel_group(group_ranks,
+ get_world_group().local_rank,
+ backend,
+ group_name="ep")
+
+ global _ETP
+ assert _ETP is None, (
+ "expert tensor parallel group is already initialized")
+
+ group_ranks = []
+ for i in range(num_expert_tensor_parallel_groups):
+ ranks = list(range(i * infer_expert_tensor_parallel_size,
+ (i + 1) * infer_expert_tensor_parallel_size))
+ group_ranks.append(ranks)
+ logger.info(f"ETP rank: {group_ranks}")
+
+ ascend_ps._ETP = init_model_parallel_group(group_ranks,
+ get_world_group().local_rank,
+ backend,
+ group_name="etp")
+
+ if data_parallel_size > 1:
+ global _DP
+ assert _DP is None, ("data parallel group is already initialized")
+ dp_group_ranks = torch.tensor(tp_group_ranks).transpose(0, 1).reshape(-1, data_parallel_size).unbind(0)
+ group_ranks = [x.tolist() for x in dp_group_ranks]
+ logger.info(f"DP rank: {group_ranks}")
+
+ ps._DP = init_model_parallel_group(group_ranks,
+ get_world_group().local_rank,
+ backend,
+ group_name="dp")
+
+ os.environ["VLLM_DP_RANK"] = str(ps._DP.rank_in_group)
+ envs.VLLM_DP_RANK = int(os.environ["VLLM_DP_RANK"])
+ ip_list = get_cluster_info()
+
+ for index, group_rank in enumerate(group_ranks):
+ if torch.distributed.get_rank() in group_rank:
+ os.environ["VLLM_DP_MASTER_PORT"] = str(
+ int(os.environ.get("MASTER_PORT")) + 1 + index)
+ os.environ["VLLM_DP_MASTER_IP"] = ip_list[group_rank[0]]
+
+ envs.VLLM_DP_MASTER_IP = os.environ["VLLM_DP_MASTER_IP"]
+ envs.VLLM_DP_MASTER_PORT = int(os.environ["VLLM_DP_MASTER_PORT"])
+ os.environ["VLLM_PORT"] = os.environ["VLLM_DP_MASTER_PORT"]
+ envs.VLLM_PORT = envs.VLLM_DP_MASTER_PORT
+
+ logger.info(f"rank: {torch.distributed.get_rank()}>>>>>>VLLM_DP_MASTER_IP: {envs.VLLM_DP_MASTER_IP}, VLLM_DP_MASTER_PORT: {envs.VLLM_DP_MASTER_PORT}")
+
def initialize_model_parallel(
- tensor_model_parallel_size: int = 1,
- pipeline_model_parallel_size: int = 1,
- backend: Optional[str] = None,
+ tensor_model_parallel_size: int = 1,
+ pipeline_model_parallel_size: int = 1,
+ backend: Optional[str] = None,
) -> None:
"""
NOTE: This method is a hack from the open-sourced version without
@@ -260,8 +377,6 @@ def initialize_model_parallel(
world_size: int = torch.distributed.get_world_size()
backend = backend or torch.distributed.get_backend(ps.get_world_group().device_group)
-
-
num_tensor_model_parallel_groups: int = world_size // tensor_model_parallel_size
global _TP
if _TP is not None:
@@ -295,3 +410,60 @@ def initialize_model_parallel(
ps._PP = _PP # for verl
+
+def get_cluster_info():
+ # 确保分布式环境已初始化
+ if not dist.is_initialized():
+ raise RuntimeError("Distributed environment not initialized")
+
+ world_size = dist.get_world_size()
+
+ # 获取当前节点的IP地址
+ ip_address = _get_current_node_ip()
+
+ # 收集所有rank的IP地址
+ ip_list = [None] * world_size
+ dist.all_gather_object(ip_list, ip_address)
+
+ return ip_list
+
+
+def _get_current_node_ip() -> str:
+ try:
+ # 创建一个 UDP 套接字(仅用于获取接口信息)
+ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
+ # 连接到一个外部地址(无需真实通信)
+ s.connect(("8.8.8.8", 80)) # Google DNS 服务器
+ local_ip = s.getsockname()[0]
+ except Exception:
+ local_ip = _get_ip_by_ifname()
+ if not local_ip:
+ # 如果失败,回退到遍历接口
+ local_ip = "127.0.0.1"
+ hostname = socket.gethostname()
+ for addr in socket.getaddrinfo(hostname, None):
+ ip = addr[4][0]
+ if not ip.startswith("::"):
+ local_ip = ip
+ break
+ return local_ip
+
+
+def _get_ip_by_ifname():
+ """
+ 通过接口名称(如 eth0、en0)获取 IPv4 地址
+ 返回 IP 字符串,失败返回 None
+ """
+ try:
+ # 执行 ifconfig 命令并捕获输出
+ ifname = os.environ.get("HCCL_SOCKET_IFNAME", 0)
+ if ifname:
+ output = subprocess.check_output(["ifconfig", ifname], stderr=subprocess.STDOUT).decode()
+ # 正则匹配 IPv4 地址(排除 127.0.0.1)
+ matches = re.findall(r'inet (?:addr:)?((?:\d{1,3}\.){3}\d{1,3})', output)
+ for ip in matches:
+ if ip != "127.0.0.1":
+ return ip
+ return None
+ except subprocess.CalledProcessError:
+ return None
diff --git a/mindspeed_rl/models/rollout/vllm_engine.py b/mindspeed_rl/models/rollout/vllm_engine.py
index 649f36988d74ad77ac2ee53db87346b295bf12d0..c4fff6f6d21f75c97e4549aa7a16ec72b5b98012 100644
--- a/mindspeed_rl/models/rollout/vllm_engine.py
+++ b/mindspeed_rl/models/rollout/vllm_engine.py
@@ -9,10 +9,10 @@ import gc
import ray
import torch
import torch.distributed
+import torch_npu
+from torch_npu.contrib import transfer_to_npu
from torch.nn.utils.rnn import pad_sequence
from transformers import AutoConfig
-from vllm import LLM, SamplingParams
-from mindspeed_rl.utils.utils import mstx_timer_decorator
def dummy_compile(*compile_args, **compile_kwargs):
@@ -23,14 +23,14 @@ def dummy_compile(*compile_args, **compile_kwargs):
return wrapper
return decorate
-
-torch.jit.script = dummy_compile
torch.compile = dummy_compile
+torch.jit.script = dummy_compile
+from vllm import LLM, SamplingParams
+from vllm.v1.core.kv_cache_utils import get_kv_cache_config, unify_kv_cache_configs
from mindspeed_rl.utils.loggers import Loggers
from mindspeed_rl.models.base.base_inference_engine import BaseInferEngine
-from mindspeed_rl.config_cls.megatron_config import MegatronConfig
from mindspeed_rl.models.rollout.vllm_adapter.vllm_parallel_state import initialize_parallel_state
from mindspeed_rl.models.rollout.vllm_adapter.megatron_weight_loaders import (
load_megatron_weights,
@@ -49,11 +49,12 @@ class VLLMInferEngine(BaseInferEngine):
train_tensor_parallel_size: int,
train_pipeline_parallel_size: int,
train_expert_parallel_size: int,
+ train_context_parallel_size: int,
infer_tensor_parallel_size: int,
infer_pipeline_parallel_size: int,
infer_expert_parallel_size: int,
- megatron_config: MegatronConfig,
sampling_config: dict,
+ infer_expert_tensor_parallel_size: int = 1,
prompt_type: str = None,
prompt_type_path: str = None,
enable_prefix_caching: bool = False,
@@ -64,6 +65,7 @@ class VLLMInferEngine(BaseInferEngine):
gpu_memory_utilization: float = 0.5,
trust_remote_code: bool = True,
load_format: str = "megatron",
+ enforce_eager: bool = False,
**kwargs
):
"""
@@ -74,10 +76,12 @@ class VLLMInferEngine(BaseInferEngine):
train_tensor_parallel_size (int): Tensor parallel size during training.
train_pipeline_parallel_size (int): Pipeline parallel size during training.
train_expert_parallel_size (int): Expert parallel size during training.
+ train_context_parallel_size (int): Context parallel size during training.
infer_tensor_parallel_size (int): Tensor parallel size during inference.
infer_pipeline_parallel_size (int): Pipeline parallel size during inference.
infer_expert_parallel_size (int): Expert parallel size during inference.
sampling_config (dict): Configuration for text generation sampling.
+ infer_expert_tensor_parallel_size (int): Expert tensor parallel size during inference.
enable_prefix_caching (bool): Whether to enable prefix caching.
num_scheduler_steps (int): Num scheduler steps. Default is 1.
max_num_seqs (int): Maximum number of sequences to process simultaneously. Default is 1.
@@ -95,6 +99,7 @@ class VLLMInferEngine(BaseInferEngine):
train_tensor_parallel_size=train_tensor_parallel_size,
train_pipeline_parallel_size=train_pipeline_parallel_size,
train_expert_parallel_size=train_expert_parallel_size,
+ train_context_parallel_size=train_context_parallel_size,
infer_tensor_parallel_size=infer_tensor_parallel_size,
infer_pipeline_parallel_size=infer_pipeline_parallel_size,
infer_expert_parallel_size=infer_expert_parallel_size,
@@ -105,6 +110,12 @@ class VLLMInferEngine(BaseInferEngine):
trust_remote_code=trust_remote_code
)
# Additional initialization logic for VLLMInferEngine
+
+ torch.compile = dummy_compile
+ # vLLM Ascend must be patched in advance
+ from vllm_ascend.patch import platform
+ from vllm_ascend.patch import worker
+
# Initialize sampling parameters from SamplingConfig
self.sampling_config = sampling_config
try:
@@ -112,13 +123,11 @@ class VLLMInferEngine(BaseInferEngine):
n=sampling_config.get('num_completions', 1),
logprobs=sampling_config.get('logprobs', 1),
max_tokens=sampling_config.get('max_tokens', 128),
- best_of=sampling_config.get('best_of', 2),
top_p=sampling_config.get('top_p', 1.0),
top_k=sampling_config.get('top_k', 50),
min_p=sampling_config.get('min_p', 0.0),
temperature=sampling_config.get('temperature', 0.2),
- detokenize=sampling_config.get('detokenize', False),
- seed=sampling_config.get('seed', None)
+ detokenize=sampling_config.get('detokenize', False)
)
except Exception as e:
raise ValueError(f"Error creating SamplingParams from dictionary") from e
@@ -139,7 +148,6 @@ class VLLMInferEngine(BaseInferEngine):
# Initialize parallel state if tensor parallel size is specified
if train_tensor_parallel_size is not None:
- num_tp_per_train_tp = train_tensor_parallel_size // infer_tensor_parallel_size
os.environ['CUDA_TIMER_STREAM_KAFKA_ENABLE'] = '0'
os.environ['MEGATRON_IMPORT_TIMERS'] = '0'
initialize_parallel_state(
@@ -147,32 +155,39 @@ class VLLMInferEngine(BaseInferEngine):
train_tensor_model_parallel_size=train_tensor_parallel_size,
infer_pipeline_model_parallel_size=infer_pipeline_parallel_size,
train_pipeline_model_parallel_size=train_pipeline_parallel_size,
+ train_expert_model_parallel_size=train_expert_parallel_size,
+ infer_expert_model_parallel_size=infer_expert_parallel_size,
+ train_context_model_parallel_size=train_context_parallel_size
)
if load_format == "megatron":
update_megatron_weight_loader()
- torch.jit.script = dummy_compile
- torch.compile = dummy_compile
-
# Initialize the LLM engine
self.llm = LLM(
+ seed=1234,
model=tokenizer_name_or_path,
trust_remote_code=trust_remote_code,
tensor_parallel_size=infer_tensor_parallel_size,
- load_format="dummy" if load_format == "megatron" else "auto",
+ load_format='dummy' if load_format == 'megatron' else load_format,
distributed_executor_backend="external_launcher",
enable_prefix_caching=enable_prefix_caching,
num_scheduler_steps=num_scheduler_steps,
dtype=dtype,
- enforce_eager=False,
+ enforce_eager=enforce_eager,
skip_tokenizer_init=False,
gpu_memory_utilization=gpu_memory_utilization,
max_num_seqs=max_num_seqs,
- max_model_len=max_model_len
+ max_model_len=max_model_len,
+ additional_config={
+ 'expert_tensor_parallel_size': infer_expert_tensor_parallel_size,
+ 'enable_graph_mode': int(os.environ.get('VLLM_ENABLE_GRAPH_MODE', '0')),
+ 'ascend_scheduler_config': {},
+ }
)
self.model = self.llm.llm_engine.model_executor.driver_worker.worker.model_runner.get_model()
+ self.kv_cache_configs = None
self.cpu_model = {}
for name, params in self.model.named_parameters():
@@ -180,14 +195,61 @@ class VLLMInferEngine(BaseInferEngine):
if load_format == "megatron":
self.free_cache_engine()
+ if os.environ['VLLM_USE_V1'] == '1':
+ self._initialize_kv_caches(self.llm.llm_engine.vllm_config)
self.offload_model_weights()
+ from vllm.config import VllmConfig
+
+ def _initialize_kv_caches(self, vllm_config: VllmConfig):
+
+ # Get all kv cache needed by the model
+ kv_cache_specs = self.llm.llm_engine.engine_core.engine_core.model_executor.get_kv_cache_specs()
+
+ # Profiles the peak memory usage of the model to determine how much
+ # memory can be allocated for kv cache.
+ available_gpu_memory = self.llm.llm_engine.engine_core.engine_core.model_executor.determine_available_memory()
+
+ assert len(kv_cache_specs) == len(available_gpu_memory)
+ # Get the kv cache tensor size
+ self.kv_cache_configs = [
+ get_kv_cache_config(vllm_config, kv_cache_spec_one_worker,
+ available_gpu_memory_one_worker)
+ for kv_cache_spec_one_worker, available_gpu_memory_one_worker in
+ zip(kv_cache_specs, available_gpu_memory)
+ ]
+
+ # Since we use a shared centralized controller, we need the
+ # `kv_cache_config` to be consistent across all workers to make sure
+ # all the memory operators can be applied to all workers.
+ unify_kv_cache_configs(self.kv_cache_configs)
+
+ # All workers have the same kv_cache_config except layer names, so use
+ # an arbitrary one to initialize the scheduler.
+ assert all([
+ cfg.num_blocks == self.kv_cache_configs[0].num_blocks
+ for cfg in self.kv_cache_configs
+ ])
+
def init_cache_engine(self):
- if self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine is None:
- self.llm.llm_engine.model_executor.driver_worker.worker._init_cache_engine()
+ if os.environ['VLLM_USE_V1'] == '1':
+ worker = self.llm.llm_engine.model_executor.driver_worker.worker
+ if not worker.model_runner.kv_caches:
+ # v1 使用显式初始化方法
+ self.llm.llm_engine.engine_core.engine_core.model_executor.initialize_from_config(
+ self.kv_cache_configs)
+ else:
+ if self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine is None:
+ self.llm.llm_engine.model_executor.driver_worker.worker._init_cache_engine()
def free_cache_engine(self):
- ctx = self.llm.llm_engine.model_executor.driver_worker.worker.compilation_config.static_forward_context
+ if os.environ['VLLM_USE_V1'] == '1':
+ worker = self.llm.llm_engine.model_executor.driver_worker.worker
+
+ ctx = worker.model_runner.vllm_config.compilation_config.static_forward_context
+
+ else:
+ ctx = self.llm.llm_engine.model_executor.driver_worker.worker.compilation_config.static_forward_context
from vllm.attention import AttentionType
layer_need_kv_cache = []
@@ -201,10 +263,14 @@ class VLLMInferEngine(BaseInferEngine):
for _ in range(pipeline_parallel_size):
kv_cache.append(torch.tensor([]))
ctx[layer_name].kv_cache = kv_cache
-
- self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine = None
- self.llm.llm_engine.model_executor.driver_worker.worker.gpu_cache = None
-
+ if os.environ['VLLM_USE_V1'] == '1':
+ worker = self.llm.llm_engine.model_executor.driver_worker.worker
+
+ # 清理缓存引擎
+ worker.model_runner.kv_caches = []
+ else:
+ self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine = None
+ self.llm.llm_engine.model_executor.driver_worker.worker.gpu_cache = None
if hasattr(self.model.model.layers[0].self_attn, "attn"):
for i in range(self.model.model.start_layer, self.model.model.end_layer):
attn_impl = self.model.model.layers[i].self_attn.attn.impl
@@ -212,18 +278,26 @@ class VLLMInferEngine(BaseInferEngine):
attn_impl.key_cache = None
attn_impl.value_cache = None
- self.gpu_cache = None
-
gc.collect()
torch.cuda.empty_cache()
+
def offload_model_weights(self):
for name, params in self.model.named_parameters():
params.data = self.cpu_model[name]
+ if hasattr(self.model.model.layers[-1].self_attn, "mla_attn"):
+ for i in range(self.model.model.start_layer, self.model.model.end_layer):
+ mla = self.model.model.layers[i].self_attn.mla_attn.impl
+ if hasattr(mla, "w_kc"):
+ mla.w_kc = None
+ mla.w_vc = None
+ if hasattr(mla, "W_UV"):
+ mla.W_UV = None
+ mla.W_UK_T = None
def sync_model_weights(self, params, load_format='megatron'):
infer_parallel_config = InferParallelConfig(self.infer_tensor_parallel_size, self.infer_pipeline_parallel_size,
- self.infer_expert_parallel_size)
+ self.infer_expert_parallel_size * self.infer_tensor_parallel_size)
load_megatron_weights(params,
self.model,
infer_parallel_config,
@@ -237,9 +311,12 @@ class VLLMInferEngine(BaseInferEngine):
if hasattr(mla, "w_kc"):
mla.w_kc = None
mla.w_vc = None
+ if hasattr(mla, "W_UV"):
+ mla.W_UV = None
+ mla.W_UK_T = None
+ mla.process_weights_after_loading(None)
@torch.no_grad()
- @mstx_timer_decorator
def generate_sequences(self, idx_list, **kwargs):
self.init_cache_engine()
with self.update_sampling_params(**kwargs):
@@ -325,4 +402,4 @@ def get_local_rank() -> int:
# Default to 0 (for testing or single-process scenarios)
logger.warning("Warning: Unable to determine local rank. Defaulting to 0.")
- return 0
+ return 0
\ No newline at end of file
diff --git a/mindspeed_rl/utils/tokenizer.py b/mindspeed_rl/utils/tokenizer.py
index eb4e9a7dbe702dec16a726368f76299fa5e1abb0..73c75f902ba47e420f4f3f6cd38e252897e66787 100644
--- a/mindspeed_rl/utils/tokenizer.py
+++ b/mindspeed_rl/utils/tokenizer.py
@@ -259,8 +259,8 @@ class _HuggingFaceTokenizer(BaseTokenizer):
def tokenize(self, text):
return self.tokenizer(text).input_ids
- def detokenize(self, token_ids):
- return self.tokenizer.decode(token_ids)
+ def detokenize(self, token_ids, **kwargs):
+ return self.tokenizer.decode(token_ids, **kwargs)
@property
def eod(self):
diff --git a/mindspeed_rl/workers/actor_hybrid_worker.py b/mindspeed_rl/workers/actor_hybrid_worker.py
index bd5d3f80d710dcc90d9b73fef90172db5488e8e1..7970079ab6d1a6ae30aa6ee96ed1959cd10e8319 100644
--- a/mindspeed_rl/workers/actor_hybrid_worker.py
+++ b/mindspeed_rl/workers/actor_hybrid_worker.py
@@ -84,6 +84,9 @@ class ActorHybridWorkerBase(BaseWorker):
self.inference_model = self._build_rollout()
self.sharding_manager = self._build_sharding_manager()
+
+ if self.generate_config.offload_train_param:
+ self.actor_offloader.onload_param()
self.actor_hybrid = ActorRolloutHybrid(
self.model,
@@ -172,7 +175,7 @@ class ActorHybridWorkerBase(BaseWorker):
self.args.consumed_train_samples += self.megatron_config.global_batch_size // self.rl_config.n_samples_per_prompt
self.num_floating_point_operations_so_far += num_floating_point_operations(self.args,
self.megatron_config.global_batch_size)
- if self.parallel_state.is_pipeline_last_stage() and self.parallel_state.get_tensor_model_parallel_rank() == 0:
+ if self.parallel_state.is_pipeline_last_stage(ignore_virtual=True) and self.parallel_state.get_tensor_model_parallel_rank() == 0:
ray.get(self.td.update_metrics.remote(value=metrics, cumulate=True))
ray.get(
self.td.update_metrics.remote(
@@ -352,6 +355,7 @@ class ActorHybridWorkerBase(BaseWorker):
train_tensor_parallel_size=self.megatron_config.tensor_model_parallel_size,
train_pipeline_parallel_size=self.megatron_config.pipeline_model_parallel_size,
train_expert_parallel_size=self.megatron_config.expert_model_parallel_size,
+ train_context_parallel_size=self.megatron_config.context_parallel_size,
infer_tensor_parallel_size=self.generate_config.infer_tensor_parallel_size,
infer_pipeline_parallel_size=self.generate_config.infer_pipeline_parallel_size,
infer_expert_parallel_size=self.generate_config.infer_expert_parallel_size,
@@ -363,9 +367,9 @@ class ActorHybridWorkerBase(BaseWorker):
max_model_len=self.generate_config.max_model_len,
dtype=self.generate_config.dtype,
gpu_memory_utilization=self.generate_config.gpu_memory_utilization,
- trust_remote_code=self.generate_config.trust_remote_code
+ trust_remote_code=self.generate_config.trust_remote_code,
+ enforce_eager=self.generate_config.enforce_eager,
)
-
return rollout
def _build_sharding_manager(self):
@@ -385,7 +389,8 @@ class ActorHybridWorkerBase(BaseWorker):
grad_offload=self.generate_config.offload_train_grad,
train_param_offload=self.generate_config.offload_train_param,
enable_validate=self.rl_config.enable_sharding_validate,
- megatron_offloader=self.actor_offloader
+ megatron_offloader=self.actor_offloader,
+ noop_layers=self.megatron_config.noop_layers
)
return sharding_manager
diff --git a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py
index d9f7026e962dff336b747a69b2483cf9e0e6cff6..1f98553351010291cd7a92abd2bac5856a481814 100644
--- a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py
+++ b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py
@@ -21,20 +21,27 @@ Manager used to shard weight and offload/onload optimizer from training stage to
from itertools import chain
from collections import defaultdict
+import os
import torch
-import torch.distributed
+import torch.distributed as dist
+import vllm.distributed.parallel_state as ps
+from mindspeed_rl.utils.loggers import Loggers
from mindspeed_rl.workers.resharding.vllm_weight_container import MegatronStyleVllmWeightContainer
from mindspeed_rl.workers.resharding.weight_adaptor import get_weight_adaptor
from mindspeed_rl.utils.utils import mstx_timer_decorator
+logger = Loggers(
+ name="vllm_engine_inference",
+)
+
+
class MegatronOffLoader:
def __init__(self, megatron_model=None, optimizer=None, wrap_with_ddp=True):
self.optimizer = optimizer
self.model = megatron_model
self.wrap_with_ddp = wrap_with_ddp
-
self.tensor_to_cpu_states_map = dict()
@mstx_timer_decorator
@@ -51,18 +58,29 @@ class MegatronOffLoader:
@mstx_timer_decorator
def offload_optimizer(self):
- for param_group in self.optimizer.optimizer.param_groups:
- for param in param_group['params']:
- param.data = param.data.to("cpu", non_blocking=False)
- self.optimizer.optimizer.state = self._move_to_device(self.optimizer.optimizer.state, "cpu")
+ if hasattr(self.optimizer, "chained_optimizers"):
+ optimizers = self.optimizer.chained_optimizers
+ else:
+ optimizers = [self.optimizer]
+ for optimizer in optimizers:
+ for param_group in optimizer.optimizer.param_groups:
+ for param in param_group['params']:
+ param.data = param.data.to("cpu", non_blocking=False)
+ optimizer.optimizer.state = self._move_to_device(optimizer.optimizer.state,
+ "cpu")
@mstx_timer_decorator
def onload_optimizer(self):
- for param_group in self.optimizer.optimizer.param_groups:
- for param in param_group['params']:
- param.data = param.data.to(torch.cuda.current_device(), non_blocking=False)
- self.optimizer.optimizer.state = self._move_to_device(self.optimizer.optimizer.state,
- torch.cuda.current_device())
+ if hasattr(self.optimizer, "chained_optimizers"):
+ optimizers = self.optimizer.chained_optimizers
+ else:
+ optimizers = [self.optimizer]
+ for optimizer in optimizers:
+ for param_group in optimizer.optimizer.param_groups:
+ for param in param_group['params']:
+ param.data = param.data.to(torch.cuda.current_device(), non_blocking=False)
+ optimizer.optimizer.state = self._move_to_device(optimizer.optimizer.state,
+ torch.cuda.current_device())
@mstx_timer_decorator
def _move_to_device(self, data, device):
@@ -133,7 +151,8 @@ class MegatronShardingManager:
num_layer_list=None,
moe_tp_extend_ep=None,
parallel_state=None,
- megatron_offloader=None
+ megatron_offloader=None,
+ noop_layers=None
):
"""Megatron Sharding Manager initialization.
@@ -169,13 +188,13 @@ class MegatronShardingManager:
moe_tp_extend_ep=moe_tp_extend_ep,
parallel_state=parallel_state,
weight_adaptor=self.weight_adaptor,
- enable_validate=enable_validate)
+ enable_validate=enable_validate,
+ noop_layers=noop_layers)
self.optimizer_offload = optimizer_offload
self.grad_offload = grad_offload
self.train_param_offload = train_param_offload
self.enable_validate = enable_validate
- self.use_distributed_optimizer = self.optimizer.config.use_distributed_optimizer
self.inference_engine.offload_model_weights()
self.megatron_offloader = megatron_offloader
@@ -206,8 +225,6 @@ class MegatronShardingManager:
3. do resharding
4. offload training param
"""
- if self.train_param_offload:
- self.megatron_offloader.onload_param()
self.onload_infer_params()
@@ -215,9 +232,9 @@ class MegatronShardingManager:
if self.train_param_offload:
self.megatron_offloader.offload_param()
-
self.inference_engine.sync_model_weights(infer_params, load_format='megatron')
+
@mstx_timer_decorator
def exit_infer_mode(self):
"""
diff --git a/mindspeed_rl/workers/resharding/memory_buffer.py b/mindspeed_rl/workers/resharding/memory_buffer.py
index 022a94690bee4da6df387bc198be3d36036203f9..98106d40715c053968609ab96b36d2f0ce9f66c8 100644
--- a/mindspeed_rl/workers/resharding/memory_buffer.py
+++ b/mindspeed_rl/workers/resharding/memory_buffer.py
@@ -71,7 +71,7 @@ class MemoryBuffer:
if param_name not in self.tensor_indices:
raise KeyError(f"Parameter {param_name} not found in the buffer.")
- start_index, shape = self.tensor_indices[param_name]
+ start_index, shape = self.tensor_indices[param_name] # weight_name -- index shape
return self.get(shape, start_index)
@@ -82,6 +82,15 @@ def calc_padded_numel(shape: torch.Size, dtype: torch.dtype):
return (numel + align_numel - 1) // align_numel * align_numel
+# 构建EP增大的buffer———构造一个experts_weight_buffer_meta
+def get_weight_buffer_meta_from_buffer(weight_buffer_meta) -> Dict[str, Dict]:
+ experts_weight_buffer_meta = {}
+ for name, meta_info in sorted(weight_buffer_meta.items()):
+ if "mlp.experts" in name:
+ experts_weight_buffer_meta[name] = meta_info
+ return experts_weight_buffer_meta
+
+
def build_memory_buffer(weight_buffer_meta: Dict[str, Dict]) -> Dict[torch.dtype, MemoryBuffer]:
"""Build the memory buffer given weight_buffer_meta
@@ -123,8 +132,61 @@ def build_memory_buffer(weight_buffer_meta: Dict[str, Dict]) -> Dict[torch.dtype
return memory_buffers
+def build_experts_memory_buffer(experts_weight_buffer_meta: Dict[str, Dict], experts_memory_expend_N) -> Dict[torch.dtype, MemoryBuffer]:
+ """Build the experts memory buffer given experts_weight_buffer_meta
+
+ Args:
+ weight_buffer_meta: contains mapping from name to a dictionary containing shape and dtype of the tensors
+
+ Returns: a large memory buffer for each dtype that can hold all the tensors
+
+ """
+ experts_memory_buffers = {}
+ total_numel_map = {} # map from dtype to the total numel
+
+ for _, meta_info in sorted(experts_weight_buffer_meta.items()):
+ shape = meta_info['shape']
+ shape = torch.Size([experts_memory_expend_N, shape[0], shape[1], shape[2]])
+ dtype = meta_info['dtype']
+
+ if not isinstance(shape, torch.Size):
+ raise TypeError("Shape must be an instance of torch.Size")
+ if not isinstance(dtype, torch.dtype):
+ raise TypeError("dtype must be an instance of torch.dtype")
+ if dtype not in total_numel_map:
+ total_numel_map[dtype] = 0
+
+ tmp_numel = calc_padded_numel(shape, dtype)
+ total_numel_map[dtype] += tmp_numel
+
+
+ for dtype, total_numel in total_numel_map.items():
+ # Create a buffer for each dtype with the total numel
+ experts_memory_buffers[dtype] = MemoryBuffer(total_numel, total_numel, dtype)
+
+ # Now, insert each tensor's index and shape for later retrieval by name
+ current_index_map = {} # This keeps track of the current memory index for each dtype
+ for name, meta_info in sorted(experts_weight_buffer_meta.items()):
+ shape = meta_info['shape']
+ shape = torch.Size([experts_memory_expend_N, shape[0], shape[1], shape[2]])
+ dtype = meta_info['dtype']
+ buffer = experts_memory_buffers[dtype]
+ tensor_size = calc_padded_numel(shape, dtype)
+
+ start_index = current_index_map.get(dtype, 0)
+ current_index_map[dtype] = start_index + tensor_size
+
+ buffer.tensor_indices[name] = (start_index, shape)
+
+ return experts_memory_buffers
+
+
def build_model_weight_buffer(model: nn.Module, names_per_pp: List[str], get_weight_buffer_meta):
- memory_buffers = [ModelWeightBuffer(model, weight_names, get_weight_buffer_meta) for weight_names in names_per_pp]
+ combined_names_per_pp = [[] for _ in names_per_pp]
+ for pp_rank, vpp_stages in enumerate(names_per_pp):
+ for weight_names_per_stage in vpp_stages:
+ combined_names_per_pp[pp_rank].extend(weight_names_per_stage)
+ memory_buffers = [ModelWeightBuffer(model, weight_names, get_weight_buffer_meta) for weight_names in combined_names_per_pp]
return memory_buffers
@@ -139,7 +201,7 @@ class ModelWeightBuffer:
self.weight_buffer_meta = self.get_weight_buffer_meta(self.model, weight_names)
self.weight_names = list(self.weight_buffer_meta.keys())
self.memory_buffers = None
- # self.memory_buffers = build_memory_buffer(self.weight_buffer_meta)
+
def __getitem__(self, weight_name: str) -> torch.Tensor:
return self.get_weight_by_name(weight_name)
diff --git a/mindspeed_rl/workers/resharding/vllm_weight_container.py b/mindspeed_rl/workers/resharding/vllm_weight_container.py
index a783e90663e8d0241f617cb15b292043173c7ff5..15664ee375f9f4b5feab383a7631b70b5d9ae096 100644
--- a/mindspeed_rl/workers/resharding/vllm_weight_container.py
+++ b/mindspeed_rl/workers/resharding/vllm_weight_container.py
@@ -25,12 +25,16 @@ import torch.distributed as dist
import numpy as np
from torch.distributed import new_group
+import vllm.distributed.parallel_state as ps
-from mindspeed_rl.workers.resharding.memory_buffer import build_model_weight_buffer
+from mindspeed_rl.workers.resharding.memory_buffer import build_model_weight_buffer, calc_padded_numel
import mindspeed_rl.workers.resharding.utils
from mindspeed_rl.workers.resharding.utils import get_tensor_parallel_partition_dim, tp_md5_validate, \
update_md5_by_rank, compute_md5, validate_md5, _build_infer_param_dict, get_tp_allgather_group, \
get_tp_allgather_world_size, is_tensor_parallel_param, get_tp_group, is_fake_tp_param
+from mindspeed_rl.utils.loggers import Loggers
+
+logger = Loggers(__name__)
class MegatronStyleVllmWeightContainer:
@@ -42,7 +46,8 @@ class MegatronStyleVllmWeightContainer:
moe_tp_extend_ep=False,
parallel_state=None,
weight_adaptor=None,
- enable_validate=False) -> None:
+ enable_validate=False,
+ noop_layers=None) -> None:
""" Megatron style vllm weight container.
Arguments:
@@ -64,16 +69,26 @@ class MegatronStyleVllmWeightContainer:
self.megatron_model = megatron_model
self.parallel_state = parallel_state
self.weight_adaptor = weight_adaptor
- self._num_hidden_layers = self.model_config.num_hidden_layers
+ self._num_hidden_layers = self.model_config.num_hidden_layers # 通过tokenier路径下的config.json获取hf的模型
+ self._noop_layers = None
+ if noop_layers is not None:
+ self._noop_layers = [int(layer_idx) for layer_idx in noop_layers.split(',')]
+ self._num_hidden_layers += len(self._noop_layers)
# pp configs
self._pp_rank = self.parallel_state.get_pipeline_model_parallel_rank()
self._pp_group = self.parallel_state.get_pipeline_model_parallel_group()
self._pp_size = self.parallel_state.get_pipeline_model_parallel_world_size()
+ self._world_size = dist.get_world_size()
+ self.pp_group_size = self._world_size // self._pp_size
+ ## vpp
self._num_layer_list = self._build_num_layer_list(num_layer_list)
- self._vpp_size = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK else 1
- self._vpp_rank = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE else 0
-
+ self._vpp_rank = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK else 0
+ self._vpp_size = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE else 1
+ self._vpp_layer_list = self._build_vpp_layer_list(self._num_layer_list)
+ ## _noop_layers
+ self._global2local_map = self._build_global2local_map(self._vpp_layer_list, self._vpp_size, self._noop_layers) if self._noop_layers is not None else None
+
# tp configs
self._tp_size = self.parallel_state.get_tensor_model_parallel_world_size()
self._tp_group = self.parallel_state.get_tensor_model_parallel_group()
@@ -97,7 +112,11 @@ class MegatronStyleVllmWeightContainer:
self._infer_ep_size = infer_expert_parallel_size
self.moe_tp_extend_ep = moe_tp_extend_ep
- self._world_size = dist.get_world_size()
+ # TODO: infer_expert_tensor_parallel_size and num_process is fixed.
+ self.infer_expert_tensor_parallel_size = 1
+ self.num_process = 1
+ self._infer_ep_size = self._infer_ep_size * self._infer_tp_size
+ self.experts_memory_expend_N = self._infer_ep_size // self._ep_size
# validate parallel configs
self._validate_parallel_config()
@@ -116,10 +135,9 @@ class MegatronStyleVllmWeightContainer:
def _validate_parallel_config(self):
if self._infer_pp_size != 1:
raise ValueError("infer_pp_size != 1 not supported yet")
- if self._infer_ep_size != 1:
- raise ValueError("infer_ep_size != 1 not supported yet")
- if self._ep_size > 1 and self._ep_size != self._infer_tp_size:
- raise ValueError("For training EP, supports EP -> TP only currently.")
+
+ if self._infer_ep_size % self._ep_size != 0:
+ raise ValueError("The training expert size should be divisibled by the inference expert size.")
if self._ep_size > 1 and not self.moe_tp_extend_ep:
raise ValueError("To enable training EP, you need to enable moe_tp_extend_ep and use GroupedMLP.")
if self._pp_size < self._infer_pp_size:
@@ -149,6 +167,12 @@ class MegatronStyleVllmWeightContainer:
self._update_weight_buffers_intra_pp()
self._update_weight_buffers_inter_pp()
+
+ # 执行_update_weight_buffers_ep+_send_receive_experts的前提条件
+ if(self.moe_tp_extend_ep and self._infer_ep_size >= self._ep_size):
+ self._update_weight_buffers_ep()
+ self._send_receive_experts()
+
params = self._get_all_params()
params = _build_infer_param_dict(params=params)
@@ -161,27 +185,55 @@ class MegatronStyleVllmWeightContainer:
raise ValueError("num_layers % pp_size == 0, please specify num_layer_list")
return [self._num_hidden_layers // self._pp_size for _ in range(self._pp_size)]
+ def _build_vpp_layer_list(self, num_layer_list):
+ if self._vpp_size <= 1:
+ return num_layer_list
+ for layers_in_pp_rank in num_layer_list:
+ if layers_in_pp_rank % self._vpp_size != 0:
+ raise ValueError("num_layers_per_pp % vpp_size != 0, please specify pp_size and vpp_size")
+ return [int(layers_in_pp_rank / self._vpp_size) for layers_in_pp_rank in num_layer_list]
+
+ def _build_global2local_map(self, layer_list, vpp_size, noop_layers):
+ stage_layers_num = sum(layer_list)
+ glb2local_map = []
+ for vpp_rank in range(vpp_size):
+ start_layer = vpp_rank * stage_layers_num
+ for _, layers_in_vpp_rank in enumerate(layer_list):
+ layer_idx_list = [
+ layer_idx for layer_idx in range(start_layer, start_layer + layers_in_vpp_rank)
+ if layer_idx not in noop_layers
+ ]
+ glb2local_map += [layer_idx % layers_in_vpp_rank for layer_idx in layer_idx_list]
+ start_layer += layers_in_vpp_rank
+
+ return glb2local_map
+
def _unwrap_megatron_model(self, model):
"""
Remove consecutive 'module.' prefixes from the model based on the state_dict's first key.
This method only removes 'module.' from the beginning of the key and ignores other occurrences.
"""
- model = model[0]
- first_key = list(dict(model.named_parameters()).keys())[0]
- while first_key.startswith("module."):
- model = model.module
- first_key = first_key[len("module."):] # 更新键,去掉一个module.
- return model
+ unwraped_model = []
+ for model_chunk in model:
+ first_key = list(dict(model_chunk.named_parameters()).keys())[0]
+ while first_key.startswith("module."):
+ model_chunk = model_chunk.module
+ first_key = first_key[len("module."):]
+ unwraped_model.append(model_chunk)
+ return unwraped_model
def _init_weight_buffers(self):
"""
Build buffers from vllm state dict. Totally build train pp_size buffers, each buffer corresponds to a pack of megatron weight.
Return a list of buffers, and a reference dict megatron_param_name->buffer.
"""
- vllm_names = list(dict(self.vllm_model.named_parameters()).keys())
- self.weight_names_per_pp = self.weight_adaptor.get_weight_names_per_pp(self._num_layer_list, vllm_names)
+ vllm_names = list(dict(self.vllm_model.named_parameters()).keys()) # 获取每个pp内部的weights name
+ self.weight_names_per_pp = self.weight_adaptor.get_weight_names_per_pp(self._vpp_layer_list, vllm_names,
+ sum(self._num_layer_list), self._vpp_size, self._noop_layers)
+
self.weight_buffers = build_model_weight_buffer(self.vllm_model, self.weight_names_per_pp,
- self.weight_adaptor.get_weight_buffer_meta)
+ self.weight_adaptor.get_weight_buffer_meta
+ )
def trans_ep_params_to_tp(self, megatron_param, name):
"""
@@ -264,7 +316,11 @@ class MegatronStyleVllmWeightContainer:
async_op=False
)
total_experts = self.num_local_experts * tp_size
- return torch.cat(output_tensor_list, dim=1).reshape(hidden_size, total_experts, -1).permute(1, 0, 2)
+ res = torch.cat(output_tensor_list, dim=1).reshape(hidden_size, total_experts, -1)
+ if 'weight2' in name:
+ return res.permute(1, 2, 0).contiguous()
+ return res.permute(1, 0, 2).contiguous()
+
def _update_weight_buffers_intra_pp(self):
"""
@@ -281,35 +337,107 @@ class MegatronStyleVllmWeightContainer:
return infer_param
pp_rank = self._pp_rank
- weight_buffer = self.weight_buffers[pp_rank]
+ weight_names = self.weight_names_per_pp[pp_rank]
+ weight_names_meta = self.weight_adaptor.convert_weight_name_meta(weight_names)
true_megatron_model = self._unwrap_megatron_model(self.megatron_model)
- normal_layer_func = partial(self.weight_adaptor.global2local_layer, num_layer_list=self._num_layer_list)
- name_pairs = sorted(list(set([(name, self.weight_adaptor.replace_name_i2t(normal_layer_func(name)))
- for name in weight_buffer.weight_names])))
+ normal_layer_func = partial(self.weight_adaptor.global2local_layer, num_layer_list=self._vpp_layer_list, global2local_map=self._global2local_map)
+ name_pairs = sorted(list(set([(name, vpp_rank, self.weight_adaptor.replace_name_i2t(normal_layer_func(name, vpp_rank=vpp_rank)))
+ for vpp_rank, names_per_vpp in enumerate(weight_names_meta) for name in names_per_vpp])))
+
if self.enable_validate:
self.origin_params_for_md5 = hashlib.md5()
self.infer_params_for_md5 = [hashlib.md5() for _ in range(get_tp_allgather_world_size())]
- for hf_name, megatron_name in name_pairs:
+
+ # 检查 linear_fc1 和 linear_fc2 权重形状是否符合特定关系(fc1 包含门控和扩展参数,因此大小是 fc2 的两倍)。不符合条件的模型不被支持。
+ for _, vpp_rank, megatron_name in name_pairs:
if megatron_name.endswith("linear_fc1.weight"):
fc2_name = megatron_name.replace("linear_fc1", "linear_fc2")
- megatron_param_fc1 = dict(true_megatron_model.named_parameters())[megatron_name]
- megatron_param_fc2 = dict(true_megatron_model.named_parameters())[fc2_name]
+ megatron_param_fc1 = dict(true_megatron_model[vpp_rank].named_parameters())[megatron_name]
+ megatron_param_fc2 = dict(true_megatron_model[vpp_rank].named_parameters())[fc2_name]
if megatron_param_fc1.shape[0] * megatron_param_fc1.shape[1] != megatron_param_fc2.shape[0] * \
megatron_param_fc2.shape[1] * 2:
raise ValueError("Only implemented for Llama model which linear_fc1 contains gate and up params.")
- megatron_params_dict = dict(true_megatron_model.named_buffers())
- megatron_params_dict.update(true_megatron_model.named_parameters())
- for hf_name, megatron_name in name_pairs:
- megatron_param = megatron_params_dict[megatron_name]
- param = _transfer_from_megatron_division(megatron_param, megatron_name)
- weight_buffer.copy_by_name(hf_name, param)
+ weight_buffer = self.weight_buffers[pp_rank]
+ megatron_params_dict = {}
+ for vpp_rank in range(self._vpp_size):
+ megatron_params_dict.update({vpp_rank: dict(true_megatron_model[vpp_rank].named_buffers())})
+ megatron_params_dict[vpp_rank].update(true_megatron_model[vpp_rank].named_parameters())
+
+ for hf_name, vpp_rank, megatron_name in name_pairs:
+ if((self._infer_ep_size > 1 or self._ep_size > 1) and "mlp.experts" in megatron_name):
+ pass
+ else:
+ megatron_param = megatron_params_dict[vpp_rank][megatron_name]
+ param = _transfer_from_megatron_division(megatron_param, megatron_name)
+ weight_buffer.copy_by_name(hf_name, param)
# tp md5 validate
if self.enable_validate:
tp_md5_validate(self.infer_params_for_md5, self.origin_params_for_md5,
f"rank[{self._rank}] tp params allgather")
+ def _update_weight_buffers_ep(self):
+ # 构造临时的experts_memory_buffers
+ for cur_pp_rank in range(self._pp_size):
+ pp_rank = self._pp_rank
+ from mindspeed_rl.workers.resharding.memory_buffer import build_experts_memory_buffer, get_weight_buffer_meta_from_buffer
+ # Step1 在当前的PP_rank中,设置一个临时的exprts_buffer
+ combined_names_per_pp = []
+ vpp_stages = self.weight_names_per_pp[cur_pp_rank]
+ for weight_names_per_stage in vpp_stages:
+ combined_names_per_pp.extend(weight_names_per_stage)
+ self.weight_buffer_meta = self.weight_adaptor.get_weight_buffer_meta(self.vllm_model, combined_names_per_pp)
+ self.experts_weight_buffer_meta = get_weight_buffer_meta_from_buffer(self.weight_buffer_meta)
+ self.experts_memory_buffers = build_experts_memory_buffer(self.experts_weight_buffer_meta, self.experts_memory_expend_N)
+
+ # Step2 将weights_buffer上对应的权重放到experts_buffer中
+ if(cur_pp_rank == pp_rank):
+ weight_names = self.weight_names_per_pp[pp_rank]
+ weight_names_meta = self.weight_adaptor.convert_weight_name_meta(weight_names)
+ normal_layer_func = partial(self.weight_adaptor.global2local_layer, num_layer_list=self._vpp_layer_list, global2local_map=self._global2local_map)
+ name_pairs = sorted(list(set([(name, vpp_rank, self.weight_adaptor.replace_name_i2t(normal_layer_func(name, vpp_rank=vpp_rank)))
+ for vpp_rank, names_per_vpp in enumerate(weight_names_meta) for name in names_per_vpp])))
+ true_megatron_model = self._unwrap_megatron_model(self.megatron_model)
+
+ megatron_params_dict = {}
+ # 拿到当前pp的所有权重
+ for vpp_rank in range(self._vpp_size):
+ megatron_params_dict.update({vpp_rank: dict(true_megatron_model[vpp_rank].named_buffers())})
+ megatron_params_dict[vpp_rank].update(true_megatron_model[vpp_rank].named_parameters())
+
+ for hf_name, vpp_rank, megatron_name in name_pairs:
+ if((self._infer_ep_size > 1 or self._ep_size > 1) and "mlp.experts" in megatron_name):
+ megatron_param = megatron_params_dict[vpp_rank][megatron_name]
+ dtype = self.experts_weight_buffer_meta[hf_name]['dtype']
+ self.experts_memory_buffers[dtype].copy_by_name(hf_name, megatron_param)
+
+ # Step3 后续的操作可以复用
+ global_src = dist.get_global_rank(group=self._pp_group, group_rank=cur_pp_rank)
+
+ # broadcast专家权重(experts memory buffer中的)
+ for dtype, experts_memory_buffer in self.experts_memory_buffers.items():
+ dist.broadcast(tensor=experts_memory_buffer.data, src=global_src, group=self._pp_group, async_op=False)
+ pp_group_rank = self._rank // self.pp_group_size
+
+ # 获取对应的dtype
+ for name, tensor_indices_value in sorted(experts_memory_buffer.tensor_indices.items()):
+ shape = tensor_indices_value[1] # 是*N的
+ index = pp_group_rank % self.experts_memory_expend_N
+ experts_tensor = experts_memory_buffer.get_by_name(name)
+ experts_tensor_reshape = experts_tensor.view(shape)
+ weight_tensor_infer = experts_tensor_reshape[index]
+ self.weight_buffers[cur_pp_rank].copy_by_name(name, weight_tensor_infer)
+
+ # 卸载专家的buffer
+ experts_memory_buffer = None
+ self.experts_memory_buffers[dtype] = None
+
+ for memory_buffer in self.experts_memory_buffers.values():
+ memory_buffer = None
+ self.experts_memory_buffers = None
+
+
def _update_weight_buffers_inter_pp(self):
"""
Update weight buffers by gathering weights from other pp stage.
@@ -328,6 +456,36 @@ class MegatronStyleVllmWeightContainer:
dist.broadcast(md5_tensor_src, group=self._pp_group, src=global_src, async_op=False)
validate_md5(md5_tensor_src, md5_tensor, f"rank[{self._rank}] pp resharding params")
+
+ def get_expert_router(self, cur_rank, train_tp_ep_size, infer_tp_ep_size, world_size):
+ for tp_ep_group_id in range(world_size // infer_tp_ep_size):
+ tp_ep_group = [i for i in range(tp_ep_group_id * infer_tp_ep_size, (tp_ep_group_id + 1) * infer_tp_ep_size)]
+ if cur_rank in tp_ep_group:
+ self.INFER_TP_EP_GROUP = tp_ep_group
+ stride = infer_tp_ep_size // train_tp_ep_size
+ dev_array = np.array(self.INFER_TP_EP_GROUP).reshape(stride, train_tp_ep_size)
+ src_router = np.squeeze(dev_array.transpose().reshape(1, infer_tp_ep_size)).tolist()
+ src = src_router[cur_rank % infer_tp_ep_size]
+ dst = self.INFER_TP_EP_GROUP[src_router.index(cur_rank)]
+ return src, dst
+
+ def _send_receive_experts(self):
+ cur_rank = dist.get_rank()
+ src_rank, dst_rank = self.get_expert_router(cur_rank, self._ep_size, self._infer_ep_size, self._world_size)
+ for cur_pp_rank in range(self._pp_size):
+ for memory_buffer in self.weight_buffers[cur_pp_rank].memory_buffers.values():
+ for name in sorted(memory_buffer.tensor_indices.keys()):
+ if "mlp.experts" in name:
+ # 做收发
+ tensor_to_send = memory_buffer.get_by_name(name)
+ tensor_to_replace = torch.empty_like(tensor_to_send)
+ send_op = dist.P2POp(dist.isend, tensor_to_send, dst_rank)
+ recv_op = dist.P2POp(dist.irecv, tensor_to_replace, src_rank)
+ reqs = dist.batch_isend_irecv([send_op, recv_op])
+ for req in reqs:
+ req.wait()
+ memory_buffer.copy_by_name(name, tensor_to_replace)
+
def _get_all_params(self):
"""Get all the parameters of the models in all pp ranks
@@ -353,7 +511,7 @@ class MegatronStyleVllmWeightContainer:
return
if self._tp_size % self._infer_tp_size != 0:
raise ValueError("self._tp_size must be divisible by self._infer_tp_size")
- tp_allgather_size = self._tp_size // self._infer_tp_size
+ tp_allgather_size = self._tp_size
if mindspeed_rl.workers.resharding.utils._TP_ALLGATHER_GROUP is not None:
raise RuntimeError("Group for allgather tensor model parallel weight is already initialized")
num_groups = self._world_size // tp_allgather_size
@@ -432,7 +590,7 @@ class MegatronStyleVllmWeightContainer:
2. split train_tp params into groups (size: infer_tp_size)
3. return the corresponding param from group based on infer tp rank
"""
- if self._infer_tp_size <= self._tp_size:
+ if self._infer_tp_size <= self._tp_size or is_fake_tp_param(name, self.moe_tp_extend_ep):
return param
tp_group = get_tp_group()
@@ -494,6 +652,9 @@ class MegatronStyleVllmWeightContainer:
torch.distributed.all_gather(infer_param, param, group=tp_allgather_group)
if self.enable_validate:
update_md5_by_rank(infer_param, param, self.origin_params_for_md5, self.infer_params_for_md5)
- infer_param = self._default_tp_concat_fn(name, param, infer_param)
+ part_len = len(infer_param) // self._infer_tp_size
+ start = self._rank % self._infer_tp_size
+ part_param = infer_param[part_len * start:part_len * (start + 1)]
+ infer_param = self._default_tp_concat_fn(name, param, part_param)
return infer_param
diff --git a/mindspeed_rl/workers/resharding/weight_adaptor.py b/mindspeed_rl/workers/resharding/weight_adaptor.py
index 19ea46c2857a0bdacb46cffe919cbedee2c516a0..9a333a94f63c5ac6ca14966370cce5ab0cdb631f 100644
--- a/mindspeed_rl/workers/resharding/weight_adaptor.py
+++ b/mindspeed_rl/workers/resharding/weight_adaptor.py
@@ -42,6 +42,7 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
def __init__(self, model_config):
super(MegatronVLLMWeightAdaptor, self).__init__()
self.model_config = model_config
+ self.meta_info = None
self.params_mapping = [
# (megatron core gpt model name, vllm model name)
("embedding.word_embeddings", "model.embed_tokens"),
@@ -92,6 +93,8 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
"""
pass
+ def convert_weight_name_meta(self, weight_names):
+ return weight_names
def get_weight_buffer_meta(self, model, valid_names=None):
weight_buffer_meta = {}
@@ -103,11 +106,12 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
return weight_buffer_meta
@staticmethod
- def global2local_layer(name, num_layer_list):
+ def global2local_layer(name, num_layer_list, vpp_rank=0, global2local_map=None):
"""
Transform the model name in each model_chunk in global space to local space
"""
layer_name = 'layers'
+ num_layer_offset = vpp_rank * sum(num_layer_list)
if layer_name in name: # belong to an intermediate layer
split_name = name.split('.')
@@ -122,12 +126,15 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
raise ValueError(f'split_name = {split_name}')
# increment layer_num_idx by layer_offset
- global_idx = int(split_name[layer_num_idx])
- for layers_in_pp in num_layer_list:
- global_idx -= layers_in_pp
- if global_idx < 0:
- local_index = global_idx + layers_in_pp
- break
+ if global2local_map is None:
+ global_idx = int(split_name[layer_num_idx]) - num_layer_offset
+ for layers_in_pp in num_layer_list:
+ global_idx -= layers_in_pp
+ if global_idx < 0:
+ local_index = global_idx + layers_in_pp
+ break
+ else:
+ local_index = global2local_map[int(split_name[layer_num_idx])]
split_name[layer_num_idx] = str(local_index)
name = '.'.join(split_name) # weight name in inference_tp_model
@@ -135,15 +142,27 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
return name
@staticmethod
- def get_weight_names_per_pp(layer_list, vllm_names):
+ def get_weight_names_per_pp(layer_list, vllm_names, layers_num=None, vpp_size=0, noop_layers=None):
+ ## add protection for default kwargs
+ if not layers_num:
+ if vpp_size > 0:
+ ValueError(f"layers_num is required with vpp_size = {vpp_size}")
+ layers_num = sum(layer_list)
- end_layer = sum(layer_list) - 1
+ end_layer = layers_num - 1
- def get_weight_names_in_range(layer_range, names: list, layer_name='layers') -> list:
+ def get_weight_names_in_range(layer_range, names: list, noop_layers=None, layer_name='layers') -> list:
"""
Extract weights in a given range and also include the weights before and after the range as needed.
"""
start, end = layer_range
+
+ layer_idx_list = [layer_idx for layer_idx in range(start, end + 1)]
+ if noop_layers:
+ layer_idx_list = [
+ layer_idx - sum(1 for i in noop_layers if i <= layer_idx) for layer_idx in layer_idx_list if
+ layer_idx not in noop_layers
+ ]
last_layer_index = end_layer
names_in_range = []
@@ -160,7 +179,7 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
match = re.match(r'.*\.layers\.(\d+)', name)
if match:
layer_num = int(match.group(1))
- if start <= layer_num <= end:
+ if layer_num in layer_idx_list:
names_in_range.append(name)
# add names after decode layers
@@ -172,13 +191,18 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor):
break
return names_in_range
- pp_layers_range = []
- start_layer = 0
- for layers_in_pp_rank in layer_list:
- pp_layers_range.append((start_layer, start_layer + layers_in_pp_rank - 1))
- start_layer += layers_in_pp_rank
- weight_names_per_pp = [get_weight_names_in_range(layer_range, vllm_names) for layer_range in pp_layers_range]
- return weight_names_per_pp
+ stage_layers_num = sum(layer_list)
+ weight_names_per_vpp_combined = [[] for _ in layer_list]
+ for vpp_rank in range(vpp_size):
+ start_layer = vpp_rank * stage_layers_num
+ for pp_rank, layers_in_vpp_rank in enumerate(layer_list):
+ vpp_layers_range = (start_layer, start_layer + layers_in_vpp_rank - 1)
+ weight_names_per_vpp = get_weight_names_in_range(vpp_layers_range, vllm_names, noop_layers)
+ weight_names_per_vpp_combined[pp_rank].append(weight_names_per_vpp)
+
+ start_layer += layers_in_vpp_rank
+
+ return weight_names_per_vpp_combined
class DeepSeekMVWeightAdaptor(MegatronVLLMWeightAdaptor):
@@ -187,6 +211,8 @@ class DeepSeekMVWeightAdaptor(MegatronVLLMWeightAdaptor):
"""
def __init__(self, model_config):
super(DeepSeekMVWeightAdaptor, self).__init__(model_config)
+ self.meta_info = {'replace': {'kv_a_proj_with_mqa': 'qkv_proj'},
+ 'delete': ['q_a_proj']}
self.params_mapping = [
# (megatron core gpt model name, vllm model name)
("embedding.word_embeddings", "model.embed_tokens"),
@@ -216,16 +242,48 @@ class DeepSeekMVWeightAdaptor(MegatronVLLMWeightAdaptor):
if valid_names and name not in valid_names:
continue
if 'kv_a_proj_with_mqa' in name:
- q_param = dict(model.named_parameters()).get(name.replace('kv_a_proj_with_mqa', 'q_a_proj'))
+ # 将kv_a_proj_with_mqa和q_a_proj的tensor拼接,并用qkv_proj和拼接的结果替换掉原来kv_a_proj_with_mqa的对应部分
+ q_param = dict(model.named_parameters()).get(name.replace('kv_a_proj_with_mqa', 'q_a_proj' if self.model_config.q_lora_rank else "q_proj"))
qkv_param_shape = torch.cat([q_param, param], dim=0).shape
qkv_name = name.replace('kv_a_proj_with_mqa', 'qkv_proj')
weight_buffer_meta[qkv_name] = {'shape': qkv_param_shape, 'dtype': param.dtype}
- elif 'q_a_proj' in name:
+ elif 'q_a_proj' in name or 'q_proj' in name:
continue
else:
weight_buffer_meta[name] = {'shape': param.shape, 'dtype': param.dtype}
return weight_buffer_meta
+ def convert_weight_name_meta(self, weight_names):
+ if not self.meta_info:
+ return weight_names
+
+ weight_names_meta = list()
+ for elements in weight_names:
+ if isinstance(elements, list):
+ tmp_weight_names_meta = self.convert_weight_name_meta(elements)
+ weight_names_meta.append(tmp_weight_names_meta)
+ else:
+ converted = False
+ if not converted and 'replace' in self.meta_info:
+ for key, value in self.meta_info['replace'].items():
+ if key in elements:
+ qkv_name = elements.replace(key, value)
+ weight_names_meta.append(qkv_name)
+ converted = True
+ break
+
+ if not converted and 'delete' in self.meta_info:
+ for key in self.meta_info['delete']:
+ if key in elements:
+ converted = True
+ break
+
+ if not converted:
+ weight_names_meta.append(elements)
+
+ return weight_names_meta
+
+
class QwenMVWeightAdaptor(MegatronVLLMWeightAdaptor):
"""
@@ -239,6 +297,7 @@ WEIGHT_ADAPTOR_REGISTRY = {
"Qwen2ForCausalLM": QwenMVWeightAdaptor,
"DeepseekV3ForCausalLM": DeepSeekMVWeightAdaptor,
"DeepseekV2ForCausalLM": DeepSeekMVWeightAdaptor,
+ "CustomDeepseekV3ForCausalLM": DeepSeekMVWeightAdaptor,
}
diff --git a/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml
index 362dce0e53c69a4735af4d8e9e502dff3496783b..66420287bc1cc6b9f6b2690dfee4fb2ec4b4aaa3 100644
--- a/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml
+++ b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml
@@ -85,6 +85,7 @@ rl_config:
num_npus: 8
generate_config:
+ enforce_eager: True
gen_micro_batch_size: 16
trust_remote_code: true
offload_train_optimizer: true
diff --git a/tests/st/infer_engine/test_module_entry_vllm_engine.sh b/tests/st/infer_engine/test_module_entry_vllm_engine.sh
deleted file mode 100644
index 8704a666899dd4af8fbe8ff5e65294e6555a4580..0000000000000000000000000000000000000000
--- a/tests/st/infer_engine/test_module_entry_vllm_engine.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/bin/bash
-
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-# 获取脚本的绝对路径
-SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)
-export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH
-
-GPUS_PER_NODE=1
-MASTER_ADDR=localhost
-MASTER_PORT=6555
-NNODES=1
-NODE_RANK=0
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $GPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-echo "start test_vllm_engine st"
-
-torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_vllm_engine.py --distribute-backend nccl
-torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_vllm_engine_multistep_decode.py --distribute-backend nccl
\ No newline at end of file
diff --git a/tests/st/infer_engine/test_vllm_engine.py b/tests/st/infer_engine/test_vllm_engine.py
deleted file mode 100644
index a4536dbfe765246cdd2a8ba9678b683abf6de9f9..0000000000000000000000000000000000000000
--- a/tests/st/infer_engine/test_vllm_engine.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright (c) Huawei Technologies Co., Ltd.2023-2025. All rights reserved.
-import os
-import logging
-
-import tensordict
-import torch
-from torch_npu.contrib import transfer_to_npu
-
-from mindspeed_rl.models.rollout.vllm_engine import VLLMInferEngine
-from mindspeed_rl.config_cls.megatron_config import MegatronConfig
-from mindspeed_rl.utils.loggers import Loggers
-
-tokenizer_name_or_path = "/data/for_dt/tokenizer/Llama-3.2-1B-Instruct/"
-weights_path = "/data/for_dt/weights/Llama-3.2-1B-tp1pp1/iter_0000001/mp_rank_00/model_optim_rng.pt"
-megatron_dict = {"num_attention_heads": 32,
- "tensor_model_parallel_size": 1,
- "num_query_groups": 8,
- "group_query_attention": True}
-sampling_config = {
- "num_completions": 1, # 每个输入提示生成的独立完成项数量
- "logprobs": 1, # 返回的 top token 的对数概率数量
- "max_tokens": 128, # 生成输出的最大 token 数量
- "best_of": 2, # 内部生成候选完成项的数量,从中选择最佳的一个
- "top_p": 1.0, # 核采样的累积概率阈值
- "top_k": 50, # 采样时考虑的最高概率 token 的数量
- "min_p": 0.0, # token 选择的最小概率阈值
- "temperature": 0.2, # 控制预测随机性的温度参数
- "detokenize": False # 是否将生成的 token 转换回可读字符串
-}
-
-
-def main():
- logger = Loggers(
- name="test_vllm_engine",
- )
- logger.info("start test_vllm_engine")
-
- conversation = [
- {
- "role": "system",
- "content": "You are a helpful assistant"
- },
- {
- "role": "user",
- "content": "Hello"
- },
- {
- "role": "assistant",
- "content": "Hello! How can I assist you today?"
- },
- {
- "role": "user",
- "content": "Write an essay about the importance of higher education.",
- },
- ]
-
- logger.info("load megatron weight")
- megatron_st = torch.load(weights_path)
- actor_weights = megatron_st['model']
-
- # 配置初始化所需的参数
-
- train_tensor_parallel_size = 1
- train_pipeline_parallel_size = 1
- infer_tensor_parallel_size = 1
- infer_pipeline_parallel_size = 1
- train_expert_parallel_size = 1
- infer_expert_parallel_size = 1
- max_num_seqs = 256
- trust_remote_code = True
-
- logger.info("enter vllmInferEngine")
-
- megatron_config = MegatronConfig(megatron_dict, {})
- megatron_config.num_attention_heads = 32
- megatron_config.tensor_model_parallel_size = 1
- megatron_config.num_query_groups = 8
- megatron_config.num_key_value_heads = 8
- megatron_config.group_query_attention = True
- # 初始化 VLLMInferEngine 实例
- inference_engine = VLLMInferEngine(
- megatron_config=megatron_config,
- sampling_config=sampling_config,
- train_expert_parallel_size=train_expert_parallel_size,
- infer_expert_parallel_size=infer_expert_parallel_size,
- tokenizer_name_or_path=tokenizer_name_or_path,
- train_tensor_parallel_size=train_tensor_parallel_size,
- train_pipeline_parallel_size=train_pipeline_parallel_size,
- infer_tensor_parallel_size=infer_tensor_parallel_size,
- infer_pipeline_parallel_size=infer_pipeline_parallel_size,
- max_num_seqs=max_num_seqs,
- trust_remote_code=trust_remote_code
- )
-
- logger.info("model inited")
- inference_engine.free_cache_engine()
- torch.cuda.empty_cache()
- logger.info("free_cache")
-
- inference_engine.offload_model_weights()
- logger.info("offload_model")
- torch.cuda.empty_cache()
- logger.info("empty_cache")
-
- logger.info("enter sync_model_weights")
- inference_engine.sync_model_weights(actor_weights)
-
- logger.info("enter init_cache_engine")
- inference_engine.init_cache_engine()
-
- logger.info("=" * 80)
- logger.info("start chat")
- outputs = inference_engine.chat(conversation)
- logger.info("chat result is ", outputs)
-
- idx_list = []
- idx_list_per_step = []
- for i in range(2):
- for j in range(4):
- tokens = torch.randint(100, (10,))
- idx_list_per_step.append(tokens.view(-1).cpu().numpy().tolist())
- idx_list.extend(idx_list_per_step)
- idx_list_per_step = []
- logger.info(len(idx_list), [len(i) for i in idx_list])
-
- logger.info("start test generate_sequences ")
- outputs = inference_engine.generate_sequences(
- idx_list=idx_list,
- )
- logger.info("generate_sequences output is:")
- logger.info(outputs[0])
-
- logger.info("input")
- logger.info(idx_list[0])
-
-
-if __name__ == "__main__":
- main()
diff --git a/tests/st/infer_engine/test_vllm_engine_multistep_decode.py b/tests/st/infer_engine/test_vllm_engine_multistep_decode.py
deleted file mode 100644
index d99d0eb4bb997f29f403503ce2921ebf334d9d86..0000000000000000000000000000000000000000
--- a/tests/st/infer_engine/test_vllm_engine_multistep_decode.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright (c) Huawei Technologies Co., Ltd.2023-2025. All rights reserved.
-import os
-import logging
-
-import tensordict
-import torch
-from torch_npu.contrib import transfer_to_npu
-
-from mindspeed_rl.models.rollout.vllm_engine import VLLMInferEngine
-from mindspeed_rl.config_cls.megatron_config import MegatronConfig
-from mindspeed_rl.utils.loggers import Loggers
-
-tokenizer_name_or_path = "/data/for_dt/tokenizer/Llama-3.2-1B-Instruct/"
-weights_path = "/data/for_dt/weights/Llama-3.2-1B-tp1pp1/iter_0000001/mp_rank_00/model_optim_rng.pt"
-megatron_dict = {"num_attention_heads": 32,
- "tensor_model_parallel_size": 1,
- "num_query_groups": 8,
- "group_query_attention": True}
-sampling_config = {
- "num_completions": 1, # 每个输入提示生成的独立完成项数量
- "logprobs": 1, # 返回的 top token 的对数概率数量
- "max_tokens": 128, # 生成输出的最大 token 数量
- "best_of": 2, # 内部生成候选完成项的数量,从中选择最佳的一个
- "top_p": 1.0, # 核采样的累积概率阈值
- "top_k": 50, # 采样时考虑的最高概率 token 的数量
- "min_p": 0.0, # token 选择的最小概率阈值
- "temperature": 0.2, # 控制预测随机性的温度参数
- "detokenize": False # 是否将生成的 token 转换回可读字符串
-}
-
-
-def main():
- logger = Loggers(
- name="test_vllm_engine_multistep_decode",
- )
- logger.info("start test_vllm_engine_multistep_decode")
-
- conversation = [
- {
- "role": "system",
- "content": "You are a helpful assistant"
- },
- {
- "role": "user",
- "content": "Hello"
- },
- {
- "role": "assistant",
- "content": "Hello! How can I assist you today?"
- },
- {
- "role": "user",
- "content": "Write an essay about the importance of higher education.",
- },
- ]
-
- logger.info("load megatron weight")
- megatron_st = torch.load(weights_path)
- actor_weights = megatron_st['model']
-
- # 配置初始化所需的参数
-
- train_tensor_parallel_size = 1
- train_pipeline_parallel_size = 1
- infer_tensor_parallel_size = 1
- infer_pipeline_parallel_size = 1
- train_expert_parallel_size = 1
- infer_expert_parallel_size = 1
- max_num_seqs = 256
- trust_remote_code = True
-
- logger.info("enter vllmInferEngine")
-
- megatron_config = MegatronConfig(megatron_dict, {})
- megatron_config.num_attention_heads = 32
- megatron_config.tensor_model_parallel_size = 1
- megatron_config.num_query_groups = 8
- megatron_config.num_key_value_heads = 8
- megatron_config.group_query_attention = True
- # 初始化 VLLMInferEngine 实例
- inference_engine = VLLMInferEngine(
- megatron_config=megatron_config,
- sampling_config=sampling_config,
- train_expert_parallel_size=train_expert_parallel_size,
- infer_expert_parallel_size=infer_expert_parallel_size,
- tokenizer_name_or_path=tokenizer_name_or_path,
- train_tensor_parallel_size=train_tensor_parallel_size,
- train_pipeline_parallel_size=train_pipeline_parallel_size,
- infer_tensor_parallel_size=infer_tensor_parallel_size,
- infer_pipeline_parallel_size=infer_pipeline_parallel_size,
- max_num_seqs=max_num_seqs,
- trust_remote_code=trust_remote_code,
- num_scheduler_steps=8, # 8 decode steps
- )
-
- logger.info("model inited")
- inference_engine.free_cache_engine()
- torch.cuda.empty_cache()
- logger.info("free_cache")
-
- inference_engine.offload_model_weights()
- logger.info("offload_model")
- torch.cuda.empty_cache()
- logger.info("empty_cache")
-
- logger.info("enter sync_model_weights")
- inference_engine.sync_model_weights(actor_weights)
-
- logger.info("enter init_cache_engine")
- inference_engine.init_cache_engine()
-
- logger.info("=" * 80)
- logger.info("start chat")
- outputs = inference_engine.chat(conversation)
- logger.info("chat result is ", outputs)
-
- idx_list = []
- idx_list_per_step = []
- for i in range(2):
- for j in range(4):
- tokens = torch.randint(100, (10,))
- idx_list_per_step.append(tokens.view(-1).cpu().numpy().tolist())
- idx_list.extend(idx_list_per_step)
- idx_list_per_step = []
- logger.info(len(idx_list), [len(i) for i in idx_list])
-
- logger.info("start test generate_sequences ")
- outputs = inference_engine.generate_sequences(
- idx_list=idx_list,
- )
- logger.info("generate_sequences output is:")
- logger.info(outputs[0])
-
- logger.info("input")
- logger.info(idx_list[0])
-
-
-if __name__ == "__main__":
- main()
diff --git a/tests/st/resharding/test_module_entry_resharding.sh b/tests/st/resharding/test_module_entry_resharding.sh
index dfe90827e0160142eff31d8f4b54adea485c8c70..100e60627f61a1b5abf0dea44924fa2ba15276a9 100644
--- a/tests/st/resharding/test_module_entry_resharding.sh
+++ b/tests/st/resharding/test_module_entry_resharding.sh
@@ -1,45 +1,54 @@
-#!/bin/bash
+# #!/bin/bash
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)
-export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH
-GPUS_PER_NODE=8
-MASTER_ADDR=localhost
-MASTER_PORT=6555
-NNODES=1
-NODE_RANK=0
-WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
+# export CUDA_DEVICE_MAX_CONNECTIONS=1
+# SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)
+# export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH
+# export VLLM_DP_SIZE=1
+# export HCCL_BUFFSIZE=256
+# export VLLM_USE_V1=1
+# export VLLM_VERSION=0.9.0
+# export VLLM_ENABLE_GRAPH_MODE=0
+# export VLLM_ENABLE_MC2=0
+# export HCCL_OP_EXPANSION_MODE="AIV"
+# export VLLM_ENABLE_TOPK_OPTIMZE=1
-DISTRIBUTED_ARGS="
- --nproc_per_node $GPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-PYTHON_ARGS="
- --model-path "/data/for_dt/weights/Qwen2.5-7B-mg" \
- --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \
- --train-tp 4 \
- --train-pp 2 \
- --train-ep 1 \
- --infer-tp 2 \
- --infer-pp 1 \
- --infer-ep 1
-"
-PYTHON_ARGS_new="
- --model-path "/data/for_dt/weights/Qwen2.5-7B-tp2pp2" \
- --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \
- --train-tp 2 \
- --train-pp 2 \
- --train-ep 1 \
- --infer-tp 4 \
- --infer-pp 1 \
- --infer-ep 1
-"
+# GPUS_PER_NODE=8
+# MASTER_ADDR=localhost
+# MASTER_PORT=6555
+# NNODES=1
+# NODE_RANK=0
+# WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
-echo "start test_resharding st"
+# DISTRIBUTED_ARGS="
+# --nproc_per_node $GPUS_PER_NODE \
+# --nnodes $NNODES \
+# --node_rank $NODE_RANK \
+# --master_addr $MASTER_ADDR \
+# --master_port $MASTER_PORT
+# "
+# PYTHON_ARGS="
+# --model-path "/data/for_dt/weights/Qwen2.5-7B-mg" \
+# --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \
+# --train-tp 4 \
+# --train-pp 2 \
+# --train-ep 1 \
+# --infer-tp 2 \
+# --infer-pp 1 \
+# --infer-ep 1
+# "
+# PYTHON_ARGS_new="
+# --model-path "/data/for_dt/weights/Qwen2.5-7B-tp2pp2" \
+# --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \
+# --train-tp 2 \
+# --train-pp 2 \
+# --train-ep 1 \
+# --infer-tp 4 \
+# --infer-pp 1 \
+# --infer-ep 1
+# "
-torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS
+# echo "start test_resharding st"
-torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS_new
\ No newline at end of file
+# torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS
+
+# torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS_new
\ No newline at end of file
diff --git a/tests/st/resharding/test_resharding.py b/tests/st/resharding/test_resharding.py
index 676d4131efd3ce102773542d99c08b027be5a09f..42134b7d8f95af3e9614ad2549d4b766d65d505d 100644
--- a/tests/st/resharding/test_resharding.py
+++ b/tests/st/resharding/test_resharding.py
@@ -306,6 +306,7 @@ class TestActor():
train_tensor_parallel_size=args.train_tp,
train_pipeline_parallel_size=args.train_pp,
train_expert_parallel_size=args.train_ep,
+ train_context_parallel_size=args.train_cp,
infer_tensor_parallel_size=args.infer_tp,
infer_pipeline_parallel_size=args.infer_pp,
infer_expert_parallel_size=args.infer_ep,
@@ -315,6 +316,7 @@ class TestActor():
dtype="bfloat16",
gpu_memory_utilization=0.6,
trust_remote_code=True,
+ enforce_eager=True,
megatron_config=megatron_config
)
self.megatron_offloader = MegatronOffLoader(self.model, self.optimizer)
@@ -364,6 +366,7 @@ def parse_args():
parser.add_argument("--train-tp", type=int, default=2)
parser.add_argument("--train-pp", type=int, default=2)
parser.add_argument("--train-ep", type=int, default=1)
+ parser.add_argument("--train_cp", type=int, default=1)
parser.add_argument("--infer-tp", type=int, default=4)
parser.add_argument("--infer-pp", type=int, default=1)
parser.add_argument("--infer-ep", type=int, default=1)