Text-Generation Pipeline Example#526
Conversation
regisss
left a comment
There was a problem hiding this comment.
Thanks for adding this!
I left several comments.
We should also move the two test files to the tests folder. And we could create a text-generation subfolder there with these two files, test_text_generation_example.py and test_encoder_decoder_text_summarization.py.
| def get_repo_root(model_name_or_path, local_rank=-1, token=None): | ||
| """ | ||
| Downloads the specified model checkpoint and returns the repository where it was downloaded. | ||
| """ | ||
| if Path(model_name_or_path).is_dir(): | ||
| # If it is a local model, no need to download anything | ||
| return model_name_or_path | ||
| else: | ||
| # Checks if online or not | ||
| if is_offline_mode(): | ||
| if local_rank == 0: | ||
| print("Offline mode: forcing local_files_only=True") | ||
|
|
||
| # Only download PyTorch weights by default | ||
| allow_patterns = ["*.bin"] | ||
|
|
||
| # Download only on first process | ||
| if local_rank in [-1, 0]: | ||
| cache_dir = snapshot_download( | ||
| model_name_or_path, | ||
| local_files_only=is_offline_mode(), | ||
| cache_dir=os.getenv("TRANSFORMERS_CACHE", None), | ||
| allow_patterns=allow_patterns, | ||
| max_workers=16, | ||
| token=token, | ||
| ) | ||
| if local_rank == -1: | ||
| # If there is only one process, then the method is finished | ||
| return cache_dir | ||
|
|
||
| # Make all processes wait so that other processes can get the checkpoint directly from cache | ||
| torch.distributed.barrier() | ||
|
|
||
| return snapshot_download( | ||
| model_name_or_path, | ||
| local_files_only=is_offline_mode(), | ||
| cache_dir=os.getenv("TRANSFORMERS_CACHE", None), | ||
| allow_patterns=allow_patterns, | ||
| token=token, | ||
| ) | ||
|
|
||
|
|
||
| def get_checkpoint_files(model_name_or_path, local_rank): | ||
| """ | ||
| Gets the list of files for the specified model checkpoint. | ||
| """ | ||
| cached_repo_dir = get_repo_root(model_name_or_path, local_rank) | ||
|
|
||
| # Extensions: .bin | .pt | ||
| # Creates a list of paths from all downloaded files in cache dir | ||
| file_list = [str(entry) for entry in Path(cached_repo_dir).rglob("*.[bp][it][n]") if entry.is_file()] | ||
| return file_list | ||
|
|
||
|
|
||
| def write_checkpoints_json(model_name_or_path, local_rank, checkpoints_json): | ||
| """ | ||
| Dumps metadata into a JSON file for DeepSpeed-inference. | ||
| """ | ||
| checkpoint_files = get_checkpoint_files(model_name_or_path, local_rank) | ||
| if local_rank == 0: | ||
| data = {"type": "ds_model", "checkpoints": checkpoint_files, "version": 1.0} | ||
| with open(checkpoints_json, "w") as fp: | ||
| json.dump(data, fp) | ||
|
|
||
|
|
||
| def model_on_meta(config): | ||
| """ | ||
| Checks if load the model to meta. | ||
| """ | ||
| return config.model_type in ["bloom", "llama"] | ||
|
|
||
|
|
||
| def get_optimized_model_name(config): | ||
| from optimum.habana.transformers.generation import MODELS_OPTIMIZED_WITH_STATIC_SHAPES | ||
|
|
||
| for model_type in MODELS_OPTIMIZED_WITH_STATIC_SHAPES: | ||
| if model_type == config.model_type: | ||
| return model_type | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def model_is_optimized(config): | ||
| """ | ||
| Checks if the given config belongs to a model in optimum/habana/transformers/models, which has a | ||
| new input token_idx. | ||
| """ | ||
| return get_optimized_model_name(config) is not None | ||
|
|
||
|
|
||
| def get_ds_injection_policy(config): | ||
| """ | ||
| Defines injection policies for model parallelism via DeepSpeed. | ||
| """ | ||
| model_type = get_optimized_model_name(config) | ||
| policy = {} | ||
| if model_type: | ||
| if model_type == "bloom": | ||
| from transformers.models.bloom.modeling_bloom import BloomBlock | ||
|
|
||
| policy = {BloomBlock: ("self_attention.dense", "mlp.dense_4h_to_h")} | ||
|
|
||
| if model_type == "opt": | ||
| from transformers.models.opt.modeling_opt import OPTDecoderLayer | ||
|
|
||
| policy = {OPTDecoderLayer: ("self_attn.out_proj", ".fc2")} | ||
|
|
||
| if model_type == "gpt2": | ||
| from transformers.models.gpt2.modeling_gpt2 import GPT2MLP | ||
|
|
||
| policy = {GPT2MLP: ("attn.c_proj", "mlp.c_proj")} | ||
|
|
||
| if model_type == "gptj": | ||
| from transformers.models.gptj.modeling_gptj import GPTJBlock | ||
|
|
||
| policy = {GPTJBlock: ("attn.out_proj", "mlp.fc_out")} | ||
|
|
||
| if model_type == "gpt_neox": | ||
| from transformers.models.gpt_neox.modeling_gpt_neox import GPTNeoXLayer | ||
|
|
||
| policy = {GPTNeoXLayer: ("attention.dense", "mlp.dense_4h_to_h")} | ||
|
|
||
| if model_type == "llama": | ||
| from transformers.models.llama.modeling_llama import LlamaDecoderLayer | ||
|
|
||
| policy = {LlamaDecoderLayer: ("self_attn.o_proj", "mlp.down_proj")} | ||
|
|
||
| return policy |
There was a problem hiding this comment.
All these methods can be imported from here: https://github.com/bgoldberg-habana/optimum-habana/blob/main1/optimum/habana/checkpoint_utils.py
There was a problem hiding this comment.
This change has been made.
| self.generation_config.force_words_ids = None | ||
|
|
||
| # Define stopping criteria based on eos token id | ||
| self.stopping_criteria = StoppingCriteriaList([CustomStoppingCriteria(self.generation_config.eos_token_id)]) |
There was a problem hiding this comment.
I'm not sure we need this custom stopping criteria, why not setting ignore_eos to False in the generation config?
There was a problem hiding this comment.
Yes, the stopping criteria has been removed.
| return self.stop_token_id in input_ids[0] | ||
|
|
||
|
|
||
| class GaudiTextGenerationPipeline: |
There was a problem hiding this comment.
Shouldn't it inherit from Transformers' text-generation pipeline?
https://github.com/huggingface/transformers/blob/main/src/transformers/pipelines/text_generation.py
There was a problem hiding this comment.
This change has been made.
|
|
||
| self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path) | ||
|
|
||
| model_dtype = torch.bfloat16 |
There was a problem hiding this comment.
We should be able to specify the dtype as an argument
There was a problem hiding this comment.
Could you please provide an example of this argument? Wondering if we could use a boolean for bf16 or a string for different dtypes...
There was a problem hiding this comment.
There was a problem hiding this comment.
Thanks for sharing this link. I added a use_bf16 argument to the pipeline constructor.
| else: | ||
| get_repo_root(model_name_or_path, local_rank=self.local_rank) | ||
| # placement on hpu if meta tensors are not supported | ||
| with deepspeed.OnDevice(dtype=model_dtype, device="hpu"): |
There was a problem hiding this comment.
Note that with device="hpu" models that cannot be loaded on Meta may trigger an out-of-memory error if they are spread across several devices.
There was a problem hiding this comment.
Thanks for pointing this out. I have changed it to device="cpu".
| return cache_dir | ||
|
|
||
| # Make all processes wait so that other processes can get the checkpoint directly from cache | ||
| torch.distributed.barrier() |
There was a problem hiding this comment.
This must fail when not using DeepSpeed no? Since torch.distributed is not initialized in that case
There was a problem hiding this comment.
Not applicable anymore as methods are imported from checkpoint_utils.py
|
|
||
| # Used for padding input to fixed length | ||
| self.tokenizer.padding_side = "left" | ||
| self.max_padding_length = kwargs.get("max_padding_length", self.model.config.max_position_embeddings) |
There was a problem hiding this comment.
| self.max_padding_length = kwargs.get("max_padding_length", self.model.config.max_position_embeddings) | |
| self.max_padding_length = kwargs.get("max_padding_length", self.tokenizer.model_max_length) |
There was a problem hiding this comment.
Tried your suggestion but the tokenizer threw an error.
| Function to compile computation graphs and synchronize hpus. | ||
| """ | ||
| for _ in range(3): | ||
| self("Here is my prompt") |
There was a problem hiding this comment.
We will still get compilations if the prompt doesn't have exactly the same size as this toy one no?
There was a problem hiding this comment.
Yes, that's correct. The input prompt is left-padded so the computation graph does not change.
| @classmethod | ||
| def setUpClass(self): | ||
| """Overrides setUpClass from unittest to create artifacts for testing""" | ||
| self.base_command = ["python", "../../gaudi_spawn.py", "--use_deepspeed", "--world_size"] |
There was a problem hiding this comment.
"../.." will depend on where the test is launched from. I would prefer to have an absolute path here. You can for example use pathlib as follows:
Path(__file__).parent.parent.resolve()There was a problem hiding this comment.
Will work on this next.
|
Thanks for reviewing my code. I have replied to all your comments. |
regisss
left a comment
There was a problem hiding this comment.
I left a couple of comments. The main one is about using the methods defined in https://github.com/huggingface/optimum-habana/blob/main/examples/text-generation/utils.py to avoid duplicated code with the text-generation example.
I think the main method of the example is a good base to start with:
| An end-to-end text-generation pipeline that can used to initialize LangChain classes. It supports both single-hpu and multi-hpu inference. | ||
| """ | ||
|
|
||
| def __init__(self, model_name_or_path=None, use_bf16=True, **kwargs): |
There was a problem hiding this comment.
| def __init__(self, model_name_or_path=None, use_bf16=True, **kwargs): | |
| def __init__(self, model_name_or_path=None, bf16=True, **kwargs): |
Let's call this arg bf16 to stay consistent with other parts of the codebase
| self.use_deepspeed = "deepspeed" in os.environ["_"] | ||
|
|
||
| if self.use_deepspeed: | ||
| world_size, _, self.local_rank = initialize_distributed_hpu() | ||
|
|
||
| import deepspeed | ||
|
|
||
| # Initialize Deepspeed processes | ||
| deepspeed.init_distributed(dist_backend="hccl") | ||
|
|
||
| self.task = "text-generation" | ||
| self.device = "hpu" | ||
|
|
||
| # Tweak generation so that it runs faster on Gaudi | ||
| adapt_transformers_to_gaudi() | ||
| set_seed(27) | ||
|
|
||
| self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path) | ||
|
|
||
| if self.use_deepspeed or use_bf16: | ||
| model_dtype = torch.bfloat16 | ||
| else: | ||
| model_dtype = torch.float | ||
|
|
||
| if self.use_deepspeed: | ||
| config = AutoConfig.from_pretrained(model_name_or_path) | ||
| is_optimized = model_is_optimized(config) | ||
| load_to_meta = model_on_meta(config) | ||
|
|
||
| if load_to_meta: | ||
| # Construct model with fake meta tensors, later will be replaced on devices during ds-inference ckpt load | ||
| with deepspeed.OnDevice(dtype=model_dtype, device="meta"): | ||
| model = AutoModelForCausalLM.from_config(config, torch_dtype=model_dtype) | ||
| else: | ||
| get_repo_root(model_name_or_path, local_rank=self.local_rank) | ||
| # placement on cpu if meta tensors are not supported | ||
| with deepspeed.OnDevice(dtype=model_dtype, device="cpu"): | ||
| model = AutoModelForCausalLM.from_pretrained(model_name_or_path, torch_dtype=model_dtype) | ||
| model = model.eval() | ||
|
|
||
| # Initialize the model | ||
| ds_inference_kwargs = {"dtype": model_dtype} | ||
| ds_inference_kwargs["tensor_parallel"] = {"tp_size": world_size} | ||
| ds_inference_kwargs["enable_cuda_graph"] = True | ||
|
|
||
| if load_to_meta: | ||
| # model loaded to meta is managed differently | ||
| checkpoints_json = "checkpoints.json" | ||
| write_checkpoints_json(model_name_or_path, self.local_rank, checkpoints_json) | ||
|
|
||
| # Make sure all devices/nodes have access to the model checkpoints | ||
| torch.distributed.barrier() | ||
|
|
||
| ds_inference_kwargs["injection_policy"] = get_ds_injection_policy(config) | ||
| if load_to_meta: | ||
| ds_inference_kwargs["checkpoint"] = checkpoints_json | ||
|
|
||
| model = deepspeed.init_inference(model, **ds_inference_kwargs) | ||
| model = model.module | ||
| else: | ||
| get_repo_root(model_name_or_path) | ||
| model = AutoModelForCausalLM.from_pretrained(model_name_or_path, torch_dtype=model_dtype) | ||
| model = model.eval().to(self.device) | ||
| is_optimized = model_is_optimized(model.config) | ||
| model = wrap_in_hpu_graph(model) | ||
|
|
||
| self.model = model | ||
|
|
||
| # Used for padding input to fixed length | ||
| self.tokenizer.padding_side = "left" | ||
| self.max_padding_length = kwargs.get("max_padding_length", self.model.config.max_position_embeddings) | ||
|
|
||
| # Define config params for llama models | ||
| if self.model.config.model_type == "llama": | ||
| self.model.generation_config.pad_token_id = 0 | ||
| self.model.generation_config.bos_token_id = 1 | ||
| self.model.generation_config.eos_token_id = 2 | ||
| self.tokenizer.bos_token_id = self.model.generation_config.bos_token_id | ||
| self.tokenizer.eos_token_id = self.model.generation_config.eos_token_id | ||
| self.tokenizer.pad_token_id = self.model.generation_config.pad_token_id | ||
| self.tokenizer.pad_token = self.tokenizer.decode(self.tokenizer.pad_token_id) | ||
| self.tokenizer.eos_token = self.tokenizer.decode(self.tokenizer.eos_token_id) | ||
| self.tokenizer.bos_token = self.tokenizer.decode(self.tokenizer.bos_token_id) | ||
|
|
||
| # Applicable to models that do not have pad tokens | ||
| if self.tokenizer.pad_token is None: | ||
| self.tokenizer.pad_token = self.tokenizer.eos_token | ||
| self.model.generation_config.pad_token_id = self.model.generation_config.eos_token_id | ||
|
|
||
| # Edit generation configuration based on input arguments | ||
| self.generation_config = copy.deepcopy(self.model.generation_config) | ||
| self.generation_config.max_new_tokens = kwargs.get("max_new_tokens", 100) | ||
| self.generation_config.use_cache = kwargs.get("use_kv_cache", True) | ||
| self.generation_config.static_shapes = is_optimized | ||
| self.generation_config.do_sample = kwargs.get("do_sample", False) | ||
| self.generation_config.num_beams = kwargs.get("num_beams", 1) | ||
| self.generation_config.temperature = kwargs.get("temperature", 1.0) | ||
| self.generation_config.top_p = kwargs.get("top_p", 1.0) | ||
| self.generation_config.repetition_penalty = kwargs.get("repetition_penalty", 1.0) | ||
| self.generation_config.num_return_sequences = kwargs.get("num_return_sequences", 1) | ||
| self.generation_config.bad_words_ids = None | ||
| self.generation_config.force_words_ids = None | ||
| self.generation_config.ignore_eos = False | ||
|
|
||
| if self.use_deepspeed: | ||
| torch.distributed.barrier() |
There was a problem hiding this comment.
Since run_generation.py was recently refactored, let's try to use here the methods defined in https://github.com/huggingface/optimum-habana/blob/main/examples/text-generation/utils.py.
That way we avoid duplicated code.
I refactored the pipeline scripts to incorporate code from https://github.com/huggingface/optimum-habana/blob/main/examples/text-generation/utils.py wherever possible. Please let me know if you have any questions, comments or suggestions. |
libinta
left a comment
There was a problem hiding this comment.
can you add a README.md with examples?
Yes, I added a README with instructions and example commands. |
|
The docs for this PR live here. All of your documentation changes will be reflected on that endpoint. The docs are available until 30 days after the last update. |
8726c1e to
282f28e
Compare
regisss
left a comment
There was a problem hiding this comment.
LGTM!
I just spotted one typo and I'll merge after it is corrected 🙂
What does this PR do?
Adds a custom text-generation pipeline to
examples/text-generation. This pipeline supports single-card as well as multi-card runs (DeepSpeed).Please let me know if additional scripts/documentation are required.
Before submitting