From d2e58cb3fd39138efb052498e951d75b0ed425b5 Mon Sep 17 00:00:00 2001 From: Greg Clark Date: Wed, 19 Jul 2023 16:49:01 -0400 Subject: [PATCH] refactor pipelines. Add pipeline factory Signed-off-by: Greg Clark --- gpt_infer.yaml | 25 +- mvp_gpt_infer.py | 6 +- .../modules/transformer/text_generation.py | 40 +- .../language_modeling/megatron_gpt_model.py | 114 +-- .../common/text_generation_strategy.py | 38 +- .../modules/common/text_generation_utils.py | 169 ++-- .../common/transformer/text_generation.py | 35 +- .../nlp/pipelines/text_generation_pipeline.py | 731 +++++------------- nemo/core/classes/inference_pipeline.py | 181 +++++ nemo/core/classes/modelPT.py | 13 +- 10 files changed, 627 insertions(+), 725 deletions(-) create mode 100644 nemo/core/classes/inference_pipeline.py diff --git a/gpt_infer.yaml b/gpt_infer.yaml index 98bd9cc3cdd4..81fc87e8bff6 100644 --- a/gpt_infer.yaml +++ b/gpt_infer.yaml @@ -10,7 +10,6 @@ inference: min_tokens_to_generate: 1 # The minimum length of the sequence to be generated. compute_logprob: False # a flag used to compute logprob of all the input text, a very special case of running inference, default False - trainer: devices: 1 num_nodes: 1 @@ -18,26 +17,4 @@ trainer: logger: False # logger provided by exp_manager precision: 16 # 16, 32, or bf16 -tensor_model_parallel_size: -1 -pipeline_model_parallel_size: -1 -pipeline_model_parallel_split_rank: -1 # used for encoder and decoder model (0 for others) -gpt_model_file: /models/gpt2b -checkpoint_dir: null # checkpoint file dir. This is used to load the PTL checkpoint generated during the GPT training -checkpoint_name: null # PTL checkpoint file name, only used for PTL checkpoint loading -hparams_file: null # model configuration file, only used for PTL checkpoint loading -prompts: # prompts for GPT inference - - "Q: How are you?" - - "Q: How big is the universe?" -server: False # whether launch the API server -port: 5555 # the port number for the inference server -web_server: False # whether launch the web inference server -share: False # whether create a public URL -username: test # user name for web client -password: test2 # password for web client -web_port: 9889 # the port number of the web server -chat: False # use the chat interface -chatbot_config: - value: False # whether to inject the value attributes - user: User - assistant: Assistant - system: "A chat between a curious human and an artificial intelligence assistant. The assistant gives helpful, detailed, and polite answers to the human's questions.\n\n" +model_path: /models/gpt2b \ No newline at end of file diff --git a/mvp_gpt_infer.py b/mvp_gpt_infer.py index 0aa905cc90ca..d565f4427607 100644 --- a/mvp_gpt_infer.py +++ b/mvp_gpt_infer.py @@ -1,10 +1,12 @@ from nemo.core.classes.modelPT import ModelPT + def main(): - model = ModelPT.auto_load("/models/gpt2b", trainer_args={"devices": 1, "num_nodes": 1, "accelerator": "gpu", "logger": False, "precision": 16}) + model = ModelPT.load_for_inference("gpt_infer.yaml") - output = model.generate_text(["Deep learning is"], end_strings=["."]) + output = model.generate_text(["Deep learning is"], end_strings=["."], tokens_to_generate=50) print(output) + if __name__ == "__main__": main() diff --git a/nemo/collections/asr/modules/transformer/text_generation.py b/nemo/collections/asr/modules/transformer/text_generation.py index 76bb62b33a0e..f853403b6dea 100644 --- a/nemo/collections/asr/modules/transformer/text_generation.py +++ b/nemo/collections/asr/modules/transformer/text_generation.py @@ -102,24 +102,21 @@ def generate( raise NotImplementedError("please implement this method") def generate_text( - self, - prompts: List[str], - max_length: int = 10, - min_length: int = 1, - use_greedy: bool = False, - temperature: float = 0.5, - top_k: int = 1, - top_p: float = 1.0, - repetition_penalty: float = 1.0, - add_BOS: bool = False, - all_probs: bool = False, - compute_logprob: bool = True, - end_strings: List[str] = [], + self, + prompts: List[str], + max_length: int = 10, + min_length: int = 1, + use_greedy: bool = False, + temperature: float = 0.5, + top_k: int = 1, + top_p: float = 1.0, + repetition_penalty: float = 1.0, + add_BOS: bool = False, + all_probs: bool = False, + compute_logprob: bool = True, + end_strings: List[str] = [], ) -> OutputType: - length_param: LengthParam = { - "max_length": max_length, - "min_length": min_length - } + length_param: LengthParam = {"max_length": max_length, "min_length": min_length} sample_param: SamplingParam = { "use_greedy": use_greedy, @@ -129,12 +126,9 @@ def generate_text( "repetition_penalty": repetition_penalty, "add_BOS": add_BOS, "all_probs": all_probs, - "compute_logprob": compute_logprob + "compute_logprob": compute_logprob, } return self.generate( - prompts, - length_params=length_param, - sampling_params=sample_param, - end_strings=end_strings, - ) \ No newline at end of file + prompts, length_params=length_param, sampling_params=sample_param, end_strings=end_strings, + ) diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index 93d0ed541848..e0257067debe 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -19,15 +19,13 @@ from functools import partial from typing import Any, Dict, Iterator, List, Optional, Union -import numpy as np import torch -from omegaconf.dictconfig import DictConfig from omegaconf import DictConfig, OmegaConf, open_dict +from omegaconf.dictconfig import DictConfig from pytorch_lightning.accelerators import CPUAccelerator -from pytorch_lightning.trainer.trainer import Trainer from pytorch_lightning.plugins.environments import LightningEnvironment -from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy, NLPSaveRestoreConnector, SaveRestoreConnector -from nemo.core.classes import ModelPT +from pytorch_lightning.trainer.trainer import Trainer +from typing_extensions import override from nemo.collections.nlp.data.language_modeling.megatron.data_samplers import ( MegatronPretrainingRandomSampler, @@ -57,9 +55,18 @@ SamplingParam, TextGeneration, ) +from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy, NLPSaveRestoreConnector from nemo.collections.nlp.parts.utils_funcs import get_last_rank +from nemo.collections.nlp.pipelines.text_generation_pipeline import ( + TextGenerationPipeline, + TextGenerationStage, + TextGenerattionPostProcStage, + TextGenerattionPreProcStage, + load_tokenizer, +) from nemo.core.classes import Exportable from nemo.core.classes.common import PretrainedModelInfo +from nemo.core.classes.inference_pipeline import InferencePipeline, InferencePipelineFactory, PipelineStageType from nemo.core.neural_types import ChannelType, NeuralType from nemo.utils import logging @@ -189,7 +196,34 @@ def output_names(self) -> List[str]: return ['logits'] -class MegatronGPTModel(MegatronBaseModel, TextGeneration): +class MegatronGPTTextGenerationInferencePipeline(TextGenerationPipeline): + def load_nemo_pipeline(self, parts: Optional[List[Union[str, PipelineStageType]]] = None): + cfg = self.inference_config + if parts is None: + return + tokenizer = None + if "preprocessor" in parts or "postprocessor" in parts or PipelineStageType.NEMO_PROC in parts: + tokenizer_cfg = self.model_config.tokenizer + with open_dict(tokenizer_cfg): + # TODO pass model_path field name + if tokenizer_cfg.model is not None and tokenizer_cfg.model.startswith("nemo:"): + tokenizer_cfg.model = os.path.join(cfg.model_path, tokenizer_cfg.model.split(":", 1)[1]) + if tokenizer_cfg.vocab_file is not None and tokenizer_cfg.vocab_file.startswith("nemo:"): + tokenizer_cfg.vocab_file = os.path.join(cfg.model_path, tokenizer_cfg.vocab_file.split(":", 1)[1]) + if tokenizer_cfg.merge_file is not None and tokenizer_cfg.merge_file.startswith("nemo:"): + tokenizer_cfg.merge_file = os.path.join(cfg.model_path, tokenizer_cfg.merge_file.split(":", 1)[1]) + + tokenizer = load_tokenizer(tokenizer_cfg) + if "preprocessor" in parts or PipelineStageType.NEMO_PROC in parts: + self.set_stage_exec("preprocessor", TextGenerattionPreProcStage(tokenizer, MegatronGPTModel)) + if "postprocessor" in parts or PipelineStageType.NEMO_PROC in parts: + self.set_stage_exec("postprocessor", TextGenerattionPostProcStage(tokenizer)) + if "text_generation" in parts or PipelineStageType.MULTI_STEP_NNET in parts: + model = MegatronGPTModel.load_for_inference(self.inference_config) + self.set_stage_exec("text_generation", TextGenerationStage(model)) + + +class MegatronGPTModel(MegatronBaseModel, TextGeneration, InferencePipelineFactory): """ Megatron GPT pretraining """ @@ -1140,7 +1174,9 @@ def dummy(): if length_params is None: length_params = get_default_length_params() - return megatron_gpt_generate(self.cuda(), inputs, self.tokenizer, length_params, sampling_params, end_strings=end_strings) + return megatron_gpt_generate( + self.cuda(), inputs, self.tokenizer, length_params, sampling_params, end_strings=end_strings + ) def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: Optional[int] = None) -> Any: inference_config = self.get_inference_config() @@ -1335,50 +1371,14 @@ def _restore_sequence_parallelism_args(self): for mod in module.modules(): if hasattr(mod, "sequence_parallel"): mod.sequence_parallel = self.last_sequence_parallel - - @classmethod - def auto_load( - cls, - restore_path: str, - trainer_args: Dict = {}, - ): - #cfg = ModelPT.restore_from(restore_path=restore_path, return_config=True) - trainer = Trainer(plugins=[LightningEnvironment()], strategy=NLPDDPStrategy(), **trainer_args) - save_restore_connector = NLPSaveRestoreConnector() - if os.path.isdir(restore_path): - save_restore_connector.model_extracted_dir = restore_path - pretrained_cfg = MegatronGPTModel.restore_from( - restore_path=restore_path, - trainer=trainer, - return_config=True, - save_restore_connector=save_restore_connector, - ) - OmegaConf.set_struct(pretrained_cfg, True) - with open_dict(pretrained_cfg): - pretrained_cfg.sequence_parallel = False - pretrained_cfg.activations_checkpoint_granularity = None - pretrained_cfg.activations_checkpoint_method = None - pretrained_cfg.precision = trainer.precision - if trainer.precision == "16": - pretrained_cfg.megatron_amp_O2 = False - model = MegatronGPTModel.restore_from( - restore_path=restore_path, - trainer=trainer, - override_config_path=pretrained_cfg, - save_restore_connector=save_restore_connector, - map_location=f'cuda:{trainer.local_rank}', - ) - - return model - @classmethod def load_for_inference(cls, config: Union[str, DictConfig]): if isinstance(config, str): cfg = OmegaConf.load(config) else: cfg = config - + trainer = Trainer(plugins=[LightningEnvironment()], strategy=NLPDDPStrategy(), **cfg.trainer) model_path = cfg.model_path save_restore_connector = NLPSaveRestoreConnector() @@ -1406,4 +1406,28 @@ def load_for_inference(cls, config: Union[str, DictConfig]): save_restore_connector=save_restore_connector, map_location=f'cuda:{trainer.local_rank}', # map_location is needed for converted models ) - return model \ No newline at end of file + + if parallel_state.is_unitialized(): + + def dummy(): + return + + if trainer.strategy.launcher is not None: + trainer.strategy.launcher.launch(dummy, trainer=trainer) + trainer.strategy.setup_environment() + + if model.cfg.get('transformer_engine', False): + model.setup_transformer_engine_tp_groups() + return model + + @override + @classmethod + def inference_pipeline( + cls, + task_name: Optional[str] = None, + inference_config: Optional[DictConfig] = None, + model_config: Optional[DictConfig] = None, + ) -> InferencePipeline: + if task_name != "text_completion": + raise NotImplementedError(f"No pipeline for task {task_name}") + return MegatronGPTTextGenerationInferencePipeline(inference_config, model_config) diff --git a/nemo/collections/nlp/modules/common/text_generation_strategy.py b/nemo/collections/nlp/modules/common/text_generation_strategy.py index 41d6d1901941..fd01ca036a36 100644 --- a/nemo/collections/nlp/modules/common/text_generation_strategy.py +++ b/nemo/collections/nlp/modules/common/text_generation_strategy.py @@ -77,7 +77,7 @@ def tokenize_batch(self, sentences, max_len, add_BOS): Tuple[torch.Tensor], the tokenized and padded torch tensor and the token context length tensor. """ return self._tokenize_batch(self.model.tokenizer, sentences, max_len, add_BOS) - + @classmethod def _tokenize_batch(cls, tokenizer, sentences, max_len, add_BOS): """ @@ -341,6 +341,42 @@ def post_process(self, tokens: torch.Tensor, new_tokens: torch.Tensor, context_l tokens[:, :context_length][(tokens[:, :context_length] >= pseudo_token_ids_start)] = tokenizer.unk_id +def model_static_inference_strategy_dispatcher(model): + from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel + from nemo.collections.nlp.models.language_modeling.megatron_gpt_prompt_learning_model import ( + MegatronGPTPromptLearningModel, + ) + from nemo.collections.nlp.models.language_modeling.megatron_retrieval_model import MegatronRetrievalModel + from nemo.collections.nlp.modules.common.retro_inference_strategies import ( + RetroFileQAModelTextGenerationStrategy, + RetroModelTextGenerationStrategy, + RetroQAModelTextGenerationStrategy, + ) + + model_cls = model + if not isinstance(model_cls, type): + model_cls = type(model) + + if issubclass(model_cls, MegatronGPTPromptLearningModel): + return PromptLearningModelTextGenerationStrategy + elif issubclass(model_cls, MegatronGPTModel): + return GPTModelTextGenerationStrategy + elif issubclass(model_cls, MegatronRetrievalModel): + strategy_name = args['strategy'] + if strategy_name == 'RetroModelTextGenerationStrategy': + return RetroModelTextGenerationStrategy + elif strategy_name == 'RetroQAModelTextGenerationStrategy': + return RetroQAModelTextGenerationStrategy + elif strategy_name == 'RetroFileQAModelTextGenerationStrategy': + return RetroFileQAModelTextGenerationStrategy + else: + raise ValueError(f'{strategy_name} is not supported for inference') + else: + raise ValueError(f'{model} is not supported for inference') + + # Should call GPTModel or Megatron Retrieval Model's forward method + + def model_inference_strategy_dispatcher(model, **args): from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel from nemo.collections.nlp.models.language_modeling.megatron_gpt_prompt_learning_model import ( diff --git a/nemo/collections/nlp/modules/common/text_generation_utils.py b/nemo/collections/nlp/modules/common/text_generation_utils.py index 14b55c6a7be8..66f6c737015b 100644 --- a/nemo/collections/nlp/modules/common/text_generation_utils.py +++ b/nemo/collections/nlp/modules/common/text_generation_utils.py @@ -481,10 +481,63 @@ def generate( end_strings=['<|endoftext|>'], **strategy_args, ) -> OutputType: + if 'strategy' in strategy_args: + inference_strategy = strategy_args['strategy'] + else: + inference_strategy = model_inference_strategy_dispatcher(model, **strategy_args) + tokenizer = model.tokenizer + context_tokens_tensor = None + context_length_tensor = None + if torch.distributed.get_rank() == get_model_parallel_src_rank(): + if isinstance(inputs, tuple): + context_tokens_tensor, context_length_tensor = inputs + else: + context_tokens_tensor, context_length_tensor = inference_strategy.tokenize_batch( + inputs, tokens_to_generate, add_BOS + ) + output = generate_output_ids( + model, + inference_strategy, + (context_tokens_tensor, context_length_tensor), + tokens_to_generate, + all_probs, + temperature, + top_k, + top_p, + greedy, + compute_attention_mask, + compute_logprob, + repetition_penalty, + min_tokens_to_generate, + end_strings, + ) + if output is not None: + decode_tokens, output_logits, full_logits = output + return postprocess_output_ids( + decode_tokens, output_logits, full_logits, tokenizer, inference_strategy.post_generation_process + ) + + +def generate_output_ids( + model, + inference_strategy, + inputs, + tokens_to_generate=0, + all_probs=False, + temperature=1.0, + top_k=0, + top_p=0.0, + greedy=False, + compute_attention_mask=True, + compute_logprob=False, + repetition_penalty=1.0, + min_tokens_to_generate=0, + end_strings=['<|endoftext|>'], +): """ Args: model (NLPModel): text generative model - inputs (Union[tuple, List[str]]): if it is a tuple, it is assumed to be (context_tokens_tensor, context_length_tensor). Otherwise it it a list of prompt text strings + inputs (context_tokens_tensor, context_length_tensor) tokens_to_generate (int): The maximum length of the tokens to be generated. all_probs (bool): Return the log prob for all the tokens temperature (float): sampling temperature @@ -505,18 +558,8 @@ def generate( token_ids: List[Tensor], output sentence token ids offsets: List[List[int]] # list of tokens start positions in text """ - if 'strategy' in strategy_args: - inference_strategy = strategy_args['strategy'] - else: - inference_strategy = model_inference_strategy_dispatcher(model, **strategy_args) - tokenizer = model.tokenizer if torch.distributed.get_rank() == get_model_parallel_src_rank(): - if isinstance(inputs, tuple): - context_tokens_tensor, context_length_tensor = inputs - else: - context_tokens_tensor, context_length_tensor = inference_strategy.tokenize_batch( - inputs, tokens_to_generate, add_BOS - ) + context_tokens_tensor, context_length_tensor = inputs send_generate_info( context_tokens_tensor, @@ -565,6 +608,12 @@ def generate( min_tokens_to_generate=min_tokens_to_generate, end_strings=end_strings, ) + return output + + +def postprocess_output_ids( + decode_tokens, output_logits, full_logits, tokenizer, post_generation_process=None +) -> OutputType: special_tokens = set() if hasattr(tokenizer, 'pad_token') and tokenizer.pad_token is not None: special_tokens.add(tokenizer.pad_token) @@ -580,54 +629,54 @@ def generate( special_tokens.add(tokenizer.sep_token) if hasattr(tokenizer, 'mask_token') and tokenizer.mask_token is not None: special_tokens.add(tokenizer.mask_token) - if output is not None: - decode_tokens, output_logits, full_logits = output - resp_sentences = [] - resp_sentences_seg = [] - - decode_tokens = decode_tokens.cpu().numpy().tolist() - for decode_token in decode_tokens: - sentence = tokenizer.ids_to_text(decode_token) - resp_sentences.append(sentence) - if not isinstance(tokenizer, TabularTokenizer): - words = [] - for token in decode_token: - if not isinstance(token, Iterable): - token = [token] - word = tokenizer.ids_to_tokens(token) - if isinstance(word, Iterable): - word = word[0] - if hasattr(tokenizer.tokenizer, 'byte_decoder'): - word = bytearray([tokenizer.tokenizer.byte_decoder[c] for c in word]).decode( - 'utf-8', errors='replace' - ) - words.append(word) - resp_sentences_seg.append(words) - else: - words = tokenizer.text_to_tokens(sentence) - resp_sentences_seg.append(words) - - # offsets calculation - all_offsets = [] - for item in resp_sentences_seg: - offsets = [0] - for index, token in enumerate(item): - if index != len(item) - 1: - if token in special_tokens: - offsets.append(offsets[-1]) - else: - offsets.append(len(token) + offsets[-1]) - all_offsets.append(offsets) - - output = {} - output['sentences'] = resp_sentences - output['tokens'] = resp_sentences_seg - output['logprob'] = output_logits - output['full_logprob'] = full_logits - output['token_ids'] = decode_tokens - output['offsets'] = all_offsets - output = inference_strategy.post_generation_process(output) - return output + # decode_tokens, output_logits, full_logits = output + resp_sentences = [] + resp_sentences_seg = [] + + decode_tokens = decode_tokens.cpu().numpy().tolist() + for decode_token in decode_tokens: + sentence = tokenizer.ids_to_text(decode_token) + resp_sentences.append(sentence) + if not isinstance(tokenizer, TabularTokenizer): + words = [] + for token in decode_token: + if not isinstance(token, Iterable): + token = [token] + word = tokenizer.ids_to_tokens(token) + if isinstance(word, Iterable): + word = word[0] + if hasattr(tokenizer.tokenizer, 'byte_decoder'): + word = bytearray([tokenizer.tokenizer.byte_decoder[c] for c in word]).decode( + 'utf-8', errors='replace' + ) + words.append(word) + resp_sentences_seg.append(words) + else: + words = tokenizer.text_to_tokens(sentence) + resp_sentences_seg.append(words) + + # offsets calculation + all_offsets = [] + for item in resp_sentences_seg: + offsets = [0] + for index, token in enumerate(item): + if index != len(item) - 1: + if token in special_tokens: + offsets.append(offsets[-1]) + else: + offsets.append(len(token) + offsets[-1]) + all_offsets.append(offsets) + + output = {} + output['sentences'] = resp_sentences + output['tokens'] = resp_sentences_seg + output['logprob'] = output_logits + output['full_logprob'] = full_logits + output['token_ids'] = decode_tokens + output['offsets'] = all_offsets + if post_generation_process is not None: + output = post_generation_process(output) + return output def switch(val1, val2, boolean): diff --git a/nemo/collections/nlp/modules/common/transformer/text_generation.py b/nemo/collections/nlp/modules/common/transformer/text_generation.py index 9c37a150087f..564e0083fcae 100644 --- a/nemo/collections/nlp/modules/common/transformer/text_generation.py +++ b/nemo/collections/nlp/modules/common/transformer/text_generation.py @@ -102,18 +102,18 @@ def generate( raise NotImplementedError("please implement this method") def generate_text( - self, - prompts: List[str], - tokens_to_generate: int = 10, - min_tokens_to_generate: int = 1, - top_k: int = 1, - top_p: float = 1.0, - temperature: float = 0.5, - repetition_penalty: float = 1.0, - add_BOS: bool = False, # should come form model I think - all_probs: bool = False, - compute_logprob: bool = False, - end_strings: List[str] = [], + self, + prompts: Union[List[str], Tuple[Tensor, Tensor], List[dict]], + tokens_to_generate: int = 10, + min_tokens_to_generate: int = 1, + top_k: int = 1, + top_p: float = 1.0, + temperature: float = 0.5, + repetition_penalty: float = 1.0, + add_BOS: bool = False, # should come form model I think + all_probs: bool = False, + compute_logprob: bool = False, + end_strings: List[str] = [], ) -> OutputType: sampling_params: SamplingParam = { "use_greedy": False, @@ -129,13 +129,4 @@ def generate_text( "min_length": min_tokens_to_generate, "max_length": tokens_to_generate, } - return self.generate( - prompts, - length_params, - sampling_params=sampling_params, - end_strings=end_strings, - ) - - -class TextGenerationPipeline(TextGeneration): - pass \ No newline at end of file + return self.generate(prompts, length_params, sampling_params=sampling_params, end_strings=end_strings,) diff --git a/nemo/collections/nlp/pipelines/text_generation_pipeline.py b/nemo/collections/nlp/pipelines/text_generation_pipeline.py index 22d34a330498..25d376fc59bf 100644 --- a/nemo/collections/nlp/pipelines/text_generation_pipeline.py +++ b/nemo/collections/nlp/pipelines/text_generation_pipeline.py @@ -1,30 +1,27 @@ from collections import OrderedDict -import sys -import os -from typing import Any, Callable, Dict, Iterable, Optional, List, Tuple, Generator, TypedDict, Union -from typing_extensions import override +from typing import Dict, Iterable, List, Optional, Tuple, Union -import frozenlist -from nemo.collections.common.tokenizers.tabular_tokenizer import TabularTokenizer -from nemo.collections.nlp.modules.common.text_generation_strategy import GPTModelTextGenerationStrategy, model_inference_strategy_dispatcher -from nemo.core.classes.common import Typing, typecheck -from nemo.core.classes.modelPT import ModelPT -from nemo.core.classes.module import NeuralModule -from omegaconf import DictConfig, OmegaConf, open_dict -from pytorch_lightning import LightningModule, Trainer -from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel -from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy, NLPSaveRestoreConnector, SaveRestoreConnector -from nemo.collections.nlp.modules.common.transformer.text_generation import LengthParam, SamplingParam -from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer -from nemo.collections.nlp.modules.common.text_generation_utils import get_model_parallel_src_rank, receive_generate_info, send_generate_info, synced_generate -import sentencepiece -from pytorch_lightning.plugins.environments import LightningEnvironment - -from nemo.core.neural_types import NeuralType, ChannelType, LengthsType, IntType, FloatType, LogprobsType import torch -import torch.nn.functional as F +from omegaconf import DictConfig, OmegaConf +from pytorch_lightning import Trainer -from nemo.core.neural_types.elements import StringType, BoolType +from nemo.collections.common.tokenizers.tabular_tokenizer import TabularTokenizer +from nemo.collections.nlp.modules.common.text_generation_strategy import ( + model_inference_strategy_dispatcher, + model_static_inference_strategy_dispatcher, +) +from nemo.collections.nlp.modules.common.text_generation_utils import generate_output_ids +from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer +from nemo.core.classes.common import typecheck +from nemo.core.classes.inference_pipeline import ( + InferencePipeline, + PipelineStage, + PipelineStageType, + load_inference_pipeline, +) +from nemo.core.classes.modelPT import ModelPT +from nemo.core.neural_types import ChannelType, FloatType, IntType, LengthsType, LogprobsType, NeuralType +from nemo.core.neural_types.elements import BoolType, StringType try: from megatron.core import parallel_state @@ -36,20 +33,12 @@ HAVE_MEGATRON_CORE = False -class InferenceModule(NeuralModule): - def infer_input_types(self) -> Dict[str, NeuralType]: - pass - - def infer_output_types(self) -> Dict[str, NeuralType]: - pass - - def load_tokenizer(cfg: DictConfig): if hasattr(cfg, "sentencepiece_legacy"): legacy = cfg.sentencepiece_legacy else: legacy = True if cfg.library == 'sentencepiece' else False - + tokenizer = get_nmt_tokenizer( library=cfg.library, model_name=cfg.type, @@ -68,36 +57,74 @@ def load_tokenizer(cfg: DictConfig): return tokenizer -class Gpt3TextGenerattionPreProc(InferenceModule): - def __init__(self, tokenizer): - super().__init__() +class TextGenerationStage: + def __init__(self, model): + self.model = model + self.strategy = model_inference_strategy_dispatcher(self.model) + + def __call__( + self, + input_ids: torch.Tensor, + input_id_lens: torch.Tensor, + tokens_to_generate: int = 2000, + all_probs: bool = False, + temperature: float = 0.5, + greedy: bool = False, + top_k: int = 1, + top_p: float = 1.0, + repetition_penalty: float = 1.0, + min_tokens_to_generate: int = 1, + compute_logprob: bool = False, + end_strings: List[str] = ['<|endoftext|>'], + ): + if greedy: + top_k = 1 + return generate_output_ids( + self.model, + self.strategy, + inputs=(input_ids, input_id_lens), + tokens_to_generate=tokens_to_generate, + all_probs=all_probs, + temperature=temperature, + greedy=False, + top_k=top_k, + top_p=top_p, + repetition_penalty=repetition_penalty, + min_tokens_to_generate=min_tokens_to_generate, + compute_logprob=compute_logprob, + end_strings=end_strings, + ) + + +class TextGenerattionPreProcStage: + def __init__(self, tokenizer, model_cls): self.tokenizer = tokenizer - - def forward(self, prompts: List[str], tokens_to_generate: int = 50, add_BOS: bool = False) -> Tuple[torch.Tensor, torch.Tensor]: - return GPTModelTextGenerationStrategy._tokenize_batch(self.tokenizer, prompts, tokens_to_generate, add_BOS) + self._model_cls = model_cls + self._tokenize_batch = model_static_inference_strategy_dispatcher(self._model_cls)._tokenize_batch + self.__cur_batch = None + self.__batch_start = { + "prompts": [], + "tokens_to_generate": 0, + "add_BOS": None, + } -class TextGenerattionOutputType(TypedDict): - text: List[str] # output text - tokens: List[List[str]] # output sentences borken into tokens - logprob: List[List[float]] # log prob of generated tokens - full_logprob: List[List[float]] # log prob of all the tokens in the vocab - token_ids: List[List[int]] # output sentence token ids - offsets: List[List[int]] # list of tokens start positions in text + def __call__( + self, prompts: List[str], tokens_to_generate: int = 50, add_BOS: bool = False + ) -> Tuple[torch.Tensor, torch.Tensor]: + return self._tokenize_batch(self.tokenizer, prompts, tokens_to_generate, add_BOS) -class Gpt3TextGenerattionPostProc(InferenceModule): +class TextGenerattionPostProcStage: def __init__(self, tokenizer, infer_strat_post_proc_fn=None): - super().__init__() self.tokenizer = tokenizer self.infer_strat_post_proc_fn = infer_strat_post_proc_fn if self.infer_strat_post_proc_fn is None: - self.infer_strat_post_proc_fn = lambda x : x + self.infer_strat_post_proc_fn = lambda x: x - def forward( - self, - output_ids: torch.Tensor, - ) -> TextGenerattionOutputType: + def __call__( + self, output_ids: torch.Tensor, + ): tokenizer = self.tokenizer special_tokens = set() if hasattr(tokenizer, 'pad_token') and tokenizer.pad_token is not None: @@ -150,399 +177,128 @@ def forward( else: offsets.append(len(token) + offsets[-1]) all_offsets.append(offsets) - + output = {} output['text'] = resp_sentences output['tokens'] = resp_sentences_seg - #output['logprob'] = output_logits - #output['full_logprob'] = full_logits + # output['logprob'] = output_logits + # output['full_logprob'] = full_logits output['token_ids'] = output_ids output['offsets'] = all_offsets output = self.infer_strat_post_proc_fn(output) return output['text'] - -def load_gpt3_model(cfg: DictConfig): - trainer = Trainer(plugins=[LightningEnvironment()], strategy=NLPDDPStrategy(), **cfg.trainer) - if ( - cfg.tensor_model_parallel_size < 0 - or cfg.pipeline_model_parallel_size < 0 - or cfg.get('pipeline_model_parallel_split_rank', -1) < 0 - ): - if os.path.isdir(cfg.gpt_model_file): - model_config = OmegaConf.load(os.path.join(cfg.gpt_model_file, 'model_config.yaml')) - else: - model_config = MegatronGPTModel.restore_from( - restore_path=cfg.gpt_model_file, trainer=trainer, return_config=True, +class TextGenerationPipeline(InferencePipeline): + def __init__(self, inference_config: Optional[DictConfig] = None, model_config: Optional[DictConfig] = None): + self._inference_config: DictConfig = inference_config + self._model_config: DictConfig = model_config + self._stages = [] + self._stages.append( + PipelineStage( + "preprocessor", + input_types=OrderedDict( + { + "prompts": [NeuralType(None, StringType())], + "tokens_to_generate": NeuralType(None, IntType(), optional=True), + "add_BOS": NeuralType(None, IntType(), optional=True), + } + ), + output_types=OrderedDict( + { + "input_ids": NeuralType(('B', 'T'), ChannelType()), + "input_id_lens": NeuralType(('B',), LengthsType()), + } + ), + stage_type=PipelineStageType.NEMO_PROC, ) - - with open_dict(cfg): - cfg.tensor_model_parallel_size = model_config.get('tensor_model_parallel_size', 1) - cfg.pipeline_model_parallel_size = model_config.get('pipeline_model_parallel_size', 1) - cfg.pipeline_model_parallel_split_rank = model_config.get('pipeline_model_parallel_split_rank', 0) - - assert ( - cfg.trainer.devices * cfg.trainer.num_nodes - == cfg.tensor_model_parallel_size * cfg.pipeline_model_parallel_size - ), "devices * num_nodes should equal tensor_model_parallel_size * pipeline_model_parallel_size" - - print('devices', cfg.trainer.devices) - print('nodes', cfg.trainer.num_nodes) - print('tp', cfg.tensor_model_parallel_size) - print('pp', cfg.pipeline_model_parallel_size) - - if cfg.gpt_model_file: - save_restore_connector = NLPSaveRestoreConnector() - if os.path.isdir(cfg.gpt_model_file): - save_restore_connector.model_extracted_dir = cfg.gpt_model_file - - pretrained_cfg = MegatronGPTModel.restore_from( - restore_path=cfg.gpt_model_file, - trainer=trainer, - return_config=True, - save_restore_connector=save_restore_connector, - ) - OmegaConf.set_struct(pretrained_cfg, True) - with open_dict(pretrained_cfg): - pretrained_cfg.sequence_parallel = False - pretrained_cfg.activations_checkpoint_granularity = None - pretrained_cfg.activations_checkpoint_method = None - pretrained_cfg.precision = trainer.precision - if trainer.precision == "16": - pretrained_cfg.megatron_amp_O2 = False - model = MegatronGPTModel.restore_from( - restore_path=cfg.gpt_model_file, - trainer=trainer, - override_config_path=pretrained_cfg, - save_restore_connector=save_restore_connector, - map_location=f'cuda:{trainer.local_rank}', ) - else: - raise ValueError("need at a nemo file") - - model.freeze() - - # Have to turn off activations_checkpoint_method for inference - try: - model.model.language_model.encoder.activations_checkpoint_method = None - except AttributeError: - pass - return model.cuda() - - -def generate( - model, - inputs=None, - tokens_to_generate=0, - all_probs=False, - temperature=1.0, - add_BOS=False, - top_k=0, - top_p=0.0, - greedy=False, - compute_attention_mask=True, - compute_logprob=False, - repetition_penalty=1.0, - min_tokens_to_generate=0, - end_strings=['<|endoftext|>'], - **strategy_args, -): - """ - Args: - model (NLPModel): text generative model - inputs (Union[tuple, List[str]]): if it is a tuple, it is assumed to be (context_tokens_tensor, context_length_tensor). Otherwise it it a list of prompt text strings - tokens_to_generate (int): The maximum length of the tokens to be generated. - all_probs (bool): Return the log prob for all the tokens - temperature (float): sampling temperature - add_BOS (bool): add the bos token at the begining of the prompt - top_k (int): The number of highest probability vocabulary tokens to keep for top-k-filtering. - top_p (float): If set to float < 1, only the most probable tokens with probabilities that add up to top_p or higher are kept for generation. - greedy (bool): Whether or not to use sampling ; use greedy decoding otherwise - repetition_penalty (float): The parameter for repetition penalty. 1.0 means no penalty - min_tokens_to_generate (int): The minimum length of the tokens to be generated - strategy_args, the extra arguments are treated as inference strategy arguments - end_strings, a list of strings to stop generation when they are encountered in the output. - Returns: - OutputType: It generates the output in a dictionary type. It has the following keys: - logprob: List[Tensor], log prob of generated tokens - full_logprob: List[Tensor], log prob of all the tokens in the vocab - token_ids: List[Tensor], output sentence token ids - """ - if 'strategy' in strategy_args: - inference_strategy = strategy_args['strategy'] - else: - inference_strategy = model_inference_strategy_dispatcher(model, **strategy_args) - tokenizer = model.tokenizer - if torch.distributed.get_rank() == get_model_parallel_src_rank(): - if isinstance(inputs, tuple): - context_tokens_tensor, context_length_tensor = inputs - else: - context_tokens_tensor, context_length_tensor = inference_strategy.tokenize_batch( - inputs, tokens_to_generate, add_BOS + self._stages.append( + PipelineStage( + "text_generation", + input_types=OrderedDict( + { + "input_ids": NeuralType(('B', 'T'), ChannelType()), + "input_id_lens": NeuralType(('B',), LengthsType()), + "tokens_to_generate": NeuralType(None, IntType(), optional=True), + "all_probs": NeuralType(None, BoolType(), optional=True), + "temperature": NeuralType(None, FloatType(), optional=True), + "greedy": NeuralType(None, BoolType(), optional=True), + "top_k": NeuralType(None, IntType(), optional=True), + "top_p": NeuralType(None, FloatType(), optional=True), + "repetition_penalty": NeuralType(None, FloatType(), optional=True), + "min_tokens_to_generate": NeuralType(None, FloatType(), optional=True), + "compute_logprob": NeuralType(None, BoolType(), optional=True), + "end_strings": [NeuralType(None, StringType())], + } + ), + output_types=OrderedDict( + { + "output_ids": NeuralType(('B', 'T'), ChannelType()), + "logprob": NeuralType(('B', 'T'), LogprobsType(), optional=True), + "full_logprob": NeuralType(('B', 'T', 'D'), LogprobsType(), optional=True), + } + ), + stage_type=PipelineStageType.MULTI_STEP_NNET, ) - - send_generate_info( - context_tokens_tensor, - context_length_tensor, - tokens_to_generate, - all_probs, - compute_logprob, - temperature, - top_k, - top_p, - greedy, - repetition_penalty, - min_tokens_to_generate, - end_strings, ) - else: - ( - context_length_tensor, - context_tokens_tensor, - tokens_to_generate, - all_probs, - compute_logprob, - temperature, - top_k, - top_p, - greedy, - repetition_penalty, - min_tokens_to_generate, - end_strings, - ) = receive_generate_info() - - output = synced_generate( - model, - inference_strategy, - context_tokens_tensor, - context_length_tensor, - tokens_to_generate, - all_probs, - temperature, - compute_attention_mask=compute_attention_mask, - compute_logprob=compute_logprob, - top_k=top_k, - top_p=top_p, - greedy=greedy, - repetition_penalty=repetition_penalty, - min_tokens_to_generate=min_tokens_to_generate, - end_strings=end_strings, - ) - output_ids, output_logits, full_logits = output - return (output_ids, output_logits, full_logits) - - -# TODO NeMo doesn't support mixed parameter batches (e.g. diff top_k values for each element in batch) -def generate_text( - model: MegatronGPTModel, - trainer: Trainer, - sentences: Union[Tuple[torch.Tensor, torch.Tensor], List[str]], - tokens_to_generate: int = 2000, - all_probs: bool = False, - temperature: float = 0.5, - add_BOS: bool = False, - greedy: bool = False, - top_k: int = 1, - top_p: float = 1.0, - repetition_penalty: float = 1.0, - min_tokens_to_generate: int = 1, - compute_logprob: bool = False, - end_strings: List[str] = [''], -) -> Tuple: - - if parallel_state.is_unitialized(): - - def dummy(): - return - - if trainer.strategy.launcher is not None: - trainer.strategy.launcher.launch(dummy, trainer=trainer) - trainer.strategy.setup_environment() - - if model.cfg.get('transformer_engine', False): - model.setup_transformer_engine_tp_groups() - - if isinstance(sentences, (list, tuple)): - if isinstance(sentences[0], (str, torch.Tensor)): - output = generate( - model.cuda(), - inputs=sentences, - tokens_to_generate=tokens_to_generate, - all_probs=all_probs, - temperature=temperature, - add_BOS=add_BOS, - top_k=top_k, - top_p=top_p, - greedy=greedy, - repetition_penalty=repetition_penalty, - min_tokens_to_generate=min_tokens_to_generate, - compute_logprob=compute_logprob, - end_strings=end_strings, + self._stages.append( + PipelineStage( + "postprocessor", + input_types=OrderedDict({"output_ids": NeuralType(('B', 'T'), ChannelType()),}), + output_types=OrderedDict({"text": [NeuralType(None, StringType())],}), + stage_type=PipelineStageType.NEMO_PROC, ) - return output - elif isinstance(sentences[0], dict): - raise NotImplementedError("json object not implemented") - else: - raise NotImplementedError("unknown type is not implemented") - else: - raise NotImplementedError("unknown type is not implemented") - - -class Gpt3TextGeneration(InferenceModule): - def __init__(self, model): - super().__init__() - self.model = model - - def forward( - self, - input_ids: torch.Tensor, - input_id_lens: torch.Tensor, - tokens_to_generate: int = 2000, - all_probs: bool = False, - temperature: float = 0.5, - add_BOS: bool = False, - greedy: bool = False, - top_k: int = 1, - top_p: float = 1.0, - repetition_penalty: float = 1.0, - min_tokens_to_generate: int = 1, - compute_logprob: bool = False, - end_strings: List[str] = ['<|endoftext|>'], - ): - if greedy: - top_k = 1 - return generate_text( - self.model, - self.model.trainer, - sentences=(input_ids, input_id_lens), - tokens_to_generate=tokens_to_generate, - all_probs=all_probs, - temperature=temperature, - add_BOS=add_BOS, - greedy=False, - top_k=top_k, - top_p=top_p, - repetition_penalty=repetition_penalty, - min_tokens_to_generate=min_tokens_to_generate, - compute_logprob=compute_logprob, - end_strings=end_strings, ) - -class NemoPipelineStage(Typing): - def __init__(self, input_types: Dict[str, NeuralType] = None, output_types: Dict[str, NeuralType] = None, name: str = ""): - self._input_types = input_types - self._output_types = output_types - self._name = name - self._exec_fn = None + @property + def task_name(self) -> str: + return "text_completion" @property - def name(self): - return self._name - + def stages(self) -> List[PipelineStage]: + return self._stages + @property - def input_types(self) -> Optional[Dict[str, NeuralType]]: - return self._input_types - + def inference_config(self) -> DictConfig: + """ + Returns the inference config used to load the model / pipeline + Same config used in load_for_inference + """ + return self._inference_config + @property - def output_types(self) -> Optional[Dict[str, NeuralType]]: - return self._output_types - - def set_exec(self, fn): - self._exec_fn = fn - - #@typecheck - def forward(self, **kwargs): - if self._exec_fn is None: - raise NotImplementedError("set_exec must be called before the forward method is called") - return self._exec_fn(**kwargs) - - def __call__(self, **kwargs): - return self.forward(**kwargs) - - -class NemoPipeline(Typing): - def __init__(self): - self._stages: List[NemoPipeline] = [] - - def add_stage(self, input_types: Dict[str, NeuralType] = None, output_types: Dict[str, NeuralType] = None, name: str = "", index=None): - idx = index - if idx is None: - idx = len(self._stages) - self._stages.insert( - idx, - NemoPipelineStage(input_types=input_types, output_types=output_types, name=name) - ) + def model_config(self) -> DictConfig: + return self._model_config - def register_exec(self, fn: Union[NeuralModule, Callable], stage_name=None, stage_index=None): - if stage_index is not None: - self._stages[stage_index].set_exec(fn) - elif stage_name is not None: - matches = [] - for stage in self._stages: - if stage.name == stage_name: - matches.append(stage) - if len(matches) == 1: - stage = matches[0] - if stage._exec_fn is not None: - raise Exception("exec for stage already set") - matches[0].set_exec(fn) - elif len(matches) > 1: - raise Exception(f"name {stage_name} is ambiguous") - else: - raise Exception("stage not found") - else: - raise NotImplementedError("TODO implement register exec based on stage input / output types") + # def register_default_execs(self): + # tokenizer_cfg = self._model_config.tokenizer + # with open_dict(tokenizer_cfg): + # if tokenizer_cfg.model is not None and tokenizer_cfg.model.startswith("nemo:"): + # tokenizer_cfg.model = os.path.join(cfg.model_path, tokenizer_cfg.model.split(":", 1)[1]) + # if tokenizer_cfg.vocab_file is not None and tokenizer_cfg.vocab_file.startswith("nemo:"): + # tokenizer_cfg.vocab_file = os.path.join(cfg.model_path, tokenizer_cfg.vocab_file.split(":", 1)[1]) + # if tokenizer_cfg.merge_file is not None and tokenizer_cfg.merge_file.startswith("nemo:"): + # tokenizer_cfg.merge_file = os.path.join(cfg.model_path, tokenizer_cfg.merge_file.split(":", 1)[1]) - def register_default_execs(self): - pass - - @property - def stages(self): - # TODO use frozenlist? - return self._stages + # tokenizer = load_tokenizer(tokenizer_cfg) - @property - def input_types(self) -> Optional[Dict[str, NeuralType]]: - if len(self._stages) > 0: - return self._stages[0].input_types - return None + # model = load_gpt3_model(cfg) + + # preprocessor = Gpt3TextGenerattionPreProc(tokenizer) + # text_generator = Gpt3TextGeneration(model) + # postprocessor = Gpt3TextGenerattionPostProc(tokenizer) + + # self.register_exec(preprocessor, stage_name="preprocessor") + # self.register_exec(postprocessor, stage_name="postprocessor") + # self.register_exec(text_generator, stage_name="text_generation") @property - def output_types(self) -> Optional[Dict[str, NeuralType]]: - if len(self._stages) > 0: - return self._stages[-1].output_types - return None - - def forward(self, **kwargs): - raise NotImplementedError() - - def __call__(self, **kwargs): - return self.forward(*kwargs) - - -class TextGenerationPipeline(NemoPipeline): - def __init__(self, inference_config: DictConfig=None, load_defaults=True): - super().__init__() - self.inference_config = inference_config - self.add_stage( - input_types=OrderedDict({ + def input_types(self) -> Optional[Dict[str, NeuralType]]: + return OrderedDict( + { "prompts": [NeuralType(None, StringType())], "tokens_to_generate": NeuralType(None, IntType(), optional=True), - "add_BOS": NeuralType(None, IntType(), optional=True) - }), - output_types=OrderedDict({ - "input_ids": NeuralType(('B', 'T'), ChannelType()), - "input_id_lens": NeuralType(('B',), LengthsType()), - }), - name="preprocessor" - ) - self.add_stage( - input_types=OrderedDict({ - "input_ids": NeuralType(('B', 'T'), ChannelType()), - "input_id_lens": NeuralType(('B',), LengthsType()), - "tokens_to_generate": NeuralType(None, IntType(), optional=True), "all_probs": NeuralType(None, BoolType(), optional=True), "temperature": NeuralType(None, FloatType(), optional=True), "add_BOS": NeuralType(None, FloatType(), optional=True), @@ -553,83 +309,23 @@ def __init__(self, inference_config: DictConfig=None, load_defaults=True): "min_tokens_to_generate": NeuralType(None, FloatType(), optional=True), "compute_logprob": NeuralType(None, BoolType(), optional=True), "end_strings": [NeuralType(None, StringType())], - }), - output_types=OrderedDict({ - "output_ids": NeuralType(('B', 'T'), ChannelType()), - "logprob": NeuralType(('B', 'T'), LogprobsType(), optional=True), - "full_logprob": NeuralType(('B', 'T', 'D'), LogprobsType(), optional=True), - }), - name="text_generation" + } ) - self.add_stage( - input_types=OrderedDict({ - "output_ids": NeuralType(('B', 'T'), ChannelType()), - }), - output_types=OrderedDict({ - "text": [NeuralType(None, StringType())], - }), - name="postprocessor" - ) - - if load_defaults: - self.register_default_execs() - - def register_default_execs(self): - cfg = self.inference_config - if os.path.isdir(cfg.gpt_model_file): - model_config = OmegaConf.load(os.path.join(cfg.gpt_model_file, "model_config.yaml")) - else: - raise Exception("Should be dir") - - tokenizer_cfg = model_config.tokenizer - with open_dict(tokenizer_cfg): - if tokenizer_cfg.model is not None and tokenizer_cfg.model.startswith("nemo:"): - tokenizer_cfg.model = os.path.join(cfg.gpt_model_file, tokenizer_cfg.model.split(":", 1)[1]) - if tokenizer_cfg.vocab_file is not None and tokenizer_cfg.vocab_file.startswith("nemo:"): - tokenizer_cfg.vocab_file = os.path.join(cfg.gpt_model_file, tokenizer_cfg.vocab_file.split(":", 1)[1]) - if tokenizer_cfg.merge_file is not None and tokenizer_cfg.merge_file.startswith("nemo:"): - tokenizer_cfg.merge_file = os.path.join(cfg.gpt_model_file, tokenizer_cfg.merge_file.split(":", 1)[1]) - - tokenizer = load_tokenizer(tokenizer_cfg) - - model = load_gpt3_model(cfg) - - preprocessor = Gpt3TextGenerattionPreProc(tokenizer) - text_generator = Gpt3TextGeneration(model) - postprocessor = Gpt3TextGenerattionPostProc(tokenizer) - - self.register_exec(preprocessor, stage_name="preprocessor") - self.register_exec(postprocessor, stage_name="postprocessor") - self.register_exec(text_generator, stage_name="text_generation") - @property - def input_types(self) -> Optional[Dict[str, NeuralType]]: - return OrderedDict({ - "prompts": [NeuralType(None, StringType())], - "tokens_to_generate": NeuralType(None, IntType(), optional=True), - "all_probs": NeuralType(None, BoolType(), optional=True), - "temperature": NeuralType(None, FloatType(), optional=True), - "add_BOS": NeuralType(None, FloatType(), optional=True), - "greedy": NeuralType(None, BoolType(), optional=True), - "top_k": NeuralType(None, IntType(), optional=True), - "top_p": NeuralType(None, FloatType(), optional=True), - "repetition_penalty": NeuralType(None, FloatType(), optional=True), - "min_tokens_to_generate": NeuralType(None, FloatType(), optional=True), - "compute_logprob": NeuralType(None, BoolType(), optional=True), - "end_strings": [NeuralType(None, StringType())], - }) - @property def output_types(self) -> Optional[Dict[str, NeuralType]]: - return OrderedDict({ - "text": [NeuralType(None, StringType())], - "output_ids": NeuralType(('B', 'T'), ChannelType()), - "logprob": NeuralType(('B', 'T'), LogprobsType(), optional=True), - "full_logprob": NeuralType(('B', 'T', 'D'), LogprobsType(), optional=True), - }) + return OrderedDict( + { + "text": [NeuralType(None, StringType())], + "output_ids": NeuralType(('B', 'T'), ChannelType()), + "logprob": NeuralType(('B', 'T'), LogprobsType(), optional=True), + "full_logprob": NeuralType(('B', 'T', 'D'), LogprobsType(), optional=True), + } + ) @typecheck() - def forward(self, + def execute( + self, prompts=None, tokens_to_generate=50, all_probs=False, @@ -641,7 +337,7 @@ def forward(self, repetition_penalty=1.0, min_tokens_to_generate=1, compute_logprob=True, - end_strings=["<|endoftext|>"] + end_strings=["<|endoftext|>"], ): preprocessor = self.stages[0] text_generator = self.stages[1] @@ -666,61 +362,18 @@ def forward(self, return text, output_ids, output_logits, full_logits -def old_main(): - gpt_model_file = "/models/gpt2b" - infer_cfg_file = "gpt_infer.yaml" - - if os.path.isdir(gpt_model_file): - model_config = OmegaConf.load(os.path.join(gpt_model_file, 'model_config.yaml')) - else: - raise Exception("Should be dir") - - tokenizer_cfg = model_config.tokenizer - with open_dict(tokenizer_cfg): - if tokenizer_cfg.model is not None and tokenizer_cfg.model.startswith("nemo:"): - tokenizer_cfg.model = os.path.join(gpt_model_file, tokenizer_cfg.model.split(":", 1)[1]) - if tokenizer_cfg.vocab_file is not None and tokenizer_cfg.vocab_file.startswith("nemo:"): - tokenizer_cfg.vocab_file = os.path.join(gpt_model_file, tokenizer_cfg.vocab_file.split(":", 1)[1]) - if tokenizer_cfg.merge_file is not None and tokenizer_cfg.merge_file.startswith("nemo:"): - tokenizer_cfg.merge_file = os.path.join(gpt_model_file, tokenizer_cfg.merge_file.split(":", 1)[1]) - - tokenizer = load_tokenizer(tokenizer_cfg) - - cfg = OmegaConf.load(infer_cfg_file) - model = load_gpt3_model(cfg) - - preprocessor = Gpt3TextGenerattionPreProc(tokenizer) - text_generator = Gpt3TextGeneration(model) - postprocessor = Gpt3TextGenerattionPostProc(tokenizer) - input_ids, lens = preprocessor(["Deep learning is", "I wish i could"]) - print(input_ids) - print(lens) - print("======================") - - output_ids, output_logits, full_logits = text_generator(input_ids.cuda(), lens.cuda(), tokens_to_generate=50, compute_logprob=True, all_probs=True) - print(output_ids) - print(output_logits) - print("===============") - - text = postprocessor(input_ids) - print(text) - - def main(): infer_cfg_file = "gpt_infer.yaml" - cfg = OmegaConf.load(infer_cfg_file) + text_generation_pipe = load_inference_pipeline(infer_cfg_file, task_name="text_completion") + text_generation_pipe.load_nemo_pipeline(text_generation_pipe.stage_names) - text_generation_pipe = TextGenerationPipeline(inference_config=cfg, load_defaults=True) prompts = ["Deep learning is", "Is python a good programming language?"] - text, output_ids, logits, all_logits = text_generation_pipe.forward( - prompts=prompts, - tokens_to_generate=cfg.inference.tokens_to_generate, - top_k=cfg.inference.top_k, - greedy=False, - end_strings=["."] + text, output_ids, logits, all_logits = text_generation_pipe.execute( + prompts=prompts, tokens_to_generate=50, top_k=1, greedy=False, end_strings=["."] ) print(text) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/nemo/core/classes/inference_pipeline.py b/nemo/core/classes/inference_pipeline.py new file mode 100644 index 000000000000..284a217e8793 --- /dev/null +++ b/nemo/core/classes/inference_pipeline.py @@ -0,0 +1,181 @@ +import importlib +from enum import Enum +from functools import partial +from typing import Callable, Dict, List, Optional, Union + +import torch +from omegaconf import DictConfig, OmegaConf +from pytorch_lightning import Trainer + +from nemo.core.classes.common import Typing, typecheck +from nemo.core.classes.modelPT import ModelPT +from nemo.core.neural_types.neural_type import NeuralType + + +class PipelineStageType(Enum): + NEMO_PROC = 1 + SINGLE_STEP_NNET = 2 + MULTI_STEP_NNET = 3 + + +class PipelineStage(Typing): + def __init__( + self, + name: str, + input_types: Dict[str, NeuralType], + output_types: Dict[str, NeuralType], + stage_type: PipelineStageType = PipelineStageType.NEMO_PROC, + ): + self._name = name + self._input_types = input_types + self._output_types = output_types + self._stage_type = (stage_type,) + self._exec_fn = None + + @property + def input_types(self) -> Optional[Dict[str, NeuralType]]: + """Define these to enable input neural type checks""" + return self._input_types + + @property + def output_types(self) -> Optional[Dict[str, NeuralType]]: + """Define these to enable output neural type checks""" + return self._output_types + + @property + def name(self): + """Name of the stage which will be used to bind an executor using set_execute""" + return self._name + + @property + def type(self): + # TODO maybe?? + return self._stage_type + + @property + def supported_triton_backends(self): + # TODO not sure about this + # Some of this could be determined be export api, but not all + return [ + "python_backend", + "fastertransformer", + "onnxruntime", + # ... + ] + + def set_execute(self, fn: Callable): + # It would be nice if this could fail if `fn` is not compatible, but as written types are determined dyanmically + # TODO + # self._exec_fn = partial(typecheck(), fn, self) + self._exec_fn = fn + + def execute(self, *args, **kwargs): + if self._exec_fn is None: + raise NotImplementedError() + return self._exec_fn(*args, **kwargs) + + def __call__(self, *args, **kwargs): + return self.execute(*args, **kwargs) + + +class InferencePipeline(Typing): + # TODO I don't think this is needed in the interface at least + # keep things very simple for now + # def __init__( + # self, + # map_location: Optional[torch.device] = None, + # trainer: Optional[Trainer] = None, + # ): + # self._map_location = map_location + # self._trainer = trainer + + @property + def task_name(self) -> str: + raise NotImplementedError() + + @property + def stages(self) -> List[PipelineStage]: + raise NotImplementedError() + + @property + def stage_names(self): + return [stage.name for stage in self.stages] + + @property + def inference_config(self) -> DictConfig: + """ + Returns the inference config used to load the model / pipeline + Same config used in load_for_inference + """ + raise NotImplementedError() + + @property + def model_config(self) -> DictConfig: + raise NotImplementedError() + + # @property + # def trainer(self): + # return self._trainer + + # @property + # def map_location(self): + # return self._map_location + + def load_nemo_pipeline(self, parts: Optional[List[Union[str, PipelineStageType]]] = None): + raise NotImplementedError() + + def set_stage_exec(self, stage_name: str, fn: Callable): + for stage in self.stages: + if stage.name == stage_name: + stage.set_execute(fn) + + def execute(self, *args, **kwargs): + if len(self.stages) == 0: + raise NotImplementedError() + output = self.stages[0].execute(*args, **kwargs) + if len(self.stages) < 2: + return output + for stage in self.stages[1:]: + output = stage.execute(*output) + return output + + def __call__(self, *args, **kwargs): + self.execute(*args, **kwargs) + + +class InferencePipelineFactory: + @classmethod + def inference_pipeline( + cls, + task_name: Optional[str] = None, + inference_config: Optional[DictConfig] = None, + model_config: Optional[DictConfig] = None, + ) -> InferencePipeline: + raise NotImplementedError() + + +def load_inference_pipeline(config: Union[str, DictConfig], model_path_field_name: str = "model_path", task_name=None): + if isinstance(config, str): + cfg: DictConfig = OmegaConf.load(config) + else: + cfg = config + if not hasattr(cfg, model_path_field_name): + raise ValueError(f"inference config must have model path field {model_path_field_name}") + model_path = getattr(cfg, model_path_field_name) + model_cfg: DictConfig = ModelPT.restore_from(model_path, return_config=True) + + target_class = None + for tgt_name in ("target", "_target_", "__target__"): + if hasattr(model_cfg, tgt_name): + target_class = getattr(model_cfg, tgt_name) + if target_class is None: + raise ValueError("Unable to find target class for model") + + module_name, class_name = target_class.rsplit(".", 1) + module = importlib.import_module(module_name) + if not hasattr(module, class_name): + raise ImportError(f"No class {class_name} in module {module_name}") + cls = getattr(module, class_name) + if not issubclass(cls, InferencePipelineFactory): + raise ValueError(f"model target class {target_class} is not a InterfacePipelineFactory") + return cls.inference_pipeline(task_name=task_name, inference_config=cfg, model_config=model_cfg,) diff --git a/nemo/core/classes/modelPT.py b/nemo/core/classes/modelPT.py index 7812209ad0b4..1d9a010a865e 100644 --- a/nemo/core/classes/modelPT.py +++ b/nemo/core/classes/modelPT.py @@ -14,10 +14,10 @@ from __future__ import annotations import copy +import importlib import inspect import os import uuid -import importlib from abc import abstractmethod from os import path from pathlib import Path @@ -383,8 +383,7 @@ def maybe_make_save_dir(path: 'pathlib.Path'): @classmethod def load_for_inference( - cls, - cfg: Union[str, DictConfig], + cls, cfg: Union[str, DictConfig], ): if isinstance(cfg, str): config = OmegaConf.load(cfg) @@ -395,12 +394,10 @@ def load_for_inference( raise NotImplementedError("") else: return model_cls.load_for_inference(cfg) - + @classmethod def auto_load( - cls, - restore_path: str, - trainer_args: Dict = {}, + cls, restore_path: str, trainer_args: Dict = {}, ): model_cls = cls._get_class_from_config(cls._read_model_config(restore_path)) if cls == model_cls: @@ -408,7 +405,6 @@ def auto_load( else: return model_cls.auto_load(restore_path, trainer_args=trainer_args) - @classmethod def _read_model_config(cls, restore_path: str) -> DictConfig: cfg = cls.restore_from(restore_path=restore_path, return_config=True) @@ -429,7 +425,6 @@ def _get_class_from_config(cls, cfg: DictConfig): raise Exception(f"Failed to load class {classpath}") return getattr(module, class_name) - @classmethod def restore_from( cls,