Skip to content

Commit

Permalink
convert Megatron LM ckpt to NeMo PP support. (NVIDIA#6159)
Browse files Browse the repository at this point in the history
* fix convert script

Signed-off-by: Yi Dong <[email protected]>

* fix filename eror

Signed-off-by: Yi Dong <[email protected]>

* use none batch size

Signed-off-by: Yi Dong <[email protected]>

* revert

Signed-off-by: Yi Dong <[email protected]>

* working

Signed-off-by: Yi Dong <[email protected]>

* fix the conversion bug

Signed-off-by: Yi Dong <[email protected]>

* use older version

Signed-off-by: Yi Dong <[email protected]>

* fix rope

Signed-off-by: Yi Dong <[email protected]>

* overwrite the precision

Signed-off-by: Yi Dong <[email protected]>

* fix port num

Signed-off-by: Yi Dong <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix the event loop

Signed-off-by: Yi Dong <[email protected]>

---------

Signed-off-by: Yi Dong <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Signed-off-by: hsiehjackson <[email protected]>
  • Loading branch information
2 people authored and hsiehjackson committed Jun 2, 2023
1 parent ea9eb8a commit 690c4ce
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ port: 5555 # the port number for the inference server
web_server: False # whether launch the web inference server
share: False # whether create a public URL
username: test # user name for web client
password: test2 # password for web client
password: test2 # password for web client
web_port: 9889 # the port number of the web server
9 changes: 8 additions & 1 deletion examples/nlp/language_modeling/megatron_gpt_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os
import threading

Expand Down Expand Up @@ -177,6 +178,7 @@ def main(cfg) -> None:
pretrained_cfg.sequence_parallel = False
pretrained_cfg.activations_checkpoint_granularity = None
pretrained_cfg.activations_checkpoint_method = None
pretrained_cfg.precision = trainer.precision
model = MegatronGPTModel.restore_from(
restore_path=cfg.gpt_model_file,
trainer=trainer,
Expand Down Expand Up @@ -256,7 +258,12 @@ def main(cfg) -> None:
if cfg.server:
if parallel_state.is_pipeline_first_stage() and parallel_state.get_tensor_model_parallel_rank() == 0:
if cfg.web_server:
thread = threading.Thread(target=get_demo, daemon=True, args=(cfg.share, cfg.username, cfg.password))
loop = asyncio.new_event_loop()
thread = threading.Thread(
target=get_demo,
daemon=True,
args=(cfg.share, cfg.username, cfg.password, cfg.port, cfg.web_port, loop),
)
thread.start()
server = MegatronServer(model.cuda())
server.run("0.0.0.0", port=cfg.port)
Expand Down
82 changes: 46 additions & 36 deletions examples/nlp/language_modeling/megatron_lm_ckpt_to_nemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from typing import Any, Optional

import torch
from apex.transformer import parallel_state
from pytorch_lightning.core.saving import _load_state as ptl_load_state
from pytorch_lightning.core.saving import load_hparams_from_tags_csv, load_hparams_from_yaml
from pytorch_lightning.trainer.trainer import Trainer
Expand All @@ -51,10 +50,11 @@

from nemo.collections.nlp.models.language_modeling.megatron_bert_model import MegatronBertModel
from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel
from nemo.collections.nlp.modules.common.megatron.megatron_init import initialize_model_parallel_for_nemo
from nemo.collections.nlp.parts.nlp_overrides import NLPSaveRestoreConnector
from nemo.utils import AppState, logging
from nemo.utils.distributed import initialize_distributed
from nemo.utils.model_utils import inject_model_parallel_rank
from nemo.utils.model_utils import inject_model_parallel_rank, uninject_model_parallel_rank

# this enums code is copied from Megatron_LM
enum_code = '''
Expand Down Expand Up @@ -353,46 +353,59 @@ def load_from_checkpoint(
return checkpoint, consumed, steps, check_point_version


def convert(local_rank, rank, world_size, args):
def megatron_lm_inject_model_parallel_rank(filepath):
"""
Injects tensor/pipeline model parallel ranks into the filepath.
Does nothing if not using model parallelism.
"""
# first make sure filepath does not have rank
filepath = uninject_model_parallel_rank(filepath)

app_state = AppState()
app_state.data_parallel_rank = 0
tensor_model_parallel_size = args.tensor_model_parallel_size
num_nodes = world_size // args.gpus_per_node
pipeline_model_parallel_size = world_size // args.tensor_model_parallel_size
assert args.pipeline_model_parallel_size == pipeline_model_parallel_size
if app_state.model_parallel_size is not None and app_state.model_parallel_size > 1:
# filepath needs to be updated to include mp_rank
dirname = os.path.dirname(filepath)
basename = os.path.basename(filepath)
if app_state.pipeline_model_parallel_size is None or app_state.pipeline_model_parallel_size == 1:
filepath = f'{dirname}/mp_rank_{app_state.tensor_model_parallel_rank:02d}/{basename}'
else:
filepath = f'{dirname}/mp_rank_{app_state.tensor_model_parallel_rank:02d}_{app_state.pipeline_model_parallel_rank:03d}/{basename}'
return filepath
else:
return filepath

trainer = Trainer(devices=args.gpus_per_node, accelerator='gpu', num_nodes=num_nodes)

app_state.pipeline_model_parallel_size = args.pipeline_model_parallel_size
app_state.tensor_model_parallel_size = args.tensor_model_parallel_size
app_state.model_parallel_size = app_state.tensor_model_parallel_size * app_state.pipeline_model_parallel_size
def convert(local_rank, rank, world_size, args):

parallel_state.initialize_model_parallel(
tensor_model_parallel_size_=app_state.tensor_model_parallel_size,
pipeline_model_parallel_size_=app_state.pipeline_model_parallel_size,
app_state = AppState()
initialize_model_parallel_for_nemo(
world_size=world_size,
global_rank=rank,
local_rank=local_rank,
tensor_model_parallel_size=args.tensor_model_parallel_size,
pipeline_model_parallel_size=args.pipeline_model_parallel_size,
virtual_pipeline_model_parallel_size=None,
pipeline_model_parallel_split_rank=0,
micro_batch_size=None,
global_batch_size=None,
seed=1234,
apex_transformer_log_level=30,
)
# hard set the data parallel rank to 0, otherwiaze it is default to None
app_state.data_parallel_rank = 0

app_state.pipeline_model_parallel_rank = parallel_state.get_pipeline_model_parallel_rank()
app_state.tensor_model_parallel_rank = parallel_state.get_tensor_model_parallel_rank()

pipeline_rank = rank // tensor_model_parallel_size
tensor_rank = app_state.tensor_model_parallel_rank
assert pipeline_rank == app_state.pipeline_model_parallel_rank
# tensor_model_parallel_size = args.tensor_model_parallel_size
num_nodes = world_size // args.gpus_per_node
assert world_size % args.gpus_per_node == 0, "world_size must be divisible by gpus_per_node"

if tensor_model_parallel_size is not None and tensor_model_parallel_size > 1 and pipeline_model_parallel_size == 1:
# inject model parallel rank
checkpoint_path = os.path.join(args.checkpoint_folder, f'mp_rank_{tensor_rank:02d}', args.checkpoint_name)
elif tensor_model_parallel_size is not None and pipeline_model_parallel_size > 1:
checkpoint_path = os.path.join(
args.checkpoint_folder, f'mp_rank_{tensor_rank:02d}_{pipeline_rank:03d}', args.checkpoint_name
)
else:
checkpoint_path = os.path.join(args.checkpoint_folder, args.checkpoint_name)
trainer = Trainer(devices=args.gpus_per_node, accelerator='gpu', num_nodes=num_nodes)
checkpoint_path = megatron_lm_inject_model_parallel_rank(
os.path.join(args.checkpoint_folder, args.checkpoint_name)
)
logging.info(f"loading checkpoint {checkpoint_path}")

if args.model_type == 'gpt':
## this dictionary is used to rename the model parameters
# this dictionary is used to rename the model parameters
name_translate = {}
name_translate['transformer'] = 'encoder'
name_translate['.attention.'] = '.self_attention.'
Expand All @@ -407,7 +420,7 @@ def convert(local_rank, rank, world_size, args):
strict=False,
)
elif args.model_type == 'bert':
## this dictionary is used to rename the model parameters
# this dictionary is used to rename the model parameters
name_translate = {}
name_translate['transformer'] = 'encoder'
name_translate['.attention.'] = '.self_attention.'
Expand Down Expand Up @@ -456,10 +469,6 @@ def convert(local_rank, rank, world_size, args):

# verify tensor parallel rank id and pipeline parallel rank id matches
assert app_state.data_parallel_size == 1
assert app_state.tensor_model_parallel_size == tensor_model_parallel_size
assert app_state.tensor_model_parallel_rank == tensor_rank
assert app_state.pipeline_model_parallel_size == pipeline_model_parallel_size
assert app_state.pipeline_model_parallel_rank == pipeline_rank
model._save_restore_connector = NLPSaveRestoreConnector()
model.save_to(args.nemo_file_path)
logging.info(f'NeMo model saved to: {args.nemo_file_path}')
Expand All @@ -481,3 +490,4 @@ def convert(local_rank, rank, world_size, args):

torch.distributed.barrier()
convert(local_rank, rank, world_size, args)
torch.distributed.barrier()
2 changes: 2 additions & 0 deletions nemo/collections/nlp/modules/common/megatron/attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ def forward(
# In inference, we compute one token at a time.
# Select the correct positional embedding.
q_pos_emb = q_pos_emb[end - 1 : end]
else:
q_pos_emb = q_pos_emb[:end, :, :, :]
k_pos_emb = k_pos_emb[:end, :, :, :]
rotary_pos_emb = (q_pos_emb, k_pos_emb)

Expand Down
18 changes: 9 additions & 9 deletions nemo/collections/nlp/modules/common/megatron/language_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,14 +644,14 @@ def __init__(
self.pooler = Pooler(self.hidden_size, self.init_method, sequence_parallel=sequence_parallel)
self._pooler_key = 'pooler'

if not self.share_embeddings_and_output_weights:
self.output_layer = tensor_parallel.ColumnParallelLinear(
self.hidden_size,
self.vocab_size,
bias=False, # Setting bias to False always to keep it consistent with embedding tying that also does not have a bias.
init_method=self.init_method,
)
self._output_layer_key = 'output_layer'
if not self.share_embeddings_and_output_weights:
self.output_layer = tensor_parallel.ColumnParallelLinear(
self.hidden_size,
self.vocab_size,
bias=False, # Setting bias to False always to keep it consistent with embedding tying that also does not have a bias.
init_method=self.init_method,
)
self._output_layer_key = 'output_layer'

def set_input_tensor(self, input_tensor):
""" See megatron.model.transformer.set_input_tensor()"""
Expand Down Expand Up @@ -695,7 +695,7 @@ def forward(
rotary_pos_emb = None
encoder_self_attention_relative_position_bias = None
if self.position_embedding_type == 'rope':
if not set_inference_key_value_memory and inference_max_sequence_len is not None:
if inference_max_sequence_len is not None:
rotary_pos_emb = self.rotary_pos_emb(inference_max_sequence_len)
elif self.encoder.input_tensor is not None:
rotary_pos_emb = self.rotary_pos_emb(self.encoder.input_tensor.size(0))
Expand Down
46 changes: 26 additions & 20 deletions nemo/collections/nlp/modules/common/megatron_web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

import gradio as gr

from nemo.collections.nlp.modules.common.megatron.retrieval_services.util import (
Expand All @@ -23,24 +25,28 @@
__all__ = ['RetroDemoWebApp', 'get_demo']


def get_generation(prompt, greedy, add_BOS, token_to_gen, min_tokens, temp, top_p, top_k, repetition, port=5555):
data = {
"sentences": [prompt],
"tokens_to_generate": int(token_to_gen),
"temperature": temp,
"add_BOS": add_BOS,
"top_k": top_k,
"top_p": top_p,
"greedy": greedy,
"all_probs": False,
"repetition_penalty": repetition,
"min_tokens_to_generate": int(min_tokens),
}
sentences = text_generation(data, port=port)['sentences']
return sentences[0]


def get_demo(share, username, password):
def create_gen_function(port=5555):
def get_generation(prompt, greedy, add_BOS, token_to_gen, min_tokens, temp, top_p, top_k, repetition):
data = {
"sentences": [prompt],
"tokens_to_generate": int(token_to_gen),
"temperature": temp,
"add_BOS": add_BOS,
"top_k": top_k,
"top_p": top_p,
"greedy": greedy,
"all_probs": False,
"repetition_penalty": repetition,
"min_tokens_to_generate": int(min_tokens),
}
sentences = text_generation(data, port=port)['sentences']
return sentences[0]

return get_generation


def get_demo(share, username, password, server_port=5555, web_port=9889, loop=None):
asyncio.set_event_loop(loop)
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=2, width=200):
Expand All @@ -63,7 +69,7 @@ def get_demo(share, username, password):
output_box = gr.Textbox(value="", label="Output")
btn = gr.Button(value="Submit")
btn.click(
get_generation,
create_gen_function(server_port),
inputs=[
input_prompt,
greedy_flag,
Expand All @@ -77,7 +83,7 @@ def get_demo(share, username, password):
],
outputs=[output_box],
)
demo.launch(share=share, server_port=13570, server_name='0.0.0.0', auth=(username, password))
demo.launch(share=share, server_port=web_port, server_name='0.0.0.0', auth=(username, password))


class RetroDemoWebApp:
Expand Down

0 comments on commit 690c4ce

Please sign in to comment.