diff --git a/README.md b/README.md
index 0cb8170f846d257d09c6c69acc1f463025a15e3f..e99cf005e5d6ccda925c62f3a5836da9ed3d3544 100644
--- a/README.md
+++ b/README.md
@@ -80,15 +80,6 @@ MindSpeed-RL是基于昇腾生态的强化学习加速框架,旨在为华为 [
-
- 指令微调 |
- Doc |
-
- Qwen2.5-7B
- Qwen2.5-32B
- |
- Preview |
-
结果奖励 |
Doc |
diff --git a/SECURITYNOTE.md b/SECURITYNOTE.md
index 006739e80ff09dbe332dfc73c86fddd35515cb7e..b4d23bf84d8d3218460774cee1cfce7b75c5e465 100644
--- a/SECURITYNOTE.md
+++ b/SECURITYNOTE.md
@@ -47,7 +47,7 @@
## 公开接口声明
-MindSpeed-RL 暂时未发布wheel包,无正式对外公开接口,所有功能均通过shell脚本调用。入口脚本皆放置于cli目录下,分别为 train_grpo.py, train_orm.py,train_sft.py, preprocess_data.py, convert_ckpt.py 和 infer_vllm.py。
+MindSpeed-RL 暂时未发布wheel包,无正式对外公开接口,所有功能均通过shell脚本调用。入口脚本皆放置于cli目录下,分别为 train_grpo.py, train_orm.py, preprocess_data.py, convert_ckpt.py 和 infer_vllm.py。
## 公网地址声明
diff --git a/cli/train_grpo.py b/cli/train_grpo.py
index 4c7c91b0444bef080ef828c9ac383c5d46ef4e7e..aeef7bc89f9bbb697f788faa49bd8b9b0a6b96c2 100644
--- a/cli/train_grpo.py
+++ b/cli/train_grpo.py
@@ -31,6 +31,7 @@ from mindspeed_rl.workers.scheduler.launcher import RayActorGroup
from mindspeed_rl.workers.actor_hybrid_worker import ActorHybridWorker
from mindspeed_rl.workers.reference_woker import ReferenceWorker
from mindspeed_rl.workers.reward_woker import RewardWorker
+from mindspeed_rl.workers.integrated_worker import IntegratedWorker
cur_file_dir = Path(__file__).absolute().parent.parent
logger = Loggers("grpo_train")
@@ -45,47 +46,65 @@ def train(config):
logger.info('start async initializing ray actor groups')
- actor_worker = RayActorGroup(
- worker=ActorHybridWorker,
- placement_group=None,
- megatron_config=actor_config,
- rl_config=rl_config,
- generate_config=generate_config,
- model_provider=gpt_model_provider,
- tokenizer=tokenizer,
- initialize_func=initialize_megatron,
- get_megatron_module=get_megatron_module,
- global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt
- ).initialize()
-
- reference_worker = RayActorGroup(
- worker=ReferenceWorker,
- placement_group=None,
- megatron_config=ref_config,
- rl_config=rl_config,
- model_provider=gpt_model_provider,
- tokenizer=tokenizer,
- initialize_func=initialize_megatron,
- get_megatron_module=get_megatron_module,
- global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt
- ).initialize()
-
reward_list = []
- if rl_config.reward_resource:
- reward_worker = RayActorGroup(
- worker=RewardWorker,
+ if rl_config.use_integrated_worker:
+ integrated_worker = RayActorGroup(
+ worker=IntegratedWorker,
placement_group=None,
- megatron_config=reward_config,
+ megatron_config=actor_config,
rl_config=rl_config,
- model_provider=rm_model_provider,
+ generate_config=generate_config,
+ model_provider=gpt_model_provider,
+ tokenizer=tokenizer,
+ initialize_func=initialize_megatron,
+ get_megatron_module=get_megatron_module,
+ global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt,
+ ).initialize()
+
+ actor_worker = integrated_worker
+ reference_worker = integrated_worker
+
+ else:
+ actor_worker = RayActorGroup(
+ worker=ActorHybridWorker,
+ placement_group=None,
+ megatron_config=actor_config,
+ rl_config=rl_config,
+ generate_config=generate_config,
+ model_provider=gpt_model_provider,
tokenizer=tokenizer,
initialize_func=initialize_megatron,
get_megatron_module=get_megatron_module,
global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt
).initialize()
- reward_list.append(reward_worker)
+ reference_worker = RayActorGroup(
+ worker=ReferenceWorker,
+ placement_group=None,
+ megatron_config=ref_config,
+ rl_config=rl_config,
+ model_provider=gpt_model_provider,
+ tokenizer=tokenizer,
+ initialize_func=initialize_megatron,
+ get_megatron_module=get_megatron_module,
+ global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt
+ ).initialize()
+
+ if rl_config.reward_resource:
+ reward_worker = RayActorGroup(
+ worker=RewardWorker,
+ placement_group=None,
+ megatron_config=reward_config,
+ rl_config=rl_config,
+ model_provider=rm_model_provider,
+ tokenizer=tokenizer,
+ initialize_func=initialize_megatron,
+ get_megatron_module=get_megatron_module,
+ global_batch_size=actor_config.global_batch_size * rl_config.n_samples_per_prompt
+ ).initialize()
+
+ reward_list.append(reward_worker)
if rl_config.rule_reward:
rule_reward = RuleReward.options(num_cpus=rl_config.num_cpus_for_local_task).remote()
@@ -145,11 +164,25 @@ def parse_training_config(config: Dict):
"""
actor_config = MegatronConfig({**config.get("megatron_training"), **config.get("actor_config")},
config.get('model'))
- ref_config = MegatronConfig({**config.get("megatron_training"), **config.get("ref_config")},
- config.get('model'))
- reward_config = MegatronConfig({**config.get("megatron_training"), **config.get("reward_config")},
- config.get('model'))
rl_config = RLConfig(config.get("rl_config"))
+
+ if rl_config.use_integrated_worker:
+ if "ref_config" in config:
+ raise ValueError(
+ f"ref_config should not be set when use_integrated_worker mode is on.")
+ ref_config = actor_config
+
+ if "reward_config" in config:
+ raise ValueError(
+ f"reward_config should not be set when use_integrated_worker mode is on.")
+ reward_config = actor_config
+
+ else:
+ ref_config = MegatronConfig({**config.get("megatron_training"), **config.get("ref_config")},
+ config.get('model'))
+
+ reward_config = MegatronConfig({**config.get("megatron_training"), **config.get("reward_config")},
+ config.get('model'))
generate_config = GenerateConfig(config.get("generate_config"))
validate_rl_args(actor_config, ref_config, reward_config, rl_config, generate_config)
diff --git a/cli/train_sft.py b/cli/train_sft.py
deleted file mode 100644
index 8a5d190bac086c47f5c6dd62df4bda62ba6ac54b..0000000000000000000000000000000000000000
--- a/cli/train_sft.py
+++ /dev/null
@@ -1,360 +0,0 @@
-# Copyright 2020 The HuggingFace Inc. team. All rights reserved.
-# Copyright (c) 2023; NVIDIA CORPORATION. All rights reserved.
-# Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved.
-import sys
-
-import torch
-import hydra
-from omegaconf import OmegaConf
-
-from mindspeed_rl.trainer.sft_trainer import SFTTrainer
-from mindspeed_rl.config_cls.megatron_config import MegatronConfig
-from mindspeed_rl.datasets.instruction_dataset import InstructionDataset
-from mindspeed_rl.datasets.dataloader import InstructionDataLoader
-from mindspeed_rl.datasets.build_dataset import build_train_valid_test_datasets
-from mindspeed_rl.datasets.utils import build_data_iter, get_train_valid_test_num_samples
-from mindspeed_rl.utils import get_tokenizer, Loggers, synchronize_time, seed_all, parse_args_from_config
-
-logger = Loggers('train_sft')
-
-
-def sft_train():
- from megatron.core import parallel_state
- from megatron.core.utils import get_model_config
- from megatron.core.enums import ModelType
- from megatron.training import get_args
- from megatron.training.checkpointing import save_checkpoint
- from megatron.training.training import evaluate_and_print_results, setup_model_and_optimizer
- from megatron.training.utils import get_batch_on_this_cp_rank
-
- from mindspeed_llm.training import train
- from mindspeed_llm.training.initialize import set_jit_fusion_options
- from mindspeed_llm.training.utils import generate_actual_seq_len
-
- args = get_args()
- set_jit_fusion_options()
-
- start_time = synchronize_time()
- logger.info("sft training starting time: {}".format(start_time))
-
- model, optimizer, opt_param_scheduler = setup_model_and_optimizer(
- gpt_model_provider, ModelType.encoder_or_decoder)
- logger.info('after model, optimizer and learning rate scheduler are built')
-
- model_arch_config = get_model_config(model[0])
-
- # build tokenizer
- tokenizer = get_tokenizer(args.tokenizer_name_or_path,
- prompt_type=args.prompt_type, prompt_type_path=args.prompt_type_path)
- logger.info('after tokenizer is built')
-
- # build dataset
- train_valid_test_num_samples = get_train_valid_test_num_samples(
- train_samples=args.train_samples,
- train_iters=args.train_iters,
- global_batch_size=args.global_batch_size,
- eval_interval=args.eval_interval,
- eval_iters=args.eval_iters,
- )
- train_dataset, valid_dataset, test_dataset = build_train_valid_test_datasets(
- data_prefix=args.data_path,
- splits_string=args.split,
- seq_length=args.seq_length + args.num_nextn_predict_layers,
- train_valid_test_num_samples=train_valid_test_num_samples,
- dataset_cls=InstructionDataset,
- tokenizer=tokenizer,
- parallel_state=parallel_state,
- full_shuffle_instruction_dataset=args.full_shuffle_instruction_dataset,
- no_shuffle=args.no_shuffle,
- reset_position_ids=args.reset_position_ids,
- prompt_type=args.prompt_type,
- prompt_type_path=args.prompt_type_path,
- seed=args.seed
- )
- logger.info('after datasets are built')
-
- # Backward compatibility, assume fixed batch size.
- if args.iteration > 0 and args.consumed_train_samples == 0:
- if args.train_samples is not None:
- raise ValueError('only backward compatiblity support for iteration-based training')
- args.consumed_train_samples = args.iteration * args.global_batch_size
- if args.iteration > 0 and args.consumed_valid_samples == 0:
- if args.train_samples is None:
- args.consumed_valid_samples = (args.iteration // args.eval_interval) * \
- args.eval_iters * args.global_batch_size
-
- # build_dataloader
- train_dataloader = None
- if train_dataset is not None and len(train_dataset) > 0:
- train_dataloader = InstructionDataLoader(
- dataset=train_dataset,
- parallel_state=parallel_state,
- tokenizer=tokenizer,
- num_workers=args.num_workers,
- tokenizer_padding_side=args.tokenizer_padding_side,
- pad_to_multiple_of=args.pad_to_multiple_of if args.variable_seq_lengths else args.seq_length + args.num_nextn_predict_layers,
- variable_seq_lengths=args.variable_seq_lengths,
- num_nextn_predict_layers=args.num_nextn_predict_layers,
- micro_batch_size=args.micro_batch_size,
- comsumed_samples=args.consumed_train_samples,
- seed=args.seed
- )
-
- valid_dataloader = None
- if valid_dataset is not None and len(valid_dataset) > 0:
- valid_dataloader = InstructionDataLoader(
- dataset=valid_dataset,
- parallel_state=parallel_state,
- tokenizer=tokenizer,
- num_workers=args.num_workers,
- tokenizer_padding_side=args.tokenizer_padding_side,
- pad_to_multiple_of=args.pad_to_multiple_of if args.variable_seq_lengths else args.seq_length + args.num_nextn_predict_layers,
- variable_seq_lengths=args.variable_seq_lengths,
- num_nextn_predict_layers=args.num_nextn_predict_layers,
- micro_batch_size=args.micro_batch_size,
- comsumed_samples=args.consumed_valid_samples,
- seed=args.seed
- )
-
- test_dataloader = None
- if test_dataset is not None and len(test_dataset) > 0:
- test_dataloader = InstructionDataLoader(
- dataset=test_dataset,
- parallel_state=parallel_state,
- tokenizer=tokenizer,
- num_workers=args.num_workers,
- tokenizer_padding_side=args.tokenizer_padding_side,
- pad_to_multiple_of=args.pad_to_multiple_of if args.variable_seq_lengths else args.seq_length + args.num_nextn_predict_layers,
- variable_seq_lengths=args.variable_seq_lengths,
- num_nextn_predict_layers=args.num_nextn_predict_layers,
- micro_batch_size=args.micro_batch_size,
- comsumed_samples=0,
- seed=args.seed
- )
-
- # Flags to know if we need to do training/validation/testing.
- do_train = train_dataloader is not None and args.train_iters > 0
- do_valid = valid_dataloader is not None and args.eval_iters > 0
- do_test = test_dataloader is not None and args.eval_iters > 0
- flags = torch.tensor(
- [int(do_train), int(do_valid), int(do_test)],
- dtype=torch.long, device='cuda')
-
- torch.distributed.broadcast(flags, 0)
-
- args.do_train = getattr(args, "do_train", False) or flags[0].item()
- args.do_valid = getattr(args, "do_valid", False) or flags[1].item()
- args.do_test = getattr(args, "do_test", False) or flags[2].item()
-
- # build data_iterator
- train_data_iterator = []
- valid_data_iterator = []
- test_data_iterator_list = []
- if args.virtual_pipeline_model_parallel_size is not None:
- for i in range(len(model)):
- parallel_state.set_virtual_pipeline_model_parallel_rank(i)
- train_data_iterator.append(build_data_iter(train_dataloader, args.dataloader_type))
- valid_data_iterator.append(build_data_iter(valid_dataloader, args.dataloader_type))
- test_data_iterator_list.append(build_data_iter(test_dataloader, args.dataloader_type))
- else:
- train_data_iterator = build_data_iter(train_dataloader, args.dataloader_type)
- valid_data_iterator = build_data_iter(valid_dataloader, args.dataloader_type)
- test_data_iterator = build_data_iter(test_dataloader, args.dataloader_type)
- test_data_iterator_list = [test_data_iterator]
-
- logger.info('after dataloaders are built')
-
- # configure Trainer
- megatron_modules = {
- 'train_func': train,
- 'parallel_state': parallel_state,
- 'save_checkpoint_func': save_checkpoint,
- 'evaluate_fun': evaluate_and_print_results,
- "generate_seq_len_fun": generate_actual_seq_len,
- 'batch_cp_func': get_batch_on_this_cp_rank
- }
-
- trainer = SFTTrainer(
- args=args,
- model=model,
- optimizer=optimizer,
- train_data_iterator=train_data_iterator,
- valid_data_iterator=valid_data_iterator,
- test_data_iterator_list=test_data_iterator_list,
- scheduler=opt_param_scheduler,
- process_non_loss_data_func=None,
- model_config=model_arch_config,
- **megatron_modules
- )
-
- trainer.train()
-
-
-def gpt_model_provider(pre_process, post_process):
- """
- Builds the model.
-
- If you set the use_mcore_models to True, it will return the mcore GPT model and if not the legacy GPT model.
-
- Args:
- pre_process (bool, optional): Set to true if you need to compute embedings. Defaults to True.
- post_process (bool, optional): Set to true if you need to want to compute output logits/loss.
- Defaults to True.
-
-
- Returns:
- Union[GPTModel, megatron.legacy.model.GPTModel]: The returned model
- """
- from megatron.training import get_args
- from megatron.core.models.gpt import GPTModel
- from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
- from megatron.core.transformer.spec_utils import import_module
- from megatron.training.arguments import core_transformer_config_from_args
- args = get_args()
-
- logger.info('building GPT model ...')
- # Experimental loading arguments from configs
- config = core_transformer_config_from_args(args)
-
- if args.spec is not None:
- transformer_layer_spec = import_module(args.spec)
- else:
- transformer_layer_spec = get_gpt_layer_local_spec(args.num_experts, args.moe_grouped_gemm)
-
- model = GPTModel(
- config=config,
- transformer_layer_spec=transformer_layer_spec,
- vocab_size=args.padded_vocab_size,
- max_sequence_length=args.max_position_embeddings,
- pre_process=pre_process,
- post_process=post_process,
- fp16_lm_cross_entropy=args.fp16_lm_cross_entropy,
- parallel_output=True,
- share_embeddings_and_output_weights=not args.untie_embeddings_and_output_weights,
- position_embedding_type=args.position_embedding_type,
- rotary_percent=args.rotary_percent,
- seq_len_interpolation_factor=args.rotary_seq_len_interpolation_factor
- )
-
- return model
-
-
-def initialize_megatron(
- extra_args_provider=None,
- args_defaults=None,
- ignore_unknown_args=False,
- allow_no_cuda=False,
- skip_mpu_initialization=False,
- config=None,
-):
- """Set global variables, initialize distributed, and
- set autoresume and random seeds.
- `allow_no_cuda` should not be set unless using megatron for cpu only
- data processing. In general this arg should not be set unless you know
- what you are doing.
- Returns a function to finalize distributed env initialization
- (optionally, only when args.lazy_mpu_init == True)
- """
- if args_defaults is None:
- args_defaults = {}
-
- origin_sys_argv = sys.argv
- sys.argv = [sys.argv[0]]
- parse_args_from_config(config)
- from mindspeed_llm.training.arguments import parse_args_decorator
- import megatron
-
- parse_args = parse_args_decorator(megatron.training.arguments.parse_args)
- args = parse_args(extra_args_provider, ignore_unknown_args)
- sys.argv = origin_sys_argv
-
- if not allow_no_cuda:
- if not torch.cuda.is_available():
- raise ValueError("Megatron requires CUDA.")
-
- from megatron.core import parallel_state
- from megatron.training import get_args
- from megatron.training.arguments import validate_args
- from megatron.training.checkpointing import load_args_from_checkpoint
- from megatron.training.global_vars import set_global_variables
- from megatron.training.initialize import _set_random_seed, \
- _init_autoresume, _compile_dependencies, \
- _initialize_tp_communicators, _initialize_distributed
-
- if args.use_checkpoint_args or args_defaults.get("use_checkpoint_args", False):
- if args.load is None:
- raise ValueError("--use-checkpoints-args requires --load argument.")
- load_args_from_checkpoint(args)
-
- validate_args(args, args_defaults)
-
- set_global_variables(args)
-
- if args.use_deter_comp:
- seed_all(args.seed)
- logger.info("deterministic computing is applied for npu.")
-
- # torch.distributed initialization
- def finish_mpu_init():
- args = get_args()
- # Pytorch distributed.
- _initialize_distributed()
-
- # Random seeds for reproducibility.
- if args.rank == 0:
- logger.info("> setting random seeds to {} ...".format(args.seed))
- _set_random_seed(args.seed, args.data_parallel_random_init)
-
- if skip_mpu_initialization:
- return None
-
- args = get_args()
- if args.lazy_mpu_init:
- args.use_cpu_initialization = True
- # delayed initialization of DDP-related stuff
- # We only set basic DDP globals
- parallel_state.set_tensor_model_parallel_world_size(args.tensor_model_parallel_size)
- # and return function for external DDP manager
- # to call when it has DDP initialized
- parallel_state.set_tensor_model_parallel_rank(args.rank)
- return finish_mpu_init
- else:
- # Megatron's MPU is the master. Complete initialization right away.
- finish_mpu_init()
-
- # Autoresume.
- _init_autoresume()
-
- # Compile dependencies.
- _compile_dependencies()
-
- if args.tp_comm_overlap:
- _initialize_tp_communicators()
-
- # No continuation function
- return None
-
-
-def separate_config_and_parse_args(config):
- model_config = config.model
- sft_config = config.sft
-
- OmegaConf.set_struct(model_config, False)
- OmegaConf.set_struct(sft_config, False)
-
- sft_config_dict = OmegaConf.to_container(sft_config, resolve=True)
- model_config_dict = OmegaConf.to_container(model_config, resolve=True)
-
- megatron_config = MegatronConfig(sft_config_dict, model_config_dict)
- return megatron_config
-
-
-@hydra.main(config_path='../configs', config_name='sft_qwen25_7b', version_base=None)
-def main(config):
- megatron_config = separate_config_and_parse_args(config)
- initialize_megatron(config=megatron_config)
- sft_train()
-
-
-if __name__ == '__main__':
- main()
diff --git a/configs/r1_zero_qwen25_7b_integrated.yaml b/configs/r1_zero_qwen25_7b_integrated.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..e7c4b5f11a175344e8d3282f8caf6e70cec4a135
--- /dev/null
+++ b/configs/r1_zero_qwen25_7b_integrated.yaml
@@ -0,0 +1,107 @@
+defaults:
+ - model:
+ - qwen25_7b
+
+megatron_training:
+ model: qwen25_7b
+ use_fused_rmsnorm: true
+ use_mcore_models: true
+ sequence_parallel: true
+ use_flash_attn: true
+ no_masked_softmax_fusion: true
+ attention_softmax_in_fp32: true
+ no_gradient_accumulation_fusion: true
+ use_fused_swiglu: true
+ use_fused_rotary_pos_emb: true
+ bf16: true
+ use_distributed_optimizer: true
+ tokenizer_type: PretrainedFromHF
+ tokenizer_name_or_path: ./Qwen2.5-7B-Instruct
+ global_batch_size: 4
+ seq_length: 1024
+ save_interval: 50
+ train_iters: 2
+ stage: ray_grpo
+ attention_dropout: 0.0
+ init_method_std: 0.01
+ hidden_dropout: 0.0
+ distributed_backend: nccl
+ no_shared_storage: true
+ variable_seq_lengths: true
+ dataset_additional_keys: ['labels',]
+ data_path: ./data
+ split: 100,0,0
+
+actor_config:
+ model: qwen25_7b
+ micro_batch_size: 2
+ tensor_model_parallel_size: 4
+ pipeline_model_parallel_size: 1
+ lr: 5e-7
+ lr_decay_style: cosine
+ min_lr: 5e-8
+ weight_decay: 0.0
+ lr_warmup_fraction: 0.0
+ clip_grad: 10000.0
+ adam_beta1: 0.9
+ adam_beta2: 0.95
+ initial_loss_scale: 4096
+ finetune: true
+ load: ./ckpt
+ save: ./ckpt
+ no_load_optim: true
+ no_load_rng: true
+
+rl_config:
+ use_integrated_worker: true
+ blocking: true
+ update_micro_batch_size: 2
+ experience_count: 4
+ gamma: 1.0
+ lam: 0.95
+ adv_estimator: group_norm
+ kl_penalty: kl
+ kl_ctrl_type: fixed
+ init_kl_coef: 0.0001
+ mini_batch_size: 256
+ max_prompt_length: 2048
+ epochs: 1
+ clip_ratio: 0.2
+ entropy_coeff: 0.001
+ n_samples_per_prompt: 8
+ rule_reward: true
+ verifier_function: ["base_acc"]
+ verifier_weight: [1.0]
+ verifier_parallel: 4
+ verifier_timeout: 120
+ use_tensorboard: true
+ actor_resource:
+ num_npus: 8
+
+generate_config:
+ trust_remote_code: true
+ offload_train_optimizer: true
+ offload_train_grad: true
+ offload_train_param: true
+
+ # 推理时的并行配置
+ infer_tensor_parallel_size: 2
+ infer_pipeline_parallel_size: 1
+ infer_expert_parallel_size: 1
+
+ # vllm 模型相关设置
+ max_num_seqs: 512
+ max_model_len: 4096
+ dtype: "bfloat16"
+ gpu_memory_utilization: 0.9
+ num_scheduler_steps: 1
+
+ # 采样配置
+ sampling_config:
+ logprobs: 1
+ max_tokens: 2048
+ top_p: 0.9
+ top_k: 50
+ min_p: 0.01
+ temperature: 0.8
+ detokenize: false
diff --git a/configs/sft_pack_qwen25_32b.yaml b/configs/sft_pack_qwen25_32b.yaml
deleted file mode 100644
index faf73537091e2caa54f518a9d698fdca8d83810f..0000000000000000000000000000000000000000
--- a/configs/sft_pack_qwen25_32b.yaml
+++ /dev/null
@@ -1,78 +0,0 @@
-defaults:
- - model:
- - qwen25_32b
-
-sft:
- # memory_args:
- swap_attention: true
- recompute_granularity: full
- recompute_method: block
- recompute_num_layers: 4
- recompute_activation_function: true
-
- # tune_args:
- finetune: true
- stage: sft
- is_instruction_dataset: true
- variable_seq_lengths: false
- tokenizer_not_use_fast: true
- prompt_type: qwen
-
- # gpt_args:
- reuse_fp32_param: true
- norm_epsilon: 1e-6
- micro_batch_size: 2
- global_batch_size: 128
- tokenizer_type: PretrainedFromHF
- tokenizer_name_or_path: ./Qwen2.5-32B/
- train_iters: 2000
- lr: 1.25e-6
- lr_decay_style: cosine
- min_lr: 1.25e-7
- lr_warmup_fraction: 0.01
- weight_decay: 0.0
- clip_grad: 1.0
- initial_loss_scale: 4096
- use_distributed_optimizer: true
- tensor_model_parallel_size: 8
- pipeline_model_parallel_size: 2
- sequence_parallel: true
- use_mcore_models: true
- use_fused_rmsnorm: true
- use_flash_attn: true
- no_masked_softmax_fusion: true
- no_gradient_accumulation_fusion: true
- use_fused_swiglu: true
- use_fused_rotary_pos_emb: true
- bf16: true
- seq_length: 4096
- adam_beta1: 0.9
- adam_beta2: 0.95
- attention_dropout: 0.0
- init_method_std: 0.01
- hidden_dropout: 0.0
- overlap_grad_reduce: true
- overlap_param_gather: true
-
- # data_args:
- data_path: ./alpaca/data
- split: 100,0,0
- no_shuffle: false
- reset_position_ids: true
-
- # ckpt_args:
- no_load_optim: true
- no_load_rng: true
- no_save_optim: true
- no_save_rng: true
- seed: 1234
- model: qwen25_32b
- load: ./mcore_tp8pp2/
- save: /cache
-
- # output_args:
- log_interval: 1
- save_interval: 2000
- eval_interval: 2000
- eval_iters: 0
- log_throughput: true
diff --git a/configs/sft_pack_qwen25_7b.yaml b/configs/sft_pack_qwen25_7b.yaml
deleted file mode 100644
index fe1c71107c1f340efac9d9f7b226824199e16b5a..0000000000000000000000000000000000000000
--- a/configs/sft_pack_qwen25_7b.yaml
+++ /dev/null
@@ -1,71 +0,0 @@
-defaults:
- - model:
- - qwen25_7b
-
-sft:
- # tune_args:
- finetune: true
- stage: sft
- is_instruction_dataset: true
- variable_seq_lengths: false
- tokenizer_not_use_fast: true
- prompt_type: qwen
-
- # gpt_args:
- norm_epsilon: 1e-6
- micro_batch_size: 2
- global_batch_size: 128
- tokenizer_type: PretrainedFromHF
- tokenizer_name_or_path: ./Qwen2.5-7B/
- train_iters: 5000
- lr: 5e-5
- lr_decay_style: cosine
- min_lr: 1.25e-7
- lr_warmup_fraction: 0.01
- weight_decay: 1e-1
- clip_grad: 1.0
- initial_loss_scale: 4096
- use_distributed_optimizer: true
- tensor_model_parallel_size: 2
- pipeline_model_parallel_size: 2
- sequence_parallel: true
- use_mcore_models: true
- use_fused_rmsnorm: true
- use_flash_attn: true
- no_masked_softmax_fusion: true
- no_gradient_accumulation_fusion: true
- use_fused_swiglu: true
- use_fused_rotary_pos_emb: true
- bf16: true
- seq_length: 4096
- adam_beta1: 0.9
- adam_beta2: 0.95
- attention_dropout: 0.0
- init_method_std: 0.01
- hidden_dropout: 0.0
- overlap_grad_reduce: true
- overlap_param_gather: true
- max_position_embeddings: 4096
-
- # data_args:
- data_path: ./dataset/data
- reset_position_ids: true
- split: 100,0,0
- no_shuffle: false
-
- # ckpt_args:
- no_load_optim: true
- no_load_rng: true
- no_save_optim: true
- no_save_rng: true
- seed: 1234
- model: qwen25_7b
- load: ./ckpt
- save: ./ckpt
-
- # output_args:
- log_interval: 1
- save_interval: 5000
- eval_interval: 5000
- eval_iters: 0
- log_throughput: true
diff --git a/configs/sft_qwen25_32b.yaml b/configs/sft_qwen25_32b.yaml
deleted file mode 100644
index ab0dd81f3bd3c73db5c999ed1ce683dcaeec93d1..0000000000000000000000000000000000000000
--- a/configs/sft_qwen25_32b.yaml
+++ /dev/null
@@ -1,77 +0,0 @@
-defaults:
- - model:
- - qwen25_32b
-
-sft:
- # memory_args:
- swap_attention: true
- recompute_granularity: full
- recompute_method: block
- recompute_num_layers: 4
- recompute_activation_function: true
-
- # tune_args:
- finetune: true
- stage: sft
- is_instruction_dataset: true
- variable_seq_lengths: true
- tokenizer_not_use_fast: true
- prompt_type: qwen
-
- # gpt_args:
- reuse_fp32_param: true
- norm_epsilon: 1e-6
- micro_batch_size: 4
- global_batch_size: 128
- tokenizer_type: PretrainedFromHF
- tokenizer_name_or_path: ./Qwen2.5-32B/
- train_iters: 2000
- lr: 1.25e-6
- lr_decay_style: cosine
- min_lr: 1.25e-7
- lr_warmup_fraction: 0.01
- weight_decay: 0.0
- clip_grad: 1.0
- initial_loss_scale: 4096
- use_distributed_optimizer: true
- tensor_model_parallel_size: 8
- pipeline_model_parallel_size: 2
- sequence_parallel: true
- use_mcore_models: true
- use_fused_rmsnorm: true
- use_flash_attn: true
- no_masked_softmax_fusion: true
- no_gradient_accumulation_fusion: true
- use_fused_swiglu: true
- use_fused_rotary_pos_emb: true
- bf16: true
- seq_length: 4096
- adam_beta1: 0.9
- adam_beta2: 0.95
- attention_dropout: 0.0
- init_method_std: 0.01
- hidden_dropout: 0.0
- overlap_grad_reduce: true
- overlap_param_gather: true
-
- # data_args:
- data_path: ./alpaca/data
- split: 100,0,0
- no_shuffle: false
-
- # ckpt_args:
- no_load_optim: true
- no_load_rng: true
- no_save_optim: true
- no_save_rng: true
- seed: 1234
- model: qwen25_32b
- load: ./mcore_tp8pp2/
- save: /cache
-
- # output_args:
- log_interval: 1
- save_interval: 2000
- eval_interval: 2000
- eval_iters: 0
- log_throughput: true
diff --git a/configs/sft_qwen25_7b.yaml b/configs/sft_qwen25_7b.yaml
deleted file mode 100644
index 3182007d2b635221d8223cce162b1dd06b4de1a5..0000000000000000000000000000000000000000
--- a/configs/sft_qwen25_7b.yaml
+++ /dev/null
@@ -1,69 +0,0 @@
-defaults:
- - model:
- - qwen25_7b
-
-sft:
- # tune_args:
- finetune: true
- stage: sft
- is_instruction_dataset: true
- variable_seq_lengths: true
- tokenizer_not_use_fast: true
- prompt_type: qwen
-
- # gpt_args:
- norm_epsilon: 1e-6
- micro_batch_size: 4
- global_batch_size: 128
- tokenizer_type: PretrainedFromHF
- tokenizer_name_or_path: ./Qwen2.5-7B/
- train_iters: 5000
- lr: 5e-5
- lr_decay_style: cosine
- min_lr: 1.25e-7
- lr_warmup_fraction: 0.01
- weight_decay: 1e-1
- clip_grad: 1.0
- initial_loss_scale: 4096
- use_distributed_optimizer: true
- tensor_model_parallel_size: 2
- pipeline_model_parallel_size: 2
- sequence_parallel: false
- use_mcore_models: true
- use_fused_rmsnorm: true
- use_flash_attn: true
- no_masked_softmax_fusion: true
- no_gradient_accumulation_fusion: true
- use_fused_swiglu: true
- use_fused_rotary_pos_emb: true
- bf16: true
- seq_length: 4096
- adam_beta1: 0.9
- adam_beta2: 0.95
- attention_dropout: 0.0
- init_method_std: 0.01
- hidden_dropout: 0.0
- overlap_grad_reduce: true
- overlap_param_gather: true
-
- # data_args:
- data_path: ./alpaca/data
- split: 100,0,0
- no_shuffle: false
-
- # ckpt_args:
- no_load_optim: true
- no_load_rng: true
- no_save_optim: true
- no_save_rng: true
- seed: 1234
- model: qwen25_7b
- load: ./tp2pp2/
- save: ./ckpt
-
- # output_args:
- log_interval: 1
- save_interval: 5000
- eval_interval: 5000
- eval_iters: 0
- log_throughput: true
diff --git a/docs/algorithms/supervised_finetune.md b/docs/algorithms/supervised_finetune.md
deleted file mode 100644
index 1b07cbf5e2eb03e92aec02520add1e993415763a..0000000000000000000000000000000000000000
--- a/docs/algorithms/supervised_finetune.md
+++ /dev/null
@@ -1,248 +0,0 @@
-# MindSpeed-RL 监督微调
-
-## 简介
-
-监督微调 SFT(Supervised Fine-Tuning),是一种通过标注数据集对预训练的大型语言模型进行微调的技术。其目的是让模型能够更好地适应特定任务需求。通过在结构化的监督数据上进行训练,SFT
-能够让模型更准确地理解和生成内容,例如执行指令、回答问题或提供更精准的对话。
-
-在强化学习中,SFT 一般用来初始化 actor 模型和 reference 模型。
-
-
-
-## 使用示例
-
-### 准备环境
-
-请参考首页[安装指南](../install_guide.md)安装环境和准备代码依赖。
-
-### 准备数据
-
-#### 数据下载
-
-* [单轮对话:Alpaca英文数据集](https://huggingface.co/datasets/tatsu-lab/alpaca)
-
-Alpaca风格微调数据集下载可以基于网页直接下载,也可以基于命令行下载,比如:
-
-```bash
-cd dataset/
-wget https://huggingface.co/datasets/tatsu-lab/alpaca/resolve/main/data/train-00000-of-00001-a09b74b3ef9c3b56.parquet
-cd ..
-```
-
-#### 数据转换
-
-##### pack格式转换
-
-> 注意:packing格式转换的seq-length需要和SFT训练参数的seq-length保持一致,如果训练参数需要修改,则数据集也需要重新转换。
-
-进入MindSpeed-RL目录,修改 `./configs/datasets/alpaca_instruction_pack.yaml` 文件,并执行如下参考命令
-
-```
-bash examples/data/preprocess_data.sh alpaca_instruction_pack
-```
-
-##### 非pack格式转换
-
-进入MindSpeed-RL目录,修改 `./configs/datasets/alpaca_instruction_non_pack.yaml` 文件,并执行如下参考命令
-
-```
-bash examples/data/preprocess_data.sh alpaca_instruction_non_pack
-```
-
-##### 参数说明
-
-【--input】
-
-可以直接输入到数据集目录或具体文件,如果是目录,则处理全部文件, 支持 .parquet \ .csv \ .json \ .jsonl \ .txt \ .arrow 格式,
-同一个文件夹下的数据格式需要保持一致
-
-【--map-keys】
-
-`--map-keys`参数用于配置字段映射来使用数据集。
-
-Alpaca风格示例:
-
-```
-[
-{
- "instruction": "人类指令(必填)",
- "input": "人类输入(选填)",
- "output": "模型回答(必填)",
- "system": "系统提示词(选填)",
- "history": [
- ["第一轮指令(选填)", "第一轮回答(选填)"],
- ["第二轮指令(选填)", "第二轮回答(选填)"]
- ]
-}
-]
-```
-
-对于上面格式的数据,`--map-keys`参数完整应为
-
-`'{"prompt":"instruction","query":"input","response":"output","system":"system","history":"history"}'`
-
-其中参数的key值`"prompt"、"query"、"response"、"system"、"history"`
-代表数据集列映射后的属性,在代码中是固定的,不应改变,value值`"instruction"、"input"、"output"、"system"、"history"`对应数据集的列名。
-
-考虑到alpaca数据集大部分都是`["instruction", "input", "output"]`型格式,因此我们为key值`["prompt", "query", "response"]`
-设置了默认值。因此上面格式`--map-keys`参数可简略为`'{"system": "system","history": "history"}'`
-
-若数据集中无`system`与`history`列,则`--map-keys`可省略。
-
-【--prompt-type】
-
-用于指定模型模板,能够让base模型微调后能具备更好的对话能力。`prompt-type`
-的可选项可以在[templates](../../configs/model/templates.json)文件内查看。
-
-【--handler-name】
-
-微调数据预处理Alpaca风格数据集时,应指定为`AlpacaStyleInstructionHandler`,根据`--map-keys`参数提取对应数据的列。
-
-【--pack】
-
-将数据转为Pack格式。
-
-【--seq-length】
-
-指定Pack数据集每条数据的长度。
-
-【--append-eod】
-
-在每个输入序列的末尾添加一个特殊的标记来表示输入序列的结束。
-
-【--overwrite-cache】
-
-用于控制是否覆盖已存在的缓存分词器。
-
-**示例1:**
-
-```
---map-keys '{"prompt":"notice","query":"question","response":"answer","system":"system_test","history":"histories"}'
-```
-
-则会提取数据集里的`"notice"、"question"、"answer"、"system_test"、"histories"`列。
-
-**示例2:**
-
-```
---map-keys '{"history":"histories"}'
-```
-
-则会提取数据集里的`"instruction"、"input"、"output"、"histories"`列,其中`"instruction"、"input"、"output"`列作为默认值隐式存在。
-
-### 权重转换
-
-#### 权重下载
-
-* [qwen25-7b](https://huggingface.co/Qwen/Qwen2.5-7B/tree/main)
-* [qwen25-32b](https://huggingface.co/Qwen/Qwen2.5-32B/tree/main)
-
-#### hf 转 mcore
-
-在训练前,需要将 Hugging Face 权重转换成 Mcore 格式。
-
-注:这里会调用到 MindSpeed_LLM 仓,进行权重转换时注意按照安装手册中的环境准备步骤,将 mindspeed_llm 放入 MindSpeed-RL 目录下。
-
-```bash
-# 路径按照真实情况配置
-bash examples/ckpt/ckpt_convert_qwen25_hf2mcore.sh
-```
-
-##### 配置参数介绍
-
-* `use-mcore-models`:启用 MCore 模型;
-* `model-type`:指定模型类型,如 GPT;
-* `load-model-type`:指定加载模型的类型,如 hf(Hugging Face);
-* `save-model-type`:指定保存模型的类型,如 mg;
-* `target-tensor-parallel-size`:设置目标张量并行大小;
-* `target-pipeline-parallel-size`:设置目标流水线并行大小;
-* `add-qkv-bias`:是否进行 QKV 偏置;
-* `load-dir`:加载 Hugging Face 权重的路径;
-* `save-dir`:保存转换后权重的路径;
-* `tokenizer-model`:分词器模型文件的路径;
-* `model-type-hf`:指定 Hugging Face 模型类型,如 llama2;
-* `params-dtype`:指定参数的数据类型,如 bf16。
-
-#### mcore转hf(可选)
-
-训练结束后,如果需要将生成的mcore格式权重转换回Hugging Face 格式,可以参照以下命令及脚本:
-
-```bash
-# 路径按照真实情况配置
-bash examples/ckpt/ckpt_convert_qwen25_mcore2hf.sh
-```
-
-##### 配置参数介绍
-
-这里的参数与上文基本一致,注意以下几个事项即可:
-
-1. 权重转换转回hugging-face格式时,tp 和 pp 配置需配置为1;
-2. load-model-type参数配置为 mg,save-model-type参数配置为hf;
-3. save-dir 路径需要填入原始HF模型路径,新权重会存于HF原始权重文件下的mg2hg目录下,如/qwen2.5_7b_hf/mg2hg/
-
-### 开始训练
-
-### 单机
-
-参考[配置](../../configs/sft_qwen25_7b.yaml)
-,根据真实环境填写路径。进入项目目录后通过 [examples/sft/sft_qwen25_7b.sh](../../examples/sft/sft_qwen25_7b.sh)
-或者[examples/sft/sft_pack_qwen25_7b.sh](../../examples/sft/sft_pack_qwen25_7b.sh) 启动7B模型训练(单机)
-
-### 多机
-
-参考[配置](../../configs/sft_qwen25_32b.yaml),根据真实环境填写路径。
-进入项目目录后通过 [examples/sft/sft_qwen25_32b.sh](../../examples/sft/sft_qwen25_32b.sh)
-或者[examples/sft/sft_pack_qwen25_32b.sh](../../examples/sft/sft_pack_qwen25_32b.sh) 启动32B模型训练(多机)
-在运行脚本前需要根据真实环境配置脚本中的环境变量
-
-- MASTER_ADDR 主节点的IP
-- MASTER_PORT 主节点的端口
-- NNODES 参与训练的节点数
-- NODE_RANK 该节点在集群内对应的RANK
-- GLOO_SOCKET_IFNAME 可以通过 ifconfig 命令,找到本机IP对应的网卡名
-- TP_SOCKET_IFNAME,HCCL_SOCKET_IFNAME 可与 GLOO_SOCKET_IFNAME 配置成一样的
-
-在所有需要启动的机器内配置好脚本,在命令行统一运行即可启动多机训练。
-
-### 配置
-
-脚本使用的是configs下的sft_qwen25_7b/sft_qwen25_32b/sft_pack_qwen25_7b/sft_pack_qwen25_32b配置文件
-
-在文件内需要根据真实环境配置
-
-- tokenizer_name_or_path 需配置为 tokenizer对应路径
-- data_path 需配置为 \/\ 的形式,需要保证加载的 bin
- 文件的文件名为`_packed_(.*)_document(.*)`
-- tensor_model_parallel_size 张量并行数
-- pipeline_model_parallel_size 流水线并行数
-- 需要保证并行数乘积能整除总卡数
-- 并行配置要与权重转换时转换目标的并行配置一致
-
-## 参考集群规模
-
-
-
-
- 实验模型 |
- 硬件信息 |
- 集群规模 |
- 启动脚本 |
-
-
-
-
- Qwen25-7B |
- Atlas 900 A2 PODc |
- 1x8 |
- non_pack |
-
- pack |
-
- Qwen25-32B |
- Atlas 900 A2 PODc |
- 2x8 |
- non_pack |
-
- pack |
-
-
\ No newline at end of file
diff --git a/docs/features/hybrid_actor.md b/docs/features/hybrid_actor.md
index e4d9126c0cdff44f9ad03503c8eae8558252b716..077ac36f5408da3b4d3c1d6e0f18548512fb00f5 100644
--- a/docs/features/hybrid_actor.md
+++ b/docs/features/hybrid_actor.md
@@ -70,22 +70,22 @@ generate_config:
def initialize(self):
# 初始化分布式环境
self.setup_distributed_rank()
-
+
# 初始化训练态模型及卸载器
self.model, self.optimizer, self.opt_param_scheduler = self._build_model_optimizer()
- self.megatron_offloader = MegatronOffLoader(self.optimizer, self.model)
-
+ self.actor_offloader = MegatronOffLoader(self.model, self.optimizer)
+
# 在初始化推理态模型之前,首先卸载训练态的模型,这样才能让推理模型在初始化时正确计算KV Block的数量
if self.generate_config.offload_train_optimizer:
- self.megatron_offloader.offload_optimizer()
- if self.generate_config.offload_train_grad:
- self.megatron_offloader.offload_grad()
- if self.generate_config.offload_train_param:
- self.megatron_offloader.offload_train_param()
+ self.actor_offloader.offload_optimizer()
+ if self.generate_config.offload_grad:
+ self.actor_offloader.offload_grad()
+ if self.generate_config.offload_param:
+ self.actor_offloader.offload_param()
# 初始化推理态模型
self.inference_model = self._build_rollout()
-
+
# 初始化 sharding_manager
self.sharding_manager = self._build_sharding_manager()
...
@@ -221,44 +221,44 @@ def generate_sequences(self):
# mindspeed_rl/workers/resharding/megatron_sharding_manager.py
class MegatronShardingManager:
-
+
def reshard_to_train_mode(self):
# 卸载推理态相关权重
self.inference_engine.offload_model_weights()
self.offload_infer_params()
torch.cuda.empty_cache()
-
+
# 重新加载回训练态所需的内容
if self.optimizer_offload:
self.megatron_offloader.onload_optimizer()
if self.train_param_offload:
- self.megatron_offloader.onload_train_param()
+ self.megatron_offloader.onload_param()
if self.grad_offload:
self.megatron_offloader.onload_grad()
torch.cuda.empty_cache()
def reshard_to_infer_mode(self):
-
+
# 卸载训练态所需的相关参数
if self.optimizer_offload:
self.megatron_offloader.offload_optimizer()
if self.grad_offload:
self.megatron_offloader.offload_grad()
torch.cuda.empty_cache()
-
+
# 训练态权重要在完成推理权重构建之后才能进行卸载
# 这里是为了对应初始化后第一次推理时,训练态权重不在显存上的情况
if self.train_param_offload:
- self.megatron_offloader.onload_train_param()
-
+ self.megatron_offloader.onload_param()
+
# 根据训练态和推理态的切分策略,完成推理态权重的构建
self.onload_infer_params()
infer_params = self.vllm_weight_container.get_infer_params()
-
+
# 开始推理前,将训练态权重进行卸载
if self.train_param_offload:
- self.megatron_offloader.offload_train_param()
-
+ self.megatron_offloader.offload_param()
+
# 将推理态权重从 weight_buffer 绑定到推理引擎上
self.inference_engine.sync_model_weights(infer_params, load_format='megatron')
torch.cuda.empty_cache()
diff --git a/examples/sft/sft_pack_qwen25_32b.sh b/examples/sft/sft_pack_qwen25_32b.sh
deleted file mode 100644
index 63ea2ed78a86a4b9b3645204404548af8d7f720c..0000000000000000000000000000000000000000
--- a/examples/sft/sft_pack_qwen25_32b.sh
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/bin/bash
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True
-export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export TP_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HYDRA_FULL_ERROR=1
-
-MASTER_ADDR="localhost"
-MASTER_PORT="6060"
-NNODES=2
-NODE_RANK=0
-NPUS_PER_NODE=8
-WORLD_SIZE=$(($NPUS_PER_NODE*$NNODES))
-
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $NPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-
-torchrun $DISTRIBUTED_ARGS cli/train_sft.py \
- --config-name sft_pack_qwen25_32b \
- | tee logs/sft_pack_qwen25_32b_rank${NODE_RANK}.log
diff --git a/examples/sft/sft_pack_qwen25_7b.sh b/examples/sft/sft_pack_qwen25_7b.sh
deleted file mode 100644
index 90edbfd02e67006a3fcd8bfcea1e63d2105c275c..0000000000000000000000000000000000000000
--- a/examples/sft/sft_pack_qwen25_7b.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True
-export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export TP_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HYDRA_FULL_ERROR=1
-
-GPUS_PER_NODE=8
-MASTER_ADDR=localhost
-MASTER_PORT=6004
-NNODES=1
-NODE_RANK=0
-WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $GPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-
-torchrun $DISTRIBUTED_ARGS cli/train_sft.py \
- --config-name sft_pack_qwen25_7b \
- | tee logs/sft_pack_qwen25_7b_rank${NODE_RANK}.log
diff --git a/examples/sft/sft_qwen25_32b.sh b/examples/sft/sft_qwen25_32b.sh
deleted file mode 100644
index a400d66b5e346f2e7dbf381d646744c738cb032d..0000000000000000000000000000000000000000
--- a/examples/sft/sft_qwen25_32b.sh
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/bin/bash
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True
-export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export TP_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HYDRA_FULL_ERROR=1
-
-MASTER_ADDR="localhost"
-MASTER_PORT="6060"
-NNODES=2
-NODE_RANK=0
-NPUS_PER_NODE=8
-WORLD_SIZE=$(($NPUS_PER_NODE*$NNODES))
-
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $NPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-
-torchrun $DISTRIBUTED_ARGS cli/train_sft.py \
- --config-name sft_qwen25_32b \
- | tee logs/sft_qwen25_32b_rank${NODE_RANK}.log
\ No newline at end of file
diff --git a/examples/sft/sft_qwen25_7b.sh b/examples/sft/sft_qwen25_7b.sh
deleted file mode 100644
index 5845fbb075c55b35cbd4920ae48764607898cf86..0000000000000000000000000000000000000000
--- a/examples/sft/sft_qwen25_7b.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True
-export GLOO_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export TP_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HCCL_SOCKET_IFNAME= "Your SOCKET IFNAME"
-export HYDRA_FULL_ERROR=1
-
-GPUS_PER_NODE=8
-MASTER_ADDR=localhost
-MASTER_PORT=6005
-NNODES=1
-NODE_RANK=0
-WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $GPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-
-torchrun $DISTRIBUTED_ARGS cli/train_sft.py \
- --config-name sft_qwen25_7b \
- | tee logs/sft_qwen25_7b_rank${NODE_RANK}.log
diff --git a/mindspeed_rl/__init__.py b/mindspeed_rl/__init__.py
index d8d4ed4a2a1e3011d2cefc1320504e2b039bbb1f..076e803980b2267875ba66b92d0ee40ac8c30473 100644
--- a/mindspeed_rl/__init__.py
+++ b/mindspeed_rl/__init__.py
@@ -11,7 +11,7 @@ from .models import (
LossFuncFactory, GRPOActorLossFunc, ReferenceLossFunc, RewardLossFunc,
Actor, ActorRolloutHybrid, Reference, Reward
)
-from .trainer import SFTTrainer, RayGRPOTrainer
+from .trainer import RayGRPOTrainer
from .workers import (
ReferenceWorker, RewardWorker, ActorHybridWorker,
RayActorGroup, MegatronShardingManager, RuleReward
@@ -27,8 +27,7 @@ __all__ = [
'InstructionDataset', 'InstructionDataLoader', 'PromptDataset', 'PromptDataLoader',
'build_train_valid_test_datasets', 'get_train_valid_test_num_samples',
'LossFuncFactory', 'GRPOActorLossFunc', 'ReferenceLossFunc', 'RewardLossFunc',
- 'Actor', 'ActorRolloutHybrid', 'Reference', 'Reward',
- 'SFTTrainer', 'RayGRPOTrainer',
+ 'Actor', 'ActorRolloutHybrid', 'Reference', 'Reward', 'RayGRPOTrainer',
'get_tokenizer', 'WandbLogger', 'Metric',
'get_batch_metrices_mean', 'num_floating_point_operations',
'seed_all', 'synchronize_time', 'parse_args_from_config',
diff --git a/mindspeed_rl/config_cls/rl_config.py b/mindspeed_rl/config_cls/rl_config.py
index 650a5db15c1812c6cdf282130b2aa5e3100241a7..994bfc4f9b245ba2efc8effdf0de3a22aa1b926c 100644
--- a/mindspeed_rl/config_cls/rl_config.py
+++ b/mindspeed_rl/config_cls/rl_config.py
@@ -86,6 +86,7 @@ class RLConfig(BaseConfig):
self.n_samples_per_prompt = 1
self.enable_sharding_validate = False
self.tp_split_expert = False
+ self.update_micro_batch_size = None
self.use_tensorboard = False
self.use_wandb = False
@@ -94,5 +95,6 @@ class RLConfig(BaseConfig):
self.wandb_save_dir = ""
self.blocking = False
self.num_cpus_for_local_task = 1
+ self.use_integrated_worker = False
self.update(config_dict)
diff --git a/mindspeed_rl/config_cls/validate_config.py b/mindspeed_rl/config_cls/validate_config.py
index bba4ae9657239048e10d203fb1452b657d4989b0..00e400faedf94b31293296ddeba8e517eae2d763 100644
--- a/mindspeed_rl/config_cls/validate_config.py
+++ b/mindspeed_rl/config_cls/validate_config.py
@@ -1,8 +1,29 @@
# Copyright (c) 2025, HUAWEI CORPORATION. All rights reserved.
import os
+from mindspeed_rl.config_cls.rl_config import RLConfig
+from mindspeed_rl.config_cls.megatron_config import MegatronConfig
+from mindspeed_rl.config_cls.generate_config import GenerateConfig
+
+
+def validate_rl_args(
+ actor_config: MegatronConfig,
+ ref_config: MegatronConfig,
+ reward_config: MegatronConfig,
+ rl_config: RLConfig,
+ generate_config: GenerateConfig
+ ):
+
+ # 检查全共卡情况下参数设置
+ if rl_config.use_integrated_worker:
+ if rl_config.reference_resource is not None:
+ raise ValueError(
+ f"reference_resource should not be set when use_integrated_worker mode is on.")
+ rl_config.reference_resource = rl_config.actor_resource
+ if rl_config.reward_resource is not None:
+ raise ValueError(
+ f" Reward model is not supported when use_integrated_worker mode is on.")
-def validate_rl_args(actor_config, ref_config, reward_config, rl_config, generate_config):
# 校验序列长度与模型最大长度
if generate_config.max_model_len < actor_config.seq_length:
raise ValueError(
@@ -102,10 +123,11 @@ def validate_rl_args(actor_config, ref_config, reward_config, rl_config, generat
rl_config.n_samples_per_prompt,
"Reference")
- _validate_batch_ratio(reward_config.global_batch_size,
- reward_config.micro_batch_size,
- rl_config.n_samples_per_prompt,
- "Reward")
+ if rl_config.reward_resource:
+ _validate_batch_ratio(reward_config.global_batch_size,
+ reward_config.micro_batch_size,
+ rl_config.n_samples_per_prompt,
+ "Reward")
# 校验经验计数与全局批次关系
def _validate_experience_ratio(global_batch, experience_count, component):
@@ -122,9 +144,10 @@ def validate_rl_args(actor_config, ref_config, reward_config, rl_config, generat
rl_config.experience_count_ref,
"Reference")
- _validate_experience_ratio(reward_config.global_batch_size,
- rl_config.experience_count_reward,
- "Reward")
+ if rl_config.reward_resource:
+ _validate_experience_ratio(reward_config.global_batch_size,
+ rl_config.experience_count_reward,
+ "Reward")
_validate_experience_ratio(reward_config.global_batch_size,
rl_config.experience_count_rule_reward,
diff --git a/mindspeed_rl/trainer/__init__.py b/mindspeed_rl/trainer/__init__.py
index 8ebefa9c0aabc88898b8c88512c9ab2c3f6ee4c3..60eeea726eae67fe92b512dd2f706410b97570bb 100644
--- a/mindspeed_rl/trainer/__init__.py
+++ b/mindspeed_rl/trainer/__init__.py
@@ -1,7 +1,5 @@
# Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved.
-
-from .sft_trainer import SFTTrainer
from .grpo_trainer_hybrid import RayGRPOTrainer
from .orm_trainer import ORMTrainer
-__all__ = ['SFTTrainer', 'ORMTrainer', 'RayGRPOTrainer']
+__all__ = ['ORMTrainer', 'RayGRPOTrainer']
diff --git a/mindspeed_rl/trainer/grpo_trainer_hybrid.py b/mindspeed_rl/trainer/grpo_trainer_hybrid.py
index 12f74173861ca5b2d94dcab302217c9422e3e21c..079a3f30570d693d626d9cb20a9a6273081bebba 100644
--- a/mindspeed_rl/trainer/grpo_trainer_hybrid.py
+++ b/mindspeed_rl/trainer/grpo_trainer_hybrid.py
@@ -148,18 +148,18 @@ class RayGRPOTrainer(RayBaseTrainer):
# generate sequences
self.actor_worker.generate_sequences(blocking=self.blocking)
- # compute reference log_prob
- self.ref_worker.compute_log_prob(blocking=self.blocking)
-
# compute rm scores.
for reward_worker in self.reward_list:
if isinstance(reward_worker, RayActorGroup):
reward_worker.compute_rm_score(blocking=self.blocking)
else:
- self.rule_reward_compute_rm_score(reward_worker, blocking=self.blocking)
+ self.rule_reward_compute_rm_score(reward_worker, blocking=False)
# compute advantages, executed on the driver process
- self.compute_advantage(blocking=self.blocking)
+ self.compute_advantage(blocking=False)
+
+ # compute reference log_prob
+ self.ref_worker.compute_ref_log_prob(blocking=self.blocking)
# compute old log_prob
self.actor_worker.compute_log_prob(blocking=self.blocking)
diff --git a/mindspeed_rl/trainer/sft_trainer.py b/mindspeed_rl/trainer/sft_trainer.py
deleted file mode 100644
index 5d8741fbdd1378743fdc95320ea4b6303d5d9f7c..0000000000000000000000000000000000000000
--- a/mindspeed_rl/trainer/sft_trainer.py
+++ /dev/null
@@ -1,223 +0,0 @@
-# Copyright (c) 2023; NVIDIA CORPORATION. All rights reserved.
-# Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved.
-import os
-from functools import partial
-from abc import ABC
-
-import torch
-
-from mindspeed_rl.utils import Loggers
-from mindspeed_rl.utils.utils import get_tune_attention_mask
-from .utils.training import (
- get_finetune_data_on_this_tp_rank, broadcast_data, average_losses_across_data_parallel_group
-)
-
-logger = Loggers('stf_trainer')
-
-
-class SFTTrainer(ABC):
- """
- Trainer to use while training reward model.
-
- Args:
- model (torch.nn.Module): the model to train
- strategy (Strategy): the strategy to use for training
- optim(Optimizer): the optimizer to use for training
- train_dataset (RewardDataset): the dataset to use for training
- eval_dataset (RewardDataset): the dataset to use for evaluation
- batch_size (int, defaults to 1): the batch size while training
- max_epochs (int, defaults to 2): the number of epochs to train
- optim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer
- """
-
- def __init__(
- self,
- args,
- model,
- optimizer,
- train_data_iterator,
- valid_data_iterator,
- test_data_iterator_list,
- scheduler,
- process_non_loss_data_func=None,
- model_config=None,
- **kwargs
- ) -> None:
- super().__init__()
- self.args = args
- self.model = model
- self.optimizer = optimizer
- self.train_data_iterator = train_data_iterator
- self.valid_data_iterator = valid_data_iterator
- self.test_data_iterator_list = test_data_iterator_list
- self.scheduler = scheduler
- self.process_non_loss_data_func = process_non_loss_data_func
- self.model_config = model_config
- self.train_args = (self.forward_step, self.model, self.optimizer, self.scheduler, self.train_data_iterator,
- self.valid_data_iterator, self.process_non_loss_data_func, self.model_config)
- self.train_func = kwargs.get('train_func', None)
- self.parallel_state = kwargs.get('parallel_state', None)
- self.save_checkpoint_func = kwargs.get('save_checkpoint_func', None)
- self.evaluate_fun = kwargs.get('evaluate_fun', None)
- self.generate_seq_len_fun = kwargs.get('generate_seq_len_fun', None)
- self.get_batch_on_this_cp_rank = kwargs.get('batch_cp_func', None)
-
- def get_batch(self, data_iterator):
- """Generate a batch."""
- # Items and their type.
- keys = ['input_ids', 'attention_mask', 'labels']
- if self.args.reset_position_ids:
- keys += ['position_ids']
- data_type = torch.int64
-
- if (not self.parallel_state.is_pipeline_first_stage()) and (not self.parallel_state.is_pipeline_last_stage()):
- if self.args.variable_seq_lengths and self.args.pipeline_model_parallel_size > 2:
- tokens, attention_mask = get_finetune_data_on_this_tp_rank(data_iterator,
- self.parallel_state,
- self.args.reset_position_ids,
- self.args.tokenizer_padding_side)
- return tokens, None, None, attention_mask, None
- else:
- # Broadcast data.
- data_b = broadcast_data(keys, next(data_iterator), data_type)
- if self.args.reset_position_ids:
- self.generate_seq_len_fun(data_b)
- attention_mask_1d = data_b.get('attention_mask').long()
- attention_mask = get_tune_attention_mask(attention_mask_1d)
- batch = {'attention_mask': attention_mask}
- batch = self.get_batch_on_this_cp_rank(batch)
- return None, None, None, batch['attention_mask'], None
-
- # Broadcast data.
- data_b = broadcast_data(keys, next(data_iterator), data_type, self.parallel_state)
-
- # Unpack
- labels = data_b.get('labels').long()
- tokens = data_b.get('input_ids').long()
- attention_mask_1d = data_b.get('attention_mask').long()
- # ignored label -100
- loss_mask = torch.where(labels == -100, 0, 1)
-
- if self.args.reset_position_ids:
- position_ids = data_b.get('position_ids').long()
- self.generate_seq_len_fun(data_b)
- batch = {
- 'tokens': tokens,
- 'labels': labels,
- 'loss_mask': loss_mask,
- }
- batch = self.get_batch_on_this_cp_rank(batch)
- batch['attention_mask'] = None
- batch['position_ids'] = position_ids
- return batch.values()
-
- attention_mask = get_tune_attention_mask(attention_mask_1d,
- tokenizer_padding_side=self.args.tokenizer_padding_side,
- reset_attention_mask=self.args.reset_attention_mask
- )
- position_ids = None
- batch = {
- 'tokens': tokens,
- 'labels': labels,
- 'loss_mask': loss_mask,
- 'attention_mask': attention_mask,
- 'position_ids': position_ids
- }
- batch = self.get_batch_on_this_cp_rank(batch)
- return batch.values()
-
- def loss_func(self, input_tensor: torch.Tensor, output_tensor: torch.Tensor):
- """Loss function.
-
- Args:
- input_tensor (torch.Tensor): Used to mask out some portions of the loss
- output_tensor (torch.Tensor): The tensor with the losses
- """
- loss_mask = input_tensor
-
- losses = output_tensor.float()
- loss_mask = loss_mask[..., 1:].view(-1).float()
- if self.args.context_parallel_size > 1:
- loss = torch.cat([torch.sum(losses.view(-1) * loss_mask).view(1), loss_mask.sum().view(1)])
- torch.distributed.all_reduce(loss,
- group=self.parallel_state.get_context_parallel_group())
- loss = loss[0] / loss[1]
- else:
- loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
-
- # Check individual rank losses are not NaN prior to DP all-reduce.
- if self.args.check_for_nan_in_loss_and_grad:
- global_rank = torch.distributed.get_rank()
- if loss.isnan():
- raise ValueError(f'Rank {global_rank}: found NaN in local forward loss calculation. '
- f'Device: {torch.cuda.current_device()}, node: {os.uname()[1]}')
-
- # Reduce loss for logging.
- averaged_loss = average_losses_across_data_parallel_group([loss], parallel_state=self.parallel_state)
-
- return loss * self.args.context_parallel_size, {'lm loss': averaged_loss[0]}
-
- def forward_step(self, data_iterator, model):
- """Forward training step.
-
- Args:
- data_iterator : Input data iterator
- model (GPTModel): The GPT Model
- """
-
- # Get the batch.
- tokens, labels, loss_mask, attention_mask, position_ids = self.get_batch(
- data_iterator)
-
- output_tensor = model(tokens, position_ids, attention_mask,
- labels=labels)
-
- return output_tensor, partial(self.loss_func, loss_mask)
-
- def train(self):
- test_data_iterator = self.test_data_iterator_list[0]
- (forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator,
- process_non_loss_data_func, config) = self.train_args
-
- if not self.args.skip_train:
- logger.info('training ...')
-
- if self.args.dataloader_type == 'cyclic' and self.args.retro_project_dir:
- if self.args.retro_cyclic_train_iters is None:
- raise ValueError("retro_cyclic_train_iters must be provided.")
- self.args.train_iters = self.args.retro_cyclic_train_iters
- logger.info("retro cyclic train iters : %d" % self.args.train_iters)
-
- iteration = 0
- if self.args.do_train and self.args.train_iters > 0:
- iteration, num_floating_point_operations_so_far = self.train_func(
- *self.train_args)
-
- logger.info('after training is done')
-
- if self.args.save and iteration != 0 and iteration % self.args.save_interval != 0:
- self.save_checkpoint_func(
- iteration,
- model,
- optimizer,
- opt_param_scheduler,
- num_floating_point_operations_so_far
- )
- else:
- logger.info('skipping training (--skip-train is on) ...')
-
- iteration = self.args.iteration
-
- if self.args.do_valid:
- prefix = f'iteration {iteration} on validation set'
- self.evaluate_fun(prefix, forward_step_func,
- valid_data_iterator, model,
- iteration, process_non_loss_data_func, config,
- verbose=True, write_to_tensorboard=not self.args.skip_train)
-
- if self.args.do_test:
- prefix = f'iteration {iteration} on test set'
- self.evaluate_fun(prefix, forward_step_func,
- test_data_iterator, model,
- iteration, process_non_loss_data_func, config,
- verbose=True, write_to_tensorboard=not self.args.skip_train)
diff --git a/mindspeed_rl/utils/utils.py b/mindspeed_rl/utils/utils.py
index bf3bfa06524982226201fa50aa40e9c918897ebf..7ec4ea745273f7ce86b5db45c07c44764af765cc 100644
--- a/mindspeed_rl/utils/utils.py
+++ b/mindspeed_rl/utils/utils.py
@@ -222,7 +222,7 @@ def metrics_sort(metrics, time_all) -> Dict[str, Tensor]:
metrics["timing/all"] = time_all
sort_metrics = dict(sorted(metrics.items()))
- custom_order = ['timing/all', 'timing/update', 'timing/resharding_to_infer', 'timing/rollout', 'timing/resharding_to_train', 'timing/old_log_p', 'timing/reference_model', 'timing/non_overlap_reference_model']
+ custom_order = ['timing/all', 'timing/update', 'timing/rollout', 'timing/old_log_p', 'timing/reference_model', 'timing/non_overlap_reference_model']
special_keys = ['timing/non_overlap_rule_reward', 'timing/non_overlap_reward_model', 'timing/rule_reward', 'timing/reward_model']
keys_to_move = [key for key in sort_metrics.keys() if key in special_keys]
remaining_keys = []
@@ -239,12 +239,13 @@ def compute_tps(compute_kwargs, metrics_result, gbs, n_samples, time_all):
actor_resource = compute_kwargs.get('actor_resource', {})
reference_resource = compute_kwargs.get('reference_resource', {})
reward_resource = compute_kwargs.get('reward_resource', None)
+ actor_resource_only = compute_kwargs.get('use_integrated_worker', False)
actor_npus = actor_resource.get('num_npus', 0)
reference_npus = reference_resource.get('num_npus', 0)
reward_npus = reward_resource.get('num_npus', 0) if reward_resource is not None else 0
- world_size = actor_npus + reference_npus + reward_npus
+ world_size = actor_npus + reference_npus + reward_npus if not actor_resource_only else actor_npus
tps = (metrics_result['response_length/mean'] + metrics_result['prompt_length/mean']) * gbs * n_samples / world_size / time_all
return tps
diff --git a/mindspeed_rl/workers/actor_hybrid_worker.py b/mindspeed_rl/workers/actor_hybrid_worker.py
index dc292b18a6d3aa535c073654fbc691fd6b358b28..7b9e109bb015f4fc087e323fec71db595d78eb44 100644
--- a/mindspeed_rl/workers/actor_hybrid_worker.py
+++ b/mindspeed_rl/workers/actor_hybrid_worker.py
@@ -25,8 +25,7 @@ from mindspeed_rl.utils.compute import get_parallel_state
from mindspeed_rl.trainer.utils.parallel_state import is_pipeline_last_stage, get_tensor_model_parallel_rank
-@ray.remote(resources={"NPU": 0.7})
-class ActorHybridWorker(BaseWorker):
+class ActorHybridWorkerBase(BaseWorker):
"""
ActorHybridWorker class. This class implements the hybrid worker logic for training and inference.
@@ -71,13 +70,13 @@ class ActorHybridWorker(BaseWorker):
self.setup_distributed_rank()
self.model, self.optimizer, self.opt_param_scheduler = self._build_model_optimizer()
self._set_no_sync_func()
- self.megatron_offloader = MegatronOffLoader(self.optimizer, self.model)
+ self.actor_offloader = MegatronOffLoader(self.model, self.optimizer)
if self.generate_config.offload_train_optimizer:
- self.megatron_offloader.offload_optimizer()
+ self.actor_offloader.offload_optimizer()
if self.generate_config.offload_train_grad:
- self.megatron_offloader.offload_grad()
+ self.actor_offloader.offload_grad()
if self.generate_config.offload_train_param:
- self.megatron_offloader.offload_train_param()
+ self.actor_offloader.offload_param()
self.inference_model = self._build_rollout()
self.sharding_manager = self._build_sharding_manager()
@@ -112,6 +111,8 @@ class ActorHybridWorker(BaseWorker):
return self.args.consumed_train_samples
def update(self, kl_ctrl=None):
+ self.sharding_manager.enter_train_mode()
+
self.args.curr_iteration = self.iteration
experience_consumer_stage = 'actor_train'
@@ -151,17 +152,19 @@ class ActorHybridWorker(BaseWorker):
self.iteration += 1
+ self.sharding_manager.exit_train_mode()
+
def save_ckpt(self, iteration: int):
self.save_checkpoint(iteration, self.model, self.optimizer, self.opt_param_scheduler,
self.num_floating_point_operations_so_far)
def generate_sequences(self):
+ self.sharding_manager.enter_infer_mode()
experience_consumer_stage = 'actor_rollout'
experience_colums = ['prompts', 'prompt_length']
experience_count = self.rl_config.experience_count_actor // self.generate_config.data_parallel_size
start_reshard_to_infer = time.time()
- self.sharding_manager.reshard_to_infer_mode()
end_reshard_to_infer = time.time()
ray.get(
self.td.update_metrics.remote(
@@ -224,19 +227,22 @@ class ActorHybridWorker(BaseWorker):
cumulate=True
)
)
- start_reshard_to_train = time.time()
- self.sharding_manager.reshard_to_train_mode()
- end_reshard_to_train = time.time()
- ray.get(
- self.td.update_metrics.remote(
- "timing/resharding_to_train",
- value=[round(end_reshard_to_train, 4), round(start_reshard_to_train, 4)],
- cumulate=True
- )
- )
- self.empty_cache()
+ generate_end_time = time.time()
+ parallel_state = get_parallel_state()
+ use_vllm = True
+ if is_pipeline_last_stage(parallel_state, use_vllm) and get_tensor_model_parallel_rank(parallel_state, use_vllm) == 0:
+ ray.get(
+ self.td.update_metrics.remote(
+ "end_time/generate",
+ value=[round(generate_end_time, 4)],
+ cumulate=True
+ )
+ )
+
+ self.sharding_manager.exit_infer_mode()
def compute_log_prob(self):
+ self.sharding_manager.enter_forward_mode()
experience_consumer_stage = 'actor_log_prob'
experience_colums = ['input_ids', 'responses', 'response_length', 'prompt_length']
experience_count = self.rl_config.experience_count_actor // self.parallel_state.get_data_parallel_world_size()
@@ -261,15 +267,15 @@ class ActorHybridWorker(BaseWorker):
end_time = time.time()
ray.get(
self.td.update_metrics.remote(
- "timing/old_log_p",
- value=[round(end_time, 4), round(start_time, 4)],
+ "timing/old_log_p",
+ value=[round(end_time, 4), round(start_time, 4)],
cumulate=True
)
)
ray.get(
self.td.update_metrics.remote(
- "end_time/old_log_p",
- value=[round(end_time, 4)],
+ "end_time/old_log_p",
+ value=[round(end_time, 4)],
cumulate=True
)
)
@@ -331,7 +337,7 @@ class ActorHybridWorker(BaseWorker):
grad_offload=self.generate_config.offload_train_grad,
train_param_offload=self.generate_config.offload_train_param,
enable_validate=self.rl_config.enable_sharding_validate,
- megatron_offloader=self.megatron_offloader
+ megatron_offloader=self.actor_offloader
)
return sharding_manager
@@ -342,4 +348,10 @@ class ActorHybridWorker(BaseWorker):
# Megatron requires no_sync_func properly to correctly trigger DP reduce
config.no_sync_func = [model_chunk.no_sync for model_chunk in self.model]
if len(self.model) == 1:
- config.no_sync_func = config.no_sync_func[0]
\ No newline at end of file
+ config.no_sync_func = config.no_sync_func[0]
+
+
+
+@ray.remote(resources={"NPU": 0.7})
+class ActorHybridWorker(ActorHybridWorkerBase):
+ pass
\ No newline at end of file
diff --git a/mindspeed_rl/workers/integrated_worker.py b/mindspeed_rl/workers/integrated_worker.py
new file mode 100644
index 0000000000000000000000000000000000000000..cc0919437183abd72a6a97eee22fddd044d705e9
--- /dev/null
+++ b/mindspeed_rl/workers/integrated_worker.py
@@ -0,0 +1,112 @@
+# Copyright (c) 2025, HUAWEI CORPORATION. All rights reserved.
+
+import dataclasses
+from typing import Callable
+
+import ray
+import torch
+
+from mindspeed_rl.config_cls.megatron_config import MegatronConfig
+from mindspeed_rl.config_cls.rl_config import RLConfig
+from mindspeed_rl.config_cls.generate_config import GenerateConfig
+from mindspeed_rl.utils.tokenizer import BaseTokenizer
+from mindspeed_rl.workers.resharding.megatron_sharding_manager import MegatronOffLoader
+
+from mindspeed_rl.workers.actor_hybrid_worker import ActorHybridWorkerBase
+from mindspeed_rl.workers.reference_woker import ReferenceWorkerBase
+from mindspeed_rl.workers.reward_woker import RewardWorkerBase
+from mindspeed_rl.models.reference import Reference
+from mindspeed_rl.models.reward import Reward
+
+
+@ray.remote(resources={"NPU": 0.7})
+class IntegratedWorker(ActorHybridWorkerBase, ReferenceWorkerBase, RewardWorkerBase):
+ """
+ IntegratedWorker class. This class implements the integrated worker for the Actor, Reference and Reward Worker.
+
+ Args:
+ megatron_config: MegatronConfig Configuration for Megatron-LM (e.g., model parallelism settings).
+ rl_config: RLConfig Configuration for reinforcement learning (e.g., PPO settings).
+ generate_config: GenerateConfig Configuration for generation/inference (e.g., vLLM settings).
+ model_provider: Callable Function to provide the model instance.
+ initialize_func: Callable Function to initialize the model and environment.
+ tokenizer: BaseTokenizer = None Object to retrieve the tokenizer.
+ get_megatron_module: Callable = megatron_module from get_megatron_module.
+ **kwargs: Additional parameters for base class argument passing.
+ """
+
+ def __init__(
+ self,
+ megatron_config: MegatronConfig,
+ rl_config: RLConfig,
+ generate_config: GenerateConfig,
+ model_provider: Callable,
+ initialize_func: Callable,
+ tokenizer: BaseTokenizer = None,
+ get_megatron_module: Callable = None,
+ **kwargs
+ ):
+
+ # We use Actor as main worker, so only do init for Actor here.
+ ActorHybridWorkerBase.__init__(
+ self,
+ megatron_config,
+ rl_config,
+ generate_config,
+ model_provider=model_provider,
+ initialize_func=initialize_func,
+ tokenizer=tokenizer,
+ get_megatron_module=get_megatron_module,
+ **kwargs
+ )
+
+ self.update_micro_batch_size = rl_config.update_micro_batch_size
+
+ self.reference = None
+ self.ref_model = None
+ self.ref_manager = None
+
+
+ def initialize(self):
+
+ # Based on Actor
+ ActorHybridWorkerBase.initialize(self)
+
+ # Add Reference
+ self.ref_model = self.get_model(self.model_provider, self.model_type, wrap_with_ddp=False)
+ self.load_checkpoint(self.ref_model, None, None)
+ self.ref_manager = MegatronOffLoader(self.ref_model, wrap_with_ddp=False)
+ self.ref_manager.offload_param()
+ self.reference = Reference(
+ self.ref_model,
+ beta=self.rl_config.beta,
+ mini_batch_size=self.rl_config.mini_batch_size,
+ epochs=self.rl_config.epochs,
+ shuffle_mini_batch=self.rl_config.shuffle_mini_batch,
+ generate_config=self.generate_config,
+ stage=self.megatron_config.stage,
+ forward_backward_func=self.forward_backward_func,
+ micro_batch_size=self.megatron_config.micro_batch_size
+ )
+
+ def compute_ref_log_prob(self):
+ self.ref_manager.onload_param()
+ ReferenceWorkerBase.compute_ref_log_prob(self)
+ self.ref_manager.offload_param()
+
+ def update(self, kl_ctrl=None):
+ # set update mbs
+ update_mbs = self.update_micro_batch_size
+ mbs = self.actor_hybrid.train_actor.micro_batch_size
+
+ from megatron.training import get_args
+ args = get_args()
+
+ if update_mbs is not None:
+ self.actor_hybrid.train_actor.micro_batch_size = update_mbs
+ args.micro_batch_size = update_mbs
+
+ ActorHybridWorkerBase.update(self, kl_ctrl)
+
+ args.micro_batch_size = mbs
+ self.actor_hybrid.train_actor.micro_batch_size = mbs
diff --git a/mindspeed_rl/workers/reference_woker.py b/mindspeed_rl/workers/reference_woker.py
index d813f880491d4894552b87564cef707418f78b91..fd465a7735d83c2848ee61100b43f1ebb4827a2c 100644
--- a/mindspeed_rl/workers/reference_woker.py
+++ b/mindspeed_rl/workers/reference_woker.py
@@ -17,8 +17,7 @@ from mindspeed_rl.utils.compute import get_parallel_state
from mindspeed_rl.trainer.utils.parallel_state import is_pipeline_last_stage, get_tensor_model_parallel_rank
-@ray.remote(resources={"NPU": 0.3})
-class ReferenceWorker(BaseWorker):
+class ReferenceWorkerBase(BaseWorker):
"""
ReferenceWorker class. This class implements the worker logic for reference model inference.
@@ -82,7 +81,7 @@ class ReferenceWorker(BaseWorker):
def init_transfer_dock(self, td):
self.td = td
- def compute_log_prob(self):
+ def compute_ref_log_prob(self):
experience_consumer_stage = 'ref_log_prob'
experience_columns = ['input_ids', 'responses', 'response_length', 'prompt_length']
experience_count = self.rl_config.experience_count_ref // self.parallel_state.get_data_parallel_world_size()
@@ -126,8 +125,13 @@ class ReferenceWorker(BaseWorker):
ref_end_time = time.time()
ray.get(
self.td.update_metrics.remote(
- "end_time/reference",
+ "end_time/reference",
value=[round(ref_end_time, 4)]
)
)
self.empty_cache()
+
+
+@ray.remote(resources={"NPU": 0.3})
+class ReferenceWorker(ReferenceWorkerBase):
+ pass
\ No newline at end of file
diff --git a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py
index d74fd0c13768336baa004f5b1b6f56934b36a07c..c7016e40c72bc358d4228b4e08e76bc7f4afc85c 100644
--- a/mindspeed_rl/workers/resharding/megatron_sharding_manager.py
+++ b/mindspeed_rl/workers/resharding/megatron_sharding_manager.py
@@ -29,23 +29,20 @@ from mindspeed_rl.workers.resharding.weight_adaptor import get_weight_adaptor
class MegatronOffLoader:
- def __init__(
- self,
- optimizer=None,
- megatron_model=None
- ):
+ def __init__(self, megatron_model=None, optimizer=None, wrap_with_ddp=True):
self.optimizer = optimizer
- self.train_model = megatron_model
+ self.model = megatron_model
+ self.wrap_with_ddp = wrap_with_ddp
self.tensor_to_cpu_states_map = dict()
def offload_grad(self):
- for model_idx, model in enumerate(self.train_model):
+ for model in self.model:
for buffer in chain(model.buffers, model.expert_parallel_buffers):
self.swap_tensors_to_host(buffer.grad_data)
def onload_grad(self):
- for model_idx, model in enumerate(self.train_model):
+ for model in self.model:
for buffer in chain(model.buffers, model.expert_parallel_buffers):
self.swap_tensors_to_device(buffer.grad_data)
@@ -73,15 +70,23 @@ class MegatronOffLoader:
else:
return data
- def offload_train_param(self):
- for model_idx, model in enumerate(self.train_model):
- for buffer in chain(model.buffers, model.expert_parallel_buffers):
- self.swap_tensors_to_host(buffer.param_data)
+ def offload_param(self):
+ if self.wrap_with_ddp:
+ for model in self.model:
+ for buffer in chain(model.buffers, model.expert_parallel_buffers):
+ self.swap_tensors_to_host(buffer.param_data)
+ else:
+ for item in self.model:
+ item.to('cpu')
- def onload_train_param(self):
- for model_idx, model in enumerate(self.train_model):
- for buffer in chain(model.buffers, model.expert_parallel_buffers):
- self.swap_tensors_to_device(buffer.param_data)
+ def onload_param(self):
+ if self.wrap_with_ddp:
+ for model in self.model:
+ for buffer in chain(model.buffers, model.expert_parallel_buffers):
+ self.swap_tensors_to_device(buffer.param_data)
+ else:
+ for item in self.model:
+ item.to(torch.cuda.current_device())
def swap_tensors_to_host(self, tensor):
if tensor not in self.tensor_to_cpu_states_map:
@@ -162,47 +167,106 @@ class MegatronShardingManager:
self.inference_engine.offload_model_weights()
self.megatron_offloader = megatron_offloader
- def __enter__(self):
- self.reshard_to_infer_mode()
+ def offload_infer_params(self):
+ infer_weight_buffers = self.vllm_weight_container.weight_buffers
+ for buffer in infer_weight_buffers:
+ buffer.offload()
+
+ def onload_infer_params(self):
+ infer_weight_buffers = self.vllm_weight_container.weight_buffers
+ for buffer in infer_weight_buffers:
+ buffer.onload()
+
+ def enter_infer_mode(self):
+ """
+ Before:
+ Empty or with training param on NPU.
+
+ After:
+ Empty.
+
+ Process:
+ 1. onload training param if needed
+ 2. onload inference param
+ 3. do resharding
+ 4. offload training param
+ """
+ if self.train_param_offload:
+ self.megatron_offloader.onload_param()
+
+ self.onload_infer_params()
+
+ infer_params = self.vllm_weight_container.get_infer_params()
+
+ if self.train_param_offload:
+ self.megatron_offloader.offload_param()
+
+ self.inference_engine.sync_model_weights(infer_params, load_format='megatron')
+ torch.cuda.empty_cache()
+
+ def exit_infer_mode(self):
+ """
+ Before:
+ With inference param on NPU.
- def __exit__(self, exc_type, exc_value, traceback):
- self.reshard_to_train_mode()
+ After:
+ Empty.
- def reshard_to_train_mode(self):
+ Process:
+ 1. offload inference param
+ """
self.inference_engine.offload_model_weights()
self.offload_infer_params()
torch.cuda.empty_cache()
+
+ def enter_forward_mode(self):
+ """
+ Before:
+ Empty.
+
+ After:
+ With training param on NPU.
+
+ Process:
+ 1. onload training param
+ """
+ if self.train_param_offload:
+ self.megatron_offloader.onload_param()
+ torch.cuda.empty_cache()
+
+ def enter_train_mode(self):
+ """
+ Before:
+ With training param on NPU.
+
+ After:
+ With training param, optimizer and grad on NPU.
+
+ Process:
+ 1. onload training optimizer
+ 2. onload training grad
+ """
if self.optimizer_offload:
self.megatron_offloader.onload_optimizer()
- if self.train_param_offload:
- self.megatron_offloader.onload_train_param()
if self.grad_offload:
self.megatron_offloader.onload_grad()
- # add empty cache after each compute
torch.cuda.empty_cache()
- def reshard_to_infer_mode(self):
+ def exit_train_mode(self):
+ """
+ Before:
+ With training param, optimizer and grad on NPU.
+
+ After:
+ With training param on NPU.
+
+ Process:
+ 1. offload training optimizer
+ 2. offload training grad
+ """
if self.optimizer_offload:
self.megatron_offloader.offload_optimizer()
if self.grad_offload:
self.megatron_offloader.offload_grad()
torch.cuda.empty_cache()
- if self.train_param_offload:
- self.megatron_offloader.onload_train_param()
- self.onload_infer_params()
- infer_params = self.vllm_weight_container.get_infer_params()
- if self.train_param_offload:
- self.megatron_offloader.offload_train_param()
- self.inference_engine.sync_model_weights(infer_params, load_format='megatron')
- torch.cuda.empty_cache()
-
- def offload_infer_params(self):
- infer_weight_buffers = self.vllm_weight_container.weight_buffers
- for buffer in infer_weight_buffers:
- buffer.offload()
-
- def onload_infer_params(self):
- infer_weight_buffers = self.vllm_weight_container.weight_buffers
- for buffer in infer_weight_buffers:
- buffer.onload()
diff --git a/mindspeed_rl/workers/reward_woker.py b/mindspeed_rl/workers/reward_woker.py
index 2eb4cd7469eb09e3bd4814861699a338d20174a9..2948d8eabc4982c5badb08271a19841b6861888a 100644
--- a/mindspeed_rl/workers/reward_woker.py
+++ b/mindspeed_rl/workers/reward_woker.py
@@ -17,8 +17,7 @@ from mindspeed_rl.utils.compute import get_parallel_state
from mindspeed_rl.trainer.utils.parallel_state import is_pipeline_last_stage, get_tensor_model_parallel_rank
-@ray.remote(resources={"NPU": 0.1})
-class RewardWorker(BaseWorker):
+class RewardWorkerBase(BaseWorker):
"""
RewardWorker class. This class implements the worker logic for reward model training and inference.
@@ -114,7 +113,7 @@ class RewardWorker(BaseWorker):
end_time = time.time()
ray.get(
self.td.update_metrics.remote(
- "timing/reward_model",
+ "timing/reward_model",
value=[round(end_time, 4), round(start_time, 4)],
cumulate=True
)
@@ -125,8 +124,13 @@ class RewardWorker(BaseWorker):
rwd_end_time = time.time()
ray.get(
self.td.update_metrics.remote(
- "end_time/reward_model",
+ "end_time/reward_model",
value=[round(rwd_end_time, 4)]
)
)
- self.empty_cache()
\ No newline at end of file
+ self.empty_cache()
+
+
+@ray.remote(resources={"NPU": 0.1})
+class RewardWorker(RewardWorkerBase):
+ pass
\ No newline at end of file
diff --git a/mindspeed_rl/workers/scheduler/launcher.py b/mindspeed_rl/workers/scheduler/launcher.py
index ea9c4e80d031f0effbf25c17ef91a30820e649fc..2b6d3099d46833b893c59a9be79c293af60f4b5b 100644
--- a/mindspeed_rl/workers/scheduler/launcher.py
+++ b/mindspeed_rl/workers/scheduler/launcher.py
@@ -35,12 +35,16 @@ from mindspeed_rl.workers.base_worker import BaseWorker
from mindspeed_rl.workers.actor_hybrid_worker import ActorHybridWorker
from mindspeed_rl.workers.reference_woker import ReferenceWorker
from mindspeed_rl.workers.reward_woker import RewardWorker
+from mindspeed_rl.workers.integrated_worker import IntegratedWorker
def get_rl_resource_by_worker_type(rl_config: RLConfig, worker: Type[BaseWorker]):
if (worker.__ray_actor_class__.__name__ ==
ActorHybridWorker.__ray_actor_class__.__name__):
return rl_config.actor_resource
+ elif (worker.__ray_actor_class__.__name__ ==
+ IntegratedWorker.__ray_actor_class__.__name__):
+ return rl_config.actor_resource
elif (worker.__ray_actor_class__.__name__ ==
RewardWorker.__ray_actor_class__.__name__):
return rl_config.reward_resource
@@ -264,6 +268,12 @@ class RayActorGroup:
if blocking:
ray.get(self.temp_actor_ref_objs)
+ def compute_ref_log_prob(self, blocking=False):
+ for actor in self.actor_handlers:
+ self.temp_actor_ref_objs.append(actor.compute_ref_log_prob.remote())
+ if blocking:
+ ray.get(self.temp_actor_ref_objs)
+
def compute_rm_score(self, blocking=False):
for actor in self.actor_handlers:
self.temp_actor_ref_objs.append(actor.compute_rm_score.remote())
diff --git a/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..2c504160f6291c0b9beb9b405a00c70742f163a9
--- /dev/null
+++ b/tests/st/configs/test_grpo_trainer_qwen25_7b_integrated.yaml
@@ -0,0 +1,107 @@
+defaults:
+ - model:
+ - qwen25_7b
+
+megatron_training:
+ model: qwen25_7b
+ use_fused_rmsnorm: true
+ use_mcore_models: true
+ sequence_parallel: true
+ use_flash_attn: true
+ no_masked_softmax_fusion: true
+ attention_softmax_in_fp32: true
+ no_gradient_accumulation_fusion: true
+ use_fused_swiglu: true
+ use_fused_rotary_pos_emb: true
+ bf16: true
+ use_distributed_optimizer: true
+ tokenizer_type: PretrainedFromHF
+ tokenizer_name_or_path: /data/for_dt/weights/Qwen2.5-7B
+ global_batch_size: 4
+ seq_length: 1024
+ save_interval: 50
+ train_iters: 1
+ stage: ray_grpo
+ attention_dropout: 0.0
+ init_method_std: 0.01
+ hidden_dropout: 0.0
+ distributed_backend: nccl
+ no_shared_storage: true
+ variable_seq_lengths: true
+ dataset_additional_keys: ['labels',]
+ data_path: /data/for_dt/datasets/pe-nlp/data
+ split: 100,0,0
+
+actor_config:
+ model: qwen25_7b
+ micro_batch_size: 2
+ tensor_model_parallel_size: 4
+ pipeline_model_parallel_size: 1
+ lr: 5e-7
+ lr_decay_style: cosine
+ min_lr: 5e-8
+ weight_decay: 0.0
+ lr_warmup_fraction: 0.0
+ clip_grad: 10000.0
+ adam_beta1: 0.9
+ adam_beta2: 0.95
+ initial_loss_scale: 4096
+ finetune: true
+ load: /data/for_dt/weights/Qwen2.5-7B-tp4
+ save: ./ckpt
+ no_load_optim: true
+ no_load_rng: true
+
+rl_config:
+ use_integrated_worker: true
+ blocking: true
+ update_micro_batch_size: 2
+ experience_count: 4
+ gamma: 1.0
+ lam: 0.95
+ adv_estimator: group_norm
+ kl_penalty: kl
+ kl_ctrl_type: fixed
+ init_kl_coef: 0.0001
+ mini_batch_size: 256
+ max_prompt_length: 2048
+ epochs: 1
+ clip_ratio: 0.2
+ entropy_coeff: 0.001
+ n_samples_per_prompt: 8
+ rule_reward: true
+ verifier_function: ["base_acc"]
+ verifier_weight: [1.0]
+ verifier_parallel: 4
+ verifier_timeout: 120
+ use_tensorboard: true
+ actor_resource:
+ num_npus: 8
+
+generate_config:
+ trust_remote_code: true
+ offload_train_optimizer: true
+ offload_train_grad: true
+ offload_train_param: true
+
+ # 推理时的并行配置
+ infer_tensor_parallel_size: 2
+ infer_pipeline_parallel_size: 1
+ infer_expert_parallel_size: 1
+
+ # vllm 模型相关设置
+ max_num_seqs: 512
+ max_model_len: 4096
+ dtype: "bfloat16"
+ gpu_memory_utilization: 0.9
+ num_scheduler_steps: 1
+
+ # 采样配置
+ sampling_config:
+ logprobs: 1
+ max_tokens: 2048
+ top_p: 0.9
+ top_k: 50
+ min_p: 0.01
+ temperature: 0.8
+ detokenize: false
diff --git a/tests/st/configs/test_sft_trainer_qwen25_7b.yaml b/tests/st/configs/test_sft_trainer_qwen25_7b.yaml
deleted file mode 100644
index 53d3e7257a4c19097c63ecc45ce68d6af238cd39..0000000000000000000000000000000000000000
--- a/tests/st/configs/test_sft_trainer_qwen25_7b.yaml
+++ /dev/null
@@ -1,70 +0,0 @@
-defaults:
- - model:
- - qwen25_7b
-
-sft:
- # tune_args:
- finetune: true
- stage: sft
- is_instruction_dataset: true
- variable_seq_lengths: false
- tokenizer_not_use_fast: true
- prompt_type: qwen
-
- # gpt_args:
- norm_epsilon: 1e-6
- micro_batch_size: 2
- global_batch_size: 4
- tokenizer_type: PretrainedFromHF
- tokenizer_name_or_path: /data/for_dt/tokenizer/Qwen25-7B
- train_iters: 5
- lr: 5e-5
- lr_decay_style: cosine
- min_lr: 1.25e-7
- lr_warmup_fraction: 0.01
- weight_decay: 1e-1
- clip_grad: 1.0
- initial_loss_scale: 4096
- use_distributed_optimizer: true
- tensor_model_parallel_size: 2
- pipeline_model_parallel_size: 2
- sequence_parallel: true
- use_mcore_models: true
- use_fused_rmsnorm: true
- use_flash_attn: true
- no_masked_softmax_fusion: true
- no_gradient_accumulation_fusion: true
- use_fused_swiglu: true
- use_fused_rotary_pos_emb: true
- bf16: true
- seq_length: 4096
- adam_beta1: 0.9
- adam_beta2: 0.95
- attention_dropout: 0.0
- init_method_std: 0.01
- hidden_dropout: 0.0
- overlap_grad_reduce: true
- overlap_param_gather: true
-
- # data_args:
- data_path: /data/datasets/pack/alpaca
- reset_position_ids: true
- split: 100,0,0
- no_shuffle: true
-
- # ckpt_args:
- no_load_optim: true
- no_load_rng: true
- no_save_optim: true
- no_save_rng: true
- seed: 1234
- model: qwen25_7b
- load: ./ckpt
- save: ./ckpt
-
- # output_args:
- log_interval: 1
- save_interval: 5000
- eval_interval: 5000
- eval_iters: 0
- log_throughput: true
diff --git a/tests/st/resharding/test_resharding.py b/tests/st/resharding/test_resharding.py
index 670a9d2687f515aaac4482644c29d0eabc7305ee..676d4131efd3ce102773542d99c08b027be5a09f 100644
--- a/tests/st/resharding/test_resharding.py
+++ b/tests/st/resharding/test_resharding.py
@@ -317,7 +317,7 @@ class TestActor():
trust_remote_code=True,
megatron_config=megatron_config
)
- self.megatron_offloader = MegatronOffLoader(self.optimizer, self.model)
+ self.megatron_offloader = MegatronOffLoader(self.model, self.optimizer)
self.sharding_manager = MegatronShardingManager(
megatron_model=self.model,
model_config=model_config_mock,
@@ -342,9 +342,9 @@ class TestActor():
tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_path)
input_ids = tokenizer(prompts, padding=True, truncation=True, return_tensors="pt")["input_ids"].tolist()
- self.sharding_manager.reshard_to_infer_mode()
+ self.sharding_manager.enter_infer_mode()
outputs = self.inference_engine.generate_sequences(idx_list=input_ids)[0]
- self.sharding_manager.reshard_to_train_mode()
+ self.sharding_manager.exit_infer_mode()
rank = torch.distributed.get_rank()
for output in outputs:
diff --git a/tests/st/sft_trainer/test_module_entry_sft_trainer.sh b/tests/st/sft_trainer/test_module_entry_sft_trainer.sh
deleted file mode 100644
index 394f03cfdde2b88f675d11865298b1d98e000cd5..0000000000000000000000000000000000000000
--- a/tests/st/sft_trainer/test_module_entry_sft_trainer.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-export CUDA_DEVICE_MAX_CONNECTIONS=1
-export HYDRA_FULL_ERROR=1
-
-SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)
-export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH
-PROJECT_PATH=$SCRIPT_DIR/../../..
-
-GPUS_PER_NODE=4
-MASTER_ADDR=localhost
-MASTER_PORT=6004
-NNODES=1
-NODE_RANK=0
-
-DISTRIBUTED_ARGS="
- --nproc_per_node $GPUS_PER_NODE \
- --nnodes $NNODES \
- --node_rank $NODE_RANK \
- --master_addr $MASTER_ADDR \
- --master_port $MASTER_PORT
-"
-
-torchrun $DISTRIBUTED_ARGS "$PROJECT_PATH"/cli/train_sft.py --config-dir="$PROJECT_PATH"/tests/st/configs \
---config-name=test_sft_trainer_qwen25_7b
diff --git a/tests/st/train_engine/test_module_entry_ray_grpo_qwen25_7b_tp4_integrated.sh b/tests/st/train_engine/test_module_entry_ray_grpo_qwen25_7b_tp4_integrated.sh
new file mode 100644
index 0000000000000000000000000000000000000000..39708871aa455e7a51ee2f1ba6996f0dadaf684c
--- /dev/null
+++ b/tests/st/train_engine/test_module_entry_ray_grpo_qwen25_7b_tp4_integrated.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+export CUDA_DEVICE_MAX_CONNECTIONS=1
+export HCCL_DETERMINISTIC=True
+
+SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)
+export PYTHONPATH=$SCRIPT_DIR/../../..:$PYTHONPATH
+PROJECT_PATH=$SCRIPT_DIR/../../..
+
+python "$PROJECT_PATH"/cli/train_grpo.py --config-dir="$PROJECT_PATH"/tests/st/configs --config-name=test_grpo_trainer_qwen25_7b_integrated
\ No newline at end of file