diff --git a/sdks/python/apache_beam/examples/inference/README.md b/sdks/python/apache_beam/examples/inference/README.md index 326ec4b4a096..1653f3a9699f 100644 --- a/sdks/python/apache_beam/examples/inference/README.md +++ b/sdks/python/apache_beam/examples/inference/README.md @@ -42,6 +42,7 @@ The RunInference API supports the Tensorflow framework. To use Tensorflow locall pip install tensorflow==2.12.0 ``` + ### PyTorch dependencies The following installation requirements are for the files used in these examples. @@ -65,6 +66,21 @@ For installation of the `torch` dependency on a distributed runner such as Dataf [PyPI dependency instructions](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#pypi-dependencies). +### Transformers dependencies + +The following installation requirement is for the Hugging Face model handler examples. + +The RunInference API supports loading models from the Hugging Face Hub. To use it, first install `transformers`. +``` +pip install transformers==4.30.0 +``` +Additional dependicies for PyTorch and TensorFlow may need to be installed separately: +``` +pip install tensorflow==2.12.0 +pip install torch==1.10.0 +``` + + ### TensorRT dependencies The RunInference API supports TensorRT SDK for high-performance deep learning inference with NVIDIA GPUs. @@ -687,3 +703,60 @@ MilkQualityAggregation(bad_quality_measurements=6, medium_quality_measurements=4 MilkQualityAggregation(bad_quality_measurements=3, medium_quality_measurements=3, high_quality_measurements=3) MilkQualityAggregation(bad_quality_measurements=1, medium_quality_measurements=2, high_quality_measurements=1) ``` + +--- +## Language modeling with Hugging Face Hub + +[`huggingface_language_modeling.py`](./huggingface_language_modeling.py) contains an implementation for a RunInference pipeline that performs masked language modeling (that is, decoding a masked token in a sentence) using the `AutoModelForMaskedLM` architecture from Hugging Face. + +The pipeline reads sentences, performs basic preprocessing to convert the last word into a `` token, passes the masked sentence to the Hugging Face implementation of RunInference, and then writes the predictions to a text file. + +### Dataset and model for language modeling + +To use this transform, you need a dataset and model for language modeling. + +1. Choose a checkpoint to load from Hugging Face Hub, eg:[MaskedLanguageModel](https://huggingface.co/stevhliu/my_awesome_eli5_mlm_model). +2. (Optional) Create a file named `SENTENCES.txt` that contains sentences to feed into the model. The content of the file should be similar to the following example: +``` +The capital of France is Paris . +He looked up and saw the sun and stars . +... +``` + +### Running `huggingface_language_modeling.py` + +To run the language modeling pipeline locally, use the following command: +```sh +python -m apache_beam.examples.inference.huggingface_language_modeling \ + --input SENTENCES \ + --output OUTPUT \ + --model_name REPOSITORY_ID +``` +The `input` argument is optional. If none is provided, it will run the pipeline with some +example sentences. + +For example, if you've followed the naming conventions recommended above: +```sh +python -m apache_beam.examples.inference.huggingface_language_modeling \ + --input SENTENCES.txt \ + --output predictions.csv \ + --model_name "stevhliu/my_awesome_eli5_mlm_model" +``` +Or, using the default example sentences: +```sh +python -m apache_beam.examples.inference.huggingface_language_modeling \ + --output predictions.csv \ + --model_name "stevhliu/my_awesome_eli5_mlm_model" +``` + +This writes the output to the `predictions.csv` with contents like: +``` +The capital of France is Paris .;paris +He looked up and saw the sun and stars .;moon +... +``` +Each line has data separated by a semicolon ";". +The first item is the input sentence. The model masks the last word and tries to predict it; +the second item is the word that the model predicts for the mask. + +--- \ No newline at end of file diff --git a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py new file mode 100644 index 000000000000..f6cb3de72b70 --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A pipeline that uses RunInference to perform Language Modeling with +masked language model from Hugging Face. + +This pipeline takes sentences from a custom text file, converts the last word +of the sentence into a token, and then uses the AutoModelForMaskedLM from +Hugging Face to predict the best word for the masked token given all the words +already in the sentence. The pipeline then writes the prediction to an output +file in which users can then compare against the original sentence. +""" + +import argparse +import logging +from typing import Dict +from typing import Iterable +from typing import Iterator +from typing import Tuple + +import apache_beam as beam +import torch +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult +from transformers import AutoModelForMaskedLM +from transformers import AutoTokenizer + + +def add_mask_to_last_word(text: str) -> Tuple[str, str]: + text_list = text.split() + return text, ' '.join(text_list[:-2] + ['', text_list[-1]]) + + +def tokenize_sentence( + text_and_mask: Tuple[str, str], + tokenizer: AutoTokenizer) -> Tuple[str, Dict[str, torch.Tensor]]: + text, masked_text = text_and_mask + tokenized_sentence = tokenizer.encode_plus(masked_text, return_tensors="pt") + + # Workaround to manually remove batch dim until we have the feature to + # add optional batching flag. + # TODO(https://github.com/apache/beam/issues/21863): Remove once optional + # batching flag added + return text, { + k: torch.squeeze(v) + for k, v in dict(tokenized_sentence).items() + } + + +def filter_empty_lines(text: str) -> Iterator[str]: + if len(text.strip()) > 0: + yield text + + +class PostProcessor(beam.DoFn): + """Processes the PredictionResult to get the predicted word. + + The logits are the output of the Model. We can get the word with the highest + probability of being a candidate replacement word by taking the argmax. + """ + def __init__(self, tokenizer: AutoTokenizer): + super().__init__() + self.tokenizer = tokenizer + + def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]: + text, prediction_result = element + inputs = prediction_result.example + logits = prediction_result.inference['logits'] + mask_token_index = torch.where( + inputs["input_ids"] == self.tokenizer.mask_token_id)[0] + predicted_token_id = logits[mask_token_index].argmax(axis=-1) + decoded_word = self.tokenizer.decode(predicted_token_id) + yield text + ';' + decoded_word + + +def parse_known_args(argv): + """Parses args for the workflow.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--input', + dest='input', + help='Path to the text file containing sentences.') + parser.add_argument( + '--output', + dest='output', + required=True, + help='Path of file in which to save the output predictions.') + parser.add_argument( + '--model_name', + dest='model_name', + required=True, + help='bert uncased model. This can be base model or large model') + parser.add_argument( + '--model_class', + dest='model_class', + default=AutoModelForMaskedLM, + help="Name of the model from Hugging Face") + return parser.parse_known_args(argv) + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + """ + Args: + argv: Command line arguments defined for this example. + save_main_session: Used for internal testing. + test_pipeline: Used for internal testing. + """ + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + pipeline = test_pipeline + if not test_pipeline: + pipeline = beam.Pipeline(options=pipeline_options) + + tokenizer = AutoTokenizer.from_pretrained(known_args.model_name) + + model_handler = HuggingFaceModelHandlerKeyedTensor( + model_uri=known_args.model_name, + model_class=known_args.model_class, + framework='pt', + max_batch_size=1) + if not known_args.input: + text = ( + pipeline | 'CreateSentences' >> beam.Create([ + 'The capital of France is Paris .', + 'It is raining cats and dogs .', + 'Today is Monday and tomorrow is Tuesday .', + 'There are 5 coconuts on this palm tree .', + 'The strongest person in the world is not famous .', + 'The secret ingredient to his wonderful life was gratitude .', + 'The biggest animal in the world is the whale .', + ])) + else: + text = ( + pipeline | 'ReadSentences' >> beam.io.ReadFromText(known_args.input)) + text_and_tokenized_text_tuple = ( + text + | 'FilterEmptyLines' >> beam.ParDo(filter_empty_lines) + | 'AddMask' >> beam.Map(add_mask_to_last_word) + | + 'TokenizeSentence' >> beam.Map(lambda x: tokenize_sentence(x, tokenizer))) + output = ( + text_and_tokenized_text_tuple + | 'RunInference' >> RunInference(KeyedModelHandler(model_handler)) + | 'ProcessOutput' >> beam.ParDo(PostProcessor(tokenizer=tokenizer))) + _ = output | "WriteOutput" >> beam.io.WriteToText( + known_args.output, shard_name_template='', append_trailing_newlines=True) + + result = pipeline.run() + result.wait_until_finish() + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py b/sdks/python/apache_beam/ml/inference/huggingface_inference.py new file mode 100644 index 000000000000..35c3a1686c70 --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py @@ -0,0 +1,488 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import logging +import sys +from collections import defaultdict +from typing import Any +from typing import Callable +from typing import Dict +from typing import Iterable +from typing import Optional +from typing import Sequence +from typing import Union + +import tensorflow as tf +import torch +from apache_beam.ml.inference import utils +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.pytorch_inference import _convert_to_device +from transformers import AutoModel +from transformers import TFAutoModel + +_LOGGER = logging.getLogger(__name__) + +__all__ = [ + "HuggingFaceModelHandlerTensor", + "HuggingFaceModelHandlerKeyedTensor", +] + +TensorInferenceFn = Callable[[ + Sequence[Union[torch.Tensor, tf.Tensor]], + Union[AutoModel, TFAutoModel], + str, + Optional[Dict[str, Any]], + Optional[str], +], + Iterable[PredictionResult], + ] + +KeyedTensorInferenceFn = Callable[[ + Sequence[Dict[str, Union[torch.Tensor, tf.Tensor]]], + Union[AutoModel, TFAutoModel], + str, + Optional[Dict[str, Any]], + Optional[str], +], + Iterable[PredictionResult], + ] + + +def _validate_constructor_args(model_uri, model_class): + message = ( + "Please provide both model class and model uri to load the model." + "Got params as model_uri={model_uri} and " + "model_class={model_class}.") + if not model_uri and not model_class: + raise RuntimeError( + message.format(model_uri=model_uri, model_class=model_class)) + elif not model_uri: + raise RuntimeError( + message.format(model_uri=model_uri, model_class=model_class)) + elif not model_class: + raise RuntimeError( + message.format(model_uri=model_uri, model_class=model_class)) + + +def no_gpu_available_warning(): + _LOGGER.warning( + "HuggingFaceModelHandler specified a 'GPU' device, " + "but GPUs are not available. Switching to CPU.") + + +def is_gpu_available_torch(): + if torch.cuda.is_available(): + return True + else: + no_gpu_available_warning() + return False + + +def get_device_torch(device): + if device == "GPU" and is_gpu_available_torch(): + return torch.device("cuda") + return torch.device("cpu") + + +def is_gpu_available_tensorflow(device): + gpu_devices = tf.config.list_physical_devices(device) + if len(gpu_devices) == 0: + no_gpu_available_warning() + return False + return True + + +def _run_inference_torch_keyed_tensor( + batch: Sequence[Dict[str, torch.Tensor]], + model: AutoModel, + device, + inference_args: Dict[str, Any], + model_id: Optional[str] = None) -> Iterable[PredictionResult]: + device = get_device_torch(device) + key_to_tensor_list = defaultdict(list) + # torch.no_grad() mitigates GPU memory issues + # https://github.com/apache/beam/issues/22811 + with torch.no_grad(): + for example in batch: + for key, tensor in example.items(): + key_to_tensor_list[key].append(tensor) + key_to_batched_tensors = {} + for key in key_to_tensor_list: + batched_tensors = torch.stack(key_to_tensor_list[key]) + batched_tensors = _convert_to_device(batched_tensors, device) + key_to_batched_tensors[key] = batched_tensors + predictions = model(**key_to_batched_tensors, **inference_args) + return utils._convert_to_result(batch, predictions, model_id) + + +def _run_inference_tensorflow_keyed_tensor( + batch: Sequence[Dict[str, tf.Tensor]], + model: TFAutoModel, + device, + inference_args: Dict[str, Any], + model_id: Optional[str] = None) -> Iterable[PredictionResult]: + if device == "GPU": + is_gpu_available_tensorflow(device) + key_to_tensor_list = defaultdict(list) + for example in batch: + for key, tensor in example.items(): + key_to_tensor_list[key].append(tensor) + key_to_batched_tensors = {} + for key in key_to_tensor_list: + batched_tensors = tf.stack(key_to_tensor_list[key], axis=0) + key_to_batched_tensors[key] = batched_tensors + predictions = model(**key_to_batched_tensors, **inference_args) + return utils._convert_to_result(batch, predictions, model_id) + + +class HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str, + Union[tf.Tensor, + torch.Tensor]], + PredictionResult, + Union[AutoModel, + TFAutoModel]]): + def __init__( + self, + model_uri: str, + model_class: Union[AutoModel, TFAutoModel], + framework: str, + device: str = "CPU", + *, + inference_fn: Optional[Callable[..., Iterable[PredictionResult]]] = None, + load_model_args: Optional[Dict[str, Any]] = None, + inference_args: Optional[Dict[str, Any]] = None, + min_batch_size: Optional[int] = None, + max_batch_size: Optional[int] = None, + large_model: bool = False, + **kwargs): + """ + Implementation of the ModelHandler interface for HuggingFace with + Keyed Tensors for PyTorch/Tensorflow backend. + + Example Usage model:: + pcoll | RunInference(HuggingFaceModelHandlerKeyedTensor( + model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM, + framework='pt')) + + Args: + model_uri (str): path to the pretrained model on the hugging face + models hub. + model_class: model class to load the repository from model_uri. + framework (str): Framework to use for the model. 'tf' for TensorFlow and + 'pt' for PyTorch. + device: For torch tensors, specify device on which you wish to + run the model. Defaults to CPU. + inference_fn: the inference function to use during RunInference. + Default is _run_inference_torch_keyed_tensor or + _run_inference_tensorflow_keyed_tensor depending on the input type. + load_model_args (Dict[str, Any]): (Optional) Keyword arguments to provide + load options while loading models from Hugging Face Hub. + Defaults to None. + inference_args (Dict[str, Any]): (Optional) Non-batchable arguments + required as inputs to the model's inference function. Unlike Tensors + in `batch`, these parameters will not be dynamically batched. + Defaults to None. + min_batch_size: the minimum batch size to use when batching inputs. + max_batch_size: the maximum batch size to use when batching inputs. + large_model: set to true if your model is large enough to run into + memory pressure if you load multiple copies. Given a model that + consumes N memory and a machine with W cores and M memory, you should + set this to True if N*W > M. + kwargs: 'env_vars' can be used to set environment variables + before loading the model. + + **Supported Versions:** HuggingFaceModelHandler supports + transformers>=4.18.0. + """ + self._model_uri = model_uri + self._model_class = model_class + self._device = device + self._inference_fn = inference_fn + self._model_config_args = load_model_args if load_model_args else {} + self._inference_args = inference_args if inference_args else {} + self._batching_kwargs = {} + self._env_vars = kwargs.get("env_vars", {}) + if min_batch_size is not None: + self._batching_kwargs["min_batch_size"] = min_batch_size + if max_batch_size is not None: + self._batching_kwargs["max_batch_size"] = max_batch_size + self._large_model = large_model + self._framework = framework + + _validate_constructor_args( + model_uri=self._model_uri, model_class=self._model_class) + + def load_model(self): + """Loads and initializes the model for processing.""" + model = self._model_class.from_pretrained( + self._model_uri, **self._model_config_args) + if self._framework == 'pt': + if self._device == "GPU" and is_gpu_available_torch: + model.to(torch.device("cuda")) + return model + + def run_inference( + self, + batch: Sequence[Dict[str, Union[tf.Tensor, torch.Tensor]]], + model: Union[AutoModel, TFAutoModel], + inference_args: Optional[Dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """ + Runs inferences on a batch of Keyed Tensors and returns an Iterable of + Tensors Predictions. + + This method stacks the list of Tensors in a vectorized format to optimize + the inference call. + + Args: + batch: A sequence of Keyed Tensors. These Tensors should be batchable, + as this method will call `tf.stack()`/`torch.stack()` and pass in + batched Tensors with dimensions (batch_size, n_features, etc.) into + the model's predict() function. + model: A Tensorflow/PyTorch model. + inference_args: Non-batchable arguments required as inputs to the + model's inference function. Unlike Tensors in `batch`, + these parameters will not be dynamically batched. + Returns: + An Iterable of type PredictionResult. + """ + inference_args = {} if not inference_args else inference_args + + if self._inference_fn: + return self._inference_fn( + batch, model, self._device, inference_args, self._model_uri) + + if self._framework == "tf": + return _run_inference_tensorflow_keyed_tensor( + batch, model, self._device, inference_args, self._model_uri) + else: + return _run_inference_torch_keyed_tensor( + batch, model, self._device, inference_args, self._model_uri) + + def update_model_path(self, model_path: Optional[str] = None): + self._model_uri = model_path if model_path else self._model_uri + + def get_num_bytes( + self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int: + """ + Returns: + The number of bytes of data for the Tensors batch. + """ + if self._framework == "tf": + return sum(sys.getsizeof(element) for element in batch) + else: + return sum( + (el.element_size() for tensor in batch for el in tensor.values())) + + def batch_elements_kwargs(self): + return self._batching_kwargs + + def share_model_across_processes(self) -> bool: + return self._large_model + + def get_metrics_namespace(self) -> str: + """ + Returns: + A namespace for metrics collected by the RunInference transform. + """ + return "BeamML_HuggingFaceModelHandler_KeyedTensor" + + +def _default_inference_fn_torch( + batch: Sequence[Union[tf.Tensor, torch.Tensor]], + model: Union[AutoModel, TFAutoModel], + device, + inference_args: Dict[str, Any], + model_id: Optional[str] = None) -> Iterable[PredictionResult]: + device = get_device_torch(device) + # torch.no_grad() mitigates GPU memory issues + # https://github.com/apache/beam/issues/22811 + with torch.no_grad(): + batched_tensors = torch.stack(batch) + batched_tensors = _convert_to_device(batched_tensors, device) + predictions = model(batched_tensors, **inference_args) + return utils._convert_to_result(batch, predictions, model_id) + + +def _default_inference_fn_tensorflow( + batch: Sequence[Union[tf.Tensor, torch.Tensor]], + model: Union[AutoModel, TFAutoModel], + device, + inference_args: Dict[str, Any], + model_id: Optional[str] = None) -> Iterable[PredictionResult]: + if device == "GPU": + is_gpu_available_tensorflow(device) + batched_tensors = tf.stack(batch, axis=0) + predictions = model(batched_tensors, **inference_args) + return utils._convert_to_result(batch, predictions, model_id) + + +class HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor], + PredictionResult, + Union[AutoModel, + TFAutoModel]]): + def __init__( + self, + model_uri: str, + model_class: Union[AutoModel, TFAutoModel], + device: str = "CPU", + *, + inference_fn: Optional[Callable[..., Iterable[PredictionResult]]] = None, + load_model_args: Optional[Dict[str, Any]] = None, + inference_args: Optional[Dict[str, Any]] = None, + min_batch_size: Optional[int] = None, + max_batch_size: Optional[int] = None, + large_model: bool = False, + **kwargs): + """ + Implementation of the ModelHandler interface for HuggingFace with + Tensors for PyTorch/Tensorflow backend. + + Depending on the type of tensors, the model framework is determined + automatically. + + Example Usage model: + pcoll | RunInference(HuggingFaceModelHandlerTensor( + model_uri="bert-base-uncased", model_class=AutoModelForMaskedLM)) + + Args: + model_uri (str): path to the pretrained model on the hugging face + models hub. + model_class: model class to load the repository from model_uri. + device: For torch tensors, specify device on which you wish to + run the model. Defaults to CPU. + inference_fn: the inference function to use during RunInference. + Default is _run_inference_torch_keyed_tensor or + _run_inference_tensorflow_keyed_tensor depending on the input type. + load_model_args (Dict[str, Any]): (Optional) keyword arguments to provide + load options while loading models from Hugging Face Hub. + Defaults to None. + inference_args (Dict[str, Any]): (Optional) Non-batchable arguments + required as inputs to the model's inference function. Unlike Tensors + in `batch`, these parameters will not be dynamically batched. + Defaults to None. + min_batch_size: the minimum batch size to use when batching inputs. + max_batch_size: the maximum batch size to use when batching inputs. + large_model: set to true if your model is large enough to run into + memory pressure if you load multiple copies. Given a model that + consumes N memory and a machine with W cores and M memory, you should + set this to True if N*W > M. + kwargs: 'env_vars' can be used to set environment variables + before loading the model. + + **Supported Versions:** HuggingFaceModelHandler supports + transformers>=4.18.0. + """ + self._model_uri = model_uri + self._model_class = model_class + self._device = device + self._inference_fn = inference_fn + self._model_config_args = load_model_args if load_model_args else {} + self._inference_args = inference_args if inference_args else {} + self._batching_kwargs = {} + self._env_vars = kwargs.get("env_vars", {}) + if min_batch_size is not None: + self._batching_kwargs["min_batch_size"] = min_batch_size + if max_batch_size is not None: + self._batching_kwargs["max_batch_size"] = max_batch_size + self._large_model = large_model + self._framework = "" + + _validate_constructor_args( + model_uri=self._model_uri, model_class=self._model_class) + + def load_model(self): + """Loads and initializes the model for processing.""" + model = self._model_class.from_pretrained( + self._model_uri, **self._model_config_args) + return model + + def run_inference( + self, + batch: Sequence[Union[tf.Tensor, torch.Tensor]], + model: Union[AutoModel, TFAutoModel], + inference_args: Optional[Dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + """ + Runs inferences on a batch of Tensors and returns an Iterable of + Tensors Predictions. + + This method stacks the list of Tensors in a vectorized format to optimize + the inference call. + + Args: + batch: A sequence of Tensors. These Tensors should be batchable, as + this method will call `tf.stack()`/`torch.stack()` and pass in + batched Tensors with dimensions (batch_size, n_features, etc.) + into the model's predict() function. + model: A Tensorflow/PyTorch model. + inference_args (Dict[str, Any]): Non-batchable arguments required as + inputs to the model's inference function. Unlike Tensors in `batch`, + these parameters will not be dynamically batched. + + Returns: + An Iterable of type PredictionResult. + """ + inference_args = {} if not inference_args else inference_args + if not self._framework: + if isinstance(batch[0], tf.Tensor): + self._framework = "tf" + else: + self._framework = "pt" + + if (self._framework == 'pt' and self._device == "GPU" and + is_gpu_available_torch()): + model.to(torch.device("cuda")) + + if self._inference_fn: + return self._inference_fn( + batch, model, inference_args, inference_args, self._model_uri) + + if self._framework == "tf": + return _default_inference_fn_tensorflow( + batch, model, self._device, inference_args, self._model_uri) + else: + return _default_inference_fn_torch( + batch, model, self._device, inference_args, self._model_uri) + + def get_num_bytes( + self, batch: Sequence[Union[tf.Tensor, torch.Tensor]]) -> int: + """ + Returns: + The number of bytes of data for the Tensors batch. + """ + if self._framework == "tf": + return sum(sys.getsizeof(element) for element in batch) + else: + return sum( + (el.element_size() for tensor in batch for el in tensor.values())) + + def batch_elements_kwargs(self): + return self._batching_kwargs + + def share_model_across_processes(self) -> bool: + return self._large_model + + def get_metrics_namespace(self) -> str: + """ + Returns: + A namespace for metrics collected by the RunInference transform. + """ + return "BeamML_HuggingFaceModelHandler_Tensor" diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py new file mode 100644 index 000000000000..ed442a4b801a --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-End test for Hugging Face Inference""" + +import logging +import unittest +import uuid + +import pytest + +from apache_beam.io.filesystems import FileSystems +from apache_beam.testing.test_pipeline import TestPipeline + +try: + from apache_beam.examples.inference import huggingface_language_modeling + from apache_beam.ml.inference import pytorch_inference_it_test +except ImportError: + raise unittest.SkipTest( + "transformers dependencies are not installed. " + "Check if transformers, torch, and tensorflow " + "is installed.") + + +@pytest.mark.uses_transformers +@pytest.mark.it_postcommit +class HuggingFaceInference(unittest.TestCase): + @pytest.mark.timeout(1800) + def test_hf_language_modeling(self): + test_pipeline = TestPipeline(is_integration_test=True) + # Path to text file containing some sentences + file_of_sentences = 'gs://apache-beam-ml/datasets/custom/hf_sentences.txt' + output_file_dir = 'gs://apache-beam-ml/testing/predictions' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + + model_name = 'stevhliu/my_awesome_eli5_mlm_model' + + extra_opts = { + 'input': file_of_sentences, + 'output': output_file, + 'model_name': model_name, + } + huggingface_language_modeling.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + + self.assertEqual(FileSystems().exists(output_file), True) + predictions = pytorch_inference_it_test.process_outputs( + filepath=output_file) + actuals_file = 'gs://apache-beam-ml/testing/expected_outputs/test_hf_run_inference_for_masked_lm_actuals.txt' # pylint: disable=line-too-long + actuals = pytorch_inference_it_test.process_outputs(filepath=actuals_file) + + predictions_dict = {} + for prediction in predictions: + text, predicted_text = prediction.split(';') + predictions_dict[text] = predicted_text.strip().lower() + + for actual in actuals: + text, actual_predicted_text = actual.split(';') + predicted_predicted_text = predictions_dict[text] + self.assertEqual(actual_predicted_text, predicted_predicted_text) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + unittest.main() diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference_test.py b/sdks/python/apache_beam/ml/inference/huggingface_inference_test.py new file mode 100644 index 000000000000..763d5ee8d36f --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference_test.py @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import shutil +import tempfile +import unittest +from typing import Any +from typing import Dict +from typing import Iterable +from typing import Optional +from typing import Sequence +from typing import Union + +import pytest + +from apache_beam.ml.inference import utils +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.tensorflow_inference_test import FakeTFTensorModel +from apache_beam.ml.inference.tensorflow_inference_test import _compare_tensor_prediction_result + +# pylint: disable=ungrouped-imports +try: + import tensorflow as tf + import torch + from transformers import AutoModel + from transformers import TFAutoModel + from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerTensor +except ImportError: + raise unittest.SkipTest('Transformers dependencies are not installed.') + + +def fake_inference_fn_tensor( + batch: Sequence[Union[tf.Tensor, torch.Tensor]], + model: Union[AutoModel, TFAutoModel], + device, + inference_args: Dict[str, Any], + model_id: Optional[str] = None) -> Iterable[PredictionResult]: + predictions = model.predict(batch, **inference_args) + return utils._convert_to_result(batch, predictions, model_id) + + +class FakeTorchModel: + def predict(self, input: torch.Tensor): + return input + + +@pytest.mark.uses_transformers +class HuggingFaceInferenceTest(unittest.TestCase): + def setUp(self) -> None: + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self) -> None: + shutil.rmtree(self.tmpdir) + + def test_predict_tensor(self): + fake_model = FakeTFTensorModel() + inference_runner = HuggingFaceModelHandlerTensor( + model_uri='unused', + model_class=TFAutoModel, + inference_fn=fake_inference_fn_tensor) + batched_examples = [tf.constant([1]), tf.constant([10]), tf.constant([100])] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + batched_examples, + [tf.math.multiply(n, 10) for n in batched_examples]) + ] + + inferences = inference_runner.run_inference(batched_examples, fake_model) + for actual, expected in zip(inferences, expected_predictions): + self.assertTrue(_compare_tensor_prediction_result(actual, expected)) + + def test_predict_tensor_with_inference_args(self): + fake_model = FakeTFTensorModel() + inference_runner = HuggingFaceModelHandlerTensor( + model_uri='unused', + model_class=TFAutoModel, + inference_fn=fake_inference_fn_tensor, + inference_args={"add": True}) + batched_examples = [tf.constant([1]), tf.constant([10]), tf.constant([100])] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + batched_examples, [ + tf.math.add(tf.math.multiply(n, 10), 10) + for n in batched_examples + ]) + ] + + inferences = inference_runner.run_inference( + batched_examples, fake_model, inference_args={"add": True}) + + for actual, expected in zip(inferences, expected_predictions): + self.assertTrue(_compare_tensor_prediction_result(actual, expected)) + + def test_framework_detection_torch(self): + fake_model = FakeTorchModel() + inference_runner = HuggingFaceModelHandlerTensor( + model_uri='unused', + model_class=TFAutoModel, + inference_fn=fake_inference_fn_tensor) + batched_examples = [torch.tensor(1), torch.tensor(10), torch.tensor(100)] + inference_runner.run_inference(batched_examples, fake_model) + self.assertEqual(inference_runner._framework, "torch") + + def test_framework_detection_tensorflow(self): + fake_model = FakeTFTensorModel() + inference_runner = HuggingFaceModelHandlerTensor( + model_uri='unused', + model_class=TFAutoModel, + inference_fn=fake_inference_fn_tensor, + inference_args={"add": True}) + batched_examples = [tf.constant([1]), tf.constant([10]), tf.constant([100])] + inference_runner.run_inference( + batched_examples, fake_model, inference_args={"add": True}) + self.assertEqual(inference_runner._framework, "tf") + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt b/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt new file mode 100644 index 000000000000..09c1fa8ca90c --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +torch>=1.7.1 +transformers>=4.18.0 +tensorflow>=2.12.0 \ No newline at end of file diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index 6e93c5f96e7f..7c564235c581 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -47,15 +47,16 @@ markers = # as enabling save_main_session. no_xdist: run without pytest-xdist plugin # We run these tests with multiple major pyarrow versions (BEAM-11211) - uses_pyarrow: tests that utilize pyarrow in some way + uses_pyarrow: tests that utilize pyarrow in some way. # ML tests - uses_pytorch: tests that utilize pytorch in some way - uses_sklearn: tests that utilize scikit-learn in some way - uses_tensorflow: tests that utilize tensorflow in some way + uses_pytorch: tests that utilize pytorch in some way. + uses_sklearn: tests that utilize scikit-learn in some way. + uses_tensorflow: tests that utilize tensorflow in some way. uses_tft: tests that utilizes tensorflow transforms in some way. - uses_xgboost: tests that utilize xgboost in some way + uses_xgboost: tests that utilize xgboost in some way. uses_onnx: tests that utilizes onnx in some way. - uses_tf: tests that utilize tensorflow + uses_tf: tests that utilize tensorflow. + uses_transformers: tests that utilize transformers in some way. # Default timeout intended for unit tests. # If certain tests need a different value, please see the docs on how to diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle index 27e91b4733dd..aebdb4cfa009 100644 --- a/sdks/python/test-suites/direct/common.gradle +++ b/sdks/python/test-suites/direct/common.gradle @@ -337,13 +337,41 @@ task xgboostInferenceTest { } +// Transformers RunInference IT tests +task transformersInferenceTest { + dependsOn 'installGcpTest' + dependsOn ':sdks:python:sdist' + def requirementsFile = "${rootDir}/sdks/python/apache_beam/ml/inference/huggingface_tests_requirements.txt" + doFirst { + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && pip install -r $requirementsFile" + } + } + doLast { + def testOpts = basicTestOpts + def argMap = [ + "test_opts": testOpts, + "suite": "postCommitIT-direct-py${pythonVersionSuffix}", + "collect": "uses_transformers and it_postcommit" , + "runner": "TestDirectRunner" + ] + def cmdArgs = mapToArgString(argMap) + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs" + } + } +} + // Add all the RunInference framework IT tests to this gradle task that runs on Direct Runner Post commit suite. project.tasks.register("inferencePostCommitIT") { dependsOn = [ 'torchInferenceTest', 'sklearnInferenceTest', 'tensorflowInferenceTest', - 'xgboostInferenceTest' + 'xgboostInferenceTest', + 'transformersInferenceTest' // (TODO) https://github.com/apache/beam/issues/25799 // uncomment tfx bsl tests once tfx supports protobuf 4.x // 'tfxInferenceTest', diff --git a/sdks/python/test-suites/tox/py38/build.gradle b/sdks/python/test-suites/tox/py38/build.gradle index 7243a0188eda..208a1d9d39ca 100644 --- a/sdks/python/test-suites/tox/py38/build.gradle +++ b/sdks/python/test-suites/tox/py38/build.gradle @@ -130,6 +130,19 @@ toxTask "testPy38tensorflow-212", "py38-tensorflow-212", "${posargs}" test.dependsOn "testPy38tensorflow-212" preCommitPyCoverage.dependsOn "testPy38tensorflow-212" +// Create a test task for each minor version of transformers +toxTask "testPy38transformers-428", "py38-transformers-428", "${posargs}" +test.dependsOn "testPy38transformers-428" +preCommitPyCoverage.dependsOn "testPy38transformers-428" + +toxTask "testPy38transformers-429", "py38-transformers-429", "${posargs}" +test.dependsOn "testPy38transformers-429" +preCommitPyCoverage.dependsOn "testPy38transformers-429" + +toxTask "testPy38transformers-430", "py38-transformers-430", "${posargs}" +test.dependsOn "testPy38transformers-430" +preCommitPyCoverage.dependsOn "testPy38transformers-430" + toxTask "whitespacelint", "whitespacelint", "${posargs}" task archiveFilesToLint(type: Zip) { diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 3f1b32a20d22..a5c1db416e5d 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -163,6 +163,7 @@ deps = torch xgboost datatable==1.0.0 + transformers commands = time {toxinidir}/scripts/generate_pydoc.sh @@ -406,3 +407,21 @@ commands = # Run all XGBoost unit tests # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories. /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_xgboost {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + +[testenv:py{38,39,310,311}-transformers-{428,429,430}] +deps = + -r build-requirements.txt + 428: transformers>=4.28.0,<4.29.0 + 429: transformers>=4.29.0,<4.30.0 + 430: transformers>=4.30.0,<4.31.0 + torch>=1.9.0,<1.14.0 + tensorflow==2.12.0 +extras = test,gcp +commands = + # Log transformers and its dependencies version for debugging + /bin/sh -c "pip freeze | grep -E transformers" + /bin/sh -c "pip freeze | grep -E torch" + /bin/sh -c "pip freeze | grep -E tensorflow" + # Run all Transformers unit tests + # Allow exit code 5 (no tests run) so that we can run this command safely on arbitrary subdirectories. + /bin/sh -c 'pytest -o junit_suite_name={envname} --junitxml=pytest_{envname}.xml -n 6 -m uses_transformers {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' \ No newline at end of file