diff --git a/README.md b/README.md index 0cb8170f846d257d09c6c69acc1f463025a15e3f..e99cf005e5d6ccda925c62f3a5836da9ed3d3544 100644 --- a/README.md +++ b/README.md @@ -80,15 +80,6 @@ MindSpeed-RL是基于昇腾生态的强化学习加速框架,旨在为华为 [ - - 指令微调 - Doc - - Qwen2.5-7B
- Qwen2.5-32B
- - Preview - 结果奖励 Doc diff --git a/SECURITYNOTE.md b/SECURITYNOTE.md index 006739e80ff09dbe332dfc73c86fddd35515cb7e..b4d23bf84d8d3218460774cee1cfce7b75c5e465 100644 --- a/SECURITYNOTE.md +++ b/SECURITYNOTE.md @@ -47,7 +47,7 @@ ## 公开接口声明 -MindSpeed-RL 暂时未发布wheel包,无正式对外公开接口,所有功能均通过shell脚本调用。入口脚本皆放置于cli目录下,分别为 train_grpo.py, train_orm.py,train_sft.py, preprocess_data.py, convert_ckpt.py 和 infer_vllm.py。 +MindSpeed-RL 暂时未发布wheel包,无正式对外公开接口,所有功能均通过shell脚本调用。入口脚本皆放置于cli目录下,分别为 train_grpo.py, train_orm.py, preprocess_data.py, convert_ckpt.py 和 infer_vllm.py。 ## 公网地址声明 diff --git a/cli/train_grpo.py b/cli/train_grpo.py index 4c7c91b0444bef080ef828c9ac383c5d46ef4e7e..aeef7bc89f9bbb697f788faa49bd8b9b0a6b96c2 100644 --- a/cli/train_grpo.py +++ b/cli/train_grpo.py @@ -31,6 +31,7 @@ from mindspeed_rl.workers.scheduler.launcher import RayActorGroup from mindspeed_rl.workers.actor_hybrid_worker import ActorHybridWorker from mindspeed_rl.workers.reference_woker import ReferenceWorker from mindspeed_rl.workers.reward_woker import RewardWorker +from mindspeed_rl.workers.integrated_worker import IntegratedWorker cur_file_dir = Path(__file__).absolute().parent.parent logger = Loggers("grpo_train") @@ -45,47 +46,65 @@ def train(config): logger.info('start async initializing ray actor groups') - actor_worker = RayActorGroup( - worker=ActorHybridWorker, - placement_group=None, - megatron_config=actor_config, - rl_config=rl_config, - generate_config=generate_config, - model_provider=gpt_model_provider, - tokenizer=tokenizer, - initialize_func=initialize_megatron, - get_megatron_module=get_megatron_module, - global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt - ).initialize() - - reference_worker = RayActorGroup( - worker=ReferenceWorker, - placement_group=None, - megatron_config=ref_config, - rl_config=rl_config, - model_provider=gpt_model_provider, - tokenizer=tokenizer, - initialize_func=initialize_megatron, - get_megatron_module=get_megatron_module, - global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt - ).initialize() - reward_list = [] - if rl_config.reward_resource: - reward_worker = RayActorGroup( - worker=RewardWorker, + if rl_config.use_integrated_worker: + integrated_worker = RayActorGroup( + worker=IntegratedWorker, placement_group=None, - megatron_config=reward_config, + megatron_config=actor_config, rl_config=rl_config, - model_provider=rm_model_provider, + generate_config=generate_config, + model_provider=gpt_model_provider, + tokenizer=tokenizer, + initialize_func=initialize_megatron, + get_megatron_module=get_megatron_module, + global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt, + ).initialize() + + actor_worker = integrated_worker + reference_worker = integrated_worker + + else: + actor_worker = RayActorGroup( + worker=ActorHybridWorker, + placement_group=None, + megatron_config=actor_config, + rl_config=rl_config, + generate_config=generate_config, + model_provider=gpt_model_provider, tokenizer=tokenizer, initialize_func=initialize_megatron, get_megatron_module=get_megatron_module, global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt ).initialize() - reward_list.append(reward_worker) + reference_worker = RayActorGroup( + worker=ReferenceWorker, + placement_group=None, + megatron_config=ref_config, + rl_config=rl_config, + model_provider=gpt_model_provider, + tokenizer=tokenizer, + initialize_func=initialize_megatron, + get_megatron_module=get_megatron_module, + global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt + ).initialize() + + if rl_config.reward_resource: + reward_worker = RayActorGroup( + worker=RewardWorker, + placement_group=None, + megatron_config=reward_config, + rl_config=rl_config, + model_provider=rm_model_provider, + tokenizer=tokenizer, + initialize_func=initialize_megatron, + get_megatron_module=get_megatron_module, + global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt + ).initialize() + + reward_list.append(reward_worker) if rl_config.rule_reward: rule_reward = RuleReward.options(num_cpus=rl_config.num_cpus_for_local_task).remote() @@ -145,11 +164,25 @@ def parse_training_config(config: Dict): """ actor_config = MegatronConfig({**config.get("megatron_training"), **config.get("actor_config")}, config.get('model')) - ref_config = MegatronConfig({**config.get("megatron_training"), **config.get("ref_config")}, - config.get('model')) - reward_config = MegatronConfig({**config.get("megatron_training"), **config.get("reward_config")}, - config.get('model')) rl_config = RLConfig(config.get("rl_config")) + + if rl_config.use_integrated_worker: + if "ref_config" in config: + raise ValueError( + f"ref_config should not be set when use_integrated_worker mode is on.") + ref_config = actor_config + + if "reward_config" in config: + raise ValueError( + f"reward_config should not be set when use_integrated_worker mode is on.") + reward_config = actor_config + + else: + ref_config = MegatronConfig({**config.get("megatron_training"), **config.get("ref_config")}, + config.get('model')) + + reward_config = MegatronConfig({**config.get("megatron_training"), **config.get("reward_config")}, + config.get('model')) generate_config = GenerateConfig(config.get("generate_config")) validate_rl_args(actor_config, ref_config, reward_config, rl_config, generate_config) diff --git a/cli/train_sft.py b/cli/train_sft.py deleted file mode 100644 index 8a5d190bac086c47f5c6dd62df4bda62ba6ac54b..0000000000000000000000000000000000000000 --- a/cli/train_sft.py +++ /dev/null @@ -1,360 +0,0 @@ -# Copyright 2020 The HuggingFace Inc. team. All rights reserved. -# Copyright (c) 2023; NVIDIA CORPORATION. All rights reserved. -# Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved. -import sys - -import torch -import hydra -from omegaconf import OmegaConf - -from mindspeed_rl.trainer.sft_trainer import SFTTrainer -from mindspeed_rl.config_cls.megatron_config import MegatronConfig -from mindspeed_rl.datasets.instruction_dataset import InstructionDataset -from mindspeed_rl.datasets.dataloader import InstructionDataLoader -from mindspeed_rl.datasets.build_dataset import build_train_valid_test_datasets -from mindspeed_rl.datasets.utils import build_data_iter, get_train_valid_test_num_samples -from mindspeed_rl.utils import get_tokenizer, Loggers, synchronize_time, seed_all, parse_args_from_config - -logger = Loggers('train_sft') - - -def sft_train(): - from megatron.core import parallel_state - from megatron.core.utils import get_model_config - from megatron.core.enums import ModelType - from megatron.training import get_args - from megatron.training.checkpointing import save_checkpoint - from megatron.training.training import evaluate_and_print_results, setup_model_and_optimizer - from megatron.training.utils import get_batch_on_this_cp_rank - - from mindspeed_llm.training import train - from mindspeed_llm.training.initialize import set_jit_fusion_options - from mindspeed_llm.training.utils import generate_actual_seq_len - - args = get_args() - set_jit_fusion_options() - - start_time = synchronize_time() - logger.info("sft training starting time: {}".format(start_time)) - - model, optimizer, opt_param_scheduler = setup_model_and_optimizer( - gpt_model_provider, ModelType.encoder_or_decoder) - logger.info('after model, optimizer and learning rate scheduler are built') - - model_arch_config = get_model_config(model[0]) - - # build tokenizer - tokenizer = get_tokenizer(args.tokenizer_name_or_path, - prompt_type=args.prompt_type, prompt_type_path=args.prompt_type_path) - logger.info('after tokenizer is built') - - # build dataset - train_valid_test_num_samples = get_train_valid_test_num_samples( - train_samples=args.train_samples, - train_iters=args.train_iters, - global_batch_size=args.global_batch_size, - eval_interval=args.eval_interval, - eval_iters=args.eval_iters, - ) - train_dataset, valid_dataset, test_dataset = build_train_valid_test_datasets( - data_prefix=args.data_path, - splits_string=args.split, - seq_length=args.seq_length + args.num_nextn_predict_layers, - train_valid_test_num_samples=train_valid_test_num_samples, - dataset_cls=InstructionDataset, - tokenizer=tokenizer, - parallel_state=parallel_state, - full_shuffle_instruction_dataset=args.full_shuffle_instruction_dataset, - no_shuffle=args.no_shuffle, - reset_position_ids=args.reset_position_ids, - prompt_type=args.prompt_type, - prompt_type_path=args.prompt_type_path, - seed=args.seed - ) - logger.info('after datasets are built') - - # Backward compatibility, assume fixed batch size. - if args.iteration > 0 and args.consumed_train_samples == 0: - if args.train_samples is not None: - raise ValueError('only backward compatiblity support for iteration-based training') - args.consumed_train_samples = args.iteration * args.global_batch_size - if args.iteration > 0 and args.consumed_valid_samples == 0: - if args.train_samples is None: - args.consumed_valid_samples = (args.iteration // args.eval_interval) * \ - args.eval_iters * args.global_batch_size - - # build_dataloader - train_dataloader = None - if train_dataset is not None and len(train_dataset) > 0: - train_dataloader = InstructionDataLoader( - dataset=train_dataset, - parallel_state=parallel_state, - tokenizer=tokenizer, - num_workers=args.num_workers, - tokenizer_padding_side=args.tokenizer_padding_side, - pad_to_multiple_of=args.pad_to_multiple_of if args.variable_seq_lengths else args.seq_length + args.num_nextn_predict_layers, - variable_seq_lengths=args.variable_seq_lengths, - num_nextn_predict_layers=args.num_nextn_predict_layers, - micro_batch_size=args.micro_batch_size, - comsumed_samples=args.consumed_train_samples, - seed=args.seed - ) - - valid_dataloader = None - if valid_dataset is not None and len(valid_dataset) > 0: - valid_dataloader = InstructionDataLoader( - dataset=valid_dataset, - parallel_state=parallel_state, - tokenizer=tokenizer, - num_workers=args.num_workers, - tokenizer_padding_side=args.tokenizer_padding_side, - pad_to_multiple_of=args.pad_to_multiple_of if args.variable_seq_lengths else args.seq_length + args.num_nextn_predict_layers, - variable_seq_lengths=args.variable_seq_lengths, - num_nextn_predict_layers=args.num_nextn_predict_layers, - micro_batch_size=args.micro_batch_size, - comsumed_samples=args.consumed_valid_samples, - seed=args.seed - ) - - test_dataloader = None - if test_dataset is not None and len(test_dataset) > 0: - test_dataloader = InstructionDataLoader( - dataset=test_dataset, - parallel_state=parallel_state, - tokenizer=tokenizer, - num_workers=args.num_workers, - tokenizer_padding_side=args.tokenizer_padding_side, - pad_to_multiple_of=args.pad_to_multiple_of if args.variable_seq_lengths else args.seq_length + args.num_nextn_predict_layers, - variable_seq_lengths=args.variable_seq_lengths, - num_nextn_predict_layers=args.num_nextn_predict_layers, - micro_batch_size=args.micro_batch_size, - comsumed_samples=0, - seed=args.seed - ) - - # Flags to know if we need to do training/validation/testing. - do_train = train_dataloader is not None and args.train_iters > 0 - do_valid = valid_dataloader is not None and args.eval_iters > 0 - do_test = test_dataloader is not None and args.eval_iters > 0 - flags = torch.tensor( - [int(do_train), int(do_valid), int(do_test)], - dtype=torch.long, device='cuda') - - torch.distributed.broadcast(flags, 0) - - args.do_train = getattr(args, "do_train", False) or flags[0].item() - args.do_valid = getattr(args, "do_valid", False) or flags[1].item() - args.do_test = getattr(args, "do_test", False) or flags[2].item() - - # build data_iterator - train_data_iterator = [] - valid_data_iterator = [] - test_data_iterator_list = [] - if args.virtual_pipeline_model_parallel_size is not None: - for i in range(len(model)): - parallel_state.set_virtual_pipeline_model_parallel_rank(i) - train_data_iterator.append(build_data_iter(train_dataloader, args.dataloader_type)) - valid_data_iterator.append(build_data_iter(valid_dataloader, args.dataloader_type)) - test_data_iterator_list.append(build_data_iter(test_dataloader, args.dataloader_type)) - else: - train_data_iterator = build_data_iter(train_dataloader, args.dataloader_type) - valid_data_iterator = build_data_iter(valid_dataloader, args.dataloader_type) - test_data_iterator = build_data_iter(test_dataloader, args.dataloader_type) - test_data_iterator_list = [test_data_iterator] - - logger.info('after dataloaders are built') - - # configure Trainer - megatron_modules = { - 'train_func': train, - 'parallel_state': parallel_state, - 'save_checkpoint_func': save_checkpoint, - 'evaluate_fun': evaluate_and_print_results, - "generate_seq_len_fun": generate_actual_seq_len, - 'batch_cp_func': get_batch_on_this_cp_rank - } - - trainer = SFTTrainer( - args=args, - model=model, - optimizer=optimizer, - train_data_iterator=train_data_iterator, - valid_data_iterator=valid_data_iterator, - test_data_iterator_list=test_data_iterator_list, - scheduler=opt_param_scheduler, - process_non_loss_data_func=None, - model_config=model_arch_config, - **megatron_modules - ) - - trainer.train() - - -def gpt_model_provider(pre_process, post_process): - """ - Builds the model. - - If you set the use_mcore_models to True, it will return the mcore GPT model and if not the legacy GPT model. - - Args: - pre_process (bool, optional): Set to true if you need to compute embedings. Defaults to True. - post_process (bool, optional): Set to true if you need to want to compute output logits/loss. - Defaults to True. - - - Returns: - Union[GPTModel, megatron.legacy.model.GPTModel]: The returned model - """ - from megatron.training import get_args - from megatron.core.models.gpt import GPTModel - from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec - from megatron.core.transformer.spec_utils import import_module - from megatron.training.arguments import core_transformer_config_from_args - args = get_args() - - logger.info('building GPT model ...') - # Experimental loading arguments from configs - config = core_transformer_config_from_args(args) - - if args.spec is not None: - transformer_layer_spec = import_module(args.spec) - else: - transformer_layer_spec = get_gpt_layer_local_spec(args.num_experts, args.moe_grouped_gemm) - - model = GPTModel( - config=config, - transformer_layer_spec=transformer_layer_spec, - vocab_size=args.padded_vocab_size, - max_sequence_length=args.max_position_embeddings, - pre_process=pre_process, - post_process=post_process, - fp16_lm_cross_entropy=args.fp16_lm_cross_entropy, - parallel_output=True, - share_embeddings_and_output_weights=not args.untie_embeddings_and_output_weights, - position_embedding_type=args.position_embedding_type, - rotary_percent=args.rotary_percent, - seq_len_interpolation_factor=args.rotary_seq_len_interpolation_factor - ) - - return model - - -def initialize_megatron( - extra_args_provider=None, - args_defaults=None, - ignore_unknown_args=False, - allow_no_cuda=False, - skip_mpu_initialization=False, - config=None, -): - """Set global variables, initialize distributed, and - set autoresume and random seeds. - `allow_no_cuda` should not be set unless using megatron for cpu only - data processing. In general this arg should not be set unless you know - what you are doing. - Returns a function to finalize distributed env initialization - (optionally, only when args.lazy_mpu_init == True) - """ - if args_defaults is None: - args_defaults = {} - - origin_sys_argv = sys.argv - sys.argv = [sys.argv[0]] - parse_args_from_config(config) - from mindspeed_llm.training.arguments import parse_args_decorator - import megatron - - parse_args = parse_args_decorator(megatron.training.arguments.parse_args) - args = parse_args(extra_args_provider, ignore_unknown_args) - sys.argv = origin_sys_argv - - if not allow_no_cuda: - if not torch.cuda.is_available(): - raise ValueError("Megatron requires CUDA.") - - from megatron.core import parallel_state - from megatron.training import get_args - from megatron.training.arguments import validate_args - from megatron.training.checkpointing import load_args_from_checkpoint - from megatron.training.global_vars import set_global_variables - from megatron.training.initialize import _set_random_seed, \ - _init_autoresume, _compile_dependencies, \ - _initialize_tp_communicators, _initialize_distributed - - if args.use_checkpoint_args or args_defaults.get("use_checkpoint_args", False): - if args.load is None: - raise ValueError("--use-checkpoints-args requires --load argument.") - load_args_from_checkpoint(args) - - validate_args(args, args_defaults) - - set_global_variables(args) - - if args.use_deter_comp: - seed_all(args.seed) - logger.info("deterministic computing is applied for npu.") - - # torch.distributed initialization - def finish_mpu_init(): - args = get_args() - # Pytorch distributed. - _initialize_distributed() - - # Random seeds for reproducibility. - if args.rank == 0: - logger.info("> setting random seeds to {} ...".format(args.seed)) - _set_random_seed(args.seed, args.data_parallel_random_init) - - if skip_mpu_initialization: - return None - - args = get_args() - if args.lazy_mpu_init: - args.use_cpu_initialization = True - # delayed initialization of DDP-related stuff - # We only set basic DDP globals - parallel_state.set_tensor_model_parallel_world_size(args.tensor_model_parallel_size) - # and return function for external DDP manager - # to call when it has DDP initialized - parallel_state.set_tensor_model_parallel_rank(args.rank) - return finish_mpu_init - else: - # Megatron's MPU is the master. Complete initialization right away. - finish_mpu_init() - - # Autoresume. - _init_autoresume() - - # Compile dependencies. - _compile_dependencies() - - if args.tp_comm_overlap: - _initialize_tp_communicators() - - # No continuation function - return None - - -def separate_config_and_parse_args(config): - model_config = config.model - sft_config = config.sft - - OmegaConf.set_struct(model_config, False) - OmegaConf.set_struct(sft_config, False) - - sft_config_dict = OmegaConf.to_container(sft_config, resolve=True) - model_config_dict = OmegaConf.to_container(model_config, resolve=True) - - megatron_config = MegatronConfig(sft_config_dict, model_config_dict) - return megatron_config - - -@hydra.main(config_path='../configs', config_name='sft_qwen25_7b', version_base=None) -def main(config): - megatron_config = separate_config_and_parse_args(config) - initialize_megatron(config=megatron_config) - sft_train() - - -if __name__ == '__main__': - main() diff --git a/configs/r1_zero_qwen25_7b_integrated.yaml b/configs/r1_zero_qwen25_7b_integrated.yaml new file mode 100644 index 0000000000000000000000000000000000000000..e7c4b5f11a175344e8d3282f8caf6e70cec4a135 --- /dev/null +++ b/configs/r1_zero_qwen25_7b_integrated.yaml @@ -0,0 +1,107 @@ +defaults: + - model: + - qwen25_7b + +megatron_training: + model: qwen25_7b + use_fused_rmsnorm: true + use_mcore_models: true + sequence_parallel: true + use_flash_attn: true + no_masked_softmax_fusion: true + attention_softmax_in_fp32: true + no_gradient_accumulation_fusion: true + use_fused_swiglu: true + use_fused_rotary_pos_emb: true + bf16: true + use_distributed_optimizer: true + tokenizer_type: PretrainedFromHF + tokenizer_name_or_path: ./Qwen2.5-7B-Instruct + global_batch_size: 4 + seq_length: 1024 + save_interval: 50 + train_iters: 2 + stage: ray_grpo + attention_dropout: 0.0 + init_method_std: 0.01 + hidden_dropout: 0.0 + distributed_backend: nccl + no_shared_storage: true + variable_seq_lengths: true + dataset_additional_keys: ['labels',] + data_path: ./data + split: 100,0,0 + +actor_config: + model: qwen25_7b + micro_batch_size: 2 + tensor_model_parallel_size: 4 + pipeline_model_parallel_size: 1 + lr: 5e-7 + lr_decay_style: cosine + min_lr: 5e-8 + weight_decay: 0.0 + lr_warmup_fraction: 0.0 + clip_grad: 10000.0 + adam_beta1: 0.9 + adam_beta2: 0.95 + initial_loss_scale: 4096 + finetune: true + load: ./ckpt + save: ./ckpt + no_load_optim: true + no_load_rng: true + +rl_config: + use_integrated_worker: true + blocking: true + update_micro_batch_size: 2 + experience_count: 4 + gamma: 1.0 + lam: 0.95 + adv_estimator: group_norm + kl_penalty: kl + kl_ctrl_type: fixed + init_kl_coef: 0.0001 + mini_batch_size: 256 + max_prompt_length: 2048 + epochs: 1 + clip_ratio: 0.2 + entropy_coeff: 0.001 + n_samples_per_prompt: 8 + rule_reward: true + verifier_function: ["base_acc"] + verifier_weight: [1.0] + verifier_parallel: 4 + verifier_timeout: 120 + use_tensorboard: true + actor_resource: + num_npus: 8 + +generate_config: + trust_remote_code: true + offload_train_optimizer: true + offload_train_grad: true + offload_train_param: true + + # 推理时的并行配置 + infer_tensor_parallel_size: 2 + infer_pipeline_parallel_size: 1 + infer_expert_parallel_size: 1 + + # vllm 模型相关设置 + max_num_seqs: 512 + max_model_len: 4096 + dtype: "bfloat16" + gpu_memory_utilization: 0.9 + num_scheduler_steps: 1 + + # 采样配置 + sampling_config: + logprobs: 1 + max_tokens: 2048 + top_p: 0.9 + top_k: 50 + min_p: 0.01 + temperature: 0.8 + detokenize: false diff --git a/configs/sft_pack_qwen25_32b.yaml b/configs/sft_pack_qwen25_32b.yaml deleted file mode 100644 index faf73537091e2caa54f518a9d698fdca8d83810f..0000000000000000000000000000000000000000 --- a/configs/sft_pack_qwen25_32b.yaml +++ /dev/null @@ -1,78 +0,0 @@ -defaults: - - model: - - qwen25_32b - -sft: - # memory_args: - swap_attention: true - recompute_granularity: full - recompute_method: block - recompute_num_layers: 4 - recompute_activation_function: true - - # tune_args: - finetune: true - stage: sft - is_instruction_dataset: true - variable_seq_lengths: false - tokenizer_not_use_fast: true - prompt_type: qwen - - # gpt_args: - reuse_fp32_param: true - norm_epsilon: 1e-6 - micro_batch_size: 2 - global_batch_size: 128 - tokenizer_type: PretrainedFromHF - tokenizer_name_or_path: ./Qwen2.5-32B/ - train_iters: 2000 - lr: 1.25e-6 - lr_decay_style: cosine - min_lr: 1.25e-7 - lr_warmup_fraction: 0.01 - weight_decay: 0.0 - clip_grad: 1.0 - initial_loss_scale: 4096 - use_distributed_optimizer: true - tensor_model_parallel_size: 8 - pipeline_model_parallel_size: 2 - sequence_parallel: true - use_mcore_models: true - use_fused_rmsnorm: true - use_flash_attn: true - no_masked_softmax_fusion: true - no_gradient_accumulation_fusion: true - use_fused_swiglu: true - use_fused_rotary_pos_emb: true - bf16: true - seq_length: 4096 - adam_beta1: 0.9 - adam_beta2: 0.95 - attention_dropout: 0.0 - init_method_std: 0.01 - hidden_dropout: 0.0 - overlap_grad_reduce: true - overlap_param_gather: true - - # data_args: - data_path: ./alpaca/data - split: 100,0,0 - no_shuffle: false - reset_position_ids: true - - # ckpt_args: - no_load_optim: true - no_load_rng: true - no_save_optim: true - no_save_rng: true - seed: 1234 - model: qwen25_32b - load: ./mcore_tp8pp2/ - save: /cache - - # output_args: - log_interval: 1 - save_interval: 2000 - eval_interval: 2000 - eval_iters: 0 - log_throughput: true diff --git a/configs/sft_pack_qwen25_7b.yaml b/configs/sft_pack_qwen25_7b.yaml deleted file mode 100644 index fe1c71107c1f340efac9d9f7b226824199e16b5a..0000000000000000000000000000000000000000 --- a/configs/sft_pack_qwen25_7b.yaml +++ /dev/null @@ -1,71 +0,0 @@ -defaults: - - model: - - qwen25_7b - -sft: - # tune_args: - finetune: true - stage: sft - is_instruction_dataset: true - variable_seq_lengths: false - tokenizer_not_use_fast: true - prompt_type: qwen - - # gpt_args: - norm_epsilon: 1e-6 - micro_batch_size: 2 - global_batch_size: 128 - tokenizer_type: PretrainedFromHF - tokenizer_name_or_path: ./Qwen2.5-7B/ - train_iters: 5000 - lr: 5e-5 - lr_decay_style: cosine - min_lr: 1.25e-7 - lr_warmup_fraction: 0.01 - weight_decay: 1e-1 - clip_grad: 1.0 - initial_loss_scale: 4096 - use_distributed_optimizer: true - tensor_model_parallel_size: 2 - pipeline_model_parallel_size: 2 - sequence_parallel: true - use_mcore_models: true - use_fused_rmsnorm: true - use_flash_attn: true - no_masked_softmax_fusion: true - no_gradient_accumulation_fusion: true - use_fused_swiglu: true - use_fused_rotary_pos_emb: true - bf16: true - seq_length: 4096 - adam_beta1: 0.9 - adam_beta2: 0.95 - attention_dropout: 0.0 - init_method_std: 0.01 - hidden_dropout: 0.0 - overlap_grad_reduce: true - overlap_param_gather: true - max_position_embeddings: 4096 - - # data_args: - data_path: ./dataset/data - reset_position_ids: true - split: 100,0,0 - no_shuffle: false - - # ckpt_args: - no_load_optim: true - no_load_rng: true - no_save_optim: true - no_save_rng: true - seed: 1234 - model: qwen25_7b - load: ./ckpt - save: ./ckpt - - # output_args: - log_interval: 1 - save_interval: 5000 - eval_interval: 5000 - eval_iters: 0 - log_throughput: true diff --git a/configs/sft_qwen25_32b.yaml b/configs/sft_qwen25_32b.yaml deleted file mode 100644 index ab0dd81f3bd3c73db5c999ed1ce683dcaeec93d1..0000000000000000000000000000000000000000 --- a/configs/sft_qwen25_32b.yaml +++ /dev/null @@ -1,77 +0,0 @@ -defaults: - - model: - - qwen25_32b - -sft: - # memory_args: - swap_attention: true - recompute_granularity: full - recompute_method: block - recompute_num_layers: 4 - recompute_activation_function: true - - # tune_args: - finetune: true - stage: sft - is_instruction_dataset: true - variable_seq_lengths: true - tokenizer_not_use_fast: true - prompt_type: qwen - - # gpt_args: - reuse_fp32_param: true - norm_epsilon: 1e-6 - micro_batch_size: 4 - global_batch_size: 128 - tokenizer_type: PretrainedFromHF - tokenizer_name_or_path: ./Qwen2.5-32B/ - train_iters: 2000 - lr: 1.25e-6 - lr_decay_style: cosine - min_lr: 1.25e-7 - lr_warmup_fraction: 0.01 - weight_decay: 0.0 - clip_grad: 1.0 - initial_loss_scale: 4096 - use_distributed_optimizer: true - tensor_model_parallel_size: 8 - pipeline_model_parallel_size: 2 - sequence_parallel: true - use_mcore_models: true - use_fused_rmsnorm: true - use_flash_attn: true - no_masked_softmax_fusion: true - no_gradient_accumulation_fusion: true - use_fused_swiglu: true - use_fused_rotary_pos_emb: true - bf16: true - seq_length: 4096 - adam_beta1: 0.9 - adam_beta2: 0.95 - attention_dropout: 0.0 - init_method_std: 0.01 - hidden_dropout: 0.0 - overlap_grad_reduce: true - overlap_param_gather: true - - # data_args: - data_path: ./alpaca/data - split: 100,0,0 - no_shuffle: false - - # ckpt_args: - no_load_optim: true - no_load_rng: true - no_save_optim: true - no_save_rng: true - seed: 1234 - model: qwen25_32b - load: ./mcore_tp8pp2/ - save: /cache - - # output_args: - log_interval: 1 - save_interval: 2000 - eval_interval: 2000 - eval_iters: 0 - log_throughput: true diff --git a/configs/sft_qwen25_7b.yaml b/configs/sft_qwen25_7b.yaml deleted file mode 100644 index 3182007d2b635221d8223cce162b1dd06b4de1a5..0000000000000000000000000000000000000000 --- a/configs/sft_qwen25_7b.yaml +++ /dev/null @@ -1,69 +0,0 @@ -defaults: - - model: - - qwen25_7b - -sft: - # tune_args: - finetune: true - stage: sft - is_instruction_dataset: true - variable_seq_lengths: true - tokenizer_not_use_fast: true - prompt_type: qwen - - # gpt_args: - norm_epsilon: 1e-6 - micro_batch_size: 4 - global_batch_size: 128 - tokenizer_type: PretrainedFromHF - tokenizer_name_or_path: ./Qwen2.5-7B/ - train_iters: 5000 - lr: 5e-5 - lr_decay_style: cosine - min_lr: 1.25e-7 - lr_warmup_fraction: 0.01 - weight_decay: 1e-1 - clip_grad: 1.0 - initial_loss_scale: 4096 - use_distributed_optimizer: true - tensor_model_parallel_size: 2 - pipeline_model_parallel_size: 2 - sequence_parallel: false - use_mcore_models: true - use_fused_rmsnorm: true - use_flash_attn: true - no_masked_softmax_fusion: true - no_gradient_accumulation_fusion: true - use_fused_swiglu: true - use_fused_rotary_pos_emb: true - bf16: true - seq_length: 4096 - adam_beta1: 0.9 - adam_beta2: 0.95 - attention_dropout: 0.0 - init_method_std: 0.01 - hidden_dropout: 0.0 - overlap_grad_reduce: true - overlap_param_gather: true - - # data_args: - data_path: ./alpaca/data - split: 100,0,0 - no_shuffle: false - - # ckpt_args: - no_load_optim: true - no_load_rng: true - no_save_optim: true - no_save_rng: true - seed: 1234 - model: qwen25_7b - load: ./tp2pp2/ - save: ./ckpt - - # output_args: - log_interval: 1 - save_interval: 5000 - eval_interval: 5000 - eval_iters: 0 - log_throughput: true diff --git a/docs/algorithms/supervised_finetune.md b/docs/algorithms/supervised_finetune.md deleted file mode 100644 index 1b07cbf5e2eb03e92aec02520add1e993415763a..0000000000000000000000000000000000000000 --- a/docs/algorithms/supervised_finetune.md +++ /dev/null @@ -1,248 +0,0 @@ -# MindSpeed-RL 监督微调 - -## 简介 - -监督微调 SFT(Supervised Fine-Tuning),是一种通过标注数据集对预训练的大型语言模型进行微调的技术。其目的是让模型能够更好地适应特定任务需求。通过在结构化的监督数据上进行训练,SFT -能够让模型更准确地理解和生成内容,例如执行指令、回答问题或提供更精准的对话。 - -在强化学习中,SFT 一般用来初始化 actor 模型和 reference 模型。 - -![image](../../sources/images/instruction_finetune/sft_in_rl.PNG) - -## 使用示例 - -### 准备环境 - -请参考首页[安装指南](../install_guide.md)安装环境和准备代码依赖。 - -### 准备数据 - -#### 数据下载 - -* [单轮对话:Alpaca英文数据集](https://huggingface.co/datasets/tatsu-lab/alpaca) - -Alpaca风格微调数据集下载可以基于网页直接下载,也可以基于命令行下载,比如: - -```bash -cd dataset/ -wget https://huggingface.co/datasets/tatsu-lab/alpaca/resolve/main/data/train-00000-of-00001-a09b74b3ef9c3b56.parquet -cd .. -``` - -#### 数据转换 - -##### pack格式转换 - -> 注意:packing格式转换的seq-length需要和SFT训练参数的seq-length保持一致,如果训练参数需要修改,则数据集也需要重新转换。 - -进入MindSpeed-RL目录,修改 `./configs/datasets/alpaca_instruction_pack.yaml` 文件,并执行如下参考命令 - -``` -bash examples/data/preprocess_data.sh alpaca_instruction_pack -``` - -##### 非pack格式转换 - -进入MindSpeed-RL目录,修改 `./configs/datasets/alpaca_instruction_non_pack.yaml` 文件,并执行如下参考命令 - -``` -bash examples/data/preprocess_data.sh alpaca_instruction_non_pack -``` - -##### 参数说明 - -【--input】 - -可以直接输入到数据集目录或具体文件,如果是目录,则处理全部文件, 支持 .parquet \ .csv \ .json \ .jsonl \ .txt \ .arrow 格式, -同一个文件夹下的数据格式需要保持一致 - -【--map-keys】 - -`--map-keys`参数用于配置字段映射来使用数据集。 - -Alpaca风格示例: - -``` -[ -{ - "instruction": "人类指令(必填)", - "input": "人类输入(选填)", - "output": "模型回答(必填)", - "system": "系统提示词(选填)", - "history": [ - ["第一轮指令(选填)", "第一轮回答(选填)"], - ["第二轮指令(选填)", "第二轮回答(选填)"] - ] -} -] -``` - -对于上面格式的数据,`--map-keys`参数完整应为 - -`'{"prompt":"instruction","query":"input","response":"output","system":"system","history":"history"}'` - -其中参数的key值`"prompt"、"query"、"response"、"system"、"history"` -代表数据集列映射后的属性,在代码中是固定的,不应改变,value值`"instruction"、"input"、"output"、"system"、"history"`对应数据集的列名。 - -考虑到alpaca数据集大部分都是`["instruction", "input", "output"]`型格式,因此我们为key值`["prompt", "query", "response"]` -设置了默认值。因此上面格式`--map-keys`参数可简略为`'{"system": "system","history": "history"}'` - -若数据集中无`system`与`history`列,则`--map-keys`可省略。 - -【--prompt-type】 - -用于指定模型模板,能够让base模型微调后能具备更好的对话能力。`prompt-type` -的可选项可以在[templates](../../configs/model/templates.json)文件内查看。 - -【--handler-name】 - -微调数据预处理Alpaca风格数据集时,应指定为`AlpacaStyleInstructionHandler`,根据`--map-keys`参数提取对应数据的列。 - -【--pack】 - -将数据转为Pack格式。 - -【--seq-length】 - -指定Pack数据集每条数据的长度。 - -【--append-eod】 - -在每个输入序列的末尾添加一个特殊的标记来表示输入序列的结束。 - -【--overwrite-cache】 - -用于控制是否覆盖已存在的缓存分词器。 - -**示例1:** - -``` ---map-keys '{"prompt":"notice","query":"question","response":"answer","system":"system_test","history":"histories"}' -``` - -则会提取数据集里的`"notice"、"question"、"answer"、"system_test"、"histories"`列。 - -**示例2:** - -``` ---map-keys '{"history":"histories"}' -``` - -则会提取数据集里的`"instruction"、"input"、"output"、"histories"`列,其中`"instruction"、"input"、"output"`列作为默认值隐式存在。 - -### 权重转换 - -#### 权重下载 - -* [qwen25-7b](https://huggingface.co/Qwen/Qwen2.5-7B/tree/main) -* [qwen25-32b](https://huggingface.co/Qwen/Qwen2.5-32B/tree/main) - -#### hf 转 mcore - -在训练前,需要将 Hugging Face 权重转换成 Mcore 格式。 - -注:这里会调用到 MindSpeed_LLM 仓,进行权重转换时注意按照安装手册中的环境准备步骤,将 mindspeed_llm 放入 MindSpeed-RL 目录下。 - -```bash -# 路径按照真实情况配置 -bash examples/ckpt/ckpt_convert_qwen25_hf2mcore.sh -``` - -##### 配置参数介绍 - -* `use-mcore-models`:启用 MCore 模型; -* `model-type`:指定模型类型,如 GPT; -* `load-model-type`:指定加载模型的类型,如 hf(Hugging Face); -* `save-model-type`:指定保存模型的类型,如 mg; -* `target-tensor-parallel-size`:设置目标张量并行大小; -* `target-pipeline-parallel-size`:设置目标流水线并行大小; -* `add-qkv-bias`:是否进行 QKV 偏置; -* `load-dir`:加载 Hugging Face 权重的路径; -* `save-dir`:保存转换后权重的路径; -* `tokenizer-model`:分词器模型文件的路径; -* `model-type-hf`:指定 Hugging Face 模型类型,如 llama2; -* `params-dtype`:指定参数的数据类型,如 bf16。 - -#### mcore转hf(可选) - -训练结束后,如果需要将生成的mcore格式权重转换回Hugging Face 格式,可以参照以下命令及脚本: - -```bash -# 路径按照真实情况配置 -bash examples/ckpt/ckpt_convert_qwen25_mcore2hf.sh -``` - -##### 配置参数介绍 - -这里的参数与上文基本一致,注意以下几个事项即可: - -1. 权重转换转回hugging-face格式时,tp 和 pp 配置需配置为1; -2. load-model-type参数配置为 mg,save-model-type参数配置为hf; -3. save-dir 路径需要填入原始HF模型路径,新权重会存于HF原始权重文件下的mg2hg目录下,如/qwen2.5_7b_hf/mg2hg/ - -### 开始训练 - -### 单机 - -参考[配置](../../configs/sft_qwen25_7b.yaml) -,根据真实环境填写路径。进入项目目录后通过 [examples/sft/sft_qwen25_7b.sh](../../examples/sft/sft_qwen25_7b.sh) -或者[examples/sft/sft_pack_qwen25_7b.sh](../../examples/sft/sft_pack_qwen25_7b.sh) 启动7B模型训练(单机) - -### 多机 - -参考[配置](../../configs/sft_qwen25_32b.yaml),根据真实环境填写路径。 -进入项目目录后通过 [examples/sft/sft_qwen25_32b.sh](../../examples/sft/sft_qwen25_32b.sh) -或者[examples/sft/sft_pack_qwen25_32b.sh](../../examples/sft/sft_pack_qwen25_32b.sh) 启动32B模型训练(多机) -在运行脚本前需要根据真实环境配置脚本中的环境变量 - -- MASTER_ADDR 主节点的IP -- MASTER_PORT 主节点的端口 -- NNODES 参与训练的节点数 -- NODE_RANK 该节点在集群内对应的RANK -- GLOO_SOCKET_IFNAME 可以通过 ifconfig 命令,找到本机IP对应的网卡名 -- TP_SOCKET_IFNAME,HCCL_SOCKET_IFNAME 可与 GLOO_SOCKET_IFNAME 配置成一样的 - -在所有需要启动的机器内配置好脚本,在命令行统一运行即可启动多机训练。 - -### 配置 - -脚本使用的是configs下的sft_qwen25_7b/sft_qwen25_32b/sft_pack_qwen25_7b/sft_pack_qwen25_32b配置文件 - -在文件内需要根据真实环境配置 - -- tokenizer_name_or_path 需配置为 tokenizer对应路径 -- data_path 需配置为 \/\ 的形式,需要保证加载的 bin - 文件的文件名为`_packed_(.*)_document(.*)` -- tensor_model_parallel_size 张量并行数 -- pipeline_model_parallel_size 流水线并行数 -- 需要保证并行数乘积能整除总卡数 -- 并行配置要与权重转换时转换目标的并行配置一致 - -## 参考集群规模 - - - - - - - - - - - - - - - - - - - - - - - - - - -
实验模型硬件信息集群规模启动脚本
Qwen25-7BAtlas 900 A2 PODc1x8non_pack
pack
Qwen25-32BAtlas 900 A2 PODc2x8non_pack
pack
\ No newline at end of file diff --git a/docs/features/hybrid_actor.md b/docs/features/hybrid_actor.md index e4d9126c0cdff44f9ad03503c8eae8558252b716..077ac36f5408da3b4d3c1d6e0f18548512fb00f5 100644 --- a/docs/features/hybrid_actor.md +++ b/docs/features/hybrid_actor.md @@ -70,22 +70,22 @@ generate_config: def initialize(self): # 初始化分布式环境 self.setup_distributed_rank() - + # 初始化训练态模型及卸载器 self.model, self.optimizer, self.opt_param_scheduler = self._build_model_optimizer() - self.megatron_offloader = MegatronOffLoader(self.optimizer, self.model) - + self.actor_offloader = MegatronOffLoader(self.model, self.optimizer) + # 在初始化推理态模型之前,首先卸载训练态的模型,这样才能让推理模型在初始化时正确计算KV Block的数量 if self.generate_config.offload_train_optimizer: - self.megatron_offloader.offload_optimizer() - if self.generate_config.offload_train_grad: - self.megatron_offloader.offload_grad() - if self.generate_config.offload_train_param: - self.megatron_offloader.offload_train_param() + self.actor_offloader.offload_optimizer() + if self.generate_config.offload_grad: + self.actor_offloader.offload_grad() + if self.generate_config.offload_param: + self.actor_offloader.offload_param() # 初始化推理态模型 self.inference_model = self._build_rollout() - + # 初始化 sharding_manager self.sharding_manager = self._build_sharding_manager() ... @@ -221,44 +221,44 @@ def generate_sequences(self): # mindspeed_rl/workers/resharding/megatron_sharding_manager.py class MegatronShardingManager: - + def reshard_to_train_mode(self): # 卸载推理态相关权重 self.inference_engine.offload_model_weights() self.offload_infer_params() torch.cuda.empty_cache() - + # 重新加载回训练态所需的内容 if self.optimizer_offload: self.megatron_offloader.onload_optimizer() if self.train_param_offload: - self.megatron_offloader.onload_train_param() + self.megatron_offloader.onload_param() if self.grad_offload: self.megatron_offloader.onload_grad() torch.cuda.empty_cache() def reshard_to_infer_mode(self): - + # 卸载训练态所需的相关参数 if self.optimizer_offload: self.megatron_offloader.offload_optimizer() if self.grad_offload: self.megatron_offloader.offload_grad() torch.cuda.empty_cache() - + # 训练态权重要在完成推理权重构建之后才能进行卸载 # 这里是为了对应初始化后第一次推理时,训练态权重不在显存上的情况 if self.train_param_offload: - self.megatron_offloader.onload_train_param() - + self.megatron_offloader.onload_param() + # 根据训练态和推理态的切分策略,完成推理态权重的构建 self.onload_infer_params() infer_params = self.vllm_weight_container.get_infer_params() - + # 开始推理前,将训练态权重进行卸载 if self.train_param_offload: - self.megatron_offloader.offload_train_param() - + self.megatron_offloader.offload_param() + # 将推理态权重从 weight_buffer 绑定到推理引擎上 self.inference_engine.sync_model_weights(infer_params, load_format='megatron') torch.cuda.empty_cache() diff --git a/examples/sft/sft_pack_qwen25_32b.sh b/examples/sft/sft_pack_qwen25_32b.sh deleted file mode 100644 index 63ea2ed78a86a4b9b3645204404548af8d7f720c..0000000000000000000000000000000000000000 --- a/examples/sft/sft_pack_qwen25_32b.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash -export CUDA_DEVICE_MAX_CONNECTIONS=1 -export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True -export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME" -export TP_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HYDRA_FULL_ERROR=1 - -MASTER_ADDR="localhost" -MASTER_PORT="6060" -NNODES=2 -NODE_RANK=0 -NPUS_PER_NODE=8 -WORLD_SIZE=$(($NPUS_PER_NODE*$NNODES)) - - -DISTRIBUTED_ARGS=" - --nproc_per_node $NPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" - -torchrun $DISTRIBUTED_ARGS cli/train_sft.py \ - --config-name sft_pack_qwen25_32b \ - | tee logs/sft_pack_qwen25_32b_rank${NODE_RANK}.log diff --git a/examples/sft/sft_pack_qwen25_7b.sh b/examples/sft/sft_pack_qwen25_7b.sh deleted file mode 100644 index 90edbfd02e67006a3fcd8bfcea1e63d2105c275c..0000000000000000000000000000000000000000 --- a/examples/sft/sft_pack_qwen25_7b.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -export CUDA_DEVICE_MAX_CONNECTIONS=1 -export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True -export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME" -export TP_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HYDRA_FULL_ERROR=1 - -GPUS_PER_NODE=8 -MASTER_ADDR=localhost -MASTER_PORT=6004 -NNODES=1 -NODE_RANK=0 -WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) - -DISTRIBUTED_ARGS=" - --nproc_per_node $GPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" - -torchrun $DISTRIBUTED_ARGS cli/train_sft.py \ - --config-name sft_pack_qwen25_7b \ - | tee logs/sft_pack_qwen25_7b_rank${NODE_RANK}.log diff --git a/examples/sft/sft_qwen25_32b.sh b/examples/sft/sft_qwen25_32b.sh deleted file mode 100644 index a400d66b5e346f2e7dbf381d646744c738cb032d..0000000000000000000000000000000000000000 --- a/examples/sft/sft_qwen25_32b.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash -export CUDA_DEVICE_MAX_CONNECTIONS=1 -export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True -export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME" -export TP_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HYDRA_FULL_ERROR=1 - -MASTER_ADDR="localhost" -MASTER_PORT="6060" -NNODES=2 -NODE_RANK=0 -NPUS_PER_NODE=8 -WORLD_SIZE=$(($NPUS_PER_NODE*$NNODES)) - - -DISTRIBUTED_ARGS=" - --nproc_per_node $NPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" - -torchrun $DISTRIBUTED_ARGS cli/train_sft.py \ - --config-name sft_qwen25_32b \ - | tee logs/sft_qwen25_32b_rank${NODE_RANK}.log \ No newline at end of file diff --git a/examples/sft/sft_qwen25_7b.sh b/examples/sft/sft_qwen25_7b.sh deleted file mode 100644 index 5845fbb075c55b35cbd4920ae48764607898cf86..0000000000000000000000000000000000000000 --- a/examples/sft/sft_qwen25_7b.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -export CUDA_DEVICE_MAX_CONNECTIONS=1 -export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True -export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME" -export TP_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME" -export HYDRA_FULL_ERROR=1 - -GPUS_PER_NODE=8 -MASTER_ADDR=localhost -MASTER_PORT=6005 -NNODES=1 -NODE_RANK=0 -WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) - -DISTRIBUTED_ARGS=" - --nproc_per_node $GPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" - -torchrun $DISTRIBUTED_ARGS cli/train_sft.py \ - --config-name sft_qwen25_7b \ - | tee logs/sft_qwen25_7b_rank${NODE_RANK}.log diff --git a/mindspeed_rl/__init__.py b/mindspeed_rl/__init__.py index d8d4ed4a2a1e3011d2cefc1320504e2b039bbb1f..076e803980b2267875ba66b92d0ee40ac8c30473 100644 --- a/mindspeed_rl/__init__.py +++ b/mindspeed_rl/__init__.py @@ -11,7 +11,7 @@ from .models import ( LossFuncFactory, GRPOActorLossFunc, ReferenceLossFunc, RewardLossFunc, Actor, ActorRolloutHybrid, Reference, Reward ) -from .trainer import SFTTrainer, RayGRPOTrainer +from .trainer import RayGRPOTrainer from .workers import ( ReferenceWorker, RewardWorker, ActorHybridWorker, RayActorGroup, MegatronShardingManager, RuleReward @@ -27,8 +27,7 @@ __all__ = [ 'InstructionDataset', 'InstructionDataLoader', 'PromptDataset', 'PromptDataLoader', 'build_train_valid_test_datasets', 'get_train_valid_test_num_samples', 'LossFuncFactory', 'GRPOActorLossFunc', 'ReferenceLossFunc', 'RewardLossFunc', - 'Actor', 'ActorRolloutHybrid', 'Reference', 'Reward', - 'SFTTrainer', 'RayGRPOTrainer', + 'Actor', 'ActorRolloutHybrid', 'Reference', 'Reward', 'RayGRPOTrainer', 'get_tokenizer', 'WandbLogger', 'Metric', 'get_batch_metrices_mean', 'num_floating_point_operations', 'seed_all', 'synchronize_time', 'parse_args_from_config', diff --git a/mindspeed_rl/config_cls/rl_config.py b/mindspeed_rl/config_cls/rl_config.py index 650a5db15c1812c6cdf282130b2aa5e3100241a7..994bfc4f9b245ba2efc8effdf0de3a22aa1b926c 100644 --- a/mindspeed_rl/config_cls/rl_config.py +++ b/mindspeed_rl/config_cls/rl_config.py @@ -86,6 +86,7 @@ class RLConfig(BaseConfig): self.n_samples_per_prompt = 1 self.enable_sharding_validate = False self.tp_split_expert = False + self.update_micro_batch_size = None self.use_tensorboard = False self.use_wandb = False @@ -94,5 +95,6 @@ class RLConfig(BaseConfig): self.wandb_save_dir = "" self.blocking = False self.num_cpus_for_local_task = 1 + self.use_integrated_worker = False self.update(config_dict) diff --git a/mindspeed_rl/config_cls/validate_config.py b/mindspeed_rl/config_cls/validate_config.py index bba4ae9657239048e10d203fb1452b657d4989b0..00e400faedf94b31293296ddeba8e517eae2d763 100644 --- a/mindspeed_rl/config_cls/validate_config.py +++ b/mindspeed_rl/config_cls/validate_config.py @@ -1,8 +1,29 @@ # Copyright (c) 2025, HUAWEI CORPORATION. All rights reserved. import os +from mindspeed_rl.config_cls.rl_config import RLConfig +from mindspeed_rl.config_cls.megatron_config import MegatronConfig +from mindspeed_rl.config_cls.generate_config import GenerateConfig + + +def validate_rl_args( + actor_config: MegatronConfig, + ref_config: MegatronConfig, + reward_config: MegatronConfig, + rl_config: RLConfig, + generate_config: GenerateConfig + ): + + # 检查全共卡情况下参数设置 + if rl_config.use_integrated_worker: + if rl_config.reference_resource is not None: + raise ValueError( + f"reference_resource should not be set when use_integrated_worker mode is on.") + rl_config.reference_resource = rl_config.actor_resource + if rl_config.reward_resource is not None: + raise ValueError( + f" Reward model is not supported when use_integrated_worker mode is on.") -def validate_rl_args(actor_config, ref_config, reward_config, rl_config, generate_config): # 校验序列长度与模型最大长度 if generate_config.max_model_len < actor_config.seq_length: raise ValueError( @@ -102,10 +123,11 @@ def validate_rl_args(actor_config, ref_config, reward_config, rl_config, generat rl_config.n_samples_per_prompt, "Reference") - _validate_batch_ratio(reward_config.global_batch_size, - reward_config.micro_batch_size, - rl_config.n_samples_per_prompt, - "Reward") + if rl_config.reward_resource: + _validate_batch_ratio(reward_config.global_batch_size, + reward_config.micro_batch_size, + rl_config.n_samples_per_prompt, + "Reward") # 校验经验计数与全局批次关系 def _validate_experience_ratio(global_batch, experience_count, component): @@ -122,9 +144,10 @@ def validate_rl_args(actor_config, ref_config, reward_config, rl_config, generat rl_config.experience_count_ref, "Reference") - _validate_experience_ratio(reward_config.global_batch_size, - rl_config.experience_count_reward, - "Reward") + if rl_config.reward_resource: + _validate_experience_ratio(reward_config.global_batch_size, + rl_config.experience_count_reward, + "Reward") _validate_experience_ratio(reward_config.global_batch_size, rl_config.experience_count_rule_reward, diff --git a/mindspeed_rl/trainer/__init__.py b/mindspeed_rl/trainer/__init__.py index 8ebefa9c0aabc88898b8c88512c9ab2c3f6ee4c3..60eeea726eae67fe92b512dd2f706410b97570bb 100644 --- a/mindspeed_rl/trainer/__init__.py +++ b/mindspeed_rl/trainer/__init__.py @@ -1,7 +1,5 @@ # Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved. - -from .sft_trainer import SFTTrainer from .grpo_trainer_hybrid import RayGRPOTrainer from .orm_trainer import ORMTrainer -__all__ = ['SFTTrainer', 'ORMTrainer', 'RayGRPOTrainer'] +__all__ = ['ORMTrainer', 'RayGRPOTrainer'] diff --git a/mindspeed_rl/trainer/grpo_trainer_hybrid.py b/mindspeed_rl/trainer/grpo_trainer_hybrid.py index 12f74173861ca5b2d94dcab302217c9422e3e21c..079a3f30570d693d626d9cb20a9a6273081bebba 100644 --- a/mindspeed_rl/trainer/grpo_trainer_hybrid.py +++ b/mindspeed_rl/trainer/grpo_trainer_hybrid.py @@ -148,18 +148,18 @@ class RayGRPOTrainer(RayBaseTrainer): # generate sequences self.actor_worker.generate_sequences(blocking=self.blocking) - # compute reference log_prob - self.ref_worker.compute_log_prob(blocking=self.blocking) - # compute rm scores. for reward_worker in self.reward_list: if isinstance(reward_worker, RayActorGroup): reward_worker.compute_rm_score(blocking=self.blocking) else: - self.rule_reward_compute_rm_score(reward_worker, blocking=self.blocking) + self.rule_reward_compute_rm_score(reward_worker, blocking=False) # compute advantages, executed on the driver process - self.compute_advantage(blocking=self.blocking) + self.compute_advantage(blocking=False) + + # compute reference log_prob + self.ref_worker.compute_ref_log_prob(blocking=self.blocking) # compute old log_prob self.actor_worker.compute_log_prob(blocking=self.blocking) diff --git a/mindspeed_rl/trainer/sft_trainer.py b/mindspeed_rl/trainer/sft_trainer.py deleted file mode 100644 index 5d8741fbdd1378743fdc95320ea4b6303d5d9f7c..0000000000000000000000000000000000000000 --- a/mindspeed_rl/trainer/sft_trainer.py +++ /dev/null @@ -1,223 +0,0 @@ -# Copyright (c) 2023; NVIDIA CORPORATION. All rights reserved. -# Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved. -import os -from functools import partial -from abc import ABC - -import torch - -from mindspeed_rl.utils import Loggers -from mindspeed_rl.utils.utils import get_tune_attention_mask -from .utils.training import ( - get_finetune_data_on_this_tp_rank, broadcast_data, average_losses_across_data_parallel_group -) - -logger = Loggers('stf_trainer') - - -class SFTTrainer(ABC): - """ - Trainer to use while training reward model. - - Args: - model (torch.nn.Module): the model to train - strategy (Strategy): the strategy to use for training - optim(Optimizer): the optimizer to use for training - train_dataset (RewardDataset): the dataset to use for training - eval_dataset (RewardDataset): the dataset to use for evaluation - batch_size (int, defaults to 1): the batch size while training - max_epochs (int, defaults to 2): the number of epochs to train - optim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer - """ - - def __init__( - self, - args, - model, - optimizer, - train_data_iterator, - valid_data_iterator, - test_data_iterator_list, - scheduler, - process_non_loss_data_func=None, - model_config=None, - **kwargs - ) -> None: - super().__init__() - self.args = args - self.model = model - self.optimizer = optimizer - self.train_data_iterator = train_data_iterator - self.valid_data_iterator = valid_data_iterator - self.test_data_iterator_list = test_data_iterator_list - self.scheduler = scheduler - self.process_non_loss_data_func = process_non_loss_data_func - self.model_config = model_config - self.train_args = (self.forward_step, self.model, self.optimizer, self.scheduler, self.train_data_iterator, - self.valid_data_iterator, self.process_non_loss_data_func, self.model_config) - self.train_func = kwargs.get('train_func', None) - self.parallel_state = kwargs.get('parallel_state', None) - self.save_checkpoint_func = kwargs.get('save_checkpoint_func', None) - self.evaluate_fun = kwargs.get('evaluate_fun', None) - self.generate_seq_len_fun = kwargs.get('generate_seq_len_fun', None) - self.get_batch_on_this_cp_rank = kwargs.get('batch_cp_func', None) - - def get_batch(self, data_iterator): - """Generate a batch.""" - # Items and their type. - keys = ['input_ids', 'attention_mask', 'labels'] - if self.args.reset_position_ids: - keys += ['position_ids'] - data_type = torch.int64 - - if (not self.parallel_state.is_pipeline_first_stage()) and (not self.parallel_state.is_pipeline_last_stage()): - if self.args.variable_seq_lengths and self.args.pipeline_model_parallel_size > 2: - tokens, attention_mask = get_finetune_data_on_this_tp_rank(data_iterator, - self.parallel_state, - self.args.reset_position_ids, - self.args.tokenizer_padding_side) - return tokens, None, None, attention_mask, None - else: - # Broadcast data. - data_b = broadcast_data(keys, next(data_iterator), data_type) - if self.args.reset_position_ids: - self.generate_seq_len_fun(data_b) - attention_mask_1d = data_b.get('attention_mask').long() - attention_mask = get_tune_attention_mask(attention_mask_1d) - batch = {'attention_mask': attention_mask} - batch = self.get_batch_on_this_cp_rank(batch) - return None, None, None, batch['attention_mask'], None - - # Broadcast data. - data_b = broadcast_data(keys, next(data_iterator), data_type, self.parallel_state) - - # Unpack - labels = data_b.get('labels').long() - tokens = data_b.get('input_ids').long() - attention_mask_1d = data_b.get('attention_mask').long() - # ignored label -100 - loss_mask = torch.where(labels == -100, 0, 1) - - if self.args.reset_position_ids: - position_ids = data_b.get('position_ids').long() - self.generate_seq_len_fun(data_b) - batch = { - 'tokens': tokens, - 'labels': labels, - 'loss_mask': loss_mask, - } - batch = self.get_batch_on_this_cp_rank(batch) - batch['attention_mask'] = None - batch['position_ids'] = position_ids - return batch.values() - - attention_mask = get_tune_attention_mask(attention_mask_1d, - tokenizer_padding_side=self.args.tokenizer_padding_side, - reset_attention_mask=self.args.reset_attention_mask - ) - position_ids = None - batch = { - 'tokens': tokens, - 'labels': labels, - 'loss_mask': loss_mask, - 'attention_mask': attention_mask, - 'position_ids': position_ids - } - batch = self.get_batch_on_this_cp_rank(batch) - return batch.values() - - def loss_func(self, input_tensor: torch.Tensor, output_tensor: torch.Tensor): - """Loss function. - - Args: - input_tensor (torch.Tensor): Used to mask out some portions of the loss - output_tensor (torch.Tensor): The tensor with the losses - """ - loss_mask = input_tensor - - losses = output_tensor.float() - loss_mask = loss_mask[..., 1:].view(-1).float() - if self.args.context_parallel_size > 1: - loss = torch.cat([torch.sum(losses.view(-1) * loss_mask).view(1), loss_mask.sum().view(1)]) - torch.distributed.all_reduce(loss, - group=self.parallel_state.get_context_parallel_group()) - loss = loss[0] / loss[1] - else: - loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum() - - # Check individual rank losses are not NaN prior to DP all-reduce. - if self.args.check_for_nan_in_loss_and_grad: - global_rank = torch.distributed.get_rank() - if loss.isnan(): - raise ValueError(f'Rank {global_rank}: found NaN in local forward loss calculation. ' - f'Device: {torch.cuda.current_device()}, node: {os.uname()[1]}') - - # Reduce loss for logging. - averaged_loss = average_losses_across_data_parallel_group([loss], parallel_state=self.parallel_state) - - return loss * self.args.context_parallel_size, {'lm loss': averaged_loss[0]} - - def forward_step(self, data_iterator, model): - """Forward training step. - - Args: - data_iterator : Input data iterator - model (GPTModel): The GPT Model - """ - - # Get the batch. - tokens, labels, loss_mask, attention_mask, position_ids = self.get_batch( - data_iterator) - - output_tensor = model(tokens, position_ids, attention_mask, - labels=labels) - - return output_tensor, partial(self.loss_func, loss_mask) - - def train(self): - test_data_iterator = self.test_data_iterator_list[0] - (forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator, - process_non_loss_data_func, config) = self.train_args - - if not self.args.skip_train: - logger.info('training ...') - - if self.args.dataloader_type == 'cyclic' and self.args.retro_project_dir: - if self.args.retro_cyclic_train_iters is None: - raise ValueError("retro_cyclic_train_iters must be provided.") - self.args.train_iters = self.args.retro_cyclic_train_iters - logger.info("retro cyclic train iters : %d" % self.args.train_iters) - - iteration = 0 - if self.args.do_train and self.args.train_iters > 0: - iteration, num_floating_point_operations_so_far = self.train_func( - *self.train_args) - - logger.info('after training is done') - - if self.args.save and iteration != 0 and iteration % self.args.save_interval != 0: - self.save_checkpoint_func( - iteration, - model, - optimizer, - opt_param_scheduler, - num_floating_point_operations_so_far - ) - else: - logger.info('skipping training (--skip-train is on) ...') - - iteration = self.args.iteration - - if self.args.do_valid: - prefix = f'iteration {iteration} on validation set' - self.evaluate_fun(prefix, forward_step_func, - valid_data_iterator, model, - iteration, process_non_loss_data_func, config, - verbose=True, write_to_tensorboard=not self.args.skip_train) - - if self.args.do_test: - prefix = f'iteration {iteration} on test set' - self.evaluate_fun(prefix, forward_step_func, - test_data_iterator, model, - iteration, process_non_loss_data_func, config, - verbose=True, write_to_tensorboard=not self.args.skip_train) diff --git a/mindspeed_rl/utils/utils.py b/mindspeed_rl/utils/utils.py index bf3bfa06524982226201fa50aa40e9c918897ebf..7ec4ea745273f7ce86b5db45c07c44764af765cc 100644 --- a/mindspeed_rl/utils/utils.py +++ b/mindspeed_rl/utils/utils.py @@ -222,7 +222,7 @@ def metrics_sort(metrics, time_all) -> Dict[str, Tensor]: metrics["timing/all"] = time_all sort_metrics = dict(sorted(metrics.items())) - custom_order = ['timing/all', 'timing/update', 'timing/resharding_to_infer', 'timing/rollout', 'timing/resharding_to_train', 'timing/old_log_p', 'timing/reference_model', 'timing/non_overlap_reference_model'] + custom_order = ['timing/all', 'timing/update', 'timing/rollout', 'timing/old_log_p', 'timing/reference_model', 'timing/non_overlap_reference_model'] special_keys = ['timing/non_overlap_rule_reward', 'timing/non_overlap_reward_model', 'timing/rule_reward', 'timing/reward_model'] keys_to_move = [key for key in sort_metrics.keys() if key in special_keys] remaining_keys = [] @@ -239,12 +239,13 @@ def compute_tps(compute_kwargs, metrics_result, gbs, n_samples, time_all): actor_resource = compute_kwargs.get('actor_resource', {}) reference_resource = compute_kwargs.get('reference_resource', {}) reward_resource = compute_kwargs.get('reward_resource', None) + actor_resource_only = compute_kwargs.get('use_integrated_worker', False) actor_npus = actor_resource.get('num_npus', 0) reference_npus = reference_resource.get('num_npus', 0) reward_npus = reward_resource.get('num_npus', 0) if reward_resource is not None else 0 - world_size = actor_npus + reference_npus + reward_npus + world_size = actor_npus + reference_npus + reward_npus if not actor_resource_only else actor_npus tps = (metrics_result['response_length/mean'] + metrics_result['prompt_length/mean']) * gbs * n_samples / world_size / time_all return tps diff --git a/mindspeed_rl/workers/actor_hybrid_worker.py b/mindspeed_rl/workers/actor_hybrid_worker.py index dc292b18a6d3aa535c073654fbc691fd6b358b28..7b9e109bb015f4fc087e323fec71db595d78eb44 100644 --- a/mindspeed_rl/workers/actor_hybrid_worker.py +++ b/mindspeed_rl/workers/actor_hybrid_worker.py @@ -25,8 +25,7 @@ from mindspeed_rl.utils.compute import get_parallel_state from mindspeed_rl.trainer.utils.parallel_state import is_pipeline_last_stage, get_tensor_model_parallel_rank -@ray.remote(resources={"NPU": 0.7}) -class ActorHybridWorker(BaseWorker): +class ActorHybridWorkerBase(BaseWorker): """ ActorHybridWorker class. This class implements the hybrid worker logic for training and inference. @@ -71,13 +70,13 @@ class ActorHybridWorker(BaseWorker): self.setup_distributed_rank() self.model, self.optimizer, self.opt_param_scheduler = self._build_model_optimizer() self._set_no_sync_func() - self.megatron_offloader = MegatronOffLoader(self.optimizer, self.model) + self.actor_offloader = MegatronOffLoader(self.model, self.optimizer) if self.generate_config.offload_train_optimizer: - self.megatron_offloader.offload_optimizer() + self.actor_offloader.offload_optimizer() if self.generate_config.offload_train_grad: - self.megatron_offloader.offload_grad() + self.actor_offloader.offload_grad() if self.generate_config.offload_train_param: - self.megatron_offloader.offload_train_param() + self.actor_offloader.offload_param() self.inference_model = self._build_rollout() self.sharding_manager = self._build_sharding_manager() @@ -112,6 +111,8 @@ class ActorHybridWorker(BaseWorker): return self.args.consumed_train_samples def update(self, kl_ctrl=None): + self.sharding_manager.enter_train_mode() + self.args.curr_iteration = self.iteration experience_consumer_stage = 'actor_train' @@ -151,17 +152,19 @@ class ActorHybridWorker(BaseWorker): self.iteration += 1 + self.sharding_manager.exit_train_mode() + def save_ckpt(self, iteration: int): self.save_checkpoint(iteration, self.model, self.optimizer, self.opt_param_scheduler, self.num_floating_point_operations_so_far) def generate_sequences(self): + self.sharding_manager.enter_infer_mode() experience_consumer_stage = 'actor_rollout' experience_colums = ['prompts', 'prompt_length'] experience_count = self.rl_config.experience_count_actor // self.generate_config.data_parallel_size start_reshard_to_infer = time.time() - self.sharding_manager.reshard_to_infer_mode() end_reshard_to_infer = time.time() ray.get( self.td.update_metrics.remote( @@ -224,19 +227,22 @@ class ActorHybridWorker(BaseWorker): cumulate=True ) ) - start_reshard_to_train = time.time() - self.sharding_manager.reshard_to_train_mode() - end_reshard_to_train = time.time() - ray.get( - self.td.update_metrics.remote( - "timing/resharding_to_train", - value=[round(end_reshard_to_train, 4), round(start_reshard_to_train, 4)], - cumulate=True - ) - ) - self.empty_cache() + generate_end_time = time.time() + parallel_state = get_parallel_state() + use_vllm = True + if is_pipeline_last_stage(parallel_state, use_vllm) and get_tensor_model_parallel_rank(parallel_state, use_vllm) == 0: + ray.get( + self.td.update_metrics.remote( + "end_time/generate", + value=[round(generate_end_time, 4)], + cumulate=True + ) + ) + + self.sharding_manager.exit_infer_mode() def compute_log_prob(self): + self.sharding_manager.enter_forward_mode() experience_consumer_stage = 'actor_log_prob' experience_colums = ['input_ids', 'responses', 'response_length', 'prompt_length'] experience_count = self.rl_config.experience_count_actor // self.parallel_state.get_data_parallel_world_size() @@ -261,15 +267,15 @@ class ActorHybridWorker(BaseWorker): end_time = time.time() ray.get( self.td.update_metrics.remote( - "timing/old_log_p", - value=[round(end_time, 4), round(start_time, 4)], + "timing/old_log_p", + value=[round(end_time, 4), round(start_time, 4)], cumulate=True ) ) ray.get( self.td.update_metrics.remote( - "end_time/old_log_p", - value=[round(end_time, 4)], + "end_time/old_log_p", + value=[round(end_time, 4)], cumulate=True ) ) @@ -331,7 +337,7 @@ class ActorHybridWorker(BaseWorker): grad_offload=self.generate_config.offload_train_grad, train_param_offload=self.generate_config.offload_train_param, enable_validate=self.rl_config.enable_sharding_validate, - megatron_offloader=self.megatron_offloader + megatron_offloader=self.actor_offloader ) return sharding_manager @@ -342,4 +348,10 @@ class ActorHybridWorker(BaseWorker): # Megatron requires no_sync_func properly to correctly trigger DP reduce config.no_sync_func = [model_chunk.no_sync for model_chunk in self.model] if len(self.model) == 1: - config.no_sync_func = config.no_sync_func[0] \ No newline at end of file + config.no_sync_func = config.no_sync_func[0] + + + +@ray.remote(resources={"NPU": 0.7}) +class ActorHybridWorker(ActorHybridWorkerBase): + pass \ No newline at end of file diff --git a/mindspeed_rl/workers/integrated_worker.py b/mindspeed_rl/workers/integrated_worker.py new file mode 100644 index 0000000000000000000000000000000000000000..cc0919437183abd72a6a97eee22fddd044d705e9 --- /dev/null +++ b/mindspeed_rl/workers/integrated_worker.py @@ -0,0 +1,112 @@ +# Copyright (c) 2025, HUAWEI CORPORATION. All rights reserved. + +import dataclasses +from typing import Callable + +import ray +import torch + +from mindspeed_rl.config_cls.megatron_config import MegatronConfig +from mindspeed_rl.config_cls.rl_config import RLConfig +from mindspeed_rl.config_cls.generate_config import GenerateConfig +from mindspeed_rl.utils.tokenizer import BaseTokenizer +from mindspeed_rl.workers.resharding.megatron_sharding_manager import MegatronOffLoader + +from mindspeed_rl.workers.actor_hybrid_worker import ActorHybridWorkerBase +from mindspeed_rl.workers.reference_woker import ReferenceWorkerBase +from mindspeed_rl.workers.reward_woker import RewardWorkerBase +from mindspeed_rl.models.reference import Reference +from mindspeed_rl.models.reward import Reward + + +@ray.remote(resources={"NPU": 0.7}) +class IntegratedWorker(ActorHybridWorkerBase, ReferenceWorkerBase, RewardWorkerBase): + """ + IntegratedWorker class. This class implements the integrated worker for the Actor, Reference and Reward Worker. + + Args: + megatron_config: MegatronConfig Configuration for Megatron-LM (e.g., model parallelism settings). + rl_config: RLConfig Configuration for reinforcement learning (e.g., PPO settings). + generate_config: GenerateConfig Configuration for generation/inference (e.g., vLLM settings). + model_provider: Callable Function to provide the model instance. + initialize_func: Callable Function to initialize the model and environment. + tokenizer: BaseTokenizer = None Object to retrieve the tokenizer. + get_megatron_module: Callable = megatron_module from get_megatron_module. + **kwargs: Additional parameters for base class argument passing. + """ + + def __init__( + self, + megatron_config: MegatronConfig, + rl_config: RLConfig, + generate_config: GenerateConfig, + model_provider: Callable, + initialize_func: Callable, + tokenizer: BaseTokenizer = None, + get_megatron_module: Callable = None, + **kwargs + ): + + # We use Actor as main worker, so only do init for Actor here. + ActorHybridWorkerBase.__init__( + self, + megatron_config, + rl_config, + generate_config, + model_provider=model_provider, + initialize_func=initialize_func, + tokenizer=tokenizer, + get_megatron_module=get_megatron_module, + **kwargs + ) + + self.update_micro_batch_size = rl_config.update_micro_batch_size + + self.reference = None + self.ref_model = None + self.ref_manager = None + + + def initialize(self): + + # Based on Actor + ActorHybridWorkerBase.initialize(self) + + # Add Reference + self.ref_model = self.get_model(self.model_provider, self.model_type, wrap_with_ddp=False) + self.load_checkpoint(self.ref_model, None, None) + self.ref_manager = MegatronOffLoader(self.ref_model, wrap_with_ddp=False) + self.ref_manager.offload_param() + self.reference = Reference( + self.ref_model, + beta=self.rl_config.beta, + mini_batch_size=self.rl_config.mini_batch_size, + epochs=self.rl_config.epochs, + shuffle_mini_batch=self.rl_config.shuffle_mini_batch, + generate_config=self.generate_config, + stage=self.megatron_config.stage, + forward_backward_func=self.forward_backward_func, + micro_batch_size=self.megatron_config.micro_batch_size + ) + + def compute_ref_log_prob(self): + self.ref_manager.onload_param() + ReferenceWorkerBase.compute_ref_log_prob(self) + self.ref_manager.offload_param() + + def update(self, kl_ctrl=None): + # set update mbs + update_mbs = self.update_micro_batch_size + mbs = self.actor_hybrid.train_actor.micro_batch_size + + from megatron.training import get_args + args = get_args() + + if update_mbs is not None: + self.actor_hybrid.train_actor.micro_batch_size = update_mbs + args.micro_batch_size = update_mbs + + ActorHybridWorkerBase.update(self, kl_ctrl) + + args.micro_batch_size = mbs + self.actor_hybrid.train_actor.micro_batch_size = mbs diff --git a/mindspeed_rl/workers/reference_woker.py b/mindspeed_rl/workers/reference_woker.py index d813f880491d4894552b87564cef707418f78b91..fd465a7735d83c2848ee61100b43f1ebb4827a2c 100644 --- a/mindspeed_rl/workers/reference_woker.py +++ b/mindspeed_rl/workers/reference_woker.py @@ -17,8 +17,7 @@ from mindspeed_rl.utils.compute import get_parallel_state from mindspeed_rl.trainer.utils.parallel_state import is_pipeline_last_stage, get_tensor_model_parallel_rank -@ray.remote(resources={"NPU": 0.3}) -class ReferenceWorker(BaseWorker): +class ReferenceWorkerBase(BaseWorker): """ ReferenceWorker class. This class implements the worker logic for reference model inference. @@ -82,7 +81,7 @@ class ReferenceWorker(BaseWorker): def init_transfer_dock(self, td): self.td = td - def compute_log_prob(self): + def compute_ref_log_prob(self): experience_consumer_stage = 'ref_log_prob' experience_columns = ['input_ids', 'responses', 'response_length', 'prompt_length'] experience_count = self.rl_config.experience_count_ref // self.parallel_state.get_data_parallel_world_size() @@ -126,8 +125,13 @@ class ReferenceWorker(BaseWorker): ref_end_time = time.time() ray.get( self.td.update_metrics.remote( - "end_time/reference", + "end_time/reference", value=[round(ref_end_time, 4)] ) ) self.empty_cache() + + +@ray.remote(resources={"NPU": 0.3}) +class ReferenceWorker(ReferenceWorkerBase): + pass \ No newline at end of file diff --git a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py index d74fd0c13768336baa004f5b1b6f56934b36a07c..c7016e40c72bc358d4228b4e08e76bc7f4afc85c 100644 --- a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py +++ b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py @@ -29,23 +29,20 @@ from mindspeed_rl.workers.resharding.weight_adaptor import get_weight_adaptor class MegatronOffLoader: - def __init__( - self, - optimizer=None, - megatron_model=None - ): + def __init__(self, megatron_model=None, optimizer=None, wrap_with_ddp=True): self.optimizer = optimizer - self.train_model = megatron_model + self.model = megatron_model + self.wrap_with_ddp = wrap_with_ddp self.tensor_to_cpu_states_map = dict() def offload_grad(self): - for model_idx, model in enumerate(self.train_model): + for model in self.model: for buffer in chain(model.buffers, model.expert_parallel_buffers): self.swap_tensors_to_host(buffer.grad_data) def onload_grad(self): - for model_idx, model in enumerate(self.train_model): + for model in self.model: for buffer in chain(model.buffers, model.expert_parallel_buffers): self.swap_tensors_to_device(buffer.grad_data) @@ -73,15 +70,23 @@ class MegatronOffLoader: else: return data - def offload_train_param(self): - for model_idx, model in enumerate(self.train_model): - for buffer in chain(model.buffers, model.expert_parallel_buffers): - self.swap_tensors_to_host(buffer.param_data) + def offload_param(self): + if self.wrap_with_ddp: + for model in self.model: + for buffer in chain(model.buffers, model.expert_parallel_buffers): + self.swap_tensors_to_host(buffer.param_data) + else: + for item in self.model: + item.to('cpu') - def onload_train_param(self): - for model_idx, model in enumerate(self.train_model): - for buffer in chain(model.buffers, model.expert_parallel_buffers): - self.swap_tensors_to_device(buffer.param_data) + def onload_param(self): + if self.wrap_with_ddp: + for model in self.model: + for buffer in chain(model.buffers, model.expert_parallel_buffers): + self.swap_tensors_to_device(buffer.param_data) + else: + for item in self.model: + item.to(torch.cuda.current_device()) def swap_tensors_to_host(self, tensor): if tensor not in self.tensor_to_cpu_states_map: @@ -162,47 +167,106 @@ class MegatronShardingManager: self.inference_engine.offload_model_weights() self.megatron_offloader = megatron_offloader - def __enter__(self): - self.reshard_to_infer_mode() + def offload_infer_params(self): + infer_weight_buffers = self.vllm_weight_container.weight_buffers + for buffer in infer_weight_buffers: + buffer.offload() + + def onload_infer_params(self): + infer_weight_buffers = self.vllm_weight_container.weight_buffers + for buffer in infer_weight_buffers: + buffer.onload() + + def enter_infer_mode(self): + """ + Before: + Empty or with training param on NPU. + + After: + Empty. + + Process: + 1. onload training param if needed + 2. onload inference param + 3. do resharding + 4. offload training param + """ + if self.train_param_offload: + self.megatron_offloader.onload_param() + + self.onload_infer_params() + + infer_params = self.vllm_weight_container.get_infer_params() + + if self.train_param_offload: + self.megatron_offloader.offload_param() + + self.inference_engine.sync_model_weights(infer_params, load_format='megatron') + torch.cuda.empty_cache() + + def exit_infer_mode(self): + """ + Before: + With inference param on NPU. - def __exit__(self, exc_type, exc_value, traceback): - self.reshard_to_train_mode() + After: + Empty. - def reshard_to_train_mode(self): + Process: + 1. offload inference param + """ self.inference_engine.offload_model_weights() self.offload_infer_params() torch.cuda.empty_cache() + + def enter_forward_mode(self): + """ + Before: + Empty. + + After: + With training param on NPU. + + Process: + 1. onload training param + """ + if self.train_param_offload: + self.megatron_offloader.onload_param() + torch.cuda.empty_cache() + + def enter_train_mode(self): + """ + Before: + With training param on NPU. + + After: + With training param, optimizer and grad on NPU. + + Process: + 1. onload training optimizer + 2. onload training grad + """ if self.optimizer_offload: self.megatron_offloader.onload_optimizer() - if self.train_param_offload: - self.megatron_offloader.onload_train_param() if self.grad_offload: self.megatron_offloader.onload_grad() - # add empty cache after each compute torch.cuda.empty_cache() - def reshard_to_infer_mode(self): + def exit_train_mode(self): + """ + Before: + With training param, optimizer and grad on NPU. + + After: + With training param on NPU. + + Process: + 1. offload training optimizer + 2. offload training grad + """ if self.optimizer_offload: self.megatron_offloader.offload_optimizer() if self.grad_offload: self.megatron_offloader.offload_grad() torch.cuda.empty_cache() - if self.train_param_offload: - self.megatron_offloader.onload_train_param() - self.onload_infer_params() - infer_params = self.vllm_weight_container.get_infer_params() - if self.train_param_offload: - self.megatron_offloader.offload_train_param() - self.inference_engine.sync_model_weights(infer_params, load_format='megatron') - torch.cuda.empty_cache() - - def offload_infer_params(self): - infer_weight_buffers = self.vllm_weight_container.weight_buffers - for buffer in infer_weight_buffers: - buffer.offload() - - def onload_infer_params(self): - infer_weight_buffers = self.vllm_weight_container.weight_buffers - for buffer in infer_weight_buffers: - buffer.onload() diff --git a/mindspeed_rl/workers/reward_woker.py b/mindspeed_rl/workers/reward_woker.py index 2eb4cd7469eb09e3bd4814861699a338d20174a9..2948d8eabc4982c5badb08271a19841b6861888a 100644 --- a/mindspeed_rl/workers/reward_woker.py +++ b/mindspeed_rl/workers/reward_woker.py @@ -17,8 +17,7 @@ from mindspeed_rl.utils.compute import get_parallel_state from mindspeed_rl.trainer.utils.parallel_state import is_pipeline_last_stage, get_tensor_model_parallel_rank -@ray.remote(resources={"NPU": 0.1}) -class RewardWorker(BaseWorker): +class RewardWorkerBase(BaseWorker): """ RewardWorker class. This class implements the worker logic for reward model training and inference. @@ -114,7 +113,7 @@ class RewardWorker(BaseWorker): end_time = time.time() ray.get( self.td.update_metrics.remote( - "timing/reward_model", + "timing/reward_model", value=[round(end_time, 4), round(start_time, 4)], cumulate=True ) @@ -125,8 +124,13 @@ class RewardWorker(BaseWorker): rwd_end_time = time.time() ray.get( self.td.update_metrics.remote( - "end_time/reward_model", + "end_time/reward_model", value=[round(rwd_end_time, 4)] ) ) - self.empty_cache() \ No newline at end of file + self.empty_cache() + + +@ray.remote(resources={"NPU": 0.1}) +class RewardWorker(RewardWorkerBase): + pass \ No newline at end of file diff --git a/mindspeed_rl/workers/scheduler/launcher.py b/mindspeed_rl/workers/scheduler/launcher.py index ea9c4e80d031f0effbf25c17ef91a30820e649fc..2b6d3099d46833b893c59a9be79c293af60f4b5b 100644 --- a/mindspeed_rl/workers/scheduler/launcher.py +++ b/mindspeed_rl/workers/scheduler/launcher.py @@ -35,12 +35,16 @@ from mindspeed_rl.workers.base_worker import BaseWorker from mindspeed_rl.workers.actor_hybrid_worker import ActorHybridWorker from mindspeed_rl.workers.reference_woker import ReferenceWorker from mindspeed_rl.workers.reward_woker import RewardWorker +from mindspeed_rl.workers.integrated_worker import IntegratedWorker def get_rl_resource_by_worker_type(rl_config: RLConfig, worker: Type[BaseWorker]): if (worker.__ray_actor_class__.__name__ == ActorHybridWorker.__ray_actor_class__.__name__): return rl_config.actor_resource + elif (worker.__ray_actor_class__.__name__ == + IntegratedWorker.__ray_actor_class__.__name__): + return rl_config.actor_resource elif (worker.__ray_actor_class__.__name__ == RewardWorker.__ray_actor_class__.__name__): return rl_config.reward_resource @@ -264,6 +268,12 @@ class RayActorGroup: if blocking: ray.get(self.temp_actor_ref_objs) + def compute_ref_log_prob(self, blocking=False): + for actor in self.actor_handlers: + self.temp_actor_ref_objs.append(actor.compute_ref_log_prob.remote()) + if blocking: + ray.get(self.temp_actor_ref_objs) + def compute_rm_score(self, blocking=False): for actor in self.actor_handlers: self.temp_actor_ref_objs.append(actor.compute_rm_score.remote()) diff --git a/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml new file mode 100644 index 0000000000000000000000000000000000000000..2c504160f6291c0b9beb9b405a00c70742f163a9 --- /dev/null +++ b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml @@ -0,0 +1,107 @@ +defaults: + - model: + - qwen25_7b + +megatron_training: + model: qwen25_7b + use_fused_rmsnorm: true + use_mcore_models: true + sequence_parallel: true + use_flash_attn: true + no_masked_softmax_fusion: true + attention_softmax_in_fp32: true + no_gradient_accumulation_fusion: true + use_fused_swiglu: true + use_fused_rotary_pos_emb: true + bf16: true + use_distributed_optimizer: true + tokenizer_type: PretrainedFromHF + tokenizer_name_or_path: /data/for_dt/weights/Qwen2.5-7B + global_batch_size: 4 + seq_length: 1024 + save_interval: 50 + train_iters: 1 + stage: ray_grpo + attention_dropout: 0.0 + init_method_std: 0.01 + hidden_dropout: 0.0 + distributed_backend: nccl + no_shared_storage: true + variable_seq_lengths: true + dataset_additional_keys: ['labels',] + data_path: /data/for_dt/datasets/pe-nlp/data + split: 100,0,0 + +actor_config: + model: qwen25_7b + micro_batch_size: 2 + tensor_model_parallel_size: 4 + pipeline_model_parallel_size: 1 + lr: 5e-7 + lr_decay_style: cosine + min_lr: 5e-8 + weight_decay: 0.0 + lr_warmup_fraction: 0.0 + clip_grad: 10000.0 + adam_beta1: 0.9 + adam_beta2: 0.95 + initial_loss_scale: 4096 + finetune: true + load: /data/for_dt/weights/Qwen2.5-7B-tp4 + save: ./ckpt + no_load_optim: true + no_load_rng: true + +rl_config: + use_integrated_worker: true + blocking: true + update_micro_batch_size: 2 + experience_count: 4 + gamma: 1.0 + lam: 0.95 + adv_estimator: group_norm + kl_penalty: kl + kl_ctrl_type: fixed + init_kl_coef: 0.0001 + mini_batch_size: 256 + max_prompt_length: 2048 + epochs: 1 + clip_ratio: 0.2 + entropy_coeff: 0.001 + n_samples_per_prompt: 8 + rule_reward: true + verifier_function: ["base_acc"] + verifier_weight: [1.0] + verifier_parallel: 4 + verifier_timeout: 120 + use_tensorboard: true + actor_resource: + num_npus: 8 + +generate_config: + trust_remote_code: true + offload_train_optimizer: true + offload_train_grad: true + offload_train_param: true + + # 推理时的并行配置 + infer_tensor_parallel_size: 2 + infer_pipeline_parallel_size: 1 + infer_expert_parallel_size: 1 + + # vllm 模型相关设置 + max_num_seqs: 512 + max_model_len: 4096 + dtype: "bfloat16" + gpu_memory_utilization: 0.9 + num_scheduler_steps: 1 + + # 采样配置 + sampling_config: + logprobs: 1 + max_tokens: 2048 + top_p: 0.9 + top_k: 50 + min_p: 0.01 + temperature: 0.8 + detokenize: false diff --git a/tests/st/configs/test_sft_trainer_qwen25_7b.yaml b/tests/st/configs/test_sft_trainer_qwen25_7b.yaml deleted file mode 100644 index 53d3e7257a4c19097c63ecc45ce68d6af238cd39..0000000000000000000000000000000000000000 --- a/tests/st/configs/test_sft_trainer_qwen25_7b.yaml +++ /dev/null @@ -1,70 +0,0 @@ -defaults: - - model: - - qwen25_7b - -sft: - # tune_args: - finetune: true - stage: sft - is_instruction_dataset: true - variable_seq_lengths: false - tokenizer_not_use_fast: true - prompt_type: qwen - - # gpt_args: - norm_epsilon: 1e-6 - micro_batch_size: 2 - global_batch_size: 4 - tokenizer_type: PretrainedFromHF - tokenizer_name_or_path: /data/for_dt/tokenizer/Qwen25-7B - train_iters: 5 - lr: 5e-5 - lr_decay_style: cosine - min_lr: 1.25e-7 - lr_warmup_fraction: 0.01 - weight_decay: 1e-1 - clip_grad: 1.0 - initial_loss_scale: 4096 - use_distributed_optimizer: true - tensor_model_parallel_size: 2 - pipeline_model_parallel_size: 2 - sequence_parallel: true - use_mcore_models: true - use_fused_rmsnorm: true - use_flash_attn: true - no_masked_softmax_fusion: true - no_gradient_accumulation_fusion: true - use_fused_swiglu: true - use_fused_rotary_pos_emb: true - bf16: true - seq_length: 4096 - adam_beta1: 0.9 - adam_beta2: 0.95 - attention_dropout: 0.0 - init_method_std: 0.01 - hidden_dropout: 0.0 - overlap_grad_reduce: true - overlap_param_gather: true - - # data_args: - data_path: /data/datasets/pack/alpaca - reset_position_ids: true - split: 100,0,0 - no_shuffle: true - - # ckpt_args: - no_load_optim: true - no_load_rng: true - no_save_optim: true - no_save_rng: true - seed: 1234 - model: qwen25_7b - load: ./ckpt - save: ./ckpt - - # output_args: - log_interval: 1 - save_interval: 5000 - eval_interval: 5000 - eval_iters: 0 - log_throughput: true diff --git a/tests/st/resharding/test_resharding.py b/tests/st/resharding/test_resharding.py index 670a9d2687f515aaac4482644c29d0eabc7305ee..676d4131efd3ce102773542d99c08b027be5a09f 100644 --- a/tests/st/resharding/test_resharding.py +++ b/tests/st/resharding/test_resharding.py @@ -317,7 +317,7 @@ class TestActor(): trust_remote_code=True, megatron_config=megatron_config ) - self.megatron_offloader = MegatronOffLoader(self.optimizer, self.model) + self.megatron_offloader = MegatronOffLoader(self.model, self.optimizer) self.sharding_manager = MegatronShardingManager( megatron_model=self.model, model_config=model_config_mock, @@ -342,9 +342,9 @@ class TestActor(): tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_path) input_ids = tokenizer(prompts, padding=True, truncation=True, return_tensors="pt")["input_ids"].tolist() - self.sharding_manager.reshard_to_infer_mode() + self.sharding_manager.enter_infer_mode() outputs = self.inference_engine.generate_sequences(idx_list=input_ids)[0] - self.sharding_manager.reshard_to_train_mode() + self.sharding_manager.exit_infer_mode() rank = torch.distributed.get_rank() for output in outputs: diff --git a/tests/st/sft_trainer/test_module_entry_sft_trainer.sh b/tests/st/sft_trainer/test_module_entry_sft_trainer.sh deleted file mode 100644 index 394f03cfdde2b88f675d11865298b1d98e000cd5..0000000000000000000000000000000000000000 --- a/tests/st/sft_trainer/test_module_entry_sft_trainer.sh +++ /dev/null @@ -1,23 +0,0 @@ -export CUDA_DEVICE_MAX_CONNECTIONS=1 -export HYDRA_FULL_ERROR=1 - -SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) -export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH -PROJECT_PATH=$SCRIPT_DIR/../../.. - -GPUS_PER_NODE=4 -MASTER_ADDR=localhost -MASTER_PORT=6004 -NNODES=1 -NODE_RANK=0 - -DISTRIBUTED_ARGS=" - --nproc_per_node $GPUS_PER_NODE \ - --nnodes $NNODES \ - --node_rank $NODE_RANK \ - --master_addr $MASTER_ADDR \ - --master_port $MASTER_PORT -" - -torchrun $DISTRIBUTED_ARGS "$PROJECT_PATH"/cli/train_sft.py --config-dir="$PROJECT_PATH"/tests/st/configs \ ---config-name=test_sft_trainer_qwen25_7b diff --git a/tests/st/train_engine/test_module_entry_ray_grpo_qwen25_7b_tp4_integrated.sh b/tests/st/train_engine/test_module_entry_ray_grpo_qwen25_7b_tp4_integrated.sh new file mode 100644 index 0000000000000000000000000000000000000000..39708871aa455e7a51ee2f1ba6996f0dadaf684c --- /dev/null +++ b/tests/st/train_engine/test_module_entry_ray_grpo_qwen25_7b_tp4_integrated.sh @@ -0,0 +1,9 @@ +#!/bin/bash +export CUDA_DEVICE_MAX_CONNECTIONS=1 +export HCCL_DETERMINISTIC=True + +SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) +export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH +PROJECT_PATH=$SCRIPT_DIR/../../.. + +python "$PROJECT_PATH"/cli/train_grpo.py --config-dir="$PROJECT_PATH"/tests/st/configs --config-name=test_grpo_trainer_qwen25_7b_integrated \ No newline at end of file