From 86dba34669abf69c62726600fad9502947fd8dbc Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 29 Jun 2022 12:26:17 +0500 Subject: [PATCH 01/29] add tensor parallelism --- megatron/model/transformer.py | 19 +++++++-- megatron/mpu/__init__.py | 4 ++ megatron/mpu/initialize.py | 8 ++++ megatron/mpu/layers.py | 31 ++++++++++++-- megatron/mpu/mappings.py | 78 ++++++++++++++++++++++++++++++++++- 5 files changed, 132 insertions(+), 8 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 06a10b8f941..5abe42c956c 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -117,7 +117,8 @@ class ParallelAttention(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=AttnMaskType.padding): + attn_mask_type=AttnMaskType.padding, + reduce_scatter=False): super(ParallelAttention, self).__init__() args = get_args() self.fp16 = args.fp16 @@ -188,7 +189,8 @@ def __init__(self, init_method, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True) + skip_bias_add=True, + reduce_scatter=reduce_scatter) if deepspeed.checkpointing.is_configured(): global get_cuda_rng_tracker, checkpoint @@ -422,7 +424,8 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=self_attn_mask_type) + attn_mask_type=self_attn_mask_type, + reduce_scatter = (num_experts > 1)) self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion @@ -503,6 +506,10 @@ def forward(self, hidden_states, attention_mask, bias_dropout_add_func = get_bias_dropout_add(self.training) # re-enable torch grad to enable fused optimization. + + if self.num_experts > 1: + residual = mpu.get_residual_for_reduce_scatter(residual) + with torch.enable_grad(): layernorm_input = bias_dropout_add_func( attention_output, @@ -550,6 +557,9 @@ def forward(self, hidden_states, attention_mask, else: residual = layernorm_input + # if self.num_experts > 1: + # residual = mpu.get_residual_for_reduce_scatter(residual) + # re-enable torch grad to enable fused optimization. with torch.enable_grad(): #if self.num_experts <= 1: @@ -561,6 +571,9 @@ def forward(self, hidden_states, attention_mask, #else: # output = mlp_output + residual + if self.num_experts > 1: + output = mpu.all_gather_from_tensor_model_parallel_region(output) + if get_key_value: output = [output, presents] diff --git a/megatron/mpu/__init__.py b/megatron/mpu/__init__.py index 32bb5fc3dd6..a60327fc2e4 100644 --- a/megatron/mpu/__init__.py +++ b/megatron/mpu/__init__.py @@ -42,6 +42,7 @@ from .initialize import initialize_model_parallel from .initialize import model_parallel_is_initialized from .initialize import get_model_parallel_world_size, get_model_parallel_rank +#from .initialize import get_expert_model_parallel_world_size, get_expert_data_parallel_world_size, get_expert_data_parallel_rank from .layers import ColumnParallelLinear from .layers import RowParallelLinear @@ -54,6 +55,8 @@ from .mappings import gather_from_tensor_model_parallel_region from .mappings import reduce_from_tensor_model_parallel_region from .mappings import scatter_to_tensor_model_parallel_region +from .mappings import reduce_scatter_from_tensor_model_parallel_region +from .mappings import all_gather_from_tensor_model_parallel_region from .random import checkpoint from .random import get_cuda_rng_tracker @@ -65,3 +68,4 @@ from .utils import divide from .utils import split_tensor_along_last_dim +from .layers import get_residual_for_reduce_scatter diff --git a/megatron/mpu/initialize.py b/megatron/mpu/initialize.py index c24d1179ada..b2af4e8c32d 100644 --- a/megatron/mpu/initialize.py +++ b/megatron/mpu/initialize.py @@ -231,6 +231,14 @@ def get_tensor_model_parallel_world_size(): return _MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE return torch.distributed.get_world_size(group=get_tensor_model_parallel_group()) +# def get_expert_model_parallel_world_size(): +# return 1 + +# def get_expert_data_parallel_world_size(): +# return dist.get_world_size() + +# def get_expert_data_parallel_rank(): +# return dist.get_rank() def get_model_parallel_world_size(): assert get_pipeline_model_parallel_world_size() == 1, "legacy get_model_parallel_world_size is only supported if PP is disabled" diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index c837160a371..b1c14fa056b 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -31,6 +31,7 @@ from .mappings import gather_from_tensor_model_parallel_region from .mappings import reduce_from_tensor_model_parallel_region from .mappings import scatter_to_tensor_model_parallel_region +from .mappings import reduce_scatter_from_tensor_model_parallel_region from .random import get_cuda_rng_tracker from .utils import divide from .utils import split_tensor_along_last_dim @@ -130,6 +131,16 @@ def _initialize_affine_weight_cpu(weight, output_size, input_size, return master_weight return None +def get_residual_for_reduce_scatter(residual): + if get_tensor_model_parallel_world_size() == 1: + return residual + + total_chunks = get_tensor_model_parallel_world_size() + this_chunk = get_tensor_model_parallel_rank() + assert residual.shape[0] % total_chunks == 0 + chunk_size = residual.shape[0]// total_chunks + + return residual[this_chunk*chunk_size:(this_chunk+1)*chunk_size] class VocabParallelEmbedding(torch.nn.Module): """Embedding parallelized in the vocabulary dimension. @@ -330,7 +341,8 @@ def __init__(self, input_size, output_size, bias=True, input_is_parallel=False, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False, MOE=False, MoE_mp_size=1): + skip_bias_add=False, MOE=False, MoE_mp_size=1, + reduce_scatter=False): super(RowParallelLinear, self).__init__() # Keep input parameters @@ -339,6 +351,7 @@ def __init__(self, input_size, output_size, bias=True, self.input_is_parallel = input_is_parallel # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() + self.input_size_per_partition = divide(input_size, world_size) self.skip_bias_add = skip_bias_add @@ -346,7 +359,13 @@ def __init__(self, input_size, output_size, bias=True, # Note: torch.nn.functional.linear performs XA^T + b and as a result # we allocate the transpose. # Initialize weight. + + if reduce_scatter: + assert not MOE, "reduce scatter is exclusively for self-attention outer projection" + self.reduce_scatter = reduce_scatter + args = get_args() + if args.use_cpu_initialization: self.weight = Parameter(torch.empty(self.output_size, self.input_size_per_partition, @@ -376,7 +395,6 @@ def __init__(self, input_size, output_size, bias=True, self.register_parameter('bias', None) - def forward(self, input_): # Set up backprop all-reduce. if self.input_is_parallel: @@ -385,8 +403,13 @@ def forward(self, input_): input_parallel = scatter_to_tensor_model_parallel_region(input_) # Matrix multiply. output_parallel = F.linear(input_parallel, self.weight) - # All-reduce across all the partitions. - output_ = reduce_from_tensor_model_parallel_region(output_parallel) + + if self.reduce_scatter: # do nothing, reduce scatter will be called after layer norm + output_ = reduce_scatter_from_tensor_model_parallel_region(output_parallel) + else: + # All-reduce across all the partitions. + output_ = reduce_from_tensor_model_parallel_region(output_parallel) + if not self.skip_bias_add: output = output_ + self.bias if self.bias is not None else output_ output_bias = None diff --git a/megatron/mpu/mappings.py b/megatron/mpu/mappings.py index 821d9acfecd..f3ff977f8ab 100644 --- a/megatron/mpu/mappings.py +++ b/megatron/mpu/mappings.py @@ -31,6 +31,48 @@ def _reduce(input_): return input_ +def _reduce_scatter(input_): + """Reduce scatter the input tensor across model parallel group.""" + + # Bypass the function if we are using only 1 GPU. + if get_tensor_model_parallel_world_size()==1: + return input_ + + # All-reduce. + input_ = input_.contiguous() + total_chunks = get_tensor_model_parallel_world_size() + this_chunk = get_tensor_model_parallel_rank() + + assert input_.shape[0] % total_chunks == 0 + + chunk_size = input_.shape[0]// total_chunks + + input_list = [input_[i*chunk_size:(i+1)*chunk_size] for i in range(total_chunks)] + + output = torch.zeros_like(input_list[this_chunk]) + torch.distributed.reduce_scatter(output, input_list, group=get_tensor_model_parallel_group()) + + return output + +def _gather_first_dim(input_): + """Gather tensors and concatinate along the first dimension.""" + + world_size = get_tensor_model_parallel_world_size() + # Bypass the function if we are using only 1 GPU. + if world_size==1: + return input_ + + # Size and dimension. + rank = get_tensor_model_parallel_rank() + + tensor_list = [torch.empty_like(input_) for _ in range(world_size)] + tensor_list[rank] = input_ + torch.distributed.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group()) + + # Note: torch.cat already creates a contiguous tensor. + output = torch.cat(tensor_list, dim=0).contiguous() + + return output def _split(input_): """Split the tensor along its last dimension and keep the @@ -136,6 +178,35 @@ def forward(ctx, input_): def backward(ctx, grad_output): return _split(grad_output) +class _ReduceScatterFromModelParallelRegion(torch.autograd.Function): + """Reduce scatter output of self attention for MoE""" + + @staticmethod + def symbolic(graph, input_): + return _reduce_scatter(input_) + + @staticmethod + def forward(ctx, input_): + return _reduce_scatter(input_) + + @staticmethod + def backward(ctx, grad_output): + return _gather_first_dim(grad_output) + +class _AllGatherFromModelParallelRegion(torch.autograd.Function): + """Reduce scatter output of self attention for MoE""" + + @staticmethod + def symbolic(graph, input_): + return _gather_first_dim(input_) + + @staticmethod + def forward(ctx, input_): + return _gather_first_dim(input_) + + @staticmethod + def backward(ctx, grad_output): + return _reduce_scatter(grad_output) # ----------------- # Helper functions. @@ -152,6 +223,11 @@ def reduce_from_tensor_model_parallel_region(input_): def scatter_to_tensor_model_parallel_region(input_): return _ScatterToModelParallelRegion.apply(input_) - def gather_from_tensor_model_parallel_region(input_): return _GatherFromModelParallelRegion.apply(input_) + +def reduce_scatter_from_tensor_model_parallel_region(input_): + return _ReduceScatterFromModelParallelRegion.apply(input_) + +def all_gather_from_tensor_model_parallel_region(input_): + return _AllGatherFromModelParallelRegion.apply(input_) \ No newline at end of file From aa08fc1a7148a0e43a0f3b40e9ade2f4b72a87ca Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Fri, 1 Jul 2022 06:24:53 +0500 Subject: [PATCH 02/29] TP for non-experts, drop token before a2a --- megatron/model/transformer.py | 20 +++++------ megatron/mpu/__init__.py | 3 +- megatron/mpu/layers.py | 36 ++++++++------------ megatron/mpu/mappings.py | 63 ++++++++++++++++++++++++++--------- 4 files changed, 72 insertions(+), 50 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 5abe42c956c..1bef1629968 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -59,7 +59,7 @@ class ParallelMLP(MegatronModule): applied. """ - def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1): + def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1, ): super(ParallelMLP, self).__init__() args = get_args() @@ -118,7 +118,7 @@ def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, attn_mask_type=AttnMaskType.padding, - reduce_scatter=False): + ): super(ParallelAttention, self).__init__() args = get_args() self.fp16 = args.fp16 @@ -190,7 +190,7 @@ def __init__(self, init_method, input_is_parallel=True, init_method=output_layer_init_method, skip_bias_add=True, - reduce_scatter=reduce_scatter) + ) if deepspeed.checkpointing.is_configured(): global get_cuda_rng_tracker, checkpoint @@ -425,7 +425,7 @@ def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, attn_mask_type=self_attn_mask_type, - reduce_scatter = (num_experts > 1)) + ) self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion @@ -507,9 +507,6 @@ def forward(self, hidden_states, attention_mask, # re-enable torch grad to enable fused optimization. - if self.num_experts > 1: - residual = mpu.get_residual_for_reduce_scatter(residual) - with torch.enable_grad(): layernorm_input = bias_dropout_add_func( attention_output, @@ -549,7 +546,10 @@ def forward(self, hidden_states, attention_mask, if self.num_experts == 1: mlp_output, mlp_bias = self.mlp(layernorm_output) else: + # drop sequences + #layernorm_output = mpu.drop_tokens(layernorm_output) mlp_output, moe_loss, _ = self.mlp(layernorm_output) + #mlp_output = mpu.all_gather_from_tensor_model_parallel_region(mlp_output) # Second residual connection. if self.apply_residual_connection_post_layernorm: @@ -557,9 +557,6 @@ def forward(self, hidden_states, attention_mask, else: residual = layernorm_input - # if self.num_experts > 1: - # residual = mpu.get_residual_for_reduce_scatter(residual) - # re-enable torch grad to enable fused optimization. with torch.enable_grad(): #if self.num_experts <= 1: @@ -571,8 +568,7 @@ def forward(self, hidden_states, attention_mask, #else: # output = mlp_output + residual - if self.num_experts > 1: - output = mpu.all_gather_from_tensor_model_parallel_region(output) + if get_key_value: output = [output, presents] diff --git a/megatron/mpu/__init__.py b/megatron/mpu/__init__.py index a60327fc2e4..bc3aa01a5c2 100644 --- a/megatron/mpu/__init__.py +++ b/megatron/mpu/__init__.py @@ -57,6 +57,8 @@ from .mappings import scatter_to_tensor_model_parallel_region from .mappings import reduce_scatter_from_tensor_model_parallel_region from .mappings import all_gather_from_tensor_model_parallel_region +from .mappings import drop_tokens + from .random import checkpoint from .random import get_cuda_rng_tracker @@ -68,4 +70,3 @@ from .utils import divide from .utils import split_tensor_along_last_dim -from .layers import get_residual_for_reduce_scatter diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index b1c14fa056b..bcc9d1767c2 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -131,16 +131,7 @@ def _initialize_affine_weight_cpu(weight, output_size, input_size, return master_weight return None -def get_residual_for_reduce_scatter(residual): - if get_tensor_model_parallel_world_size() == 1: - return residual - - total_chunks = get_tensor_model_parallel_world_size() - this_chunk = get_tensor_model_parallel_rank() - assert residual.shape[0] % total_chunks == 0 - chunk_size = residual.shape[0]// total_chunks - - return residual[this_chunk*chunk_size:(this_chunk+1)*chunk_size] + class VocabParallelEmbedding(torch.nn.Module): """Embedding parallelized in the vocabulary dimension. @@ -250,6 +241,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, self.gather_output = gather_output # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() + self.MOE = MOE self.output_size_per_partition = divide(output_size, world_size) self.skip_bias_add = skip_bias_add @@ -293,12 +285,15 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, def forward(self, input_): # Set up backprop all-reduce. - input_parallel = copy_to_tensor_model_parallel_region(input_) + if not self.MOE: + input_parallel = copy_to_tensor_model_parallel_region(input_) + else: + input_parallel = input_ # Matrix multiply. bias = self.bias if not self.skip_bias_add else None output_parallel = F.linear(input_parallel, self.weight, bias) - if self.gather_output: + if self.gather_output and not self.MoE: # All-gather across the partitions. output = gather_from_tensor_model_parallel_region(output_parallel) else: @@ -342,7 +337,7 @@ def __init__(self, input_size, output_size, bias=True, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, skip_bias_add=False, MOE=False, MoE_mp_size=1, - reduce_scatter=False): + ): super(RowParallelLinear, self).__init__() # Keep input parameters @@ -351,7 +346,8 @@ def __init__(self, input_size, output_size, bias=True, self.input_is_parallel = input_is_parallel # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() - + self.MOE = MOE + self.input_size_per_partition = divide(input_size, world_size) self.skip_bias_add = skip_bias_add @@ -360,10 +356,6 @@ def __init__(self, input_size, output_size, bias=True, # we allocate the transpose. # Initialize weight. - if reduce_scatter: - assert not MOE, "reduce scatter is exclusively for self-attention outer projection" - self.reduce_scatter = reduce_scatter - args = get_args() if args.use_cpu_initialization: @@ -400,16 +392,16 @@ def forward(self, input_): if self.input_is_parallel: input_parallel = input_ else: + assert not self.MoE input_parallel = scatter_to_tensor_model_parallel_region(input_) # Matrix multiply. output_parallel = F.linear(input_parallel, self.weight) - if self.reduce_scatter: # do nothing, reduce scatter will be called after layer norm - output_ = reduce_scatter_from_tensor_model_parallel_region(output_parallel) - else: + if not self.MOE: # All-reduce across all the partitions. output_ = reduce_from_tensor_model_parallel_region(output_parallel) - + else: + output_ = output_parallel if not self.skip_bias_add: output = output_ + self.bias if self.bias is not None else output_ output_bias = None diff --git a/megatron/mpu/mappings.py b/megatron/mpu/mappings.py index f3ff977f8ab..efded83724e 100644 --- a/megatron/mpu/mappings.py +++ b/megatron/mpu/mappings.py @@ -31,7 +31,7 @@ def _reduce(input_): return input_ -def _reduce_scatter(input_): +def _reduce_scatter(input_, dim): """Reduce scatter the input tensor across model parallel group.""" # Bypass the function if we are using only 1 GPU. @@ -43,25 +43,25 @@ def _reduce_scatter(input_): total_chunks = get_tensor_model_parallel_world_size() this_chunk = get_tensor_model_parallel_rank() - assert input_.shape[0] % total_chunks == 0 + assert input_.shape[dim] % total_chunks == 0 - chunk_size = input_.shape[0]// total_chunks + chunk_size = input_.shape[dim]// total_chunks - input_list = [input_[i*chunk_size:(i+1)*chunk_size] for i in range(total_chunks)] + input_list = [torch.narrow(input_, dim, i*chunk_size, chunk_size).contiguous() for i in range(total_chunks)] - output = torch.zeros_like(input_list[this_chunk]) + output = torch.zeros_like(input_list[this_chunk], memory_format=torch.contiguous_format) torch.distributed.reduce_scatter(output, input_list, group=get_tensor_model_parallel_group()) return output -def _gather_first_dim(input_): +def _gather_first_dim(input_, dim=0): """Gather tensors and concatinate along the first dimension.""" world_size = get_tensor_model_parallel_world_size() # Bypass the function if we are using only 1 GPU. if world_size==1: return input_ - + input_ = input_.contiguous() # Size and dimension. rank = get_tensor_model_parallel_rank() @@ -70,7 +70,7 @@ def _gather_first_dim(input_): torch.distributed.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group()) # Note: torch.cat already creates a contiguous tensor. - output = torch.cat(tensor_list, dim=0).contiguous() + output = torch.cat(tensor_list, dim=dim).contiguous() return output @@ -115,6 +115,16 @@ def _gather(input_): return output +def _drop_tokens(input_, dim=0): + if get_tensor_model_parallel_world_size() == 1: + return input_ + total_chunks = get_tensor_model_parallel_world_size() + this_chunk = get_tensor_model_parallel_rank() + assert input_.shape[dim] % total_chunks == 0 + chunk_size = input_.shape[dim]// total_chunks + + return torch.narrow(input_, dim, this_chunk*chunk_size, chunk_size) + class _CopyToModelParallelRegion(torch.autograd.Function): """Pass the input to the model parallel region.""" @@ -197,16 +207,34 @@ class _AllGatherFromModelParallelRegion(torch.autograd.Function): """Reduce scatter output of self attention for MoE""" @staticmethod - def symbolic(graph, input_): - return _gather_first_dim(input_) + def symbolic(graph, input_, dim): + return _gather_first_dim(input_, dim) @staticmethod - def forward(ctx, input_): - return _gather_first_dim(input_) + def forward(ctx, input_, dim): + ctx.dim = dim + return _gather_first_dim(input_, dim) @staticmethod def backward(ctx, grad_output): - return _reduce_scatter(grad_output) + return _drop_tokens(grad_output, ctx.dim), None + +class _DropTokens(torch.autograd.Function): + "Drop tokens (this is a hacky approach until we can do reduce scatter)" + + @staticmethod + def symbolic(graph, input_, dim): + return _drop_tokens(input_, dim) + + @staticmethod + def forward(ctx, input_, dim): + ctx.dim = dim + return _drop_tokens(input_, dim) + + @staticmethod + def backward(ctx, input_): + return _gather_first_dim(input_, ctx.dim), None + # ----------------- # Helper functions. @@ -229,5 +257,10 @@ def gather_from_tensor_model_parallel_region(input_): def reduce_scatter_from_tensor_model_parallel_region(input_): return _ReduceScatterFromModelParallelRegion.apply(input_) -def all_gather_from_tensor_model_parallel_region(input_): - return _AllGatherFromModelParallelRegion.apply(input_) \ No newline at end of file +def all_gather_from_tensor_model_parallel_region(input_, dim=0): + return _AllGatherFromModelParallelRegion.apply(input_, dim) + +def drop_tokens(input_, dim=0): + return _DropTokens.apply(input_, dim) + + \ No newline at end of file From 44a1203d87f3429ad770064de9766ba723b6ae7a Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 6 Jul 2022 23:39:15 +0500 Subject: [PATCH 03/29] dropping tokens after gating --- megatron/model/transformer.py | 3 --- megatron/mpu/mappings.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 1bef1629968..0f2f290752b 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -546,10 +546,7 @@ def forward(self, hidden_states, attention_mask, if self.num_experts == 1: mlp_output, mlp_bias = self.mlp(layernorm_output) else: - # drop sequences - #layernorm_output = mpu.drop_tokens(layernorm_output) mlp_output, moe_loss, _ = self.mlp(layernorm_output) - #mlp_output = mpu.all_gather_from_tensor_model_parallel_region(mlp_output) # Second residual connection. if self.apply_residual_connection_post_layernorm: diff --git a/megatron/mpu/mappings.py b/megatron/mpu/mappings.py index efded83724e..1da03a50992 100644 --- a/megatron/mpu/mappings.py +++ b/megatron/mpu/mappings.py @@ -217,7 +217,7 @@ def forward(ctx, input_, dim): @staticmethod def backward(ctx, grad_output): - return _drop_tokens(grad_output, ctx.dim), None + return drop_tokens(grad_output, ctx.dim), None class _DropTokens(torch.autograd.Function): "Drop tokens (this is a hacky approach until we can do reduce scatter)" From 92051b3ea22b13c2fe82b473ca8091c24dc0ee73 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 6 Jul 2022 23:48:47 +0500 Subject: [PATCH 04/29] remove commented code --- megatron/mpu/initialize.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/megatron/mpu/initialize.py b/megatron/mpu/initialize.py index b2af4e8c32d..c24d1179ada 100644 --- a/megatron/mpu/initialize.py +++ b/megatron/mpu/initialize.py @@ -231,14 +231,6 @@ def get_tensor_model_parallel_world_size(): return _MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE return torch.distributed.get_world_size(group=get_tensor_model_parallel_group()) -# def get_expert_model_parallel_world_size(): -# return 1 - -# def get_expert_data_parallel_world_size(): -# return dist.get_world_size() - -# def get_expert_data_parallel_rank(): -# return dist.get_rank() def get_model_parallel_world_size(): assert get_pipeline_model_parallel_world_size() == 1, "legacy get_model_parallel_world_size is only supported if PP is disabled" From 441173dfe9fdda3ddad945498e25a42db5cacd00 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Thu, 7 Jul 2022 00:04:02 +0500 Subject: [PATCH 05/29] remove spurious changes --- megatron/model/transformer.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 0f2f290752b..06a10b8f941 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -59,7 +59,7 @@ class ParallelMLP(MegatronModule): applied. """ - def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1, ): + def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1): super(ParallelMLP, self).__init__() args = get_args() @@ -117,8 +117,7 @@ class ParallelAttention(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=AttnMaskType.padding, - ): + attn_mask_type=AttnMaskType.padding): super(ParallelAttention, self).__init__() args = get_args() self.fp16 = args.fp16 @@ -189,8 +188,7 @@ def __init__(self, init_method, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True, - ) + skip_bias_add=True) if deepspeed.checkpointing.is_configured(): global get_cuda_rng_tracker, checkpoint @@ -424,8 +422,7 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=self_attn_mask_type, - ) + attn_mask_type=self_attn_mask_type) self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion @@ -506,7 +503,6 @@ def forward(self, hidden_states, attention_mask, bias_dropout_add_func = get_bias_dropout_add(self.training) # re-enable torch grad to enable fused optimization. - with torch.enable_grad(): layernorm_input = bias_dropout_add_func( attention_output, @@ -565,8 +561,6 @@ def forward(self, hidden_states, attention_mask, #else: # output = mlp_output + residual - - if get_key_value: output = [output, presents] From 7ed06116e8757d33ec29cad08cb5477e44a78eee Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Thu, 7 Jul 2022 00:05:04 +0500 Subject: [PATCH 06/29] remove commented code --- megatron/mpu/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/megatron/mpu/__init__.py b/megatron/mpu/__init__.py index bc3aa01a5c2..5e2106f10ff 100644 --- a/megatron/mpu/__init__.py +++ b/megatron/mpu/__init__.py @@ -42,7 +42,6 @@ from .initialize import initialize_model_parallel from .initialize import model_parallel_is_initialized from .initialize import get_model_parallel_world_size, get_model_parallel_rank -#from .initialize import get_expert_model_parallel_world_size, get_expert_data_parallel_world_size, get_expert_data_parallel_rank from .layers import ColumnParallelLinear from .layers import RowParallelLinear @@ -55,7 +54,6 @@ from .mappings import gather_from_tensor_model_parallel_region from .mappings import reduce_from_tensor_model_parallel_region from .mappings import scatter_to_tensor_model_parallel_region -from .mappings import reduce_scatter_from_tensor_model_parallel_region from .mappings import all_gather_from_tensor_model_parallel_region from .mappings import drop_tokens From 2f29eb5adf4b868146ee0babdf8ed9b7a0b9e78b Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Thu, 7 Jul 2022 00:06:32 +0500 Subject: [PATCH 07/29] remove spurious code --- megatron/mpu/layers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index bcc9d1767c2..b9926061608 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -31,7 +31,6 @@ from .mappings import gather_from_tensor_model_parallel_region from .mappings import reduce_from_tensor_model_parallel_region from .mappings import scatter_to_tensor_model_parallel_region -from .mappings import reduce_scatter_from_tensor_model_parallel_region from .random import get_cuda_rng_tracker from .utils import divide from .utils import split_tensor_along_last_dim From 1aa8ce0f6b61cc0ccdb11dca5c18ea408e4ddec4 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Tue, 12 Jul 2022 03:42:44 +0500 Subject: [PATCH 08/29] remove blank line --- megatron/mpu/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/megatron/mpu/__init__.py b/megatron/mpu/__init__.py index 5e2106f10ff..879aa27238e 100644 --- a/megatron/mpu/__init__.py +++ b/megatron/mpu/__init__.py @@ -57,7 +57,6 @@ from .mappings import all_gather_from_tensor_model_parallel_region from .mappings import drop_tokens - from .random import checkpoint from .random import get_cuda_rng_tracker from .random import init_checkpointed_activations_memory_buffer From e3b01054900f5ea15f1c85df7805b546bbfc652e Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 01:14:34 +0500 Subject: [PATCH 09/29] change to schedules --- megatron/schedules.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/megatron/schedules.py b/megatron/schedules.py index 4b1d6dcd5c2..38e15e2591f 100644 --- a/megatron/schedules.py +++ b/megatron/schedules.py @@ -27,6 +27,8 @@ from megatron.model import DistributedDataParallel as LocalDDP from megatron.model import Float16Module +from deepspeed.runtime.utils import see_memory_usage + def get_forward_backward_func(): args = get_args() if mpu.get_pipeline_model_parallel_world_size() > 1: From 3f0cbe1703fb06b0843f2c98e516de8356aa06dc Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 01:24:22 +0500 Subject: [PATCH 10/29] modified checks in mpu/layers --- megatron/mpu/layers.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index b9926061608..c42f10f19ac 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -284,15 +284,16 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, def forward(self, input_): # Set up backprop all-reduce. - if not self.MOE: - input_parallel = copy_to_tensor_model_parallel_region(input_) - else: + if (self.MOE and self.world_size==1): # non-expert only tensor parallelism input_parallel = input_ + else: + input_parallel = copy_to_tensor_model_parallel_region(input_) + # Matrix multiply. bias = self.bias if not self.skip_bias_add else None output_parallel = F.linear(input_parallel, self.weight, bias) - if self.gather_output and not self.MoE: + if self.gather_output and not (self.MoE and self.world_size==1): # All-gather across the partitions. output = gather_from_tensor_model_parallel_region(output_parallel) else: @@ -388,19 +389,19 @@ def __init__(self, input_size, output_size, bias=True, def forward(self, input_): # Set up backprop all-reduce. - if self.input_is_parallel: + if self.input_is_parallel or (self.MOE and self.world_size == 1): input_parallel = input_ else: - assert not self.MoE input_parallel = scatter_to_tensor_model_parallel_region(input_) # Matrix multiply. output_parallel = F.linear(input_parallel, self.weight) - if not self.MOE: - # All-reduce across all the partitions. - output_ = reduce_from_tensor_model_parallel_region(output_parallel) - else: + # All-reduce across all the partitions. + if (self.MOE and self.world_size==1): # non-expert only tensor-parallelism output_ = output_parallel + else: + output_ = reduce_from_tensor_model_parallel_region(output_parallel) + if not self.skip_bias_add: output = output_ + self.bias if self.bias is not None else output_ output_bias = None From e79106a5fea66e93804890400ab78267e42a05cc Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 01:27:47 +0500 Subject: [PATCH 11/29] better named flag --- megatron/mpu/layers.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index c42f10f19ac..69f7e266668 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -241,6 +241,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() self.MOE = MOE + self.is_expert_without_slicing = self.MOE and self.world_size==1 self.output_size_per_partition = divide(output_size, world_size) self.skip_bias_add = skip_bias_add @@ -284,7 +285,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, def forward(self, input_): # Set up backprop all-reduce. - if (self.MOE and self.world_size==1): # non-expert only tensor parallelism + if self.is_expert_without_slicing: # non-expert only tensor parallelism input_parallel = input_ else: input_parallel = copy_to_tensor_model_parallel_region(input_) @@ -293,7 +294,7 @@ def forward(self, input_): bias = self.bias if not self.skip_bias_add else None output_parallel = F.linear(input_parallel, self.weight, bias) - if self.gather_output and not (self.MoE and self.world_size==1): + if self.gather_output and not self.is_expert_without_slicing: # All-gather across the partitions. output = gather_from_tensor_model_parallel_region(output_parallel) else: @@ -347,6 +348,7 @@ def __init__(self, input_size, output_size, bias=True, # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() self.MOE = MOE + self.is_expert_without_slicing = self.MOE and self.world_size==1 self.input_size_per_partition = divide(input_size, world_size) self.skip_bias_add = skip_bias_add @@ -389,7 +391,7 @@ def __init__(self, input_size, output_size, bias=True, def forward(self, input_): # Set up backprop all-reduce. - if self.input_is_parallel or (self.MOE and self.world_size == 1): + if self.input_is_parallel or self.is_expert_without_slicing: input_parallel = input_ else: input_parallel = scatter_to_tensor_model_parallel_region(input_) @@ -397,7 +399,7 @@ def forward(self, input_): output_parallel = F.linear(input_parallel, self.weight) # All-reduce across all the partitions. - if (self.MOE and self.world_size==1): # non-expert only tensor-parallelism + if self.is_expert_without_slicing: # non-expert only tensor-parallelism output_ = output_parallel else: output_ = reduce_from_tensor_model_parallel_region(output_parallel) From 30e22341f421e015aeaa14f3dc4ab8ded960e278 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 02:35:06 +0500 Subject: [PATCH 12/29] migrate code to deepspeed --- megatron/model/transformer.py | 17 ++++++++++++----- megatron/mpu/layers.py | 4 ++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 06a10b8f941..2adc644d32c 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -59,7 +59,7 @@ class ParallelMLP(MegatronModule): applied. """ - def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1): + def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1, ): super(ParallelMLP, self).__init__() args = get_args() @@ -117,7 +117,8 @@ class ParallelAttention(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=AttnMaskType.padding): + attn_mask_type=AttnMaskType.padding, + ): super(ParallelAttention, self).__init__() args = get_args() self.fp16 = args.fp16 @@ -188,7 +189,8 @@ def __init__(self, init_method, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True) + skip_bias_add=True, + ) if deepspeed.checkpointing.is_configured(): global get_cuda_rng_tracker, checkpoint @@ -422,7 +424,8 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=self_attn_mask_type) + attn_mask_type=self_attn_mask_type, + ) self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion @@ -465,7 +468,8 @@ def __init__(self, init_method, output_layer_init_method, capacity_factor=args.moe_train_capacity_factor, eval_capacity_factor=args.moe_eval_capacity_factor, min_capacity=args.moe_min_capacity, - drop_tokens=args.moe_token_dropping, use_tutel=args.use_tutel) + drop_tokens=args.moe_token_dropping, use_tutel=args.use_tutel, + MoE_mp_size=moe_mp_size) def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, @@ -503,6 +507,7 @@ def forward(self, hidden_states, attention_mask, bias_dropout_add_func = get_bias_dropout_add(self.training) # re-enable torch grad to enable fused optimization. + with torch.enable_grad(): layernorm_input = bias_dropout_add_func( attention_output, @@ -561,6 +566,8 @@ def forward(self, hidden_states, attention_mask, #else: # output = mlp_output + residual + + if get_key_value: output = [output, presents] diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index 69f7e266668..c73d55f0b90 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -241,7 +241,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() self.MOE = MOE - self.is_expert_without_slicing = self.MOE and self.world_size==1 + self.is_expert_without_slicing = self.MOE and world_size==1 self.output_size_per_partition = divide(output_size, world_size) self.skip_bias_add = skip_bias_add @@ -348,7 +348,7 @@ def __init__(self, input_size, output_size, bias=True, # Divide the weight matrix along the last dimension. world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() self.MOE = MOE - self.is_expert_without_slicing = self.MOE and self.world_size==1 + self.is_expert_without_slicing = self.MOE and world_size==1 self.input_size_per_partition = divide(input_size, world_size) self.skip_bias_add = skip_bias_add From 71088c23c5aa2526b7e7a25008b3338e546c7f1e Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:17:30 +0500 Subject: [PATCH 13/29] remove unnecessary changes to code formatting --- megatron/model/transformer.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 2adc644d32c..6fef0c6dbba 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -117,8 +117,7 @@ class ParallelAttention(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=AttnMaskType.padding, - ): + attn_mask_type=AttnMaskType.padding): super(ParallelAttention, self).__init__() args = get_args() self.fp16 = args.fp16 @@ -189,8 +188,7 @@ def __init__(self, init_method, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True, - ) + skip_bias_add=True) if deepspeed.checkpointing.is_configured(): global get_cuda_rng_tracker, checkpoint @@ -424,8 +422,7 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=self_attn_mask_type, - ) + attn_mask_type=self_attn_mask_type) self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion @@ -507,7 +504,6 @@ def forward(self, hidden_states, attention_mask, bias_dropout_add_func = get_bias_dropout_add(self.training) # re-enable torch grad to enable fused optimization. - with torch.enable_grad(): layernorm_input = bias_dropout_add_func( attention_output, @@ -566,8 +562,6 @@ def forward(self, hidden_states, attention_mask, #else: # output = mlp_output + residual - - if get_key_value: output = [output, presents] From 6364a28977ad3eba9eb8c28cc0037729c3fbb433 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:18:30 +0500 Subject: [PATCH 14/29] remove unnecessary changes to code formatting --- megatron/model/transformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 6fef0c6dbba..963870d6037 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -59,7 +59,7 @@ class ParallelMLP(MegatronModule): applied. """ - def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1, ): + def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1): super(ParallelMLP, self).__init__() args = get_args() From 3a0eb55265fffe588c4e148f21eb87f4899906c1 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:19:19 +0500 Subject: [PATCH 15/29] shift code to deepspeed --- megatron/mpu/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/megatron/mpu/__init__.py b/megatron/mpu/__init__.py index 879aa27238e..32bb5fc3dd6 100644 --- a/megatron/mpu/__init__.py +++ b/megatron/mpu/__init__.py @@ -54,8 +54,6 @@ from .mappings import gather_from_tensor_model_parallel_region from .mappings import reduce_from_tensor_model_parallel_region from .mappings import scatter_to_tensor_model_parallel_region -from .mappings import all_gather_from_tensor_model_parallel_region -from .mappings import drop_tokens from .random import checkpoint from .random import get_cuda_rng_tracker From ef3f77c4fa05a2494387e5382f218d19cb32d2e4 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:21:57 +0500 Subject: [PATCH 16/29] remove blank lines --- megatron/mpu/layers.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index c73d55f0b90..d8dc1786c9e 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -130,7 +130,6 @@ def _initialize_affine_weight_cpu(weight, output_size, input_size, return master_weight return None - class VocabParallelEmbedding(torch.nn.Module): """Embedding parallelized in the vocabulary dimension. @@ -337,8 +336,7 @@ def __init__(self, input_size, output_size, bias=True, input_is_parallel=False, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False, MOE=False, MoE_mp_size=1, - ): + skip_bias_add=False, MOE=False, MoE_mp_size=1): super(RowParallelLinear, self).__init__() # Keep input parameters @@ -357,9 +355,7 @@ def __init__(self, input_size, output_size, bias=True, # Note: torch.nn.functional.linear performs XA^T + b and as a result # we allocate the transpose. # Initialize weight. - args = get_args() - if args.use_cpu_initialization: self.weight = Parameter(torch.empty(self.output_size, self.input_size_per_partition, From 96b1cb65371a120dec683899e3d714e17ab508c3 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:22:55 +0500 Subject: [PATCH 17/29] remove blank lines --- megatron/mpu/layers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index d8dc1786c9e..7d7304cb444 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -393,7 +393,6 @@ def forward(self, input_): input_parallel = scatter_to_tensor_model_parallel_region(input_) # Matrix multiply. output_parallel = F.linear(input_parallel, self.weight) - # All-reduce across all the partitions. if self.is_expert_without_slicing: # non-expert only tensor-parallelism output_ = output_parallel From 961bf8db5359b4640ec21b02c4fca282d95d5bf6 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:23:50 +0500 Subject: [PATCH 18/29] remove blank lines --- megatron/mpu/layers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index 7d7304cb444..0a1642e3df0 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -385,6 +385,7 @@ def __init__(self, input_size, output_size, bias=True, self.register_parameter('bias', None) + def forward(self, input_): # Set up backprop all-reduce. if self.input_is_parallel or self.is_expert_without_slicing: From 40891888d4cd10ed87179490d2b43d986af85b63 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:26:18 +0500 Subject: [PATCH 19/29] restore mappings.py --- megatron/mpu/mappings.py | 110 +-------------------------------------- 1 file changed, 1 insertion(+), 109 deletions(-) diff --git a/megatron/mpu/mappings.py b/megatron/mpu/mappings.py index 1da03a50992..81a3af12680 100644 --- a/megatron/mpu/mappings.py +++ b/megatron/mpu/mappings.py @@ -31,48 +31,6 @@ def _reduce(input_): return input_ -def _reduce_scatter(input_, dim): - """Reduce scatter the input tensor across model parallel group.""" - - # Bypass the function if we are using only 1 GPU. - if get_tensor_model_parallel_world_size()==1: - return input_ - - # All-reduce. - input_ = input_.contiguous() - total_chunks = get_tensor_model_parallel_world_size() - this_chunk = get_tensor_model_parallel_rank() - - assert input_.shape[dim] % total_chunks == 0 - - chunk_size = input_.shape[dim]// total_chunks - - input_list = [torch.narrow(input_, dim, i*chunk_size, chunk_size).contiguous() for i in range(total_chunks)] - - output = torch.zeros_like(input_list[this_chunk], memory_format=torch.contiguous_format) - torch.distributed.reduce_scatter(output, input_list, group=get_tensor_model_parallel_group()) - - return output - -def _gather_first_dim(input_, dim=0): - """Gather tensors and concatinate along the first dimension.""" - - world_size = get_tensor_model_parallel_world_size() - # Bypass the function if we are using only 1 GPU. - if world_size==1: - return input_ - input_ = input_.contiguous() - # Size and dimension. - rank = get_tensor_model_parallel_rank() - - tensor_list = [torch.empty_like(input_) for _ in range(world_size)] - tensor_list[rank] = input_ - torch.distributed.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group()) - - # Note: torch.cat already creates a contiguous tensor. - output = torch.cat(tensor_list, dim=dim).contiguous() - - return output def _split(input_): """Split the tensor along its last dimension and keep the @@ -115,16 +73,6 @@ def _gather(input_): return output -def _drop_tokens(input_, dim=0): - if get_tensor_model_parallel_world_size() == 1: - return input_ - total_chunks = get_tensor_model_parallel_world_size() - this_chunk = get_tensor_model_parallel_rank() - assert input_.shape[dim] % total_chunks == 0 - chunk_size = input_.shape[dim]// total_chunks - - return torch.narrow(input_, dim, this_chunk*chunk_size, chunk_size) - class _CopyToModelParallelRegion(torch.autograd.Function): """Pass the input to the model parallel region.""" @@ -188,53 +136,6 @@ def forward(ctx, input_): def backward(ctx, grad_output): return _split(grad_output) -class _ReduceScatterFromModelParallelRegion(torch.autograd.Function): - """Reduce scatter output of self attention for MoE""" - - @staticmethod - def symbolic(graph, input_): - return _reduce_scatter(input_) - - @staticmethod - def forward(ctx, input_): - return _reduce_scatter(input_) - - @staticmethod - def backward(ctx, grad_output): - return _gather_first_dim(grad_output) - -class _AllGatherFromModelParallelRegion(torch.autograd.Function): - """Reduce scatter output of self attention for MoE""" - - @staticmethod - def symbolic(graph, input_, dim): - return _gather_first_dim(input_, dim) - - @staticmethod - def forward(ctx, input_, dim): - ctx.dim = dim - return _gather_first_dim(input_, dim) - - @staticmethod - def backward(ctx, grad_output): - return drop_tokens(grad_output, ctx.dim), None - -class _DropTokens(torch.autograd.Function): - "Drop tokens (this is a hacky approach until we can do reduce scatter)" - - @staticmethod - def symbolic(graph, input_, dim): - return _drop_tokens(input_, dim) - - @staticmethod - def forward(ctx, input_, dim): - ctx.dim = dim - return _drop_tokens(input_, dim) - - @staticmethod - def backward(ctx, input_): - return _gather_first_dim(input_, ctx.dim), None - # ----------------- # Helper functions. @@ -254,13 +155,4 @@ def scatter_to_tensor_model_parallel_region(input_): def gather_from_tensor_model_parallel_region(input_): return _GatherFromModelParallelRegion.apply(input_) -def reduce_scatter_from_tensor_model_parallel_region(input_): - return _ReduceScatterFromModelParallelRegion.apply(input_) - -def all_gather_from_tensor_model_parallel_region(input_, dim=0): - return _AllGatherFromModelParallelRegion.apply(input_, dim) - -def drop_tokens(input_, dim=0): - return _DropTokens.apply(input_, dim) - - \ No newline at end of file + From e1a345c36fce3cd670bfd8194e4b408727a3537c Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:27:08 +0500 Subject: [PATCH 20/29] restore mappings.py --- megatron/mpu/mappings.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/megatron/mpu/mappings.py b/megatron/mpu/mappings.py index 81a3af12680..821d9acfecd 100644 --- a/megatron/mpu/mappings.py +++ b/megatron/mpu/mappings.py @@ -152,7 +152,6 @@ def reduce_from_tensor_model_parallel_region(input_): def scatter_to_tensor_model_parallel_region(input_): return _ScatterToModelParallelRegion.apply(input_) + def gather_from_tensor_model_parallel_region(input_): return _GatherFromModelParallelRegion.apply(input_) - - From 18c0d844766d2a1b0a19e1a5b66af1861f6e9f97 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 13 Jul 2022 22:27:57 +0500 Subject: [PATCH 21/29] remove unnecessary code --- megatron/schedules.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/megatron/schedules.py b/megatron/schedules.py index 38e15e2591f..4b1d6dcd5c2 100644 --- a/megatron/schedules.py +++ b/megatron/schedules.py @@ -27,8 +27,6 @@ from megatron.model import DistributedDataParallel as LocalDDP from megatron.model import Float16Module -from deepspeed.runtime.utils import see_memory_usage - def get_forward_backward_func(): args = get_args() if mpu.get_pipeline_model_parallel_world_size() > 1: From 1619965d33b25a1909b2112db3b247fe680ffe56 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 20 Jul 2022 01:49:50 +0500 Subject: [PATCH 22/29] restructure code and introduce tensor parallelism for experts --- megatron/arguments.py | 5 ++++- megatron/model/transformer.py | 24 ++++++++++-------------- megatron/mpu/layers.py | 24 ++++++++++++++++-------- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/megatron/arguments.py b/megatron/arguments.py index 562bc43a803..efb74db9a7d 100644 --- a/megatron/arguments.py +++ b/megatron/arguments.py @@ -628,6 +628,9 @@ def _add_distributed_args(parser): group.add_argument('--tensor-model-parallel-size', type=int, default=1, help='Degree of tensor model parallelism.') + group.add_argument('--enable-expert-tensor-parallelism', action='store_true', + default=False, + help="use tensor parallelism for expert layers in MoE") group.add_argument('--pipeline-model-parallel-size', type=int, default=1, help='Degree of pipeline model parallelism.') group.add_argument('--moe-expert-parallel-size', type=int, default=1, @@ -902,4 +905,4 @@ def _add_distillation_args(parser): group.add_argument('--load-teacher', type=str, default=None, help='Directory containing a teacher model checkpoint.') - return parser \ No newline at end of file + return parser diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 963870d6037..9c72d201316 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -59,7 +59,7 @@ class ParallelMLP(MegatronModule): applied. """ - def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size=1): + def __init__(self, init_method, output_layer_init_method, moe=False, enable_expert_tensor_parallelism=False): super(ParallelMLP, self).__init__() args = get_args() @@ -70,8 +70,9 @@ def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size gather_output=False, init_method=init_method, skip_bias_add=True, - MOE=MOE, - MoE_mp_size=MoE_mp_size) + moe=moe, + enable_expert_tensor_parallelism=enable_expert_tensor_parallelism + ) self.bias_gelu_fusion = args.bias_gelu_fusion self.activation_func = F.gelu @@ -87,9 +88,8 @@ def __init__(self, init_method, output_layer_init_method, MOE=False, MoE_mp_size input_is_parallel=True, init_method=output_layer_init_method, skip_bias_add=True, - MOE=MOE, - MoE_mp_size=MoE_mp_size) - + moe=moe, + enable_expert_tensor_parallelism=enable_expert_tensor_parallelism) def forward(self, hidden_states): @@ -448,16 +448,12 @@ def __init__(self, init_method, output_layer_init_method, self.mlp = ParallelMLP(init_method, output_layer_init_method) else: - if not args.ds_inference or self.num_experts > dist.get_world_size(): - moe_mp_size = 1 - else: - moe_mp_size = dist.get_world_size() // self.num_experts - + enable_expert_tensor_parallelism = args.enable_expert_tensor_parallelism self.mlp = MoE(args.hidden_size, ParallelMLP(init_method, output_layer_init_method=output_layer_init_method, - MOE=True, - MoE_mp_size=moe_mp_size), + moe=True, + enable_expert_tensor_parallelism=enable_expert_tensor_parallelism), num_experts=self.num_experts, ep_size=args.moe_expert_parallel_size, k=args.topk, @@ -466,7 +462,7 @@ def __init__(self, init_method, output_layer_init_method, eval_capacity_factor=args.moe_eval_capacity_factor, min_capacity=args.moe_min_capacity, drop_tokens=args.moe_token_dropping, use_tutel=args.use_tutel, - MoE_mp_size=moe_mp_size) + enable_expert_tensor_parallelism=enable_expert_tensor_parallelism) def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index 0a1642e3df0..a0b6cdebc95 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -230,7 +230,7 @@ class ColumnParallelLinear(torch.nn.Module): def __init__(self, input_size, output_size, bias=True, gather_output=True, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False, MOE=False, MoE_mp_size=1): + skip_bias_add=False, moe=False, enable_expert_tensor_parallelism=False): super(ColumnParallelLinear, self).__init__() # Keep input parameters @@ -238,9 +238,13 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, self.output_size = output_size self.gather_output = gather_output # Divide the weight matrix along the last dimension. - world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() - self.MOE = MOE - self.is_expert_without_slicing = self.MOE and world_size==1 + #world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() + if moe and (not enable_expert_tensor_parallelism): + world_size = 1 + else: + world_size = get_tensor_model_parallel_world_size() + + self.is_expert_without_slicing = moe and world_size==1 self.output_size_per_partition = divide(output_size, world_size) self.skip_bias_add = skip_bias_add @@ -336,7 +340,7 @@ def __init__(self, input_size, output_size, bias=True, input_is_parallel=False, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False, MOE=False, MoE_mp_size=1): + skip_bias_add=False, moe=False, enable_expert_tensor_parallelism=False): super(RowParallelLinear, self).__init__() # Keep input parameters @@ -344,9 +348,13 @@ def __init__(self, input_size, output_size, bias=True, self.output_size = output_size self.input_is_parallel = input_is_parallel # Divide the weight matrix along the last dimension. - world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() - self.MOE = MOE - self.is_expert_without_slicing = self.MOE and world_size==1 + + if moe and (not enable_expert_tensor_parallelism): + world_size = 1 + else: + world_size = get_tensor_model_parallel_world_size() + + self.is_expert_without_slicing = moe and world_size==1 self.input_size_per_partition = divide(input_size, world_size) self.skip_bias_add = skip_bias_add From 066632b27c8648e41271f1c5488fb2e413c1e44f Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Thu, 21 Jul 2022 22:11:31 +0500 Subject: [PATCH 23/29] correct ep_size --- megatron/model/transformer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 9c72d201316..521b2c74126 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -449,6 +449,7 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method) else: enable_expert_tensor_parallelism = args.enable_expert_tensor_parallelism + moe_mp_world_size = mpu.get_tensor_model_parallel_world_size() if enable_expert_tensor_parallelism else 1 self.mlp = MoE(args.hidden_size, ParallelMLP(init_method, output_layer_init_method=output_layer_init_method, @@ -462,7 +463,7 @@ def __init__(self, init_method, output_layer_init_method, eval_capacity_factor=args.moe_eval_capacity_factor, min_capacity=args.moe_min_capacity, drop_tokens=args.moe_token_dropping, use_tutel=args.use_tutel, - enable_expert_tensor_parallelism=enable_expert_tensor_parallelism) + moe_mp_world_size=mpu.get_tensor_model_parallel_world_size()) def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, From c5e0f403da651ccd8197b19f5ac5ce909bc6ce2f Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Sat, 23 Jul 2022 01:34:04 +0500 Subject: [PATCH 24/29] set ep size correctly --- megatron/model/transformer.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 521b2c74126..5168ae31848 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -449,7 +449,14 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method) else: enable_expert_tensor_parallelism = args.enable_expert_tensor_parallelism - moe_mp_world_size = mpu.get_tensor_model_parallel_world_size() if enable_expert_tensor_parallelism else 1 + ## set correct ep_size here because MoE.__init__ does not have access to mpu + if enable_expert_tensor_parallelism: + ## the max allowed value of ep_size is the degree of data parallelism + ep_size = min(ep_size, mpu.get_data_parallel_world_size()) + else: + # since expert tensor parallelism is disabled expert parallelism can + # stretch into data parallel dimension as well + ep_size = min(ep_size, mpu.get_data_parallel_world_size() * mpu.get_tensor_model_parallel_world_size()) self.mlp = MoE(args.hidden_size, ParallelMLP(init_method, output_layer_init_method=output_layer_init_method, @@ -463,7 +470,7 @@ def __init__(self, init_method, output_layer_init_method, eval_capacity_factor=args.moe_eval_capacity_factor, min_capacity=args.moe_min_capacity, drop_tokens=args.moe_token_dropping, use_tutel=args.use_tutel, - moe_mp_world_size=mpu.get_tensor_model_parallel_world_size()) + enable_expert_tensor_parallelism=enable_expert_tensor_parallelism) def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, From 8398bb05b62249909117f069683da267a87cf960 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Sat, 23 Jul 2022 02:23:51 +0500 Subject: [PATCH 25/29] correctly set ep_size --- megatron/model/transformer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 5168ae31848..dae3b7f41e9 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -452,18 +452,18 @@ def __init__(self, init_method, output_layer_init_method, ## set correct ep_size here because MoE.__init__ does not have access to mpu if enable_expert_tensor_parallelism: ## the max allowed value of ep_size is the degree of data parallelism - ep_size = min(ep_size, mpu.get_data_parallel_world_size()) + ep_size = min(args.moe_expert_parallel_size, mpu.get_data_parallel_world_size()) else: # since expert tensor parallelism is disabled expert parallelism can # stretch into data parallel dimension as well - ep_size = min(ep_size, mpu.get_data_parallel_world_size() * mpu.get_tensor_model_parallel_world_size()) + ep_size = min(args.moe_expert_parallel_size, mpu.get_data_parallel_world_size() * mpu.get_tensor_model_parallel_world_size()) self.mlp = MoE(args.hidden_size, ParallelMLP(init_method, output_layer_init_method=output_layer_init_method, moe=True, enable_expert_tensor_parallelism=enable_expert_tensor_parallelism), num_experts=self.num_experts, - ep_size=args.moe_expert_parallel_size, + ep_size=ep_size, k=args.topk, use_residual=(args.mlp_type == 'residual'), capacity_factor=args.moe_train_capacity_factor, From 3aa05d33c6a1742c3abdd1c27d2bd867c1054bf8 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Tue, 26 Jul 2022 03:45:31 +0500 Subject: [PATCH 26/29] remove client side code that sets ep_size --- megatron/model/transformer.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index dae3b7f41e9..9ba1569669e 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -449,14 +449,6 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method) else: enable_expert_tensor_parallelism = args.enable_expert_tensor_parallelism - ## set correct ep_size here because MoE.__init__ does not have access to mpu - if enable_expert_tensor_parallelism: - ## the max allowed value of ep_size is the degree of data parallelism - ep_size = min(args.moe_expert_parallel_size, mpu.get_data_parallel_world_size()) - else: - # since expert tensor parallelism is disabled expert parallelism can - # stretch into data parallel dimension as well - ep_size = min(args.moe_expert_parallel_size, mpu.get_data_parallel_world_size() * mpu.get_tensor_model_parallel_world_size()) self.mlp = MoE(args.hidden_size, ParallelMLP(init_method, output_layer_init_method=output_layer_init_method, From e20fc2e041cc4a84cc255a191c018fa52535e800 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Tue, 26 Jul 2022 23:43:19 +0500 Subject: [PATCH 27/29] correct ep_size --- megatron/model/transformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index 9ba1569669e..72641445d17 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -455,7 +455,7 @@ def __init__(self, init_method, output_layer_init_method, moe=True, enable_expert_tensor_parallelism=enable_expert_tensor_parallelism), num_experts=self.num_experts, - ep_size=ep_size, + ep_size=args.moe_expert_parallel_size, k=args.topk, use_residual=(args.mlp_type == 'residual'), capacity_factor=args.moe_train_capacity_factor, From 92e8839bc38cab8705bc16146b5b4df992f303ca Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 27 Jul 2022 01:25:46 +0500 Subject: [PATCH 28/29] small fix --- megatron/mpu/layers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index a0b6cdebc95..e274ece88b4 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -238,13 +238,13 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, self.output_size = output_size self.gather_output = gather_output # Divide the weight matrix along the last dimension. - #world_size = MoE_mp_size if MOE else get_tensor_model_parallel_world_size() if moe and (not enable_expert_tensor_parallelism): world_size = 1 + self.is_expert_without_slicing = moe and world_size==1 else: world_size = get_tensor_model_parallel_world_size() + self.is_expert_without_slicing = False - self.is_expert_without_slicing = moe and world_size==1 self.output_size_per_partition = divide(output_size, world_size) self.skip_bias_add = skip_bias_add From ad593a0d1b571b1635439e9bdf9e7e1fb21019f8 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Wed, 27 Jul 2022 01:35:56 +0500 Subject: [PATCH 29/29] small fix --- megatron/mpu/layers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index e274ece88b4..0d81d562238 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -240,7 +240,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, # Divide the weight matrix along the last dimension. if moe and (not enable_expert_tensor_parallelism): world_size = 1 - self.is_expert_without_slicing = moe and world_size==1 + self.is_expert_without_slicing = True else: world_size = get_tensor_model_parallel_world_size() self.is_expert_without_slicing = False