From 1fd564886439f0051bfa643fd0dea00bcec1f655 Mon Sep 17 00:00:00 2001 From: wiyr Date: Mon, 12 Jun 2023 10:39:51 +0800 Subject: [PATCH 1/2] added func code --- ascendspeed/__init__.py | 13 +- ascendspeed/arguments.py | 45 +++- ascendspeed/checkpointing.py | 8 +- ascendspeed/data/gpt_dataset.py | 13 +- ascendspeed/data/indexed_dataset.py | 1 + ascendspeed/global_vars.py | 95 +-------- ascendspeed/model/__init__.py | 1 + ascendspeed/model/enums.py | 8 +- ascendspeed/model/fused_layer_norm.py | 40 ++-- ascendspeed/timers.py | 283 ++++++++++++++++++++++++++ ascendspeed/training.py | 11 +- 11 files changed, 398 insertions(+), 120 deletions(-) create mode 100644 ascendspeed/timers.py diff --git a/ascendspeed/__init__.py b/ascendspeed/__init__.py index 3d247c581..75f6ce1a9 100644 --- a/ascendspeed/__init__.py +++ b/ascendspeed/__init__.py @@ -13,11 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import torch import copy +import torch_npu from functools import wraps - -import torch +from torch_npu.contrib import transfer_to_npu from torch import distributed as dist + from deepspeed.accelerator import get_accelerator from .package_info import ( __description__, @@ -53,17 +55,20 @@ def wrapper_type(fn): return decorated -torch.Tensor.type = wrapper_type(torch.Tensor.type) # fix for torch.Tensor.type() +torch.Tensor.type = wrapper_type(torch.Tensor.type) def wrapper_dist_long2int(fn): @wraps(fn) def wrapper(*args, **kwargs): + if not args: + args = (kwargs.pop('tensor'),) + if args[0].dtype == torch.long and not kwargs.get('async_op', False): new_args = list(copy.deepcopy(args)) new_args[0] = new_args[0].int() fn(*new_args, **kwargs) args[0].copy_(new_args[0].long()) - return None + return return fn(*args, **kwargs) return wrapper diff --git a/ascendspeed/arguments.py b/ascendspeed/arguments.py index 1c73aab9a..6941e3762 100644 --- a/ascendspeed/arguments.py +++ b/ascendspeed/arguments.py @@ -20,6 +20,7 @@ import os import torch import deepspeed +from ascendspeed.enums import PositionEmbeddingType def parse_args(extra_args_provider=None, defaults={}, @@ -229,6 +230,15 @@ def parse_args(extra_args_provider=None, defaults={}, assert args.encoder_seq_length is not None args.seq_length = args.encoder_seq_length + if args.position_embedding_type == PositionEmbeddingType.absolute or args.position_embedding_type == PositionEmbeddingType.alibi: + assert args.max_position_embeddings is not None + if args.seq_length is not None: + assert args.max_position_embeddings >= args.seq_length + if args.decoder_seq_length is not None: + assert args.max_position_embeddings >= args.decoder_seq_length + else: + assert args.max_position_embeddings is None + if args.seq_length is not None: assert args.max_position_embeddings >= args.seq_length if args.decoder_seq_length is not None: @@ -307,12 +317,21 @@ def _add_network_size_args(parser): 'attention. This is set to ' ' args.hidden_size // args.num_attention_heads ' 'if not provided.') + group.add_argument('--embed-layernorm', action='store_true', + help='use layernorm for embedding') group.add_argument('--max-position-embeddings', type=int, default=None, help='Maximum number of position embeddings to use. ' 'This is the size of position embedding.') + group.add_argument('--position-embedding-type', type=lambda x: PositionEmbeddingType[x], + choices=list(PositionEmbeddingType), default=PositionEmbeddingType.absolute, + help='Define position embedding type ("absolute" | "rotary" | "alibi"). "absolute" by default.') group.add_argument('--make-vocab-size-divisible-by', type=int, default=128, help='Pad the vocab size to be divisible by this value.' 'This is added for computational efficieny reasons.') + group.add_argument('--pad-vocab-size-to', type=int, default=None, + help='Pad the vocab size to this value.' + 'This value must be greater than the initial size of the tokenizer,' + 'needs to be divisible by TP size and `make-vocab-size-divisible-by`.') group.add_argument('--layernorm-epsilon', type=float, default=1e-5, help='Layer norm epsilon.') group.add_argument('--apply-residual-connection-post-layernorm', @@ -340,6 +359,24 @@ def _add_logging_args(parser): help='If set, calculate and log parameters norm.') group.add_argument('--log-num-zeros-in-grad', action='store_true', help='If set, calculate and log the number of zeros in gradient.') + group.add_argument('--timing-log-level', type=int, + default=0, choices=range(0,3), + help='Granularity level to measure and report timing. ' + ' 0: report only iteration time and make sure timing ' + ' does not introduce extra overhead.' + ' 1: report timing for operations that are executed ' + ' very limited times (basically once) during ' + ' each iteration (such as gradient all-reduce) ' + ' 2: report timing for operations that migh be ' + ' executed numerous times during each iteration. ' + 'Note that setting the level to 1 or 2 might ' + 'cause increase in iteration time.') + group.add_argument('--timing-log-option', type=str, default='minmax', + choices=['max', 'minmax', 'all'], + help='Options for logging timing:' + ' max: report the max timing across all ranks' + ' minmax: report min and max timings across all ranks' + ' all: report timings of all ranks.') group.add_argument('--tensorboard-log-interval', type=int, default=1, help='Report to tensorboard interval.') group.add_argument('--tensorboard-queue-size', type=int, default=1000, @@ -740,8 +777,12 @@ def _add_data_args(parser): default=None, choices=['BertWordPieceLowerCase', 'BertWordPieceCase', - 'GPT2BPETokenizer'], + 'GPT2BPETokenizer', + 'PretrainedFromHF'], help='What type of tokenizer to use.') + group.add_argument("--tokenizer-name-or-path", type=str, default=None, + help="Name or path of the huggingface tokenizer.") + group.add_argument('--data-impl', type=str, default='infer', choices=['lazy', 'cached', 'mmap', 'infer'], help='Implementation of indexed datasets.') @@ -752,6 +793,8 @@ def _add_data_args(parser): 'end-of-document token.') group.add_argument('--eod-mask-loss', action='store_true', help='Mask loss for the end of document tokens.') + group.add_argument('--loss-on-targets-only', action='store_true', + help='Mask loss on input sequence.') group.add_argument('--train-data-exact-num-epochs', type=int, default=None, help='When building the train dataset, force it to be ' 'an exact number of epochs of the raw data') diff --git a/ascendspeed/checkpointing.py b/ascendspeed/checkpointing.py index bae3257a9..afd488cb6 100644 --- a/ascendspeed/checkpointing.py +++ b/ascendspeed/checkpointing.py @@ -21,6 +21,7 @@ import sys import numpy as np from deepspeed.accelerator import get_accelerator import torch +from ascendspeed.enums import PositionEmbeddingType from ascendspeed import (get_args, @@ -63,7 +64,12 @@ def check_checkpoint_args(checkpoint_args): _compare('num_layers') _compare('hidden_size') _compare('num_attention_heads') - _compare('max_position_embeddings') + + _compare('position_embedding_type') + # with alibi we can change `max_position_embeddings` + if args.position_embedding_type != PositionEmbeddingType.alibi: + _compare('max_position_embeddings') + if args.vocab_file: _compare('make_vocab_size_divisible_by') _compare('padded_vocab_size') diff --git a/ascendspeed/data/gpt_dataset.py b/ascendspeed/data/gpt_dataset.py index 9f69bdcd7..62743177f 100644 --- a/ascendspeed/data/gpt_dataset.py +++ b/ascendspeed/data/gpt_dataset.py @@ -220,20 +220,11 @@ def _build_index_mappings(name, data_prefix, documents, sizes, sample_idx_filename = _filename + '_sample_idx.npy' shuffle_idx_filename = _filename + '_shuffle_idx.npy' - if name == 'train': - # force to use certain index files - if args.train_doc_idx_path is not None: - doc_idx_filename = args.train_doc_idx_path - if args.train_sample_idx_path is not None: - sample_idx_filename = args.train_sample_idx_path - if args.train_shuffle_idx_path is not None: - shuffle_idx_filename = args.train_shuffle_idx_path - # Build the indexed mapping if not exist. if is_rank_0(): if (not os.path.isfile(doc_idx_filename)) or \ - (not os.path.isfile(sample_idx_filename)) or \ - (not os.path.isfile(shuffle_idx_filename)): + (not os.path.isfile(sample_idx_filename)) or \ + (not os.path.isfile(shuffle_idx_filename)): print_rank_0(' > WARNING: could not find index map files, building ' 'the indices on rank 0 ...') diff --git a/ascendspeed/data/indexed_dataset.py b/ascendspeed/data/indexed_dataset.py index e62d1987e..320c6a096 100644 --- a/ascendspeed/data/indexed_dataset.py +++ b/ascendspeed/data/indexed_dataset.py @@ -11,6 +11,7 @@ # An empty sentence no longer separates documents. # Some of the fixes/improvements are adopted from +# https://github.com/bigscience-workshop/AscendSpeed/blob/main/megatron/data/indexed_dataset.py from functools import lru_cache import os diff --git a/ascendspeed/global_vars.py b/ascendspeed/global_vars.py index 6e63b88b8..27ec1f28e 100644 --- a/ascendspeed/global_vars.py +++ b/ascendspeed/global_vars.py @@ -17,7 +17,6 @@ import os import sys -import time import torch @@ -25,6 +24,8 @@ from ascendspeed.tokenizer import build_tokenizer from .arguments import parse_args from .microbatches import build_num_microbatches_calculator from deepspeed.accelerator import get_accelerator +from .timers import Timers + _GLOBAL_ARGS = None _GLOBAL_NUM_MICROBATCHES_CALCULATOR = None _GLOBAL_TOKENIZER = None @@ -83,11 +84,11 @@ def set_global_variables(extra_args_provider=None, args_defaults={}, defaults=args_defaults, ignore_unknown_args=ignore_unknown_args) _build_num_microbatches_calculator(args) - if args.vocab_file: + if args.vocab_file or args.tokenizer_name_or_path: _ = _build_tokenizer(args) _set_tensorboard_writer(args) _set_adlr_autoresume(args) - _set_timers() + _set_timers(args) def _parse_args(extra_args_provider=None, defaults={}, @@ -163,11 +164,11 @@ def _set_adlr_autoresume(args): _GLOBAL_ADLR_AUTORESUME = AutoResume -def _set_timers(): +def _set_timers(args): """Initialize timers.""" global _GLOBAL_TIMERS _ensure_var_is_not_initialized(_GLOBAL_TIMERS, 'timers') - _GLOBAL_TIMERS = Timers() + _GLOBAL_TIMERS = Timers(args.timing_log_level, args.timing_log_option) def _ensure_var_is_initialized(var, name): @@ -177,86 +178,4 @@ def _ensure_var_is_initialized(var, name): def _ensure_var_is_not_initialized(var, name): """Make sure the input variable is not None.""" - assert var is None, '{} is already initialized.'.format(name) - - -class _Timer: - """Timer.""" - - def __init__(self, name): - self.name_ = name - self.elapsed_ = 0.0 - self.started_ = False - self.start_time = time.time() - - def start(self): - """Start the timer.""" - assert not self.started_, 'timer has already been started' - get_accelerator().synchronize() - self.start_time = time.time() - self.started_ = True - - def stop(self): - """Stop the timer.""" - assert self.started_, 'timer is not started' - get_accelerator().synchronize() - self.elapsed_ += (time.time() - self.start_time) - self.started_ = False - - def reset(self): - """Reset timer.""" - self.elapsed_ = 0.0 - self.started_ = False - - def elapsed(self, reset=True): - """Calculate the elapsed time.""" - started_ = self.started_ - # If the timing in progress, end it first. - if self.started_: - self.stop() - # Get the elapsed time. - elapsed_ = self.elapsed_ - # Reset the elapsed time - if reset: - self.reset() - # If timing was in progress, set it back. - if started_: - self.start() - return elapsed_ - - -class Timers: - """Group of timers.""" - - def __init__(self): - self.timers = {} - - def __call__(self, name): - if name not in self.timers: - self.timers[name] = _Timer(name) - return self.timers[name] - - def write(self, names, writer, iteration, normalizer=1.0, reset=False): - """Write timers to a tensorboard writer""" - # currently when using add_scalars, - # torch.utils.add_scalars makes each timer its own run, which - # polutes the runs list, so we just add each as a scalar - assert normalizer > 0.0 - for name in names: - value = self.timers[name].elapsed(reset=reset) / normalizer - writer.add_scalar(name + '-time', value, iteration) - - def log(self, names, normalizer=1.0, reset=True): - """Log a group of timers.""" - assert normalizer > 0.0 - string = 'time (ms)' - for name in names: - elapsed_time = self.timers[name].elapsed( - reset=reset) * 1000.0 / normalizer - string += ' | {}: {:.2f}'.format(name, elapsed_time) - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == ( - torch.distributed.get_world_size() - 1): - print(string, flush=True) - else: - print(string, flush=True) + assert var is None, '{} is already initialized.'.format(name) \ No newline at end of file diff --git a/ascendspeed/model/__init__.py b/ascendspeed/model/__init__.py index 3033f6a48..22b822a7b 100644 --- a/ascendspeed/model/__init__.py +++ b/ascendspeed/model/__init__.py @@ -22,3 +22,4 @@ from .gpt_model import GPTModel, GPTModelPipe from .llama_model import LlamaModel, LlamaModelPipe from .language_model import get_language_model from .module import Float16Module +from .enums import ModelType diff --git a/ascendspeed/model/enums.py b/ascendspeed/model/enums.py index b6992fefa..c4f4e27f0 100644 --- a/ascendspeed/model/enums.py +++ b/ascendspeed/model/enums.py @@ -15,6 +15,10 @@ import enum +class ModelType(enum.Enum): + encoder_or_decoder = 1 + encoder_and_decoder = 2 + class LayerType(enum.Enum): encoder = 1 decoder = 2 @@ -25,4 +29,6 @@ class AttnType(enum.Enum): class AttnMaskType(enum.Enum): padding = 1 - causal = 2 + causal = 2 # Overrides `attention_mask` to be a lower triangular matrix + prefix = 3 + custom = 4 # Forces one to pass an `attention_mask` that's 1 if we need to mask. Tensor that can be broadcast to [micro_batch_size, n_head, seq_length, seq_length] diff --git a/ascendspeed/model/fused_layer_norm.py b/ascendspeed/model/fused_layer_norm.py index c7af4df47..f6e47098f 100644 --- a/ascendspeed/model/fused_layer_norm.py +++ b/ascendspeed/model/fused_layer_norm.py @@ -18,12 +18,19 @@ with some changes. """ import numbers +from ascendspeed.mpu.utils import make_viewless_tensor import torch from torch.nn.parameter import Parameter from torch.nn import init import importlib from torch.nn import functional as F +try: + from apex.contrib.layer_norm.layer_norm import FastLayerNormFN + HAVE_PERSIST_LAYER_NORM = True +except: + HAVE_PERSIST_LAYER_NORM = False + global fused_mix_prec_layer_norm_cuda fused_mix_prec_layer_norm_cuda = None @@ -62,21 +69,30 @@ class FusedLayerNormAffineFunction(torch.autograd.Function): class MixedFusedLayerNorm(torch.nn.Module): - def __init__(self, normalized_shape, eps=1e-5): - super(MixedFusedLayerNorm, self).__init__() - - if isinstance(normalized_shape, numbers.Integral): - normalized_shape = (normalized_shape,) - self.normalized_shape = torch.Size(normalized_shape) - self.eps = eps - self.weight = Parameter(torch.Tensor(*normalized_shape)) - self.bias = Parameter(torch.Tensor(*normalized_shape)) - self.reset_parameters() - + def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True, sequence_parallel=False): + super(MixedFusedLayerNorm, self).__init__() + if isinstance(normalized_shape, numbers.Integral): + normalized_shape = (normalized_shape,) + self.normalized_shape = torch.Size(normalized_shape) + self.eps = eps + self.weight = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape)) + self.bias = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape)) + self.reset_parameters() + self.no_persist_layer_norm = True + self.sequence_parallel = sequence_parallel + + # set sequence parallelism flag on weight and bias parameters + setattr(self.weight, 'sequence_parallel', self.sequence_parallel) + setattr(self.bias, 'sequence_parallel', self.sequence_parallel) def reset_parameters(self): init.ones_(self.weight) init.zeros_(self.bias) def forward(self, input): - return torch.nn.functional.layer_norm(input, self.normalized_shape, self.weight, self.bias, self.eps) + if self.no_persist_layer_norm: + return torch.nn.functional.layer_norm(input, self.normalized_shape, self.weight, self.bias, self.eps) + else: + output = FastLayerNormFN.apply(input, self.weight, self.bias, self.eps) + output = make_viewless_tensor(inp=output, requires_grad=input.requires_grad, keep_graph=True) + return output \ No newline at end of file diff --git a/ascendspeed/timers.py b/ascendspeed/timers.py new file mode 100644 index 000000000..03810cbd2 --- /dev/null +++ b/ascendspeed/timers.py @@ -0,0 +1,283 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. + +"""Megatron timers.""" + +from abc import ABC +from abc import abstractmethod +import time + +import torch + + + +class TimerBase(ABC): + + def __init__(self, name): + self.name = name + + @abstractmethod + def start(self, barrier=False): + pass + + @abstractmethod + def stop(self, barrier=False): + pass + + @abstractmethod + def reset(self): + pass + + @abstractmethod + def elapsed(self, reset=True, barrier=False): + pass + + + +class DummyTimer(TimerBase): + + def __init__(self): + super().__init__('dummy timer') + + def start(self, barrier=False): + return + + def stop(self, barrier=False): + return + + def reset(self): + return + + def elapsed(self, reset=True, barrier=False): + raise Exception('dummy timer should not be used to ' + 'calculate elapsed time') + + + +class Timer(TimerBase): + """ + Comment on using `barrier`: If this flag is passed, then all + the caller processes will wait till all reach the timing routine. + It is up to the user to make sure all the ranks in `barrier_group` + call it otherwise, it will result in a hang. + Comment on `barrier_group`: By default it is set to None which + in torch distributed land, it will result in the global communicator. + """ + + def __init__(self, name): + super().__init__(name) + self._elapsed = 0.0 + self._started = False + # Note that None will default to the global process group + self._barrier_group = None + self._start_time = time.time() + + + def set_barrier_group(self, barrier_group): + self._barrier_group = barrier_group + + + def start(self, barrier=False): + """Start the timer.""" + assert not self._started, 'timer has already been started' + if barrier: + torch.distributed.barrier(group=self._barrier_group) + torch.cuda.synchronize() + self._start_time = time.time() + self._started = True + + + def stop(self, barrier=False): + """Stop the timer.""" + assert self._started, 'timer is not started' + if barrier: + torch.distributed.barrier(group=self._barrier_group) + torch.cuda.synchronize() + self._elapsed += (time.time() - self._start_time) + self._started = False + + + def reset(self): + """Reset timer.""" + self._elapsed = 0.0 + self._started = False + + + def elapsed(self, reset=True, barrier=False): + """Calculate the elapsed time.""" + _started = self._started + # If the timing in progress, end it first. + if self._started: + self.stop(barrier=barrier) + # Get the elapsed time. + _elapsed = self._elapsed + # Reset the elapsed time + if reset: + self.reset() + # If timing was in progress, set it back. + if _started: + self.start(barrier=barrier) + return _elapsed + + + +class Timers: + """Group of timers.""" + + def __init__(self, log_level, log_option): + self._log_level = log_level + self._log_option = log_option + self._timers = {} + self._log_levels = {} + self._dummy_timer = DummyTimer() + self._max_log_level = 2 + + + def __call__(self, name, log_level=None): + if name not in self._timers: + self._timers[name] = Timer(name=name) + return self._timers[name] + + + def _get_elapsed_time_all_ranks(self, names, reset, barrier): + """ + Assumptions: + - All the ranks call this function. + - `names` are identical on all ranks. + If the above assumptions are not met, calling this function will + result in hang. + Arguments: + - names: list of timer names + - reset: reset the timer after recording the elapsed time + - barrier: if set, do a global barrier before time measurments + """ + + # First make sure all the callers are in sync. + if barrier: + torch.distributed.barrier() + + world_size = torch.distributed.get_world_size() + rank = torch.distributed.get_rank() + + # Here we can use gather on the rank we want to print the + # timing, however, there is no gather_base support in + # pytorch yet. It is simpler to deal with a single tensor + # and since we are only gathering a small amount of data, + # it should be ok to use all-gather instead of gather. + rank_name_to_time = torch.zeros((world_size, len(names)), + dtype=torch.float, + device=torch.cuda.current_device()) + for i, name in enumerate(names): + if name in self._timers: + # Here we don't need to pass the barrier flag as all + # the processes are already in sync. This avoids the + # issue of different timers having different barrier + # groups inside their class. + rank_name_to_time[rank, i] = self._timers[name].elapsed( + reset=reset) + + # See the note above for why we are not using gather. + torch.distributed._all_gather_base(rank_name_to_time.view(-1), + rank_name_to_time[rank, :].view(-1)) + + return rank_name_to_time + + + def _get_global_min_max_time(self, names, reset, barrier, normalizer): + """Report only min and max times across all ranks.""" + + rank_name_to_time = self._get_elapsed_time_all_ranks(names, reset, + barrier) + name_to_min_max_time = {} + for i, name in enumerate(names): + rank_to_time = rank_name_to_time[:, i] + # filter out the ones we did not have any timings for + rank_to_time = rank_to_time[rank_to_time > 0.0] + # If the timer exists: + if rank_to_time.numel() > 0: + name_to_min_max_time[name] = ( + rank_to_time.min().item() / normalizer, + rank_to_time.max().item() / normalizer) + return name_to_min_max_time + + + def _get_global_min_max_time_string(self, names, reset, barrier, + normalizer, max_only): + name_to_min_max_time = self._get_global_min_max_time( + names, reset, barrier, normalizer) + if not name_to_min_max_time: + return None + output_string = '(min, max) time across ranks (ms):' + for name in name_to_min_max_time: + min_time, max_time = name_to_min_max_time[name] + if max_only: + output_string += '\n {}: {:.2f}'.format( + (name+' ').ljust(48, '.'), max_time) + else: + output_string += '\n {}: ({:.2f}, {:.2f})'.format( + (name+' ').ljust(48, '.'), min_time, max_time) + return output_string + + + def _get_all_ranks_time_string(self, names, reset, barrier, normalizer): + """Report times across all ranks.""" + rank_name_to_time = self._get_elapsed_time_all_ranks(names, reset, + barrier) + + output_string = 'times across ranks (ms):' + no_reported_timing = True + for i, name in enumerate(names): + not_yet_found = True + for rank in range(torch.distributed.get_world_size()): + if rank_name_to_time[rank, i] > 0: + no_reported_timing = False + if not_yet_found: + not_yet_found = False + output_string += '\n {}:'.format(name) + output_string += '\n rank {:2d}: {:.2f}'.format( + rank, rank_name_to_time[rank, i] / normalizer) + if no_reported_timing: + return None + return output_string + + + def log(self, names, rank=None, normalizer=1.0, reset=True, barrier=False): + """Log a group of timers.""" + + # Print. + assert normalizer > 0.0 + if self._log_option in ['max', 'minmax']: + max_only = False + if self._log_option == 'max': + max_only = True + output_string = self._get_global_min_max_time_string( + names, reset, barrier, normalizer/1000.0, max_only) + elif self._log_option == 'all': + output_string = self._get_all_ranks_time_string(names, + reset, barrier, + normalizer/1000.0) + else: + raise Exception('unknown timing log option {}'.format( + self._log_option)) + + # If no input rank is provided, log on last rank. + if rank is None: + rank = torch.distributed.get_world_size() - 1 + if rank == torch.distributed.get_rank() and output_string is not None: + print(output_string, flush=True) + + + def write(self, names, writer, iteration, normalizer=1.0, + reset=False, barrier=False): + """Write timers to a tensorboard writer + Note that we only report maximum time across ranks to tensorboard. + """ + # currently when using add_scalars, + # torch.utils.add_scalars makes each timer its own run, which + # polutes the runs list, so we just add each as a scalar + assert normalizer > 0.0 + name_to_min_max_time = self._get_global_min_max_time( + names, reset, barrier, normalizer) + if writer is not None: + for name in name_to_min_max_time: + _, max_time = name_to_min_max_time[name] + writer.add_scalar(name + '-time', max_time, iteration) diff --git a/ascendspeed/training.py b/ascendspeed/training.py index 91747429f..e1a0aa190 100644 --- a/ascendspeed/training.py +++ b/ascendspeed/training.py @@ -185,6 +185,8 @@ def pretrain(train_valid_test_dataset_provider, print_rank_0('training ...') iteration = 0 + save_checkpoint(iteration, model, optimizer, lr_scheduler) + if args.do_train and args.train_iters > 0: iteration = train(forward_step_func, model, optimizer, lr_scheduler, @@ -328,7 +330,9 @@ def get_model(model_provider_func): # GPU allocation. for model_module in model: - model_module.to(get_accelerator().current_device_name()) + device_name = get_accelerator().current_device_name() + print_rank_0(f"model to {device_name}") + model_module.to(device_name) # Fp16 conversion. @@ -490,6 +494,7 @@ def setup_model_and_optimizer(model_provider_func, teacher=False, # Number of train/valid/test samples. if args.train_samples: train_samples = args.train_samples + update_train_iters(args) else: train_samples = args.train_iters * args.global_batch_size # eval_iters and test_iters here are not actually used, only for @@ -523,6 +528,7 @@ def setup_model_and_optimizer(model_provider_func, teacher=False, lr_scheduler=lr_scheduler, mpu=mpu if args.no_pipeline_parallel else None ) + assert model.fp16_enabled() == args.fp16, "megatron fp16 config does not match deepspeed" if isinstance(model, deepspeed.PipelineEngine): # hack to get batch_fn from pretrain_gpt.py model.set_batch_fn(model.module._megatron_batch_fn) @@ -729,7 +735,7 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration, timers_to_log = [] def add_to_logging(name): - if name in timers.timers: + if name in timers._timers: timers_to_log.append(name) add_to_logging('forward-compute') add_to_logging('forward-recv') @@ -1274,6 +1280,7 @@ def build_train_valid_test_data_iterators( # Number of train/valid/test samples. if args.train_samples: train_samples = args.train_samples + update_train_iters(args) else: train_samples = args.train_iters * args.global_batch_size eval_iters = (args.train_iters // args.eval_interval + 1) * \ -- Gitee From 712c04cf0c1d9c5786426cb03977a537d5992336 Mon Sep 17 00:00:00 2001 From: wiyr Date: Mon, 12 Jun 2023 11:50:46 +0800 Subject: [PATCH 2/2] added fused softmax, tokenizer, language, utils for bloom --- ascendspeed/__init__.py | 8 +- ascendspeed/arguments.py | 8 +- ascendspeed/checkpointing.py | 1 - ascendspeed/data/gpt_dataset.py | 13 ++- ascendspeed/model/enums.py | 4 +- ascendspeed/model/fused_layer_norm.py | 56 +++++------ ascendspeed/model/fused_softmax.py | 127 +++++++++++++++++++------ ascendspeed/model/language_model.py | 61 +++++++----- ascendspeed/model/llama_model.py | 2 +- ascendspeed/p2p_communication.py | 2 +- ascendspeed/tokenizer/tokenizer.py | 131 ++++++++++++++++++++++++-- ascendspeed/utils.py | 52 ++++++++-- requirements.txt | 3 +- 13 files changed, 357 insertions(+), 111 deletions(-) diff --git a/ascendspeed/__init__.py b/ascendspeed/__init__.py index 75f6ce1a9..3d00f00fb 100644 --- a/ascendspeed/__init__.py +++ b/ascendspeed/__init__.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -import torch import copy -import torch_npu from functools import wraps + +import torch +import torch_npu from torch_npu.contrib import transfer_to_npu from torch import distributed as dist - from deepspeed.accelerator import get_accelerator from .package_info import ( __description__, @@ -68,7 +68,7 @@ def wrapper_dist_long2int(fn): new_args[0] = new_args[0].int() fn(*new_args, **kwargs) args[0].copy_(new_args[0].long()) - return + return None return fn(*args, **kwargs) return wrapper diff --git a/ascendspeed/arguments.py b/ascendspeed/arguments.py index 6941e3762..89c1b8fad 100644 --- a/ascendspeed/arguments.py +++ b/ascendspeed/arguments.py @@ -230,9 +230,10 @@ def parse_args(extra_args_provider=None, defaults={}, assert args.encoder_seq_length is not None args.seq_length = args.encoder_seq_length - if args.position_embedding_type == PositionEmbeddingType.absolute or args.position_embedding_type == PositionEmbeddingType.alibi: + if (args.position_embedding_type == PositionEmbeddingType.absolute or + args.position_embedding_type == PositionEmbeddingType.alibi): assert args.max_position_embeddings is not None - if args.seq_length is not None: + if not args.seq_length: assert args.max_position_embeddings >= args.seq_length if args.decoder_seq_length is not None: assert args.max_position_embeddings >= args.decoder_seq_length @@ -318,7 +319,7 @@ def _add_network_size_args(parser): ' args.hidden_size // args.num_attention_heads ' 'if not provided.') group.add_argument('--embed-layernorm', action='store_true', - help='use layernorm for embedding') + help='Use layernorm for embedding.') group.add_argument('--max-position-embeddings', type=int, default=None, help='Maximum number of position embeddings to use. ' 'This is the size of position embedding.') @@ -782,7 +783,6 @@ def _add_data_args(parser): help='What type of tokenizer to use.') group.add_argument("--tokenizer-name-or-path", type=str, default=None, help="Name or path of the huggingface tokenizer.") - group.add_argument('--data-impl', type=str, default='infer', choices=['lazy', 'cached', 'mmap', 'infer'], help='Implementation of indexed datasets.') diff --git a/ascendspeed/checkpointing.py b/ascendspeed/checkpointing.py index afd488cb6..008e42e72 100644 --- a/ascendspeed/checkpointing.py +++ b/ascendspeed/checkpointing.py @@ -64,7 +64,6 @@ def check_checkpoint_args(checkpoint_args): _compare('num_layers') _compare('hidden_size') _compare('num_attention_heads') - _compare('position_embedding_type') # with alibi we can change `max_position_embeddings` if args.position_embedding_type != PositionEmbeddingType.alibi: diff --git a/ascendspeed/data/gpt_dataset.py b/ascendspeed/data/gpt_dataset.py index 62743177f..9f69bdcd7 100644 --- a/ascendspeed/data/gpt_dataset.py +++ b/ascendspeed/data/gpt_dataset.py @@ -220,11 +220,20 @@ def _build_index_mappings(name, data_prefix, documents, sizes, sample_idx_filename = _filename + '_sample_idx.npy' shuffle_idx_filename = _filename + '_shuffle_idx.npy' + if name == 'train': + # force to use certain index files + if args.train_doc_idx_path is not None: + doc_idx_filename = args.train_doc_idx_path + if args.train_sample_idx_path is not None: + sample_idx_filename = args.train_sample_idx_path + if args.train_shuffle_idx_path is not None: + shuffle_idx_filename = args.train_shuffle_idx_path + # Build the indexed mapping if not exist. if is_rank_0(): if (not os.path.isfile(doc_idx_filename)) or \ - (not os.path.isfile(sample_idx_filename)) or \ - (not os.path.isfile(shuffle_idx_filename)): + (not os.path.isfile(sample_idx_filename)) or \ + (not os.path.isfile(shuffle_idx_filename)): print_rank_0(' > WARNING: could not find index map files, building ' 'the indices on rank 0 ...') diff --git a/ascendspeed/model/enums.py b/ascendspeed/model/enums.py index c4f4e27f0..4f8f9a396 100644 --- a/ascendspeed/model/enums.py +++ b/ascendspeed/model/enums.py @@ -31,4 +31,6 @@ class AttnMaskType(enum.Enum): padding = 1 causal = 2 # Overrides `attention_mask` to be a lower triangular matrix prefix = 3 - custom = 4 # Forces one to pass an `attention_mask` that's 1 if we need to mask. Tensor that can be broadcast to [micro_batch_size, n_head, seq_length, seq_length] + # Forces one to pass an `attention_mask` that's 1 if we need to mask. + # Tensor that can be broadcast to [micro_batch_size, n_head, seq_length, seq_length] + custom = 4 diff --git a/ascendspeed/model/fused_layer_norm.py b/ascendspeed/model/fused_layer_norm.py index f6e47098f..94f9e7b16 100644 --- a/ascendspeed/model/fused_layer_norm.py +++ b/ascendspeed/model/fused_layer_norm.py @@ -28,7 +28,7 @@ from torch.nn import functional as F try: from apex.contrib.layer_norm.layer_norm import FastLayerNormFN HAVE_PERSIST_LAYER_NORM = True -except: +except ImportError: HAVE_PERSIST_LAYER_NORM = False global fused_mix_prec_layer_norm_cuda @@ -69,30 +69,30 @@ class FusedLayerNormAffineFunction(torch.autograd.Function): class MixedFusedLayerNorm(torch.nn.Module): - def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True, sequence_parallel=False): - super(MixedFusedLayerNorm, self).__init__() - if isinstance(normalized_shape, numbers.Integral): - normalized_shape = (normalized_shape,) - self.normalized_shape = torch.Size(normalized_shape) - self.eps = eps - self.weight = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape)) - self.bias = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape)) - self.reset_parameters() - self.no_persist_layer_norm = True - self.sequence_parallel = sequence_parallel - - # set sequence parallelism flag on weight and bias parameters - setattr(self.weight, 'sequence_parallel', self.sequence_parallel) - setattr(self.bias, 'sequence_parallel', self.sequence_parallel) - - def reset_parameters(self): - init.ones_(self.weight) - init.zeros_(self.bias) - - def forward(self, input): - if self.no_persist_layer_norm: - return torch.nn.functional.layer_norm(input, self.normalized_shape, self.weight, self.bias, self.eps) - else: - output = FastLayerNormFN.apply(input, self.weight, self.bias, self.eps) - output = make_viewless_tensor(inp=output, requires_grad=input.requires_grad, keep_graph=True) - return output \ No newline at end of file + def __init__(self, normalized_shape, eps=1e-5, no_persist_layer_norm=True, sequence_parallel=False): + super(MixedFusedLayerNorm, self).__init__() + if isinstance(normalized_shape, numbers.Integral): + normalized_shape = (normalized_shape,) + self.normalized_shape = torch.Size(normalized_shape) + self.eps = eps + self.weight = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape)) + self.bias = torch.nn.parameter.Parameter(torch.Tensor(*normalized_shape)) + self.reset_parameters() + self.no_persist_layer_norm = True + self.sequence_parallel = sequence_parallel + + # set sequence parallelism flag on weight and bias parameters + setattr(self.weight, 'sequence_parallel', self.sequence_parallel) + setattr(self.bias, 'sequence_parallel', self.sequence_parallel) + + def reset_parameters(self): + init.ones_(self.weight) + init.zeros_(self.bias) + + def forward(self, input): + if self.no_persist_layer_norm: + return torch.nn.functional.layer_norm(input, self.normalized_shape, self.weight, self.bias, self.eps) + else: + output = FastLayerNormFN.apply(input, self.weight, self.bias, self.eps) + output = make_viewless_tensor(inp=output, requires_grad=input.requires_grad, keep_graph=True) + return output \ No newline at end of file diff --git a/ascendspeed/model/fused_softmax.py b/ascendspeed/model/fused_softmax.py index d8328ebb2..2ba027b5a 100644 --- a/ascendspeed/model/fused_softmax.py +++ b/ascendspeed/model/fused_softmax.py @@ -15,10 +15,88 @@ import torch import torch_npu +import torch.nn as nn from ascendspeed.model.enums import AttnMaskType -class NPUFusedScaleMaskSoftmax(torch.nn.Module): +class ScaledUpperTriangMaskedSoftmax(torch.autograd.Function): + """ + Fused operation which performs following three operations in sequence + 1. Scale the tensor. + 2. Apply upper triangular mask (typically used in gpt models). + 3. Perform softmax. + """ + + @staticmethod + def forward(ctx, inputs, scale): + import scaled_upper_triang_masked_softmax_cuda + + scale_t = torch.tensor([scale]) + + softmax_results = scaled_upper_triang_masked_softmax_cuda.forward( + inputs, scale_t[0] + ) + ctx.save_for_backward(softmax_results, scale_t) + return softmax_results + + @staticmethod + def backward(ctx, output_grads): + import scaled_upper_triang_masked_softmax_cuda + + softmax_results, scale_t = ctx.saved_tensors + + input_grads = scaled_upper_triang_masked_softmax_cuda.backward( + output_grads, softmax_results, scale_t[0] + ) + return input_grads, None + + +class ScaledMaskedSoftmax(torch.autograd.Function): + """ + Fused operation which performs following three operations in sequence + 1. Scale the tensor. + 2. Apply the mask. + 3. Perform softmax. + """ + + @staticmethod + def forward(ctx, inputs, mask, scale): + import scaled_masked_softmax_cuda + + scale_t = torch.tensor([scale]) + + softmax_results = scaled_masked_softmax_cuda.forward( + inputs, mask, scale_t[0] + ) + ctx.save_for_backward(softmax_results, scale_t) + return softmax_results + + @staticmethod + def backward(ctx, output_grads): + import scaled_masked_softmax_cuda + + softmax_results, scale_t = ctx.saved_tensors + + input_grads = scaled_masked_softmax_cuda.backward( + output_grads, softmax_results, scale_t[0] + ) + return input_grads, None, None + + +class NPUFusedScaleMaskSoftmax(nn.Module): + """ + fused operation: scaling + mask + softmax + + Arguments: + input_in_fp16: flag to indicate if input in fp16 data format. + input_in_bf16: flag to indicate if input in bf16 data format. + attn_mask_type: attention mask type (pad or causal) + scaled_masked_softmax_fusion: flag to indicate user want to use softmax fusion + mask_func: mask function to be applied. + softmax_in_fp32: if true, softmax in performed at fp32 precision. + scale: scaling factor used in input tensor scaling. + """ + def __init__( self, input_in_fp16, @@ -28,7 +106,7 @@ class NPUFusedScaleMaskSoftmax(torch.nn.Module): mask_func, softmax_in_fp32, scale, - ): + ): super(NPUFusedScaleMaskSoftmax, self).__init__() self.input_in_fp16 = input_in_fp16 self.input_in_bf16 = input_in_bf16 @@ -41,8 +119,6 @@ class NPUFusedScaleMaskSoftmax(torch.nn.Module): self.mask_func = mask_func self.softmax_in_fp32 = softmax_in_fp32 self.scale = scale - self.mask_tri = None - p = torch.npu.get_device_properties(0) if torch.npu.is_available() else None assert ( self.scale is None or softmax_in_fp32 @@ -52,28 +128,22 @@ class NPUFusedScaleMaskSoftmax(torch.nn.Module): # [b, np, sq, sk] assert input_.dim() == 4 - if torch.npu.is_available(): + if self.is_kernel_available(mask, *input_.size()): return self.npu_forward_fused_softmax(input_, mask) - - return self.npu_forward_torch_softmax(input_, mask) - - def npu_forward_fused_softmax(self, input_, mask): - if self.softmax_in_fp32: - input_ = input_.float() - - if self.scale is None: - self.scale = 1.0 - - if self.attn_mask_type == AttnMaskType.causal: - if self.mask_tri is None: - self.mask_tri = torch.triu(torch.ones(input_.shape, device=input_.device), diagonal=1).bool() - probs = torch_npu.npu_scaled_masked_softmax(input_, self.mask_tri, self.scale, False) else: - probs = torch_npu.npu_scaled_masked_softmax(input_, mask, self.scale, False) + return self.npu_forward_torch_softmax(input_, mask) - probs = probs.half() + def is_kernel_available(self, mask, b, np, sq, sk): + return ( + self.scaled_masked_softmax_fusion # user want to fuse + and self.input_in_float16 # input must be fp16 + and 32 < sk <= 2048 # sk must be 32 ~ 2048 + and sq % 16 == 0 # sq must be divisor of 16 + and sk % 16 == 0 # sk must be divisor of 16 + ) - return probs + def npu_forward_fused_softmax(self, input_, mask): + return torch_npu.npu_scaled_masked_softmax(input_, mask, self.scale, False) def npu_forward_torch_softmax(self, input_, mask): if self.input_in_float16 and self.softmax_in_fp32: @@ -81,14 +151,15 @@ class NPUFusedScaleMaskSoftmax(torch.nn.Module): if self.scale is not None: input_ = input_ * self.scale - - if self.attn_mask_type == AttnMaskType.causal: - mask_tri = torch.triu(torch.ones(input_.shape, device=input_.device), diagonal=1).bool() - mask_output = self.mask_func(input_, mask_tri) - else: - mask_output = self.mask_func(input_, mask) if mask is not None else input_ + mask_output = self.mask_func(input_, mask) if mask is not None else input_ probs = torch.nn.Softmax(dim=-1)(mask_output) if self.input_in_float16 and self.softmax_in_fp32: probs = probs.half() if self.input_in_fp16 else probs.bfloat16() return probs + + @staticmethod + def get_batch_per_block(sq, sk, b, np): + import scaled_masked_softmax_cuda + + return scaled_masked_softmax_cuda.get_batch_per_block(sq, sk, b, np) \ No newline at end of file diff --git a/ascendspeed/model/language_model.py b/ascendspeed/model/language_model.py index 99e1e9b82..cecc1d10f 100644 --- a/ascendspeed/model/language_model.py +++ b/ascendspeed/model/language_model.py @@ -20,9 +20,9 @@ import torch.nn.functional as F from ascendspeed import get_args from ascendspeed import mpu -from .module import MegatronModule - +from ascendspeed.enums import PositionEmbeddingType from ascendspeed.model.enums import LayerType, AttnMaskType +from ascendspeed.model.module import MegatronModule from ascendspeed.model.transformer import ParallelTransformer from ascendspeed.model.utils import get_linear_layer from ascendspeed.model.utils import init_method_normal, scaled_init_method_normal @@ -139,11 +139,17 @@ class Embedding(MegatronModule): self._word_embeddings_key = 'word_embeddings' # Position embedding (serial). - self.position_embeddings = torch.nn.Embedding( - max_sequence_length, self.hidden_size) - self._position_embeddings_key = 'position_embeddings' - # Initialize the position embeddings. - self.init_method(self.position_embeddings.weight) + self.position_embedding_type = args.position_embedding_type + if self.position_embedding_type == PositionEmbeddingType.absolute: + max_position_embeddings = args.max_position_embeddings + assert max_position_embeddings is not None + self.position_embeddings = torch.nn.Embedding( + max_position_embeddings, self.hidden_size) + self._position_embeddings_key = 'position_embeddings' + # Initialize the position embeddings. + self.init_method(self.position_embeddings.weight) + else: + self.position_embeddings = None # Token type embedding. # Add this as an optional field that can be added through @@ -181,8 +187,14 @@ class Embedding(MegatronModule): def forward(self, input_ids, position_ids, tokentype_ids=None): # Embeddings. words_embeddings = self.word_embeddings(input_ids) - position_embeddings = self.position_embeddings(position_ids) - embeddings = words_embeddings + position_embeddings + embeddings = words_embeddings + + if self.position_embedding_type == PositionEmbeddingType.absolute: + assert self.position_embeddings is not None + embeddings = embeddings + self.position_embeddings(position_ids) + else: + assert self.position_embeddings is None + if tokentype_ids is not None: assert self.tokentype_embeddings is not None embeddings = embeddings + self.tokentype_embeddings(tokentype_ids) @@ -201,9 +213,10 @@ class Embedding(MegatronModule): state_dict_ = {} state_dict_[self._word_embeddings_key] \ = self.word_embeddings.state_dict(destination, prefix, keep_vars) - state_dict_[self._position_embeddings_key] \ - = self.position_embeddings.state_dict( - destination, prefix, keep_vars) + if self.position_embeddings == PositionEmbeddingType.absolute: + state_dict_[self._position_embeddings_key] \ + = self.position_embeddings.state_dict( + destination, prefix, keep_vars) if self.num_tokentypes > 0: state_dict_[self._tokentype_embeddings_key] \ = self.tokentype_embeddings.state_dict( @@ -227,16 +240,17 @@ class Embedding(MegatronModule): self.word_embeddings.load_state_dict(state_dict_, strict=strict) # Position embedding. - if self._position_embeddings_key in state_dict: - state_dict_ = state_dict[self._position_embeddings_key] - else: - # for backward compatibility. - state_dict_ = {} - for key in state_dict.keys(): - if 'position_embeddings' in key: - state_dict_[key.split('position_embeddings.')[1]] \ - = state_dict[key] - self.position_embeddings.load_state_dict(state_dict_, strict=strict) + if self.position_embedding_type == PositionEmbeddingType.absolute: + if self._position_embeddings_key in state_dict: + state_dict_ = state_dict[self._position_embeddings_key] + else: + # for backward compatibility. + state_dict_ = {} + for key in state_dict.keys(): + if 'position_embeddings' in key: + state_dict_[key.split('position_embeddings.')[1]] \ + = state_dict[key] + self.position_embeddings.load_state_dict(state_dict_, strict=strict) # Tokentype embedding. if self.num_tokentypes > 0: @@ -256,7 +270,6 @@ class Embedding(MegatronModule): print('***WARNING*** expected tokentype embeddings in the ' 'checkpoint but could not find it', flush=True) - class EmbeddingPipe(Embedding): def forward(self, inputs, **kwargs): @@ -274,7 +287,7 @@ class EmbeddingPipe(Embedding): tokentype_ids = inputs[3] else: tokentype_ids = None - + embeddings = super().forward(input_ids, position_ids, tokentype_ids=tokentype_ids) # If cmd args has attn_mask, we don't forward it as an activation. diff --git a/ascendspeed/model/llama_model.py b/ascendspeed/model/llama_model.py index eaccb0590..af5b70541 100644 --- a/ascendspeed/model/llama_model.py +++ b/ascendspeed/model/llama_model.py @@ -953,4 +953,4 @@ class LlamaModel(MegatronModule): loss = mpu.vocab_parallel_cross_entropy(hidden_states.float(), labels) return loss - return hidden_states + return hidden_states \ No newline at end of file diff --git a/ascendspeed/p2p_communication.py b/ascendspeed/p2p_communication.py index 501cab6d2..4406d5736 100644 --- a/ascendspeed/p2p_communication.py +++ b/ascendspeed/p2p_communication.py @@ -261,4 +261,4 @@ def send_forward_backward_recv_forward_backward( recv_next=recv_next) if timers is not None: timers('forward-backward-send-forward-backward-recv').stop() - return input_tensor, output_tensor_grad + return input_tensor, output_tensor_grad \ No newline at end of file diff --git a/ascendspeed/tokenizer/tokenizer.py b/ascendspeed/tokenizer/tokenizer.py index 13085a81c..3787d039d 100644 --- a/ascendspeed/tokenizer/tokenizer.py +++ b/ascendspeed/tokenizer/tokenizer.py @@ -17,7 +17,7 @@ from abc import ABC from abc import abstractmethod - +from transformers import AutoTokenizer from .bert_tokenization import FullTokenizer as FullBertTokenizer from .gpt2_tokenization import GPT2Tokenizer @@ -29,7 +29,7 @@ def build_tokenizer(args): flush=True) # Select and instantiate the tokenizer. - assert args.vocab_file is not None + assert args.vocab_file is not None or args.tokenizer_type == 'PretrainedFromHF' if args.tokenizer_type == 'BertWordPieceLowerCase': tokenizer = _BertWordPieceTokenizer(vocab_file=args.vocab_file, lower_case=True, @@ -41,6 +41,21 @@ def build_tokenizer(args): elif args.tokenizer_type == 'GPT2BPETokenizer': assert args.merge_file is not None tokenizer = _GPT2BPETokenizer(args.vocab_file, args.merge_file) + elif args.tokenizer_type == "PretrainedFromHF": + assert args.tokenizer_name_or_path is not None + + # prevent transformers from logging info and warnings on each rank + import transformers + import logging + if args.rank == 0: + transformers.utils.logging.set_verbosity(logging.INFO) + else: + # shut the warnings on replicas + transformers.utils.logging.set_verbosity(logging.ERROR) + + if args.rank == 0: + print(" vocab file is un-used. loading tokenizer from pre-trained model") + tokenizer = _AutoTokenizer(args.tokenizer_name_or_path, vocab_extra_ids=args.vocab_extra_ids) else: raise NotImplementedError('{} tokenizer is not ' 'implemented.'.format(args.tokenizer_type)) @@ -53,14 +68,26 @@ def build_tokenizer(args): def _vocab_size_with_padding(orig_vocab_size, args): - """Pad vocab size so it is divisible by model parallel size and - still having GPU friendly size.""" - - after = orig_vocab_size - multiple = args.make_vocab_size_divisible_by * \ - args.tensor_model_parallel_size - while (after % multiple) != 0: - after += 1 + """Apply the requested rules to change the size of the vocabulary""" + if args.pad_vocab_size_to is not None: + if args.pad_vocab_size_to < orig_vocab_size: + raise ValueError( + f"You asked to pad the vocabulary to {args.pad_vocab_size_to} when the initial vocabulary size is " + f"{orig_vocab_size}. You can only pad to a higher value." + ) + + if args.make_vocab_size_divisible_by is not None and \ + (args.pad_vocab_size_to % args.make_vocab_size_divisible_by) != 0: + raise ValueError(f"{args.pad_vocab_size_to} is not divisible by {args.make_vocab_size_divisible_by}") + + after = args.pad_vocab_size_to + else: + # Pad vocab size so it is divisible by model parallel size and still having GPU friendly size. + after = orig_vocab_size + multiple = args.make_vocab_size_divisible_by * \ + args.tensor_model_parallel_size + while (after % multiple) != 0: + after += 1 if args.rank == 0: print(' > padded vocab (size: {}) with {} dummy tokens ' '(new size: {})'.format( @@ -289,3 +316,87 @@ class _GPT2BPETokenizer(AbstractTokenizer): @property def eod(self): return self.eod_id + + +class _AutoTokenizer(AbstractTokenizer): + """AutoTokenizer for Hf Pretrained model loading.""" + + def __init__(self, tokenizer_name_or_path, vocab_extra_ids): + name = tokenizer_name_or_path + super().__init__(name) + hf_tokenizer_kwargs = {} + if vocab_extra_ids > 0: + # TODO @thomasw21 we might need to concatenate to a pre-existing list? + hf_tokenizer_kwargs["additional_special_tokens"] = [f"" for _id in range(vocab_extra_ids)] + self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name_or_path, **hf_tokenizer_kwargs) + self.encoder = self.tokenizer.get_vocab() + self.decoder = {v: k for k, v in self.encoder.items()} + + @property + def vocab_size(self): + return len(self.tokenizer) # vocab_size doesn't contain additional tokens + + @property + def vocab(self): + # TODO @thomasw21 make sure that special tokens don't collapse with vocab tokens. + return { + **{special_token: self.tokenizer.convert_tokens_to_ids(special_token) + for special_token in self.tokenizer.additional_special_tokens}, + **self.tokenizer.vocab, + } + + @property + def inv_vocab(self): + return {v: k for k, v in self.vocab.items()} + + def tokenize(self, text): + return self.tokenizer.encode(text) + + def detokenize(self, token_ids): + return self.tokenizer.decode(token_ids) + + @property + def eod(self): + # TODO @thomasw21 might conflict with + return self.eos + + @property + def cls(self): + candidate = self.tokenizer.cls_token_id + return self._check_token_candidate(candidate) + + @property + def sep(self): + candidate = self.tokenizer.sep_token_id + return self._check_token_candidate(candidate) + + @property + def pad(self): + candidate = self.tokenizer.pad_token_id + return self._check_token_candidate(candidate) + + @property + def mask(self): + candidate = self.tokenizer.mask_token_id + return self._check_token_candidate(candidate) + + @property + def bos(self): + raise NotImplementedError("Missing ") + + @property + def eos(self): + # TODO @thomasw21 might conflict with the notion of + candidate = self.tokenizer.eos_token_id + return self._check_token_candidate(candidate) + + @property + def additional_special_tokens_ids(self): + """ All the additional special tokens you may want to use (list of strings).""" + return self.tokenizer.additional_special_tokens_ids + + @staticmethod + def _check_token_candidate(candidate): + if candidate is None: + raise AttributeError("Token doesn't exist") + return candidate \ No newline at end of file diff --git a/ascendspeed/utils.py b/ascendspeed/utils.py index 6382b132c..f168d5d41 100644 --- a/ascendspeed/utils.py +++ b/ascendspeed/utils.py @@ -33,6 +33,7 @@ from ascendspeed.model.module import param_is_not_shared from ascendspeed.mpu.layers import param_is_not_tensor_parallel_duplicate from ascendspeed import get_num_microbatches from deepspeed.accelerator import get_accelerator + def unwrap_model(model, module_instances=(torchDDP)): return_list = True if not isinstance(model, list): @@ -155,14 +156,23 @@ def get_ltor_masks_and_position_ids(data, eod_token, reset_position_ids, reset_attention_mask, - eod_mask_loss): - """Build masks and position id for left to right model.""" + eod_mask_loss, + prefix_indices=None, + loss_on_targets_only=False): + """ + Build masks and position id for left to right model. + :param prefix_indices: argument can have multiple types: + - None signifies that the model is fully autoregressive. + - List[int] the argument holds all prefix indices that split a row into an input and a target + - List[List[int]] the argument holds all prefix indices that split documents between input and target. + :param loss_on_targets_only: bool to determine if we should mask loss on prefix. + """ # Extract batch size and sequence length. micro_batch_size, seq_length = data.size() # Attention mask (lower triangular). - if reset_attention_mask: + if reset_attention_mask or prefix_indices is not None: att_mask_batch = micro_batch_size else: att_mask_batch = 1 @@ -183,12 +193,20 @@ def get_ltor_masks_and_position_ids(data, if reset_position_ids: position_ids = position_ids.clone() - if reset_position_ids or reset_attention_mask: + if reset_position_ids or reset_attention_mask or prefix_indices is not None: # Loop through the batches: for b in range(micro_batch_size): # Find indecies where EOD token is. eod_index = position_ids[b, data[b] == eod_token] + + # If the last eod token is not the last token of the sequence, we suppose that there is a partial document + # We treat this case as if we add an eod token at the end of the sequence. + if data[b][-1] != eod_token: + eod_index = torch.cat( + (eod_index, torch.tensor([len(data[b])], dtype=eod_index.dtype, device=eod_index.device)) + ) + # Detach indecies from positions if going to modify positions. if reset_position_ids: eod_index = eod_index.clone() @@ -197,13 +215,35 @@ def get_ltor_masks_and_position_ids(data, prev_index = 0 for j in range(eod_index.size()[0]): i = eod_index[j] - # Mask attention loss. + if reset_attention_mask: + # Prevent cross document interactions. attention_mask[b, 0, (i + 1):, :(i + 1)] = 0 + + # Prefix lm per document. + if prefix_indices: + assert isinstance(prefix_indices[b], list), \ + (f"prefix for a row has to be document specific, " + "and consequently return a list, got {prefix_indices[b]}") + attention_mask[b, 0, prev_index: prefix_indices[b][j], prev_index: prefix_indices[b][j]] = 1 + if loss_on_targets_only: + # Last token of the prefix should predict the prefix_index id + loss_mask[b, prev_index: prefix_indices[b][j] - 1] = 0.0 + # Reset positions. if reset_position_ids: position_ids[b, (i + 1):] -= (i + 1 - prev_index) - prev_index = i + 1 + + prev_index = i + 1 + + # Prefix lm per row. + if prefix_indices is not None and (reset_attention_mask is False): + assert isinstance(prefix_indices[b], int), \ + f"prefix for a row has to be row specific, and consequently return an int, got {prefix_indices[b]}" + attention_mask[b, 0, :prefix_indices[b], :prefix_indices[b]] = 1 + if loss_on_targets_only: + # Last token of the prefix should predict the prefix_index id + loss_mask[b, :prefix_indices[b] - 1] = 0.0 # Convert attention mask to binary: attention_mask = (attention_mask < 0.5) diff --git a/requirements.txt b/requirements.txt index 03e58aec2..0e2c615a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ scipy attrs expecttest pytest -wrapt_timeout_decorator \ No newline at end of file +wrapt_timeout_decorator +transformers \ No newline at end of file -- Gitee