diff --git a/.gitignore b/.gitignore index f5a996cd55bb06a0a484fea883f37515b0a38767..8aa198fc7c211f14c83ead26b577016dc6c91d77 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,7 @@ cython_debug/ /kernel*/ /logs/ /model_from_hf/ -/model_weights/ \ No newline at end of file +/model_weights/ +/outputs/ +/vllm/ +/vllm_ascend/ \ No newline at end of file diff --git a/README.md b/README.md index f3a5d3ed3aa1c190b2fb1335c9ca895ac093f6c5..81fe4c908f535cdf78135f4842c5583c3cfc0d28 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,11 @@ MindSpeed RL是基于昇腾生态的强化学习加速框架,旨在为华为 [ vllm - 0.7.3 + main + + + vllm-ascend + main diff --git a/cli/infer_vllm.py b/cli/infer_vllm.py deleted file mode 100644 index a0af648b7e9118fa03434404f5230f95cd6cf3a9..0000000000000000000000000000000000000000 --- a/cli/infer_vllm.py +++ /dev/null @@ -1,244 +0,0 @@ -import os -import argparse -import logging - -import tensordict -import torch -import torch_npu -from torch_npu.contrib import transfer_to_npu -import vllm.distributed.parallel_state as ps - -from mindspeed_rl.models.rollout.vllm_engine import VLLMInferEngine -from mindspeed_rl.utils.loggers import Loggers - -logger = Loggers( - name="vllm_engine_inference", -) - - -def get_args(): - parser = argparse.ArgumentParser() - group = parser.add_argument_group(title='inference args') - group.add_argument('--tokenizer-name-or-path', type=str, - help="Huggingface config path.") - group.add_argument('--load-format', type=str, - choices=["auto", "megatron"], default="auto", - help="Vllm weight load format, support auto from huggingface and from megatron format.") - group.add_argument('--load', type=str, - default=None, - help="Vllm weight path for megatron load format.") - group.add_argument('--tensor-parallel-size', type=int, - default=1, - help="infer tensor parallel size") - group.add_argument('--query', type=str, default="Write an essay about the importance of higher education.", - help='Input query.') - group.add_argument('--task', type=str, - choices=["generation", "chat"], default="chat", - help='Inference task, generation or chat.') - group.add_argument('--gpu-memory-utilization', type=float, default=0.9, - help='Device memory ratio allocated for vllm.') - - group = parser.add_argument_group(title='distributed') - group.add_argument('--distributed-backend', default='nccl', - choices=['nccl', 'gloo'], - help='Which backend to use for distributed training.') - group.add_argument('--local-rank', type=int, default=int(os.getenv('LOCAL_RANK', '0')), - help='Local rank passed from distributed launcher.') - group.add_argument('--prompt-type', type=str, default=None, - choices=['default', 'empty', 'trl', 'qwen', 'qwen_r1', "qwen_math_r1", 'llama3', 'mistral', 'mixtral', 'gemma', 'llama2', - 'alpaca', 'deepseek2', 'deepseek2-lite', 'minicpm3', 'cpm', 'baichuan2', 'deepseek3'], - help='Which template to use for constructing prompts in training/inference.' 'e.g., "qwen"') - group.add_argument('--prompt-type-path', type=str, default=None, - help='Path to the json file of templates.') - group = parser.add_argument_group(title='sampling params') - group.add_argument('--num-completions', type=int, default=1, - help='Number of output sequences to return for the given prompt.') - group.add_argument('--logprobs', type=int, default=1, - help='Number of log probabilities to return per output token.') - group.add_argument('--max-tokens', type=int, default=128, - help='Maximum number of tokens to generate per output sequence.') - group.add_argument('--top-p', type=float, default=1.0, - help='Float that controls the cumulative probability of the top tokens to consider.') - group.add_argument('--top-k', type=int, default=-1, - help='Integer that controls the number of top tokens to consider. Set to -1 to consider all tokens.') - group.add_argument('--temperature', type=float, default=1.0, - help='Float that controls the randomness of the sampling.') - return parser.parse_args() - - -def process_outputs(outputs): - res = "" - for output in outputs: - prompt = output.prompt - generated_text = output.outputs[0].text - res = res + f"Prompt: {prompt!r}\nGenerated Text: {generated_text!r}\n" - res = res + "-" * 80 - return res - - -def main(): - logger.info("start vllm_engine inference") - args = get_args() - - sampling_config = { - "num_completions": args.num_completions, # 每个输入提示生成的独立完成项数量 - "logprobs": args.logprobs, # 返回的 top token 的对数概率数量 - "max_tokens": args.max_tokens, # 生成输出的最大 token 数量 - "top_p": args.top_p, # 核采样的累积概率阈值 - "top_k": args.top_k, # 采样时考虑的最高概率 token 的数量 - "temperature": args.temperature, # 控制预测随机性的温度参数 - "detokenize": True # 是否将生成的 token 转换回可读字符串 - } - - inference_engine = VLLMInferEngine( - megatron_config=None, - sampling_config=sampling_config, - train_expert_parallel_size=1, - infer_expert_parallel_size=1, - tokenizer_name_or_path=args.tokenizer_name_or_path, - prompt_type=args.prompt_type, - prompt_type_path=args.prompt_type_path, - train_tensor_parallel_size=args.tensor_parallel_size, - train_pipeline_parallel_size=1, - infer_tensor_parallel_size=args.tensor_parallel_size, - infer_pipeline_parallel_size=1, - max_num_seqs=1, - gpu_memory_utilization=args.gpu_memory_utilization, - trust_remote_code=True, - load_format=args.load_format - ) - - if args.load_format == "megatron": - tp_rank = ps._TP.rank_in_group - weights_path = os.path.join(args.load, f"iter_0000001/mp_rank_{tp_rank:02}/model_optim_rng.pt") - - actor_weights = torch.load(weights_path)['model'] - actor_weights = replace_state_dict_name( - actor_weights, - vllm_dict=inference_engine.model.state_dict(), - arch=inference_engine.model.__class__.__name__) - logger.info("sync_model_weights") - inference_engine.sync_model_weights(actor_weights) - - logger.info("init_cache_engine") - inference_engine.init_cache_engine() - - if args.task == "chat": - chat_task(inference_engine, args.query) - elif args.task == "generation": - generate_task(inference_engine, args.query) - - -def chat_task(inference_engine, query): - conversation = [ - { - "role": "user", - "content": query, - }, - ] - outputs = inference_engine.chat(conversation) - res = process_outputs(outputs) - logger.info('Query: {}'.format(query)) - logger.info('Responses:\n{}'.format(res)) - - -def generate_task(inference_engine, query): - outputs = inference_engine.llm.generate( - prompts=[query], - sampling_params=inference_engine.sampling_params, - ) - res = process_outputs(outputs) - logger.info('Query: {}'.format(query)) - logger.info('Responses:\n{}'.format(res)) - - -def replace_state_dict_name(state_dict, vllm_dict, arch=None): - params_mapping = [ - # (megatron core gpt model name, vllm model name) - ("embedding.word_embeddings", "model.embed_tokens"), - ("self_attention.linear_qkv", "self_attn.qkv_proj"), - ("self_attention.linear_proj", "self_attn.o_proj"), - ("input_layernorm", "input_layernorm"), - ("pre_mlp_layernorm", "post_attention_layernorm"), - ("mlp.linear_fc1", "mlp.gate_up_proj"), - ("mlp.linear_fc2", "mlp.down_proj"), - ("decoder.final_layernorm", "model.norm"), - ("output_layer", "lm_head"), - # Deepseek add - ("self_attention.linear_qb", "self_attn.q_b_proj"), - ("self_attention.linear_kvb", "self_attn.kv_b_proj"), - ("mlp.router.weight", "mlp.gate.weight"), - ("mlp.router.expert_bias", "mlp.gate.e_score_correction_bias"), - ("mlp.shared_experts.linear_fc1", "mlp.shared_experts.gate_up_proj"), - ("mlp.shared_experts.linear_fc2", "mlp.shared_experts.down_proj"), - ("mlp.experts.weight1", "mlp.experts.w13_weight"), - ("mlp.experts.weight2", "mlp.experts.w2_weight"), - ("self_attention.q_layernorm", "self_attn.q_a_layernorm"), - ("self_attention.k_layernorm", "self_attn.kv_a_layernorm"), - ] - - - new_state_dict = {} - for name, loaded_weight in state_dict.items(): - if "_extra_state" in name: - continue - if "Deepseek" in arch: - name = _replace_name_m2v_deepseek(name, params_mapping) - else: - name = _replace_name_m2v(name, params_mapping) - - # the router bias in raw weight in fp32 - if "e_score_correction_bias" in name: - loaded_weight = loaded_weight.to(vllm_dict[name].dtype) - - # to adapter 'copy_' in megatron weight loader to save memory - if "mlp.experts" in name: - loaded_weight = loaded_weight.view(vllm_dict[name].shape) - - new_state_dict[name] = loaded_weight - return new_state_dict - - -def _replace_name_m2v(name, name_mapping): - """ - Transfer state dict names from megatron to vllm. - """ - for m_name, v_name in name_mapping: - if m_name not in name: - continue - if "layers" in name: # deal with decoder layers - name = name.replace("decoder", "model") - name_list = name.split(".") - if "layer_norm_weight" in name_list or "layer_norm_bias" in name_list: - param_name_list = name_list[:3] - param_name_list.append(v_name) - param_name = ".".join(param_name_list) - else: - param_name_list = name_list[:3] - weight_or_bias = name_list[-1] - param_name_list.append(v_name) - param_name_list.append(weight_or_bias) - param_name = ".".join(param_name_list) - return param_name - else: - param_name = name.replace(m_name, v_name) - return param_name - return name - - -def _replace_name_m2v_deepseek(name, name_mapping): - """ - Transfer state dict names from megatron to vllm. - """ - for m_name, v_name in name_mapping: - if m_name not in name: - continue - if "layers" in name: # deal with decoder layers - name = name.replace("decoder", "model") - param_name = name.replace(m_name, v_name) - return param_name - return name - - -if __name__ == "__main__": - main() diff --git a/configs/envs/runtime_env.yaml b/configs/envs/runtime_env.yaml index 1525a2f02aa7778be9fbbc37baca1e6468c98cba..7ac3d617a32ee76ae3008875a1d8253408323c66 100644 --- a/configs/envs/runtime_env.yaml +++ b/configs/envs/runtime_env.yaml @@ -8,6 +8,15 @@ env_vars: HCCL_IF_BASE_PORT: '48000' CUDA_DEVICE_MAX_CONNECTIONS: '1' HYDRA_FULL_ERROR: '1' + VLLM_DP_SIZE: '1' + HCCL_BUFFSIZE: '256' + VLLM_USE_V1: '1' + VLLM_VERSION: '0.9.0' + VLLM_ENABLE_GRAPH_MODE: '0' + VLLM_ENABLE_MC2: '0' + HCCL_OP_EXPANSION_MODE: "AIV" + VLLM_ENABLE_TOPK_OPTIMZE: "1" + # GLOO_SOCKET_IFNAME: "Your SOCKET IFNAME" # TP_SOCKET_IFNAME: "Your SOCKET IFNAME" # HCCL_SOCKET_IFNAME: "Your SOCKET IFNAME" \ No newline at end of file diff --git a/configs/grpo_qwen25_32b_A3.yaml b/configs/grpo_qwen25_32b_A3.yaml index 69da5f52e943a4dca21adfe16524ef1dcf6a16d7..c48e1a930d498dddf14b10253523f1c321c95df7 100644 --- a/configs/grpo_qwen25_32b_A3.yaml +++ b/configs/grpo_qwen25_32b_A3.yaml @@ -79,6 +79,7 @@ rl_config: guarantee_order: true generate_config: + enforce_eager: True trust_remote_code: true offload_train_optimizer: true offload_train_grad: true diff --git a/configs/grpo_qwen25_7b_A3.yaml b/configs/grpo_qwen25_7b_A3.yaml index 1bcab0b53d53f64401dbde0d9d4b198255e348ae..f452fc342b47fc16a3a04fa201c14cf68d28d824 100644 --- a/configs/grpo_qwen25_7b_A3.yaml +++ b/configs/grpo_qwen25_7b_A3.yaml @@ -81,6 +81,7 @@ rl_config: num_npus: 16 generate_config: + enforce_eager: True trust_remote_code: true offload_train_optimizer: true offload_train_grad: true diff --git a/docs/install_guide.md b/docs/install_guide.md index 6c0071944f0e463ba0a41da11d1866abed764c2d..6272a7097530dafbc14c2172354ff0c7a9a1243f 100644 --- a/docs/install_guide.md +++ b/docs/install_guide.md @@ -37,16 +37,16 @@ pip install apex-0.1.dev*.whl ```shell git clone https://github.com/vllm-project/vllm.git cd vllm -git checkout v0.7.3 +git checkout 5bc1ad6cee754405464a9957e86cf3a9302e4986 pip install -r requirements-build.txt VLLM_TARGET_DEVICE=empty pip install . ``` ### vllm_ascend安装 ```shell -git clone -b v0.7.3-dev https://github.com/vllm-project/vllm-ascend.git +git clone -b main https://github.com/vllm-project/vllm-ascend.git cd vllm-ascend -git checkout 0713836e95fe993feefe334945b5b273e4add1f1 +git checkout 908a851 pip install -e . ``` diff --git a/examples/infer/infer_vllm_deepseekv3_671b.sh b/examples/infer/infer_vllm_deepseekv3_671b.sh deleted file mode 100644 index 4bbbbca299efe9fe7e2e1ce5f4d3d5654a92294b..0000000000000000000000000000000000000000 --- a/examples/infer/infer_vllm_deepseekv3_671b.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash - -export GLOO_SOCKET_IFNAME="Your SOCKET IFNAME" -export TP_SOCKET_IFNAME="Your SOCKET IFNAME" -export CUDA_DEVICE_MAX_CONNECTIONS=1 - -GPUS_PER_NODE=8 -MASTER_ADDR="host ip" -MASTER_PORT=6001 -NNODES=4 -NODE_RANK="node rank" - -DISTRIBUTED_ARGS=" - --nproc_per_node $GPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" - -INFER_ARGS=" - --tokenizer-name-or-path 'your huggingface config path' \ - --load-format megatron \ - --load 'megatron weight path' \ - --tensor-parallel-size 32 \ - --task chat \ -" - -torchrun $DISTRIBUTED_ARGS cli/infer_vllm.py \ - $INFER_ARGS \ - --query "Write an essay about the importance of higher education." \ - --distributed-backend nccl diff --git a/mindspeed_rl/config_cls/generate_config.py b/mindspeed_rl/config_cls/generate_config.py index 15df52ea631dcb5e898a8266545f22115c11ae42..3c13f23d8207e982514495832cb72b68a3505443 100644 --- a/mindspeed_rl/config_cls/generate_config.py +++ b/mindspeed_rl/config_cls/generate_config.py @@ -25,6 +25,9 @@ class GenerateConfig(BaseConfig): dtype: Data type for model weights. Default is "bfloat16". gpu_memory_utilization: GPU memory utilization factor. Default is 0.5. + enforce_eager: Whether to always use eager-mode PyTorch. If True, we will disable ACL graph and always execute the model in eager mode. + If False, we will use ACL graph and eager execution in hybrid for maximal performance and flexibility. + sampling_config: Configuration for text generation sampling. Default values are set for various sampling parameters. - num_completions: The number of independent completions to generate for each input prompt. Default is 1. - logprobs: The number of top tokens to return log probabilities for. Default is 1. @@ -72,6 +75,7 @@ class GenerateConfig(BaseConfig): self.enable_prefix_caching = False self.num_scheduler_steps = 1 + self.enforce_eager = False # 采样配置的默认值,用于生成文本时的采样策略设置 self.sampling_config = { diff --git a/mindspeed_rl/config_cls/megatron_config.py b/mindspeed_rl/config_cls/megatron_config.py index fe15a82241fb734675f5497b2e48a870bc6557a2..44d7ceadd586affcbd7598da2ab25c73e17f5ce4 100644 --- a/mindspeed_rl/config_cls/megatron_config.py +++ b/mindspeed_rl/config_cls/megatron_config.py @@ -119,6 +119,7 @@ class MegatronConfig(BaseConfig): tensor_model_parallel_size: Size of tensor model parallelism (default: 1) pipeline_model_parallel_size: Size of pipeline model parallelism (default: 1) expert_model_parallel_size: Degree of expert model parallelism (default: 1) + num_layers_per_virtual_pipeline_stage: Degree of vpp (default: None) lr: Learning rate (default: None) lr_decay_style: Learning rate decay style (default: 'linear') min_lr: Minimum learning rate (default: 0.0) @@ -198,6 +199,7 @@ class MegatronConfig(BaseConfig): reuse_fp32_param: The distributed training optimizer frees up 'param copies of FP32 to save memory. (default: False) moe_tp_extend_ep: use tp group to extend experts parallelism instead of sharding weight tensor of experts in tp group moe_alltoall_overlap_comm: moe_alltoall_overlap_comm + noop_layers: noop layers string ''' def __init__(self, training_config: Dict, model_config: Dict): @@ -305,6 +307,7 @@ class MegatronConfig(BaseConfig): self.tensor_model_parallel_size = 1 self.pipeline_model_parallel_size = 1 self.expert_model_parallel_size = 1 + self.num_layers_per_virtual_pipeline_stage = None self.lr = None self.lr_decay_style = 'linear' self.min_lr = 0.0 @@ -358,5 +361,6 @@ class MegatronConfig(BaseConfig): self.overlap_param_gather = False self.recompute_activation_function = False self.swap_attention = False + self.noop_layers = None self.update(training_config, model_config) diff --git a/mindspeed_rl/config_cls/validate_config.py b/mindspeed_rl/config_cls/validate_config.py index e037bcd842ea71f8c4f005154eab0785fd9cdd9e..dbeba666c060e0c3b8c26917f6d879f4e3dd1a7a 100644 --- a/mindspeed_rl/config_cls/validate_config.py +++ b/mindspeed_rl/config_cls/validate_config.py @@ -29,6 +29,7 @@ def validate_rl_args( raise ValueError( f"integrated_mode_config should not be set when use_integrated_worker mode is off.") + # 校验序列长度与模型最大长度 if generate_config.max_model_len < actor_config.seq_length: raise ValueError( diff --git a/mindspeed_rl/models/base/base_inference_engine.py b/mindspeed_rl/models/base/base_inference_engine.py index 5719cd8099588cd791a381ada13be941bc2ffe2a..81a1971628a67d8e9717669a3c48c5005ad1e4ff 100644 --- a/mindspeed_rl/models/base/base_inference_engine.py +++ b/mindspeed_rl/models/base/base_inference_engine.py @@ -17,6 +17,7 @@ class BaseInferEngine(ABC): prompt_type: str = None, prompt_type_path: str = None, train_expert_parallel_size: int = 1, + train_context_parallel_size: int = 1, infer_tensor_parallel_size: int = 8, infer_pipeline_parallel_size: int = 1, infer_expert_parallel_size: int = 1, @@ -34,6 +35,7 @@ class BaseInferEngine(ABC): train_tensor_parallel_size (int): Tensor parallel size during training. train_pipeline_parallel_size (int): Pipeline parallel size during training. train_expert_parallel_size (int): Expert parallel size during training. + train_context_parallel_size (int): Context parallel size during training. infer_tensor_parallel_size (int): Tensor parallel size during inference. infer_pipeline_parallel_size (int): Pipeline parallel size during inference. infer_expert_parallel_size (int): Expert parallel size during inference. @@ -49,6 +51,7 @@ class BaseInferEngine(ABC): self.train_tensor_parallel_size = train_tensor_parallel_size self.train_pipeline_parallel_size = train_pipeline_parallel_size self.train_expert_parallel_size = train_expert_parallel_size + self.train_context_parallel_size = train_context_parallel_size self.infer_tensor_parallel_size = infer_tensor_parallel_size self.infer_pipeline_parallel_size = infer_pipeline_parallel_size self.infer_expert_parallel_size = infer_expert_parallel_size diff --git a/mindspeed_rl/models/base/base_training_engine.py b/mindspeed_rl/models/base/base_training_engine.py index 55a4b6b46c57e924bd7ae912a5276277381b99cb..3c3d97e0735c888bc441dd4f3aef43baaf096126 100644 --- a/mindspeed_rl/models/base/base_training_engine.py +++ b/mindspeed_rl/models/base/base_training_engine.py @@ -92,6 +92,9 @@ class BaseTrainingEngine(ABC): shuffle_mini_batch=self.shuffle_mini_batch) n_micro_batch = len(batches) seq_len = batches[0]['input_ids'].shape[1] + data_iter = iter(batches) + if len(self.model) > 1: + data_iter = [iter(batches) for _ in self.model] self.loss_func.add_loss_meta_info(self.get_loss_meta_func()) @@ -104,7 +107,7 @@ class BaseTrainingEngine(ABC): # batch should be a list of batches inside micro-batches losses_reduced = self.forward_backward_func( forward_step_func=forward_step, - data_iterator=iter(batches), + data_iterator=data_iter, model=self.model, num_microbatches=n_micro_batch, seq_length=seq_len, diff --git a/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py b/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py index a6264f29f466b73a13a3d78801b282c67ed1556d..3323579eb3debd927b2cb7396fca012da586cc21 100644 --- a/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py +++ b/mindspeed_rl/models/rollout/vllm_adapter/megatron_weight_loaders.py @@ -7,13 +7,6 @@ import torch import torch.nn as nn from transformers.configuration_utils import PretrainedConfig -from vllm.model_executor.layers.linear import ( - ColumnParallelLinear, MergedColumnParallelLinear, QKVParallelLinear, - RowParallelLinear, ReplicatedLinear) -from vllm.model_executor.layers.fused_moe.layer import FusedMoE -from vllm.model_executor.layers.vocab_parallel_embedding import ParallelLMHead, VocabParallelEmbedding -from vllm.model_executor.models import ModelRegistry - class InferParallelConfig: def __init__(self, infer_tensor_parallel_size: int, infer_pipeline_parallel_size: int, infer_expert_parallel_size: int): @@ -88,14 +81,15 @@ def deepseek_megatron_weight_loader(actor_weights: Dict, vllm_model: nn.Module, if name not in params_dict.keys(): raise ValueError(f"unexpected key {name} in deepseek_megatron_weight_loader") if "mlp.experts.w13_weight" in name: - loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts, hf_config.hidden_size, -1).transpose(2, 1).contiguous()) + loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts // infer_paralle_config.infer_expert_parallel_size, hf_config.hidden_size, -1).transpose(2, 1).contiguous()) if "mlp.experts.w2_weight" in name: - loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts, -1, hf_config.hidden_size).transpose(2, 1).contiguous()) + loaded_weight.copy_(loaded_weight.view(hf_config.n_routed_experts // infer_paralle_config.infer_expert_parallel_size, -1, hf_config.hidden_size).transpose(2, 1).contiguous()) load_single_weight(params_dict, name, loaded_weight) return vllm_model def _get_model_weight_loader(arch: str): + from vllm.model_executor.models import ModelRegistry if arch in MODEL_MEGATRON_WEIGHT_LOADER_REGISTRY: return MODEL_MEGATRON_WEIGHT_LOADER_REGISTRY[arch] raise ValueError(f"Model architectures {arch} are not supported for now. " @@ -146,6 +140,23 @@ def load_single_weight(params_dict, name, loaded_weight): def update_megatron_weight_loader(): + from vllm.model_executor.layers.linear import ( + ColumnParallelLinear, MergedColumnParallelLinear, QKVParallelLinear, + RowParallelLinear, ReplicatedLinear) + from vllm.model_executor.layers.fused_moe.layer import FusedMoE + from vllm.model_executor.layers.vocab_parallel_embedding import ParallelLMHead, VocabParallelEmbedding + + LAYER_WEIGHT_MEGATRON_LOADER_REGISTRY = { + ColumnParallelLinear: parallel_weight_loader, + MergedColumnParallelLinear: parallel_weight_loader, + QKVParallelLinear: parallel_weight_loader, + RowParallelLinear: parallel_weight_loader, + VocabParallelEmbedding: parallel_weight_loader, + ParallelLMHead: parallel_weight_loader, + ReplicatedLinear: parallel_weight_loader, + FusedMoE: parallel_weight_loader + } + for layer_class, weight_loader in LAYER_WEIGHT_MEGATRON_LOADER_REGISTRY.items(): layer_class.weight_loader = weight_loader @@ -176,16 +187,6 @@ MODEL_MEGATRON_WEIGHT_LOADER_REGISTRY = { "LlamaForCausalLM": llama_megatron_core_weight_loader, "Qwen2ForCausalLM": qwen_megatron_weight_loader, "DeepseekV3ForCausalLM": deepseek_megatron_weight_loader, -} - - -LAYER_WEIGHT_MEGATRON_LOADER_REGISTRY = { - ColumnParallelLinear: parallel_weight_loader, - MergedColumnParallelLinear: parallel_weight_loader, - QKVParallelLinear: parallel_weight_loader, - RowParallelLinear: parallel_weight_loader, - VocabParallelEmbedding: parallel_weight_loader, - ParallelLMHead: parallel_weight_loader, - ReplicatedLinear: parallel_weight_loader, - FusedMoE: parallel_weight_loader + "DeepseekV2ForCausalLM": deepseek_megatron_weight_loader, + "CustomDeepseekV3ForCausalLM": deepseek_megatron_weight_loader, } diff --git a/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py b/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py index abcd38be61970ae4c03cd6cb2f74ec8bf029b09b..0de4687dbbcc6aa10df8574ad4d8a80c4a109dca 100644 --- a/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py +++ b/mindspeed_rl/models/rollout/vllm_adapter/vllm_parallel_state.py @@ -4,11 +4,18 @@ """Model and data parallel groups.""" import os +import re +import socket +import subprocess +from datetime import timedelta from typing import Optional import torch -import torch.distributed +import torch.distributed as dist import vllm.distributed.parallel_state as ps +import vllm_ascend.distributed.parallel_state as ascend_ps +import vllm.envs as envs +from vllm.config import get_current_vllm_config from vllm.distributed.parallel_state import ( get_pp_group, @@ -17,6 +24,10 @@ from vllm.distributed.parallel_state import ( init_model_parallel_group, ) +from mindspeed_rl.utils.loggers import Loggers + +logger = Loggers(__name__) + """ This version is strongly tied with Megatron to implement HybridEngine and weight sharing between vllm and Megatron. @@ -31,6 +42,12 @@ _DEVICE_MESH = None _TP = None # Pipeline model parallel group that the current rank belongs to. _PP = None +# Expert model parallel group that the current rank belongs to. +_EP = None +# Expert tensor model parallel group that the current rank belongs to. +_ETP = None +# Data model parallel group that the current rank belongs to. +_DP = None # Tensor model parallel group _TP_GROUP_RANKS = None @@ -42,16 +59,20 @@ def get_vllm_tp_group_ranks(): # This method is for initializing the ParallelGroup when using HybridEngine def initialize_parallel_state( - distributed_init_method: str = "env://", - backend: str = "hccl", - infer_tensor_model_parallel_size: int = 1, - train_tensor_model_parallel_size: int = 1, - infer_pipeline_model_parallel_size: int = 1, - train_pipeline_model_parallel_size: int = 1 + distributed_init_method: str = "env://", + backend: str = "hccl", + infer_tensor_model_parallel_size: int = 1, + train_tensor_model_parallel_size: int = 1, + infer_pipeline_model_parallel_size: int = 1, + train_pipeline_model_parallel_size: int = 1, + infer_expert_tensor_parallel_size: int = 1, + train_expert_tensor_parallel_size: int = 1, + train_expert_model_parallel_size: int = 1, + infer_expert_model_parallel_size: int = 1, + train_context_model_parallel_size: int = 1, ): os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1" - # NOTE(sgm): Modify for verl, Env vars will be set by TORCHRUN. rank = int(os.getenv("RANK", "-1")) local_rank = int(os.getenv("LOCAL_RANK", "0")) @@ -60,6 +81,8 @@ def initialize_parallel_state( world_size = int(os.getenv("WORLD_SIZE", "-1")) if world_size == -1: raise ValueError("The world_size is set to -1, not initialized by TORCHRUN") + config = get_current_vllm_config() + config.parallel_config.tensor_parallel_size = infer_tensor_model_parallel_size init_distributed_environment(world_size, rank, distributed_init_method, local_rank, backend) if torch.distributed.get_world_size() > 1: # NOTE: build a sepearate inference group with infer tp & micro dp @@ -67,54 +90,29 @@ def initialize_parallel_state( infer_tensor_model_parallel_size=infer_tensor_model_parallel_size, train_tensor_model_parallel_size=train_tensor_model_parallel_size, infer_pipeline_model_parallel_size=infer_pipeline_model_parallel_size, - train_pipeline_model_parallel_size=train_pipeline_model_parallel_size + train_pipeline_model_parallel_size=train_pipeline_model_parallel_size, + infer_expert_tensor_parallel_size=infer_expert_tensor_parallel_size, + train_expert_tensor_parallel_size=train_expert_tensor_parallel_size, + train_expert_model_parallel_size=train_expert_model_parallel_size, + infer_expert_model_parallel_size=infer_expert_model_parallel_size, + train_context_model_parallel_size=train_context_model_parallel_size ) else: initialize_model_parallel(infer_tensor_model_parallel_size, infer_pipeline_model_parallel_size, backend) -def ensure_model_parallel_initialized( - tensor_model_parallel_size: int, - pipeline_model_parallel_size: int = 1, - backend: Optional[str] = None, -) -> None: - """Helper to initialize model parallel groups if they are not initialized, - or ensure tensor-parallel and pipeline-parallel sizes are equal to expected - values if the model parallel groups are initialized. - """ - # get the backend of _DEVICE_WORLD_GROUP - backend = backend or torch.distributed.get_backend(get_world_group().device_group) - if not model_parallel_is_initialized(): - initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size, backend) - return - - current_tp_size = get_tensor_model_parallel_world_size() - if current_tp_size != tensor_model_parallel_size: - raise ValueError( - "tensor parallel group already initialized, but of unexpected size: " - f"{current_tp_size=} vs. " - f"{tensor_model_parallel_size=}" - ) - pp_world_size = get_pp_group().world_size - if pp_world_size != pipeline_model_parallel_size: - raise ValueError( - "pipeline parallel group already initialized, but of unexpected size: " - f"{pp_world_size=} vs. " - f"{pipeline_model_parallel_size=}" - ) - - -def model_parallel_is_initialized(): - """Check if tensor and pipeline parallel groups are initialized.""" - return ps._TP is not None - # and _PIPELINE_MODEL_PARALLEL_GROUP is not None) - - def initialize_model_parallel_for_vllm( - infer_tensor_model_parallel_size: int, - train_tensor_model_parallel_size: int = 1, - infer_pipeline_model_parallel_size: int = 1, - train_pipeline_model_parallel_size: int = 1 + infer_tensor_model_parallel_size: int, + train_tensor_model_parallel_size: int = 1, + infer_pipeline_model_parallel_size: int = 1, + train_pipeline_model_parallel_size: int = 1, + infer_expert_tensor_parallel_size: int = 1, + train_expert_tensor_parallel_size: int = 1, + train_expert_model_parallel_size: int = 1, + infer_expert_model_parallel_size: int = 1, + train_context_model_parallel_size: int = 1, + num_process: int = 1, + rebulid_EP_group: bool = False ) -> None: # Get world size and rank. Ensure some consistencies. @@ -149,8 +147,10 @@ def initialize_model_parallel_for_vllm( Returns: list of group_lists [[g0, g1], [g2, g3], [g4, g5], [g6, g7]] ''' - if ((world_size // (train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size < infer_tensor_model_parallel_size or - ((world_size // (train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size) % infer_tensor_model_parallel_size != 0): + if ((world_size // ( + train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size < infer_tensor_model_parallel_size or + ((world_size // ( + train_tensor_model_parallel_size * train_pipeline_model_parallel_size)) * train_tensor_model_parallel_size) % infer_tensor_model_parallel_size != 0): raise ValueError( f"Can't split train tp size {train_tensor_model_parallel_size} to infer tp size {infer_tensor_model_parallel_size} " f"with train dp size {(world_size // (train_tensor_model_parallel_size * train_pipeline_model_parallel_size))}.") @@ -179,16 +179,13 @@ def initialize_model_parallel_for_vllm( [[g0, g2], [g1, g3], [g4, g6], [g5, g7]] ''' if train_tensor_model_parallel_size < infer_tensor_model_parallel_size or train_tensor_model_parallel_size % infer_tensor_model_parallel_size != 0: - raise ValueError(f"Can't gather train tp size {train_tensor_model_parallel_size} to infer tp size {infer_tensor_model_parallel_size}") + raise ValueError( + f"Can't gather train tp size {train_tensor_model_parallel_size} to infer tp size {infer_tensor_model_parallel_size}") num_tensor_model_parallel_groups = world_size // infer_tensor_model_parallel_size - num_tensor_model_parallel_groups_per_train_tp = train_tensor_model_parallel_size // infer_tensor_model_parallel_size group_ranks = [] - for i in range(num_tensor_model_parallel_groups // num_tensor_model_parallel_groups_per_train_tp): - start = train_tensor_model_parallel_size * i - end = train_tensor_model_parallel_size * (i + 1) - for j in range(num_tensor_model_parallel_groups_per_train_tp): - ranks = list(range(start + j, end, num_tensor_model_parallel_groups_per_train_tp)) - group_ranks.append(ranks) + for i in range(num_tensor_model_parallel_groups): + ranks = list(range(i * infer_tensor_model_parallel_size, (i + 1) * infer_tensor_model_parallel_size)) + group_ranks.append(ranks) return group_ranks @@ -201,9 +198,11 @@ def initialize_model_parallel_for_vllm( _TP_GROUP_RANKS = tp_group_ranks return tp_group_ranks + tp_group_ranks = get_tp_group_ranks() + logger.info(f"TP rank: {tp_group_ranks}") _TP = init_model_parallel_group( - group_ranks=get_tp_group_ranks(), + group_ranks=tp_group_ranks, local_rank=get_world_group().local_rank, backend=backend, use_message_queue_broadcaster=True, @@ -218,16 +217,134 @@ def initialize_model_parallel_for_vllm( ranks = list(range(i, world_size, num_pipeline_model_parallel_groups)) group_ranks.append(ranks) # pipeline parallel does not need custom allreduce + logger.info(f"PP rank: {group_ranks}") _PP = init_model_parallel_group( group_ranks, get_world_group().local_rank, backend, ) ps._PP = _PP # for verl + data_parallel_size = 1 + from vllm.config import get_current_vllm_config + config = get_current_vllm_config() + + if config is not None: + data_parallel_size = config.parallel_config.data_parallel_size + + num_expert_parallel_groups: int = infer_expert_tensor_parallel_size + num_expert_tensor_parallel_groups: int = world_size // infer_expert_tensor_parallel_size + + num_rank_per_process = world_size // num_process + all_ranks = list(range(world_size)) + + global _EP + assert _EP is None, ("expert parallel group is already initialized") + group_ranks = [] + + if rebulid_EP_group: + # 重新建组 + tensor_model_parallel_size = train_tensor_model_parallel_size + context_parallel_size = train_context_model_parallel_size + expert_model_parallel_size = train_expert_model_parallel_size + train_data_parallel_size = world_size // tensor_model_parallel_size // train_pipeline_model_parallel_size + tensor_and_data_group_size_with_cp: int = tensor_model_parallel_size * train_data_parallel_size * context_parallel_size + num_tensor_and_data_groups_with_cp: int = world_size // tensor_and_data_group_size_with_cp + num_expert_groups: int = train_data_parallel_size * context_parallel_size // expert_model_parallel_size + tensor_and_expert_group_size = tensor_model_parallel_size * expert_model_parallel_size + all_tensor_and_expert_group_ranks = [] + for i in range(num_tensor_and_data_groups_with_cp): + for j in range(num_expert_groups): + start_rank = i * tensor_and_data_group_size_with_cp + j * tensor_and_expert_group_size + end_rank = i * tensor_and_data_group_size_with_cp + (j + 1) * tensor_and_expert_group_size + ranks = range(start_rank, end_rank) + all_tensor_and_expert_group_ranks.append(list(ranks)) + train_all_tensor_and_expert_group_ranks_tensor = torch.tensor(all_tensor_and_expert_group_ranks) + # 将训练态的EPG按照推理EP进行转置 + infer_actual_expert_model_parallel_size = infer_tensor_model_parallel_size * infer_expert_model_parallel_size + experts_memory_expend_N = infer_actual_expert_model_parallel_size // tensor_and_expert_group_size + ep_group_num = world_size // tensor_and_expert_group_size + group_ranks = [] + for i in range(0, ep_group_num, experts_memory_expend_N): + per_ep_group = train_all_tensor_and_expert_group_ranks_tensor[i:i + experts_memory_expend_N] + per_ep_group_T = per_ep_group.T + ranks = per_ep_group_T.reshape(-1).tolist() + group_ranks.append(ranks) + logger.info(f"EP rank: {group_ranks}") + + else: + # 保序 + group_ranks = [] + tensor_model_parallel_size = infer_tensor_model_parallel_size + context_parallel_size = 1 + expert_model_parallel_size = infer_expert_model_parallel_size + infer_data_parallel_size = world_size // tensor_model_parallel_size // infer_pipeline_model_parallel_size + tensor_and_data_group_size_with_cp: int = tensor_model_parallel_size * infer_data_parallel_size * context_parallel_size + num_tensor_and_data_groups_with_cp: int = world_size // tensor_and_data_group_size_with_cp + num_expert_groups: int = infer_data_parallel_size * context_parallel_size // expert_model_parallel_size + tensor_and_expert_group_size = tensor_model_parallel_size * expert_model_parallel_size + group_ranks = [] + for i in range(num_tensor_and_data_groups_with_cp): + for j in range(num_expert_groups): + start_rank = i * tensor_and_data_group_size_with_cp + j * tensor_and_expert_group_size + end_rank = i * tensor_and_data_group_size_with_cp + (j + 1) * tensor_and_expert_group_size + ranks = range(start_rank, end_rank) + group_ranks.append(list(ranks)) + logger.info(f"EP rank: {group_ranks}") + + ascend_ps._EP = init_model_parallel_group(group_ranks, + get_world_group().local_rank, + backend, + group_name="ep") + + global _ETP + assert _ETP is None, ( + "expert tensor parallel group is already initialized") + + group_ranks = [] + for i in range(num_expert_tensor_parallel_groups): + ranks = list(range(i * infer_expert_tensor_parallel_size, + (i + 1) * infer_expert_tensor_parallel_size)) + group_ranks.append(ranks) + logger.info(f"ETP rank: {group_ranks}") + + ascend_ps._ETP = init_model_parallel_group(group_ranks, + get_world_group().local_rank, + backend, + group_name="etp") + + if data_parallel_size > 1: + global _DP + assert _DP is None, ("data parallel group is already initialized") + dp_group_ranks = torch.tensor(tp_group_ranks).transpose(0, 1).reshape(-1, data_parallel_size).unbind(0) + group_ranks = [x.tolist() for x in dp_group_ranks] + logger.info(f"DP rank: {group_ranks}") + + ps._DP = init_model_parallel_group(group_ranks, + get_world_group().local_rank, + backend, + group_name="dp") + + os.environ["VLLM_DP_RANK"] = str(ps._DP.rank_in_group) + envs.VLLM_DP_RANK = int(os.environ["VLLM_DP_RANK"]) + ip_list = get_cluster_info() + + for index, group_rank in enumerate(group_ranks): + if torch.distributed.get_rank() in group_rank: + os.environ["VLLM_DP_MASTER_PORT"] = str( + int(os.environ.get("MASTER_PORT")) + 1 + index) + os.environ["VLLM_DP_MASTER_IP"] = ip_list[group_rank[0]] + + envs.VLLM_DP_MASTER_IP = os.environ["VLLM_DP_MASTER_IP"] + envs.VLLM_DP_MASTER_PORT = int(os.environ["VLLM_DP_MASTER_PORT"]) + os.environ["VLLM_PORT"] = os.environ["VLLM_DP_MASTER_PORT"] + envs.VLLM_PORT = envs.VLLM_DP_MASTER_PORT + + logger.info(f"rank: {torch.distributed.get_rank()}>>>>>>VLLM_DP_MASTER_IP: {envs.VLLM_DP_MASTER_IP}, VLLM_DP_MASTER_PORT: {envs.VLLM_DP_MASTER_PORT}") + def initialize_model_parallel( - tensor_model_parallel_size: int = 1, - pipeline_model_parallel_size: int = 1, - backend: Optional[str] = None, + tensor_model_parallel_size: int = 1, + pipeline_model_parallel_size: int = 1, + backend: Optional[str] = None, ) -> None: """ NOTE: This method is a hack from the open-sourced version without @@ -260,8 +377,6 @@ def initialize_model_parallel( world_size: int = torch.distributed.get_world_size() backend = backend or torch.distributed.get_backend(ps.get_world_group().device_group) - - num_tensor_model_parallel_groups: int = world_size // tensor_model_parallel_size global _TP if _TP is not None: @@ -295,3 +410,60 @@ def initialize_model_parallel( ps._PP = _PP # for verl + +def get_cluster_info(): + # 确保分布式环境已初始化 + if not dist.is_initialized(): + raise RuntimeError("Distributed environment not initialized") + + world_size = dist.get_world_size() + + # 获取当前节点的IP地址 + ip_address = _get_current_node_ip() + + # 收集所有rank的IP地址 + ip_list = [None] * world_size + dist.all_gather_object(ip_list, ip_address) + + return ip_list + + +def _get_current_node_ip() -> str: + try: + # 创建一个 UDP 套接字(仅用于获取接口信息) + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + # 连接到一个外部地址(无需真实通信) + s.connect(("8.8.8.8", 80)) # Google DNS 服务器 + local_ip = s.getsockname()[0] + except Exception: + local_ip = _get_ip_by_ifname() + if not local_ip: + # 如果失败,回退到遍历接口 + local_ip = "127.0.0.1" + hostname = socket.gethostname() + for addr in socket.getaddrinfo(hostname, None): + ip = addr[4][0] + if not ip.startswith("::"): + local_ip = ip + break + return local_ip + + +def _get_ip_by_ifname(): + """ + 通过接口名称(如 eth0、en0)获取 IPv4 地址 + 返回 IP 字符串,失败返回 None + """ + try: + # 执行 ifconfig 命令并捕获输出 + ifname = os.environ.get("HCCL_SOCKET_IFNAME", 0) + if ifname: + output = subprocess.check_output(["ifconfig", ifname], stderr=subprocess.STDOUT).decode() + # 正则匹配 IPv4 地址(排除 127.0.0.1) + matches = re.findall(r'inet (?:addr:)?((?:\d{1,3}\.){3}\d{1,3})', output) + for ip in matches: + if ip != "127.0.0.1": + return ip + return None + except subprocess.CalledProcessError: + return None diff --git a/mindspeed_rl/models/rollout/vllm_engine.py b/mindspeed_rl/models/rollout/vllm_engine.py index 649f36988d74ad77ac2ee53db87346b295bf12d0..c4fff6f6d21f75c97e4549aa7a16ec72b5b98012 100644 --- a/mindspeed_rl/models/rollout/vllm_engine.py +++ b/mindspeed_rl/models/rollout/vllm_engine.py @@ -9,10 +9,10 @@ import gc import ray import torch import torch.distributed +import torch_npu +from torch_npu.contrib import transfer_to_npu from torch.nn.utils.rnn import pad_sequence from transformers import AutoConfig -from vllm import LLM, SamplingParams -from mindspeed_rl.utils.utils import mstx_timer_decorator def dummy_compile(*compile_args, **compile_kwargs): @@ -23,14 +23,14 @@ def dummy_compile(*compile_args, **compile_kwargs): return wrapper return decorate - -torch.jit.script = dummy_compile torch.compile = dummy_compile +torch.jit.script = dummy_compile +from vllm import LLM, SamplingParams +from vllm.v1.core.kv_cache_utils import get_kv_cache_config, unify_kv_cache_configs from mindspeed_rl.utils.loggers import Loggers from mindspeed_rl.models.base.base_inference_engine import BaseInferEngine -from mindspeed_rl.config_cls.megatron_config import MegatronConfig from mindspeed_rl.models.rollout.vllm_adapter.vllm_parallel_state import initialize_parallel_state from mindspeed_rl.models.rollout.vllm_adapter.megatron_weight_loaders import ( load_megatron_weights, @@ -49,11 +49,12 @@ class VLLMInferEngine(BaseInferEngine): train_tensor_parallel_size: int, train_pipeline_parallel_size: int, train_expert_parallel_size: int, + train_context_parallel_size: int, infer_tensor_parallel_size: int, infer_pipeline_parallel_size: int, infer_expert_parallel_size: int, - megatron_config: MegatronConfig, sampling_config: dict, + infer_expert_tensor_parallel_size: int = 1, prompt_type: str = None, prompt_type_path: str = None, enable_prefix_caching: bool = False, @@ -64,6 +65,7 @@ class VLLMInferEngine(BaseInferEngine): gpu_memory_utilization: float = 0.5, trust_remote_code: bool = True, load_format: str = "megatron", + enforce_eager: bool = False, **kwargs ): """ @@ -74,10 +76,12 @@ class VLLMInferEngine(BaseInferEngine): train_tensor_parallel_size (int): Tensor parallel size during training. train_pipeline_parallel_size (int): Pipeline parallel size during training. train_expert_parallel_size (int): Expert parallel size during training. + train_context_parallel_size (int): Context parallel size during training. infer_tensor_parallel_size (int): Tensor parallel size during inference. infer_pipeline_parallel_size (int): Pipeline parallel size during inference. infer_expert_parallel_size (int): Expert parallel size during inference. sampling_config (dict): Configuration for text generation sampling. + infer_expert_tensor_parallel_size (int): Expert tensor parallel size during inference. enable_prefix_caching (bool): Whether to enable prefix caching. num_scheduler_steps (int): Num scheduler steps. Default is 1. max_num_seqs (int): Maximum number of sequences to process simultaneously. Default is 1. @@ -95,6 +99,7 @@ class VLLMInferEngine(BaseInferEngine): train_tensor_parallel_size=train_tensor_parallel_size, train_pipeline_parallel_size=train_pipeline_parallel_size, train_expert_parallel_size=train_expert_parallel_size, + train_context_parallel_size=train_context_parallel_size, infer_tensor_parallel_size=infer_tensor_parallel_size, infer_pipeline_parallel_size=infer_pipeline_parallel_size, infer_expert_parallel_size=infer_expert_parallel_size, @@ -105,6 +110,12 @@ class VLLMInferEngine(BaseInferEngine): trust_remote_code=trust_remote_code ) # Additional initialization logic for VLLMInferEngine + + torch.compile = dummy_compile + # vLLM Ascend must be patched in advance + from vllm_ascend.patch import platform + from vllm_ascend.patch import worker + # Initialize sampling parameters from SamplingConfig self.sampling_config = sampling_config try: @@ -112,13 +123,11 @@ class VLLMInferEngine(BaseInferEngine): n=sampling_config.get('num_completions', 1), logprobs=sampling_config.get('logprobs', 1), max_tokens=sampling_config.get('max_tokens', 128), - best_of=sampling_config.get('best_of', 2), top_p=sampling_config.get('top_p', 1.0), top_k=sampling_config.get('top_k', 50), min_p=sampling_config.get('min_p', 0.0), temperature=sampling_config.get('temperature', 0.2), - detokenize=sampling_config.get('detokenize', False), - seed=sampling_config.get('seed', None) + detokenize=sampling_config.get('detokenize', False) ) except Exception as e: raise ValueError(f"Error creating SamplingParams from dictionary") from e @@ -139,7 +148,6 @@ class VLLMInferEngine(BaseInferEngine): # Initialize parallel state if tensor parallel size is specified if train_tensor_parallel_size is not None: - num_tp_per_train_tp = train_tensor_parallel_size // infer_tensor_parallel_size os.environ['CUDA_TIMER_STREAM_KAFKA_ENABLE'] = '0' os.environ['MEGATRON_IMPORT_TIMERS'] = '0' initialize_parallel_state( @@ -147,32 +155,39 @@ class VLLMInferEngine(BaseInferEngine): train_tensor_model_parallel_size=train_tensor_parallel_size, infer_pipeline_model_parallel_size=infer_pipeline_parallel_size, train_pipeline_model_parallel_size=train_pipeline_parallel_size, + train_expert_model_parallel_size=train_expert_parallel_size, + infer_expert_model_parallel_size=infer_expert_parallel_size, + train_context_model_parallel_size=train_context_parallel_size ) if load_format == "megatron": update_megatron_weight_loader() - torch.jit.script = dummy_compile - torch.compile = dummy_compile - # Initialize the LLM engine self.llm = LLM( + seed=1234, model=tokenizer_name_or_path, trust_remote_code=trust_remote_code, tensor_parallel_size=infer_tensor_parallel_size, - load_format="dummy" if load_format == "megatron" else "auto", + load_format='dummy' if load_format == 'megatron' else load_format, distributed_executor_backend="external_launcher", enable_prefix_caching=enable_prefix_caching, num_scheduler_steps=num_scheduler_steps, dtype=dtype, - enforce_eager=False, + enforce_eager=enforce_eager, skip_tokenizer_init=False, gpu_memory_utilization=gpu_memory_utilization, max_num_seqs=max_num_seqs, - max_model_len=max_model_len + max_model_len=max_model_len, + additional_config={ + 'expert_tensor_parallel_size': infer_expert_tensor_parallel_size, + 'enable_graph_mode': int(os.environ.get('VLLM_ENABLE_GRAPH_MODE', '0')), + 'ascend_scheduler_config': {}, + } ) self.model = self.llm.llm_engine.model_executor.driver_worker.worker.model_runner.get_model() + self.kv_cache_configs = None self.cpu_model = {} for name, params in self.model.named_parameters(): @@ -180,14 +195,61 @@ class VLLMInferEngine(BaseInferEngine): if load_format == "megatron": self.free_cache_engine() + if os.environ['VLLM_USE_V1'] == '1': + self._initialize_kv_caches(self.llm.llm_engine.vllm_config) self.offload_model_weights() + from vllm.config import VllmConfig + + def _initialize_kv_caches(self, vllm_config: VllmConfig): + + # Get all kv cache needed by the model + kv_cache_specs = self.llm.llm_engine.engine_core.engine_core.model_executor.get_kv_cache_specs() + + # Profiles the peak memory usage of the model to determine how much + # memory can be allocated for kv cache. + available_gpu_memory = self.llm.llm_engine.engine_core.engine_core.model_executor.determine_available_memory() + + assert len(kv_cache_specs) == len(available_gpu_memory) + # Get the kv cache tensor size + self.kv_cache_configs = [ + get_kv_cache_config(vllm_config, kv_cache_spec_one_worker, + available_gpu_memory_one_worker) + for kv_cache_spec_one_worker, available_gpu_memory_one_worker in + zip(kv_cache_specs, available_gpu_memory) + ] + + # Since we use a shared centralized controller, we need the + # `kv_cache_config` to be consistent across all workers to make sure + # all the memory operators can be applied to all workers. + unify_kv_cache_configs(self.kv_cache_configs) + + # All workers have the same kv_cache_config except layer names, so use + # an arbitrary one to initialize the scheduler. + assert all([ + cfg.num_blocks == self.kv_cache_configs[0].num_blocks + for cfg in self.kv_cache_configs + ]) + def init_cache_engine(self): - if self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine is None: - self.llm.llm_engine.model_executor.driver_worker.worker._init_cache_engine() + if os.environ['VLLM_USE_V1'] == '1': + worker = self.llm.llm_engine.model_executor.driver_worker.worker + if not worker.model_runner.kv_caches: + # v1 使用显式初始化方法 + self.llm.llm_engine.engine_core.engine_core.model_executor.initialize_from_config( + self.kv_cache_configs) + else: + if self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine is None: + self.llm.llm_engine.model_executor.driver_worker.worker._init_cache_engine() def free_cache_engine(self): - ctx = self.llm.llm_engine.model_executor.driver_worker.worker.compilation_config.static_forward_context + if os.environ['VLLM_USE_V1'] == '1': + worker = self.llm.llm_engine.model_executor.driver_worker.worker + + ctx = worker.model_runner.vllm_config.compilation_config.static_forward_context + + else: + ctx = self.llm.llm_engine.model_executor.driver_worker.worker.compilation_config.static_forward_context from vllm.attention import AttentionType layer_need_kv_cache = [] @@ -201,10 +263,14 @@ class VLLMInferEngine(BaseInferEngine): for _ in range(pipeline_parallel_size): kv_cache.append(torch.tensor([])) ctx[layer_name].kv_cache = kv_cache - - self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine = None - self.llm.llm_engine.model_executor.driver_worker.worker.gpu_cache = None - + if os.environ['VLLM_USE_V1'] == '1': + worker = self.llm.llm_engine.model_executor.driver_worker.worker + + # 清理缓存引擎 + worker.model_runner.kv_caches = [] + else: + self.llm.llm_engine.model_executor.driver_worker.worker.cache_engine = None + self.llm.llm_engine.model_executor.driver_worker.worker.gpu_cache = None if hasattr(self.model.model.layers[0].self_attn, "attn"): for i in range(self.model.model.start_layer, self.model.model.end_layer): attn_impl = self.model.model.layers[i].self_attn.attn.impl @@ -212,18 +278,26 @@ class VLLMInferEngine(BaseInferEngine): attn_impl.key_cache = None attn_impl.value_cache = None - self.gpu_cache = None - gc.collect() torch.cuda.empty_cache() + def offload_model_weights(self): for name, params in self.model.named_parameters(): params.data = self.cpu_model[name] + if hasattr(self.model.model.layers[-1].self_attn, "mla_attn"): + for i in range(self.model.model.start_layer, self.model.model.end_layer): + mla = self.model.model.layers[i].self_attn.mla_attn.impl + if hasattr(mla, "w_kc"): + mla.w_kc = None + mla.w_vc = None + if hasattr(mla, "W_UV"): + mla.W_UV = None + mla.W_UK_T = None def sync_model_weights(self, params, load_format='megatron'): infer_parallel_config = InferParallelConfig(self.infer_tensor_parallel_size, self.infer_pipeline_parallel_size, - self.infer_expert_parallel_size) + self.infer_expert_parallel_size * self.infer_tensor_parallel_size) load_megatron_weights(params, self.model, infer_parallel_config, @@ -237,9 +311,12 @@ class VLLMInferEngine(BaseInferEngine): if hasattr(mla, "w_kc"): mla.w_kc = None mla.w_vc = None + if hasattr(mla, "W_UV"): + mla.W_UV = None + mla.W_UK_T = None + mla.process_weights_after_loading(None) @torch.no_grad() - @mstx_timer_decorator def generate_sequences(self, idx_list, **kwargs): self.init_cache_engine() with self.update_sampling_params(**kwargs): @@ -325,4 +402,4 @@ def get_local_rank() -> int: # Default to 0 (for testing or single-process scenarios) logger.warning("Warning: Unable to determine local rank. Defaulting to 0.") - return 0 + return 0 \ No newline at end of file diff --git a/mindspeed_rl/utils/tokenizer.py b/mindspeed_rl/utils/tokenizer.py index eb4e9a7dbe702dec16a726368f76299fa5e1abb0..73c75f902ba47e420f4f3f6cd38e252897e66787 100644 --- a/mindspeed_rl/utils/tokenizer.py +++ b/mindspeed_rl/utils/tokenizer.py @@ -259,8 +259,8 @@ class _HuggingFaceTokenizer(BaseTokenizer): def tokenize(self, text): return self.tokenizer(text).input_ids - def detokenize(self, token_ids): - return self.tokenizer.decode(token_ids) + def detokenize(self, token_ids, **kwargs): + return self.tokenizer.decode(token_ids, **kwargs) @property def eod(self): diff --git a/mindspeed_rl/workers/actor_hybrid_worker.py b/mindspeed_rl/workers/actor_hybrid_worker.py index bd5d3f80d710dcc90d9b73fef90172db5488e8e1..7970079ab6d1a6ae30aa6ee96ed1959cd10e8319 100644 --- a/mindspeed_rl/workers/actor_hybrid_worker.py +++ b/mindspeed_rl/workers/actor_hybrid_worker.py @@ -84,6 +84,9 @@ class ActorHybridWorkerBase(BaseWorker): self.inference_model = self._build_rollout() self.sharding_manager = self._build_sharding_manager() + + if self.generate_config.offload_train_param: + self.actor_offloader.onload_param() self.actor_hybrid = ActorRolloutHybrid( self.model, @@ -172,7 +175,7 @@ class ActorHybridWorkerBase(BaseWorker): self.args.consumed_train_samples += self.megatron_config.global_batch_size // self.rl_config.n_samples_per_prompt self.num_floating_point_operations_so_far += num_floating_point_operations(self.args, self.megatron_config.global_batch_size) - if self.parallel_state.is_pipeline_last_stage() and self.parallel_state.get_tensor_model_parallel_rank() == 0: + if self.parallel_state.is_pipeline_last_stage(ignore_virtual=True) and self.parallel_state.get_tensor_model_parallel_rank() == 0: ray.get(self.td.update_metrics.remote(value=metrics, cumulate=True)) ray.get( self.td.update_metrics.remote( @@ -352,6 +355,7 @@ class ActorHybridWorkerBase(BaseWorker): train_tensor_parallel_size=self.megatron_config.tensor_model_parallel_size, train_pipeline_parallel_size=self.megatron_config.pipeline_model_parallel_size, train_expert_parallel_size=self.megatron_config.expert_model_parallel_size, + train_context_parallel_size=self.megatron_config.context_parallel_size, infer_tensor_parallel_size=self.generate_config.infer_tensor_parallel_size, infer_pipeline_parallel_size=self.generate_config.infer_pipeline_parallel_size, infer_expert_parallel_size=self.generate_config.infer_expert_parallel_size, @@ -363,9 +367,9 @@ class ActorHybridWorkerBase(BaseWorker): max_model_len=self.generate_config.max_model_len, dtype=self.generate_config.dtype, gpu_memory_utilization=self.generate_config.gpu_memory_utilization, - trust_remote_code=self.generate_config.trust_remote_code + trust_remote_code=self.generate_config.trust_remote_code, + enforce_eager=self.generate_config.enforce_eager, ) - return rollout def _build_sharding_manager(self): @@ -385,7 +389,8 @@ class ActorHybridWorkerBase(BaseWorker): grad_offload=self.generate_config.offload_train_grad, train_param_offload=self.generate_config.offload_train_param, enable_validate=self.rl_config.enable_sharding_validate, - megatron_offloader=self.actor_offloader + megatron_offloader=self.actor_offloader, + noop_layers=self.megatron_config.noop_layers ) return sharding_manager diff --git a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py index d9f7026e962dff336b747a69b2483cf9e0e6cff6..1f98553351010291cd7a92abd2bac5856a481814 100644 --- a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py +++ b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py @@ -21,20 +21,27 @@ Manager used to shard weight and offload/onload optimizer from training stage to from itertools import chain from collections import defaultdict +import os import torch -import torch.distributed +import torch.distributed as dist +import vllm.distributed.parallel_state as ps +from mindspeed_rl.utils.loggers import Loggers from mindspeed_rl.workers.resharding.vllm_weight_container import MegatronStyleVllmWeightContainer from mindspeed_rl.workers.resharding.weight_adaptor import get_weight_adaptor from mindspeed_rl.utils.utils import mstx_timer_decorator +logger = Loggers( + name="vllm_engine_inference", +) + + class MegatronOffLoader: def __init__(self, megatron_model=None, optimizer=None, wrap_with_ddp=True): self.optimizer = optimizer self.model = megatron_model self.wrap_with_ddp = wrap_with_ddp - self.tensor_to_cpu_states_map = dict() @mstx_timer_decorator @@ -51,18 +58,29 @@ class MegatronOffLoader: @mstx_timer_decorator def offload_optimizer(self): - for param_group in self.optimizer.optimizer.param_groups: - for param in param_group['params']: - param.data = param.data.to("cpu", non_blocking=False) - self.optimizer.optimizer.state = self._move_to_device(self.optimizer.optimizer.state, "cpu") + if hasattr(self.optimizer, "chained_optimizers"): + optimizers = self.optimizer.chained_optimizers + else: + optimizers = [self.optimizer] + for optimizer in optimizers: + for param_group in optimizer.optimizer.param_groups: + for param in param_group['params']: + param.data = param.data.to("cpu", non_blocking=False) + optimizer.optimizer.state = self._move_to_device(optimizer.optimizer.state, + "cpu") @mstx_timer_decorator def onload_optimizer(self): - for param_group in self.optimizer.optimizer.param_groups: - for param in param_group['params']: - param.data = param.data.to(torch.cuda.current_device(), non_blocking=False) - self.optimizer.optimizer.state = self._move_to_device(self.optimizer.optimizer.state, - torch.cuda.current_device()) + if hasattr(self.optimizer, "chained_optimizers"): + optimizers = self.optimizer.chained_optimizers + else: + optimizers = [self.optimizer] + for optimizer in optimizers: + for param_group in optimizer.optimizer.param_groups: + for param in param_group['params']: + param.data = param.data.to(torch.cuda.current_device(), non_blocking=False) + optimizer.optimizer.state = self._move_to_device(optimizer.optimizer.state, + torch.cuda.current_device()) @mstx_timer_decorator def _move_to_device(self, data, device): @@ -133,7 +151,8 @@ class MegatronShardingManager: num_layer_list=None, moe_tp_extend_ep=None, parallel_state=None, - megatron_offloader=None + megatron_offloader=None, + noop_layers=None ): """Megatron Sharding Manager initialization. @@ -169,13 +188,13 @@ class MegatronShardingManager: moe_tp_extend_ep=moe_tp_extend_ep, parallel_state=parallel_state, weight_adaptor=self.weight_adaptor, - enable_validate=enable_validate) + enable_validate=enable_validate, + noop_layers=noop_layers) self.optimizer_offload = optimizer_offload self.grad_offload = grad_offload self.train_param_offload = train_param_offload self.enable_validate = enable_validate - self.use_distributed_optimizer = self.optimizer.config.use_distributed_optimizer self.inference_engine.offload_model_weights() self.megatron_offloader = megatron_offloader @@ -206,8 +225,6 @@ class MegatronShardingManager: 3. do resharding 4. offload training param """ - if self.train_param_offload: - self.megatron_offloader.onload_param() self.onload_infer_params() @@ -215,9 +232,9 @@ class MegatronShardingManager: if self.train_param_offload: self.megatron_offloader.offload_param() - self.inference_engine.sync_model_weights(infer_params, load_format='megatron') + @mstx_timer_decorator def exit_infer_mode(self): """ diff --git a/mindspeed_rl/workers/resharding/memory_buffer.py b/mindspeed_rl/workers/resharding/memory_buffer.py index 022a94690bee4da6df387bc198be3d36036203f9..98106d40715c053968609ab96b36d2f0ce9f66c8 100644 --- a/mindspeed_rl/workers/resharding/memory_buffer.py +++ b/mindspeed_rl/workers/resharding/memory_buffer.py @@ -71,7 +71,7 @@ class MemoryBuffer: if param_name not in self.tensor_indices: raise KeyError(f"Parameter {param_name} not found in the buffer.") - start_index, shape = self.tensor_indices[param_name] + start_index, shape = self.tensor_indices[param_name] # weight_name -- index shape return self.get(shape, start_index) @@ -82,6 +82,15 @@ def calc_padded_numel(shape: torch.Size, dtype: torch.dtype): return (numel + align_numel - 1) // align_numel * align_numel +# 构建EP增大的buffer———构造一个experts_weight_buffer_meta +def get_weight_buffer_meta_from_buffer(weight_buffer_meta) -> Dict[str, Dict]: + experts_weight_buffer_meta = {} + for name, meta_info in sorted(weight_buffer_meta.items()): + if "mlp.experts" in name: + experts_weight_buffer_meta[name] = meta_info + return experts_weight_buffer_meta + + def build_memory_buffer(weight_buffer_meta: Dict[str, Dict]) -> Dict[torch.dtype, MemoryBuffer]: """Build the memory buffer given weight_buffer_meta @@ -123,8 +132,61 @@ def build_memory_buffer(weight_buffer_meta: Dict[str, Dict]) -> Dict[torch.dtype return memory_buffers +def build_experts_memory_buffer(experts_weight_buffer_meta: Dict[str, Dict], experts_memory_expend_N) -> Dict[torch.dtype, MemoryBuffer]: + """Build the experts memory buffer given experts_weight_buffer_meta + + Args: + weight_buffer_meta: contains mapping from name to a dictionary containing shape and dtype of the tensors + + Returns: a large memory buffer for each dtype that can hold all the tensors + + """ + experts_memory_buffers = {} + total_numel_map = {} # map from dtype to the total numel + + for _, meta_info in sorted(experts_weight_buffer_meta.items()): + shape = meta_info['shape'] + shape = torch.Size([experts_memory_expend_N, shape[0], shape[1], shape[2]]) + dtype = meta_info['dtype'] + + if not isinstance(shape, torch.Size): + raise TypeError("Shape must be an instance of torch.Size") + if not isinstance(dtype, torch.dtype): + raise TypeError("dtype must be an instance of torch.dtype") + if dtype not in total_numel_map: + total_numel_map[dtype] = 0 + + tmp_numel = calc_padded_numel(shape, dtype) + total_numel_map[dtype] += tmp_numel + + + for dtype, total_numel in total_numel_map.items(): + # Create a buffer for each dtype with the total numel + experts_memory_buffers[dtype] = MemoryBuffer(total_numel, total_numel, dtype) + + # Now, insert each tensor's index and shape for later retrieval by name + current_index_map = {} # This keeps track of the current memory index for each dtype + for name, meta_info in sorted(experts_weight_buffer_meta.items()): + shape = meta_info['shape'] + shape = torch.Size([experts_memory_expend_N, shape[0], shape[1], shape[2]]) + dtype = meta_info['dtype'] + buffer = experts_memory_buffers[dtype] + tensor_size = calc_padded_numel(shape, dtype) + + start_index = current_index_map.get(dtype, 0) + current_index_map[dtype] = start_index + tensor_size + + buffer.tensor_indices[name] = (start_index, shape) + + return experts_memory_buffers + + def build_model_weight_buffer(model: nn.Module, names_per_pp: List[str], get_weight_buffer_meta): - memory_buffers = [ModelWeightBuffer(model, weight_names, get_weight_buffer_meta) for weight_names in names_per_pp] + combined_names_per_pp = [[] for _ in names_per_pp] + for pp_rank, vpp_stages in enumerate(names_per_pp): + for weight_names_per_stage in vpp_stages: + combined_names_per_pp[pp_rank].extend(weight_names_per_stage) + memory_buffers = [ModelWeightBuffer(model, weight_names, get_weight_buffer_meta) for weight_names in combined_names_per_pp] return memory_buffers @@ -139,7 +201,7 @@ class ModelWeightBuffer: self.weight_buffer_meta = self.get_weight_buffer_meta(self.model, weight_names) self.weight_names = list(self.weight_buffer_meta.keys()) self.memory_buffers = None - # self.memory_buffers = build_memory_buffer(self.weight_buffer_meta) + def __getitem__(self, weight_name: str) -> torch.Tensor: return self.get_weight_by_name(weight_name) diff --git a/mindspeed_rl/workers/resharding/vllm_weight_container.py b/mindspeed_rl/workers/resharding/vllm_weight_container.py index a783e90663e8d0241f617cb15b292043173c7ff5..15664ee375f9f4b5feab383a7631b70b5d9ae096 100644 --- a/mindspeed_rl/workers/resharding/vllm_weight_container.py +++ b/mindspeed_rl/workers/resharding/vllm_weight_container.py @@ -25,12 +25,16 @@ import torch.distributed as dist import numpy as np from torch.distributed import new_group +import vllm.distributed.parallel_state as ps -from mindspeed_rl.workers.resharding.memory_buffer import build_model_weight_buffer +from mindspeed_rl.workers.resharding.memory_buffer import build_model_weight_buffer, calc_padded_numel import mindspeed_rl.workers.resharding.utils from mindspeed_rl.workers.resharding.utils import get_tensor_parallel_partition_dim, tp_md5_validate, \ update_md5_by_rank, compute_md5, validate_md5, _build_infer_param_dict, get_tp_allgather_group, \ get_tp_allgather_world_size, is_tensor_parallel_param, get_tp_group, is_fake_tp_param +from mindspeed_rl.utils.loggers import Loggers + +logger = Loggers(__name__) class MegatronStyleVllmWeightContainer: @@ -42,7 +46,8 @@ class MegatronStyleVllmWeightContainer: moe_tp_extend_ep=False, parallel_state=None, weight_adaptor=None, - enable_validate=False) -> None: + enable_validate=False, + noop_layers=None) -> None: """ Megatron style vllm weight container. Arguments: @@ -64,16 +69,26 @@ class MegatronStyleVllmWeightContainer: self.megatron_model = megatron_model self.parallel_state = parallel_state self.weight_adaptor = weight_adaptor - self._num_hidden_layers = self.model_config.num_hidden_layers + self._num_hidden_layers = self.model_config.num_hidden_layers # 通过tokenier路径下的config.json获取hf的模型 + self._noop_layers = None + if noop_layers is not None: + self._noop_layers = [int(layer_idx) for layer_idx in noop_layers.split(',')] + self._num_hidden_layers += len(self._noop_layers) # pp configs self._pp_rank = self.parallel_state.get_pipeline_model_parallel_rank() self._pp_group = self.parallel_state.get_pipeline_model_parallel_group() self._pp_size = self.parallel_state.get_pipeline_model_parallel_world_size() + self._world_size = dist.get_world_size() + self.pp_group_size = self._world_size // self._pp_size + ## vpp self._num_layer_list = self._build_num_layer_list(num_layer_list) - self._vpp_size = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK else 1 - self._vpp_rank = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE else 0 - + self._vpp_rank = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK else 0 + self._vpp_size = self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if self.parallel_state._VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE else 1 + self._vpp_layer_list = self._build_vpp_layer_list(self._num_layer_list) + ## _noop_layers + self._global2local_map = self._build_global2local_map(self._vpp_layer_list, self._vpp_size, self._noop_layers) if self._noop_layers is not None else None + # tp configs self._tp_size = self.parallel_state.get_tensor_model_parallel_world_size() self._tp_group = self.parallel_state.get_tensor_model_parallel_group() @@ -97,7 +112,11 @@ class MegatronStyleVllmWeightContainer: self._infer_ep_size = infer_expert_parallel_size self.moe_tp_extend_ep = moe_tp_extend_ep - self._world_size = dist.get_world_size() + # TODO: infer_expert_tensor_parallel_size and num_process is fixed. + self.infer_expert_tensor_parallel_size = 1 + self.num_process = 1 + self._infer_ep_size = self._infer_ep_size * self._infer_tp_size + self.experts_memory_expend_N = self._infer_ep_size // self._ep_size # validate parallel configs self._validate_parallel_config() @@ -116,10 +135,9 @@ class MegatronStyleVllmWeightContainer: def _validate_parallel_config(self): if self._infer_pp_size != 1: raise ValueError("infer_pp_size != 1 not supported yet") - if self._infer_ep_size != 1: - raise ValueError("infer_ep_size != 1 not supported yet") - if self._ep_size > 1 and self._ep_size != self._infer_tp_size: - raise ValueError("For training EP, supports EP -> TP only currently.") + + if self._infer_ep_size % self._ep_size != 0: + raise ValueError("The training expert size should be divisibled by the inference expert size.") if self._ep_size > 1 and not self.moe_tp_extend_ep: raise ValueError("To enable training EP, you need to enable moe_tp_extend_ep and use GroupedMLP.") if self._pp_size < self._infer_pp_size: @@ -149,6 +167,12 @@ class MegatronStyleVllmWeightContainer: self._update_weight_buffers_intra_pp() self._update_weight_buffers_inter_pp() + + # 执行_update_weight_buffers_ep+_send_receive_experts的前提条件 + if(self.moe_tp_extend_ep and self._infer_ep_size >= self._ep_size): + self._update_weight_buffers_ep() + self._send_receive_experts() + params = self._get_all_params() params = _build_infer_param_dict(params=params) @@ -161,27 +185,55 @@ class MegatronStyleVllmWeightContainer: raise ValueError("num_layers % pp_size == 0, please specify num_layer_list") return [self._num_hidden_layers // self._pp_size for _ in range(self._pp_size)] + def _build_vpp_layer_list(self, num_layer_list): + if self._vpp_size <= 1: + return num_layer_list + for layers_in_pp_rank in num_layer_list: + if layers_in_pp_rank % self._vpp_size != 0: + raise ValueError("num_layers_per_pp % vpp_size != 0, please specify pp_size and vpp_size") + return [int(layers_in_pp_rank / self._vpp_size) for layers_in_pp_rank in num_layer_list] + + def _build_global2local_map(self, layer_list, vpp_size, noop_layers): + stage_layers_num = sum(layer_list) + glb2local_map = [] + for vpp_rank in range(vpp_size): + start_layer = vpp_rank * stage_layers_num + for _, layers_in_vpp_rank in enumerate(layer_list): + layer_idx_list = [ + layer_idx for layer_idx in range(start_layer, start_layer + layers_in_vpp_rank) + if layer_idx not in noop_layers + ] + glb2local_map += [layer_idx % layers_in_vpp_rank for layer_idx in layer_idx_list] + start_layer += layers_in_vpp_rank + + return glb2local_map + def _unwrap_megatron_model(self, model): """ Remove consecutive 'module.' prefixes from the model based on the state_dict's first key. This method only removes 'module.' from the beginning of the key and ignores other occurrences. """ - model = model[0] - first_key = list(dict(model.named_parameters()).keys())[0] - while first_key.startswith("module."): - model = model.module - first_key = first_key[len("module."):] # 更新键,去掉一个module. - return model + unwraped_model = [] + for model_chunk in model: + first_key = list(dict(model_chunk.named_parameters()).keys())[0] + while first_key.startswith("module."): + model_chunk = model_chunk.module + first_key = first_key[len("module."):] + unwraped_model.append(model_chunk) + return unwraped_model def _init_weight_buffers(self): """ Build buffers from vllm state dict. Totally build train pp_size buffers, each buffer corresponds to a pack of megatron weight. Return a list of buffers, and a reference dict megatron_param_name->buffer. """ - vllm_names = list(dict(self.vllm_model.named_parameters()).keys()) - self.weight_names_per_pp = self.weight_adaptor.get_weight_names_per_pp(self._num_layer_list, vllm_names) + vllm_names = list(dict(self.vllm_model.named_parameters()).keys()) # 获取每个pp内部的weights name + self.weight_names_per_pp = self.weight_adaptor.get_weight_names_per_pp(self._vpp_layer_list, vllm_names, + sum(self._num_layer_list), self._vpp_size, self._noop_layers) + self.weight_buffers = build_model_weight_buffer(self.vllm_model, self.weight_names_per_pp, - self.weight_adaptor.get_weight_buffer_meta) + self.weight_adaptor.get_weight_buffer_meta + ) def trans_ep_params_to_tp(self, megatron_param, name): """ @@ -264,7 +316,11 @@ class MegatronStyleVllmWeightContainer: async_op=False ) total_experts = self.num_local_experts * tp_size - return torch.cat(output_tensor_list, dim=1).reshape(hidden_size, total_experts, -1).permute(1, 0, 2) + res = torch.cat(output_tensor_list, dim=1).reshape(hidden_size, total_experts, -1) + if 'weight2' in name: + return res.permute(1, 2, 0).contiguous() + return res.permute(1, 0, 2).contiguous() + def _update_weight_buffers_intra_pp(self): """ @@ -281,35 +337,107 @@ class MegatronStyleVllmWeightContainer: return infer_param pp_rank = self._pp_rank - weight_buffer = self.weight_buffers[pp_rank] + weight_names = self.weight_names_per_pp[pp_rank] + weight_names_meta = self.weight_adaptor.convert_weight_name_meta(weight_names) true_megatron_model = self._unwrap_megatron_model(self.megatron_model) - normal_layer_func = partial(self.weight_adaptor.global2local_layer, num_layer_list=self._num_layer_list) - name_pairs = sorted(list(set([(name, self.weight_adaptor.replace_name_i2t(normal_layer_func(name))) - for name in weight_buffer.weight_names]))) + normal_layer_func = partial(self.weight_adaptor.global2local_layer, num_layer_list=self._vpp_layer_list, global2local_map=self._global2local_map) + name_pairs = sorted(list(set([(name, vpp_rank, self.weight_adaptor.replace_name_i2t(normal_layer_func(name, vpp_rank=vpp_rank))) + for vpp_rank, names_per_vpp in enumerate(weight_names_meta) for name in names_per_vpp]))) + if self.enable_validate: self.origin_params_for_md5 = hashlib.md5() self.infer_params_for_md5 = [hashlib.md5() for _ in range(get_tp_allgather_world_size())] - for hf_name, megatron_name in name_pairs: + + # 检查 linear_fc1 和 linear_fc2 权重形状是否符合特定关系(fc1 包含门控和扩展参数,因此大小是 fc2 的两倍)。不符合条件的模型不被支持。 + for _, vpp_rank, megatron_name in name_pairs: if megatron_name.endswith("linear_fc1.weight"): fc2_name = megatron_name.replace("linear_fc1", "linear_fc2") - megatron_param_fc1 = dict(true_megatron_model.named_parameters())[megatron_name] - megatron_param_fc2 = dict(true_megatron_model.named_parameters())[fc2_name] + megatron_param_fc1 = dict(true_megatron_model[vpp_rank].named_parameters())[megatron_name] + megatron_param_fc2 = dict(true_megatron_model[vpp_rank].named_parameters())[fc2_name] if megatron_param_fc1.shape[0] * megatron_param_fc1.shape[1] != megatron_param_fc2.shape[0] * \ megatron_param_fc2.shape[1] * 2: raise ValueError("Only implemented for Llama model which linear_fc1 contains gate and up params.") - megatron_params_dict = dict(true_megatron_model.named_buffers()) - megatron_params_dict.update(true_megatron_model.named_parameters()) - for hf_name, megatron_name in name_pairs: - megatron_param = megatron_params_dict[megatron_name] - param = _transfer_from_megatron_division(megatron_param, megatron_name) - weight_buffer.copy_by_name(hf_name, param) + weight_buffer = self.weight_buffers[pp_rank] + megatron_params_dict = {} + for vpp_rank in range(self._vpp_size): + megatron_params_dict.update({vpp_rank: dict(true_megatron_model[vpp_rank].named_buffers())}) + megatron_params_dict[vpp_rank].update(true_megatron_model[vpp_rank].named_parameters()) + + for hf_name, vpp_rank, megatron_name in name_pairs: + if((self._infer_ep_size > 1 or self._ep_size > 1) and "mlp.experts" in megatron_name): + pass + else: + megatron_param = megatron_params_dict[vpp_rank][megatron_name] + param = _transfer_from_megatron_division(megatron_param, megatron_name) + weight_buffer.copy_by_name(hf_name, param) # tp md5 validate if self.enable_validate: tp_md5_validate(self.infer_params_for_md5, self.origin_params_for_md5, f"rank[{self._rank}] tp params allgather") + def _update_weight_buffers_ep(self): + # 构造临时的experts_memory_buffers + for cur_pp_rank in range(self._pp_size): + pp_rank = self._pp_rank + from mindspeed_rl.workers.resharding.memory_buffer import build_experts_memory_buffer, get_weight_buffer_meta_from_buffer + # Step1 在当前的PP_rank中,设置一个临时的exprts_buffer + combined_names_per_pp = [] + vpp_stages = self.weight_names_per_pp[cur_pp_rank] + for weight_names_per_stage in vpp_stages: + combined_names_per_pp.extend(weight_names_per_stage) + self.weight_buffer_meta = self.weight_adaptor.get_weight_buffer_meta(self.vllm_model, combined_names_per_pp) + self.experts_weight_buffer_meta = get_weight_buffer_meta_from_buffer(self.weight_buffer_meta) + self.experts_memory_buffers = build_experts_memory_buffer(self.experts_weight_buffer_meta, self.experts_memory_expend_N) + + # Step2 将weights_buffer上对应的权重放到experts_buffer中 + if(cur_pp_rank == pp_rank): + weight_names = self.weight_names_per_pp[pp_rank] + weight_names_meta = self.weight_adaptor.convert_weight_name_meta(weight_names) + normal_layer_func = partial(self.weight_adaptor.global2local_layer, num_layer_list=self._vpp_layer_list, global2local_map=self._global2local_map) + name_pairs = sorted(list(set([(name, vpp_rank, self.weight_adaptor.replace_name_i2t(normal_layer_func(name, vpp_rank=vpp_rank))) + for vpp_rank, names_per_vpp in enumerate(weight_names_meta) for name in names_per_vpp]))) + true_megatron_model = self._unwrap_megatron_model(self.megatron_model) + + megatron_params_dict = {} + # 拿到当前pp的所有权重 + for vpp_rank in range(self._vpp_size): + megatron_params_dict.update({vpp_rank: dict(true_megatron_model[vpp_rank].named_buffers())}) + megatron_params_dict[vpp_rank].update(true_megatron_model[vpp_rank].named_parameters()) + + for hf_name, vpp_rank, megatron_name in name_pairs: + if((self._infer_ep_size > 1 or self._ep_size > 1) and "mlp.experts" in megatron_name): + megatron_param = megatron_params_dict[vpp_rank][megatron_name] + dtype = self.experts_weight_buffer_meta[hf_name]['dtype'] + self.experts_memory_buffers[dtype].copy_by_name(hf_name, megatron_param) + + # Step3 后续的操作可以复用 + global_src = dist.get_global_rank(group=self._pp_group, group_rank=cur_pp_rank) + + # broadcast专家权重(experts memory buffer中的) + for dtype, experts_memory_buffer in self.experts_memory_buffers.items(): + dist.broadcast(tensor=experts_memory_buffer.data, src=global_src, group=self._pp_group, async_op=False) + pp_group_rank = self._rank // self.pp_group_size + + # 获取对应的dtype + for name, tensor_indices_value in sorted(experts_memory_buffer.tensor_indices.items()): + shape = tensor_indices_value[1] # 是*N的 + index = pp_group_rank % self.experts_memory_expend_N + experts_tensor = experts_memory_buffer.get_by_name(name) + experts_tensor_reshape = experts_tensor.view(shape) + weight_tensor_infer = experts_tensor_reshape[index] + self.weight_buffers[cur_pp_rank].copy_by_name(name, weight_tensor_infer) + + # 卸载专家的buffer + experts_memory_buffer = None + self.experts_memory_buffers[dtype] = None + + for memory_buffer in self.experts_memory_buffers.values(): + memory_buffer = None + self.experts_memory_buffers = None + + def _update_weight_buffers_inter_pp(self): """ Update weight buffers by gathering weights from other pp stage. @@ -328,6 +456,36 @@ class MegatronStyleVllmWeightContainer: dist.broadcast(md5_tensor_src, group=self._pp_group, src=global_src, async_op=False) validate_md5(md5_tensor_src, md5_tensor, f"rank[{self._rank}] pp resharding params") + + def get_expert_router(self, cur_rank, train_tp_ep_size, infer_tp_ep_size, world_size): + for tp_ep_group_id in range(world_size // infer_tp_ep_size): + tp_ep_group = [i for i in range(tp_ep_group_id * infer_tp_ep_size, (tp_ep_group_id + 1) * infer_tp_ep_size)] + if cur_rank in tp_ep_group: + self.INFER_TP_EP_GROUP = tp_ep_group + stride = infer_tp_ep_size // train_tp_ep_size + dev_array = np.array(self.INFER_TP_EP_GROUP).reshape(stride, train_tp_ep_size) + src_router = np.squeeze(dev_array.transpose().reshape(1, infer_tp_ep_size)).tolist() + src = src_router[cur_rank % infer_tp_ep_size] + dst = self.INFER_TP_EP_GROUP[src_router.index(cur_rank)] + return src, dst + + def _send_receive_experts(self): + cur_rank = dist.get_rank() + src_rank, dst_rank = self.get_expert_router(cur_rank, self._ep_size, self._infer_ep_size, self._world_size) + for cur_pp_rank in range(self._pp_size): + for memory_buffer in self.weight_buffers[cur_pp_rank].memory_buffers.values(): + for name in sorted(memory_buffer.tensor_indices.keys()): + if "mlp.experts" in name: + # 做收发 + tensor_to_send = memory_buffer.get_by_name(name) + tensor_to_replace = torch.empty_like(tensor_to_send) + send_op = dist.P2POp(dist.isend, tensor_to_send, dst_rank) + recv_op = dist.P2POp(dist.irecv, tensor_to_replace, src_rank) + reqs = dist.batch_isend_irecv([send_op, recv_op]) + for req in reqs: + req.wait() + memory_buffer.copy_by_name(name, tensor_to_replace) + def _get_all_params(self): """Get all the parameters of the models in all pp ranks @@ -353,7 +511,7 @@ class MegatronStyleVllmWeightContainer: return if self._tp_size % self._infer_tp_size != 0: raise ValueError("self._tp_size must be divisible by self._infer_tp_size") - tp_allgather_size = self._tp_size // self._infer_tp_size + tp_allgather_size = self._tp_size if mindspeed_rl.workers.resharding.utils._TP_ALLGATHER_GROUP is not None: raise RuntimeError("Group for allgather tensor model parallel weight is already initialized") num_groups = self._world_size // tp_allgather_size @@ -432,7 +590,7 @@ class MegatronStyleVllmWeightContainer: 2. split train_tp params into groups (size: infer_tp_size) 3. return the corresponding param from group based on infer tp rank """ - if self._infer_tp_size <= self._tp_size: + if self._infer_tp_size <= self._tp_size or is_fake_tp_param(name, self.moe_tp_extend_ep): return param tp_group = get_tp_group() @@ -494,6 +652,9 @@ class MegatronStyleVllmWeightContainer: torch.distributed.all_gather(infer_param, param, group=tp_allgather_group) if self.enable_validate: update_md5_by_rank(infer_param, param, self.origin_params_for_md5, self.infer_params_for_md5) - infer_param = self._default_tp_concat_fn(name, param, infer_param) + part_len = len(infer_param) // self._infer_tp_size + start = self._rank % self._infer_tp_size + part_param = infer_param[part_len * start:part_len * (start + 1)] + infer_param = self._default_tp_concat_fn(name, param, part_param) return infer_param diff --git a/mindspeed_rl/workers/resharding/weight_adaptor.py b/mindspeed_rl/workers/resharding/weight_adaptor.py index 19ea46c2857a0bdacb46cffe919cbedee2c516a0..9a333a94f63c5ac6ca14966370cce5ab0cdb631f 100644 --- a/mindspeed_rl/workers/resharding/weight_adaptor.py +++ b/mindspeed_rl/workers/resharding/weight_adaptor.py @@ -42,6 +42,7 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): def __init__(self, model_config): super(MegatronVLLMWeightAdaptor, self).__init__() self.model_config = model_config + self.meta_info = None self.params_mapping = [ # (megatron core gpt model name, vllm model name) ("embedding.word_embeddings", "model.embed_tokens"), @@ -92,6 +93,8 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): """ pass + def convert_weight_name_meta(self, weight_names): + return weight_names def get_weight_buffer_meta(self, model, valid_names=None): weight_buffer_meta = {} @@ -103,11 +106,12 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): return weight_buffer_meta @staticmethod - def global2local_layer(name, num_layer_list): + def global2local_layer(name, num_layer_list, vpp_rank=0, global2local_map=None): """ Transform the model name in each model_chunk in global space to local space """ layer_name = 'layers' + num_layer_offset = vpp_rank * sum(num_layer_list) if layer_name in name: # belong to an intermediate layer split_name = name.split('.') @@ -122,12 +126,15 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): raise ValueError(f'split_name = {split_name}') # increment layer_num_idx by layer_offset - global_idx = int(split_name[layer_num_idx]) - for layers_in_pp in num_layer_list: - global_idx -= layers_in_pp - if global_idx < 0: - local_index = global_idx + layers_in_pp - break + if global2local_map is None: + global_idx = int(split_name[layer_num_idx]) - num_layer_offset + for layers_in_pp in num_layer_list: + global_idx -= layers_in_pp + if global_idx < 0: + local_index = global_idx + layers_in_pp + break + else: + local_index = global2local_map[int(split_name[layer_num_idx])] split_name[layer_num_idx] = str(local_index) name = '.'.join(split_name) # weight name in inference_tp_model @@ -135,15 +142,27 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): return name @staticmethod - def get_weight_names_per_pp(layer_list, vllm_names): + def get_weight_names_per_pp(layer_list, vllm_names, layers_num=None, vpp_size=0, noop_layers=None): + ## add protection for default kwargs + if not layers_num: + if vpp_size > 0: + ValueError(f"layers_num is required with vpp_size = {vpp_size}") + layers_num = sum(layer_list) - end_layer = sum(layer_list) - 1 + end_layer = layers_num - 1 - def get_weight_names_in_range(layer_range, names: list, layer_name='layers') -> list: + def get_weight_names_in_range(layer_range, names: list, noop_layers=None, layer_name='layers') -> list: """ Extract weights in a given range and also include the weights before and after the range as needed. """ start, end = layer_range + + layer_idx_list = [layer_idx for layer_idx in range(start, end + 1)] + if noop_layers: + layer_idx_list = [ + layer_idx - sum(1 for i in noop_layers if i <= layer_idx) for layer_idx in layer_idx_list if + layer_idx not in noop_layers + ] last_layer_index = end_layer names_in_range = [] @@ -160,7 +179,7 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): match = re.match(r'.*\.layers\.(\d+)', name) if match: layer_num = int(match.group(1)) - if start <= layer_num <= end: + if layer_num in layer_idx_list: names_in_range.append(name) # add names after decode layers @@ -172,13 +191,18 @@ class MegatronVLLMWeightAdaptor(BaseWeightAdaptor): break return names_in_range - pp_layers_range = [] - start_layer = 0 - for layers_in_pp_rank in layer_list: - pp_layers_range.append((start_layer, start_layer + layers_in_pp_rank - 1)) - start_layer += layers_in_pp_rank - weight_names_per_pp = [get_weight_names_in_range(layer_range, vllm_names) for layer_range in pp_layers_range] - return weight_names_per_pp + stage_layers_num = sum(layer_list) + weight_names_per_vpp_combined = [[] for _ in layer_list] + for vpp_rank in range(vpp_size): + start_layer = vpp_rank * stage_layers_num + for pp_rank, layers_in_vpp_rank in enumerate(layer_list): + vpp_layers_range = (start_layer, start_layer + layers_in_vpp_rank - 1) + weight_names_per_vpp = get_weight_names_in_range(vpp_layers_range, vllm_names, noop_layers) + weight_names_per_vpp_combined[pp_rank].append(weight_names_per_vpp) + + start_layer += layers_in_vpp_rank + + return weight_names_per_vpp_combined class DeepSeekMVWeightAdaptor(MegatronVLLMWeightAdaptor): @@ -187,6 +211,8 @@ class DeepSeekMVWeightAdaptor(MegatronVLLMWeightAdaptor): """ def __init__(self, model_config): super(DeepSeekMVWeightAdaptor, self).__init__(model_config) + self.meta_info = {'replace': {'kv_a_proj_with_mqa': 'qkv_proj'}, + 'delete': ['q_a_proj']} self.params_mapping = [ # (megatron core gpt model name, vllm model name) ("embedding.word_embeddings", "model.embed_tokens"), @@ -216,16 +242,48 @@ class DeepSeekMVWeightAdaptor(MegatronVLLMWeightAdaptor): if valid_names and name not in valid_names: continue if 'kv_a_proj_with_mqa' in name: - q_param = dict(model.named_parameters()).get(name.replace('kv_a_proj_with_mqa', 'q_a_proj')) + # 将kv_a_proj_with_mqa和q_a_proj的tensor拼接,并用qkv_proj和拼接的结果替换掉原来kv_a_proj_with_mqa的对应部分 + q_param = dict(model.named_parameters()).get(name.replace('kv_a_proj_with_mqa', 'q_a_proj' if self.model_config.q_lora_rank else "q_proj")) qkv_param_shape = torch.cat([q_param, param], dim=0).shape qkv_name = name.replace('kv_a_proj_with_mqa', 'qkv_proj') weight_buffer_meta[qkv_name] = {'shape': qkv_param_shape, 'dtype': param.dtype} - elif 'q_a_proj' in name: + elif 'q_a_proj' in name or 'q_proj' in name: continue else: weight_buffer_meta[name] = {'shape': param.shape, 'dtype': param.dtype} return weight_buffer_meta + def convert_weight_name_meta(self, weight_names): + if not self.meta_info: + return weight_names + + weight_names_meta = list() + for elements in weight_names: + if isinstance(elements, list): + tmp_weight_names_meta = self.convert_weight_name_meta(elements) + weight_names_meta.append(tmp_weight_names_meta) + else: + converted = False + if not converted and 'replace' in self.meta_info: + for key, value in self.meta_info['replace'].items(): + if key in elements: + qkv_name = elements.replace(key, value) + weight_names_meta.append(qkv_name) + converted = True + break + + if not converted and 'delete' in self.meta_info: + for key in self.meta_info['delete']: + if key in elements: + converted = True + break + + if not converted: + weight_names_meta.append(elements) + + return weight_names_meta + + class QwenMVWeightAdaptor(MegatronVLLMWeightAdaptor): """ @@ -239,6 +297,7 @@ WEIGHT_ADAPTOR_REGISTRY = { "Qwen2ForCausalLM": QwenMVWeightAdaptor, "DeepseekV3ForCausalLM": DeepSeekMVWeightAdaptor, "DeepseekV2ForCausalLM": DeepSeekMVWeightAdaptor, + "CustomDeepseekV3ForCausalLM": DeepSeekMVWeightAdaptor, } diff --git a/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml index 362dce0e53c69a4735af4d8e9e502dff3496783b..66420287bc1cc6b9f6b2690dfee4fb2ec4b4aaa3 100644 --- a/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml +++ b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml @@ -85,6 +85,7 @@ rl_config: num_npus: 8 generate_config: + enforce_eager: True gen_micro_batch_size: 16 trust_remote_code: true offload_train_optimizer: true diff --git a/tests/st/infer_engine/test_module_entry_vllm_engine.sh b/tests/st/infer_engine/test_module_entry_vllm_engine.sh deleted file mode 100644 index 8704a666899dd4af8fbe8ff5e65294e6555a4580..0000000000000000000000000000000000000000 --- a/tests/st/infer_engine/test_module_entry_vllm_engine.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash - -export CUDA_DEVICE_MAX_CONNECTIONS=1 -# 获取脚本的绝对路径 -SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) -export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH - -GPUS_PER_NODE=1 -MASTER_ADDR=localhost -MASTER_PORT=6555 -NNODES=1 -NODE_RANK=0 - -DISTRIBUTED_ARGS=" - --nproc_per_node $GPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" -echo "start test_vllm_engine st" - -torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_vllm_engine.py --distribute-backend nccl -torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_vllm_engine_multistep_decode.py --distribute-backend nccl \ No newline at end of file diff --git a/tests/st/infer_engine/test_vllm_engine.py b/tests/st/infer_engine/test_vllm_engine.py deleted file mode 100644 index a4536dbfe765246cdd2a8ba9678b683abf6de9f9..0000000000000000000000000000000000000000 --- a/tests/st/infer_engine/test_vllm_engine.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) Huawei Technologies Co., Ltd.2023-2025. All rights reserved. -import os -import logging - -import tensordict -import torch -from torch_npu.contrib import transfer_to_npu - -from mindspeed_rl.models.rollout.vllm_engine import VLLMInferEngine -from mindspeed_rl.config_cls.megatron_config import MegatronConfig -from mindspeed_rl.utils.loggers import Loggers - -tokenizer_name_or_path = "/data/for_dt/tokenizer/Llama-3.2-1B-Instruct/" -weights_path = "/data/for_dt/weights/Llama-3.2-1B-tp1pp1/iter_0000001/mp_rank_00/model_optim_rng.pt" -megatron_dict = {"num_attention_heads": 32, - "tensor_model_parallel_size": 1, - "num_query_groups": 8, - "group_query_attention": True} -sampling_config = { - "num_completions": 1, # 每个输入提示生成的独立完成项数量 - "logprobs": 1, # 返回的 top token 的对数概率数量 - "max_tokens": 128, # 生成输出的最大 token 数量 - "best_of": 2, # 内部生成候选完成项的数量,从中选择最佳的一个 - "top_p": 1.0, # 核采样的累积概率阈值 - "top_k": 50, # 采样时考虑的最高概率 token 的数量 - "min_p": 0.0, # token 选择的最小概率阈值 - "temperature": 0.2, # 控制预测随机性的温度参数 - "detokenize": False # 是否将生成的 token 转换回可读字符串 -} - - -def main(): - logger = Loggers( - name="test_vllm_engine", - ) - logger.info("start test_vllm_engine") - - conversation = [ - { - "role": "system", - "content": "You are a helpful assistant" - }, - { - "role": "user", - "content": "Hello" - }, - { - "role": "assistant", - "content": "Hello! How can I assist you today?" - }, - { - "role": "user", - "content": "Write an essay about the importance of higher education.", - }, - ] - - logger.info("load megatron weight") - megatron_st = torch.load(weights_path) - actor_weights = megatron_st['model'] - - # 配置初始化所需的参数 - - train_tensor_parallel_size = 1 - train_pipeline_parallel_size = 1 - infer_tensor_parallel_size = 1 - infer_pipeline_parallel_size = 1 - train_expert_parallel_size = 1 - infer_expert_parallel_size = 1 - max_num_seqs = 256 - trust_remote_code = True - - logger.info("enter vllmInferEngine") - - megatron_config = MegatronConfig(megatron_dict, {}) - megatron_config.num_attention_heads = 32 - megatron_config.tensor_model_parallel_size = 1 - megatron_config.num_query_groups = 8 - megatron_config.num_key_value_heads = 8 - megatron_config.group_query_attention = True - # 初始化 VLLMInferEngine 实例 - inference_engine = VLLMInferEngine( - megatron_config=megatron_config, - sampling_config=sampling_config, - train_expert_parallel_size=train_expert_parallel_size, - infer_expert_parallel_size=infer_expert_parallel_size, - tokenizer_name_or_path=tokenizer_name_or_path, - train_tensor_parallel_size=train_tensor_parallel_size, - train_pipeline_parallel_size=train_pipeline_parallel_size, - infer_tensor_parallel_size=infer_tensor_parallel_size, - infer_pipeline_parallel_size=infer_pipeline_parallel_size, - max_num_seqs=max_num_seqs, - trust_remote_code=trust_remote_code - ) - - logger.info("model inited") - inference_engine.free_cache_engine() - torch.cuda.empty_cache() - logger.info("free_cache") - - inference_engine.offload_model_weights() - logger.info("offload_model") - torch.cuda.empty_cache() - logger.info("empty_cache") - - logger.info("enter sync_model_weights") - inference_engine.sync_model_weights(actor_weights) - - logger.info("enter init_cache_engine") - inference_engine.init_cache_engine() - - logger.info("=" * 80) - logger.info("start chat") - outputs = inference_engine.chat(conversation) - logger.info("chat result is ", outputs) - - idx_list = [] - idx_list_per_step = [] - for i in range(2): - for j in range(4): - tokens = torch.randint(100, (10,)) - idx_list_per_step.append(tokens.view(-1).cpu().numpy().tolist()) - idx_list.extend(idx_list_per_step) - idx_list_per_step = [] - logger.info(len(idx_list), [len(i) for i in idx_list]) - - logger.info("start test generate_sequences ") - outputs = inference_engine.generate_sequences( - idx_list=idx_list, - ) - logger.info("generate_sequences output is:") - logger.info(outputs[0]) - - logger.info("input") - logger.info(idx_list[0]) - - -if __name__ == "__main__": - main() diff --git a/tests/st/infer_engine/test_vllm_engine_multistep_decode.py b/tests/st/infer_engine/test_vllm_engine_multistep_decode.py deleted file mode 100644 index d99d0eb4bb997f29f403503ce2921ebf334d9d86..0000000000000000000000000000000000000000 --- a/tests/st/infer_engine/test_vllm_engine_multistep_decode.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (c) Huawei Technologies Co., Ltd.2023-2025. All rights reserved. -import os -import logging - -import tensordict -import torch -from torch_npu.contrib import transfer_to_npu - -from mindspeed_rl.models.rollout.vllm_engine import VLLMInferEngine -from mindspeed_rl.config_cls.megatron_config import MegatronConfig -from mindspeed_rl.utils.loggers import Loggers - -tokenizer_name_or_path = "/data/for_dt/tokenizer/Llama-3.2-1B-Instruct/" -weights_path = "/data/for_dt/weights/Llama-3.2-1B-tp1pp1/iter_0000001/mp_rank_00/model_optim_rng.pt" -megatron_dict = {"num_attention_heads": 32, - "tensor_model_parallel_size": 1, - "num_query_groups": 8, - "group_query_attention": True} -sampling_config = { - "num_completions": 1, # 每个输入提示生成的独立完成项数量 - "logprobs": 1, # 返回的 top token 的对数概率数量 - "max_tokens": 128, # 生成输出的最大 token 数量 - "best_of": 2, # 内部生成候选完成项的数量,从中选择最佳的一个 - "top_p": 1.0, # 核采样的累积概率阈值 - "top_k": 50, # 采样时考虑的最高概率 token 的数量 - "min_p": 0.0, # token 选择的最小概率阈值 - "temperature": 0.2, # 控制预测随机性的温度参数 - "detokenize": False # 是否将生成的 token 转换回可读字符串 -} - - -def main(): - logger = Loggers( - name="test_vllm_engine_multistep_decode", - ) - logger.info("start test_vllm_engine_multistep_decode") - - conversation = [ - { - "role": "system", - "content": "You are a helpful assistant" - }, - { - "role": "user", - "content": "Hello" - }, - { - "role": "assistant", - "content": "Hello! How can I assist you today?" - }, - { - "role": "user", - "content": "Write an essay about the importance of higher education.", - }, - ] - - logger.info("load megatron weight") - megatron_st = torch.load(weights_path) - actor_weights = megatron_st['model'] - - # 配置初始化所需的参数 - - train_tensor_parallel_size = 1 - train_pipeline_parallel_size = 1 - infer_tensor_parallel_size = 1 - infer_pipeline_parallel_size = 1 - train_expert_parallel_size = 1 - infer_expert_parallel_size = 1 - max_num_seqs = 256 - trust_remote_code = True - - logger.info("enter vllmInferEngine") - - megatron_config = MegatronConfig(megatron_dict, {}) - megatron_config.num_attention_heads = 32 - megatron_config.tensor_model_parallel_size = 1 - megatron_config.num_query_groups = 8 - megatron_config.num_key_value_heads = 8 - megatron_config.group_query_attention = True - # 初始化 VLLMInferEngine 实例 - inference_engine = VLLMInferEngine( - megatron_config=megatron_config, - sampling_config=sampling_config, - train_expert_parallel_size=train_expert_parallel_size, - infer_expert_parallel_size=infer_expert_parallel_size, - tokenizer_name_or_path=tokenizer_name_or_path, - train_tensor_parallel_size=train_tensor_parallel_size, - train_pipeline_parallel_size=train_pipeline_parallel_size, - infer_tensor_parallel_size=infer_tensor_parallel_size, - infer_pipeline_parallel_size=infer_pipeline_parallel_size, - max_num_seqs=max_num_seqs, - trust_remote_code=trust_remote_code, - num_scheduler_steps=8, # 8 decode steps - ) - - logger.info("model inited") - inference_engine.free_cache_engine() - torch.cuda.empty_cache() - logger.info("free_cache") - - inference_engine.offload_model_weights() - logger.info("offload_model") - torch.cuda.empty_cache() - logger.info("empty_cache") - - logger.info("enter sync_model_weights") - inference_engine.sync_model_weights(actor_weights) - - logger.info("enter init_cache_engine") - inference_engine.init_cache_engine() - - logger.info("=" * 80) - logger.info("start chat") - outputs = inference_engine.chat(conversation) - logger.info("chat result is ", outputs) - - idx_list = [] - idx_list_per_step = [] - for i in range(2): - for j in range(4): - tokens = torch.randint(100, (10,)) - idx_list_per_step.append(tokens.view(-1).cpu().numpy().tolist()) - idx_list.extend(idx_list_per_step) - idx_list_per_step = [] - logger.info(len(idx_list), [len(i) for i in idx_list]) - - logger.info("start test generate_sequences ") - outputs = inference_engine.generate_sequences( - idx_list=idx_list, - ) - logger.info("generate_sequences output is:") - logger.info(outputs[0]) - - logger.info("input") - logger.info(idx_list[0]) - - -if __name__ == "__main__": - main() diff --git a/tests/st/resharding/test_module_entry_resharding.sh b/tests/st/resharding/test_module_entry_resharding.sh index dfe90827e0160142eff31d8f4b54adea485c8c70..100e60627f61a1b5abf0dea44924fa2ba15276a9 100644 --- a/tests/st/resharding/test_module_entry_resharding.sh +++ b/tests/st/resharding/test_module_entry_resharding.sh @@ -1,45 +1,54 @@ -#!/bin/bash +# #!/bin/bash -export CUDA_DEVICE_MAX_CONNECTIONS=1 -SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) -export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH -GPUS_PER_NODE=8 -MASTER_ADDR=localhost -MASTER_PORT=6555 -NNODES=1 -NODE_RANK=0 -WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) +# export CUDA_DEVICE_MAX_CONNECTIONS=1 +# SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) +# export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH +# export VLLM_DP_SIZE=1 +# export HCCL_BUFFSIZE=256 +# export VLLM_USE_V1=1 +# export VLLM_VERSION=0.9.0 +# export VLLM_ENABLE_GRAPH_MODE=0 +# export VLLM_ENABLE_MC2=0 +# export HCCL_OP_EXPANSION_MODE="AIV" +# export VLLM_ENABLE_TOPK_OPTIMZE=1 -DISTRIBUTED_ARGS=" - --nproc_per_node $GPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" -PYTHON_ARGS=" - --model-path "/data/for_dt/weights/Qwen2.5-7B-mg" \ - --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \ - --train-tp 4 \ - --train-pp 2 \ - --train-ep 1 \ - --infer-tp 2 \ - --infer-pp 1 \ - --infer-ep 1 -" -PYTHON_ARGS_new=" - --model-path "/data/for_dt/weights/Qwen2.5-7B-tp2pp2" \ - --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \ - --train-tp 2 \ - --train-pp 2 \ - --train-ep 1 \ - --infer-tp 4 \ - --infer-pp 1 \ - --infer-ep 1 -" +# GPUS_PER_NODE=8 +# MASTER_ADDR=localhost +# MASTER_PORT=6555 +# NNODES=1 +# NODE_RANK=0 +# WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) -echo "start test_resharding st" +# DISTRIBUTED_ARGS=" +# --nproc_per_node $GPUS_PER_NODE \ +# --nnodes $NNODES \ +# --node_rank $NODE_RANK \ +# --master_addr $MASTER_ADDR \ +# --master_port $MASTER_PORT +# " +# PYTHON_ARGS=" +# --model-path "/data/for_dt/weights/Qwen2.5-7B-mg" \ +# --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \ +# --train-tp 4 \ +# --train-pp 2 \ +# --train-ep 1 \ +# --infer-tp 2 \ +# --infer-pp 1 \ +# --infer-ep 1 +# " +# PYTHON_ARGS_new=" +# --model-path "/data/for_dt/weights/Qwen2.5-7B-tp2pp2" \ +# --tokenizer-path "/data/for_dt/weights/Qwen2.5-7B" \ +# --train-tp 2 \ +# --train-pp 2 \ +# --train-ep 1 \ +# --infer-tp 4 \ +# --infer-pp 1 \ +# --infer-ep 1 +# " -torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS +# echo "start test_resharding st" -torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS_new \ No newline at end of file +# torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS + +# torchrun $DISTRIBUTED_ARGS $SCRIPT_DIR/test_resharding.py $PYTHON_ARGS_new \ No newline at end of file diff --git a/tests/st/resharding/test_resharding.py b/tests/st/resharding/test_resharding.py index 676d4131efd3ce102773542d99c08b027be5a09f..42134b7d8f95af3e9614ad2549d4b766d65d505d 100644 --- a/tests/st/resharding/test_resharding.py +++ b/tests/st/resharding/test_resharding.py @@ -306,6 +306,7 @@ class TestActor(): train_tensor_parallel_size=args.train_tp, train_pipeline_parallel_size=args.train_pp, train_expert_parallel_size=args.train_ep, + train_context_parallel_size=args.train_cp, infer_tensor_parallel_size=args.infer_tp, infer_pipeline_parallel_size=args.infer_pp, infer_expert_parallel_size=args.infer_ep, @@ -315,6 +316,7 @@ class TestActor(): dtype="bfloat16", gpu_memory_utilization=0.6, trust_remote_code=True, + enforce_eager=True, megatron_config=megatron_config ) self.megatron_offloader = MegatronOffLoader(self.model, self.optimizer) @@ -364,6 +366,7 @@ def parse_args(): parser.add_argument("--train-tp", type=int, default=2) parser.add_argument("--train-pp", type=int, default=2) parser.add_argument("--train-ep", type=int, default=1) + parser.add_argument("--train_cp", type=int, default=1) parser.add_argument("--infer-tp", type=int, default=4) parser.add_argument("--infer-pp", type=int, default=1) parser.add_argument("--infer-ep", type=int, default=1)