-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Initial work on MLTransform and ProcessHandler * Support for containers: List, Dict[str, np.ndarray] pass types Support Pyarrow schema Artifact WIP * Add min, max, artifacts for scale_0_to_1 * Add more transform functions and artifacts WIP on inferring types Remove pyarrow implementation Add MLTransformOutput Refactor files * Add generic type annotations * Add unit tests Fix artifacts code Add more tests fix lint erors Change namespaces from ml_transform to transforms Add doc strings Add tests and refactor * Add support for saving intermediate results for a transform Sort imports Add metrics namespaces Refactor * Add schema to the output PCollection * Remove MLTransformOutput and return Row instead with schema * Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec Make VarLenFeatureSpec as default Refactoring * Add append_transform to the ProcessHandler Some more refactoring * Remove param self.has_artifacts, add artifact_location to handler..and address PR comments Add skip conditions for tests Add test suite for tft tests * Move tensorflow import into the try except catch Try except in __init__.py Remove imports from __init__ Add docstrings, refactor * Add type annotations for the data transforms * Add tft test in tox.ini Mock tensorflow_transform in pydocs fix tft pypi name Skip a test Add step name Update supported versions of TFT * Add step name for TFTProcessHandler * Remove unsupported tft versions * Fix mypy * Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema * Update doc for data processing transforms * Fix checking the typing container types * Refactor code * Fail TFTProcessHandler on a non-global window PColl * Remove underscore * Remove high level functions * Add TFIDF * Fix tests with new changes[WIP] * Fix tests * Refactor class name to CamelCase and remove kwrags * use is_default instead of isinstance * Remove falling back to staging location for artifact location * Add TFIDF tests * Remove __str__ * Refactor skip statement * Add utils for fetching artifacts on compute and apply vocab * Make ProcessHandler internal class * Only run analyze stage when transform_fn(artifacts) is not computed before. * Fail if pipeline has non default window during artifact producing stage * Add support for Dict, recordbatch and introduce artifact_mode * Hide process_handler from user. Make TFTProcessHandler as default * Refactor few tests * Comment a test * Save raw_data_meta_data so that it can be used during consume stage * Refactor code * Add test on artifacts * Fix imports * Add tensorflow_metadata to pydocs * Fix test * Add TFIDF to import * Add basic example * Remove redundant logging statements * Add test for multiple columns on MLTransform * Add todo about what to do when new process handler is introduced * Add abstractmethod decorator * Edit Error message * Update docs, error messages * Remove record batch input/output arg * Modify generic types * Fix import sort * Fix mypy errors - best effort * Fix tests * Add TFTOperation doc * Rename tft_transform to tft * Fix hadler_test * Fix base_test * Fix pydocs
- Loading branch information
1 parent
a587379
commit b529efe
Showing
12 changed files
with
2,212 additions
and
1 deletion.
There are no files selected for viewing
118 changes: 118 additions & 0 deletions
118
sdks/python/apache_beam/examples/ml_transform/ml_transform_basic.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
""" | ||
This example demonstrates how to use MLTransform. | ||
MLTransform is a PTransform that applies multiple data transformations on the | ||
incoming data. | ||
This example computes the vocabulary on the incoming data. Then, it computes | ||
the TF-IDF of the incoming data using the vocabulary computed in the previous | ||
step. | ||
1. ComputeAndApplyVocabulary computes the vocabulary on the incoming data and | ||
overrides the incoming data with the vocabulary indices. | ||
2. TFIDF computes the TF-IDF of the incoming data using the vocabulary and | ||
provides vocab_index and tf-idf weights. vocab_index is suffixed with | ||
'_vocab_index' and tf-idf weights are suffixed with '_tfidf' to the | ||
original column name(which is the output of ComputeAndApplyVocabulary). | ||
MLTransform produces artifacts, for example: ComputeAndApplyVocabulary produces | ||
a text file that contains vocabulary which is saved in `artifact_location`. | ||
ComputeAndApplyVocabulary outputs vocab indices associated with the saved vocab | ||
file. This mode of MLTransform is called artifact `produce` mode. | ||
This will be useful when the data is preprocessed before ML model training. | ||
The second mode of MLTransform is artifact `consume` mode. In this mode, the | ||
transformations are applied on the incoming data using the artifacts produced | ||
by the previous run of MLTransform. This mode will be useful when the data is | ||
preprocessed before ML model inference. | ||
""" | ||
|
||
import argparse | ||
import logging | ||
import tempfile | ||
|
||
import apache_beam as beam | ||
from apache_beam.ml.transforms.base import ArtifactMode | ||
from apache_beam.ml.transforms.base import MLTransform | ||
from apache_beam.ml.transforms.tft import TFIDF | ||
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary | ||
from apache_beam.ml.transforms.utils import ArtifactsFetcher | ||
|
||
|
||
def parse_args(): | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('--artifact_location', type=str, default='') | ||
return parser.parse_known_args() | ||
|
||
|
||
def run(args): | ||
data = [ | ||
dict(x=["Let's", "go", "to", "the", "park"]), | ||
dict(x=["I", "enjoy", "going", "to", "the", "park"]), | ||
dict(x=["I", "enjoy", "reading", "books"]), | ||
dict(x=["Beam", "can", "be", "fun"]), | ||
dict(x=["The", "weather", "is", "really", "nice", "today"]), | ||
dict(x=["I", "love", "to", "go", "to", "the", "park"]), | ||
dict(x=["I", "love", "to", "read", "books"]), | ||
dict(x=["I", "love", "to", "program"]), | ||
] | ||
|
||
with beam.Pipeline() as p: | ||
input_data = p | beam.Create(data) | ||
|
||
# arfifacts produce mode. | ||
input_data |= ( | ||
'MLTransform' >> MLTransform( | ||
artifact_location=args.artifact_location, | ||
artifact_mode=ArtifactMode.PRODUCE, | ||
).with_transform(ComputeAndApplyVocabulary( | ||
columns=['x'])).with_transform(TFIDF(columns=['x']))) | ||
|
||
# _ = input_data | beam.Map(logging.info) | ||
|
||
with beam.Pipeline() as p: | ||
input_data = [ | ||
dict(x=['I', 'love', 'books']), dict(x=['I', 'love', 'Apache', 'Beam']) | ||
] | ||
input_data = p | beam.Create(input_data) | ||
|
||
# artifacts consume mode. | ||
input_data |= ( | ||
MLTransform( | ||
artifact_location=args.artifact_location, | ||
artifact_mode=ArtifactMode.CONSUME, | ||
# you don't need to specify transforms as they are already saved in | ||
# in the artifacts. | ||
)) | ||
|
||
_ = input_data | beam.Map(logging.info) | ||
|
||
# To fetch the artifacts after the pipeline is run | ||
artifacts_fetcher = ArtifactsFetcher(artifact_location=args.artifact_location) | ||
vocab_list = artifacts_fetcher.get_vocab_list() | ||
assert vocab_list[22] == 'Beam' | ||
|
||
|
||
if __name__ == '__main__': | ||
logging.getLogger().setLevel(logging.INFO) | ||
args, pipeline_args = parse_args() | ||
# for this example, create a temp artifact location if not provided. | ||
if args.artifact_location == '': | ||
args.artifact_location = tempfile.mkdtemp() | ||
run(args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# pytype: skip-file | ||
|
||
import abc | ||
from typing import Generic | ||
from typing import List | ||
from typing import Optional | ||
from typing import Sequence | ||
from typing import TypeVar | ||
|
||
import apache_beam as beam | ||
|
||
__all__ = ['MLTransform', 'ProcessHandler', 'BaseOperation'] | ||
|
||
TransformedDatasetT = TypeVar('TransformedDatasetT') | ||
TransformedMetadataT = TypeVar('TransformedMetadataT') | ||
|
||
# Input/Output types to the MLTransform. | ||
ExampleT = TypeVar('ExampleT') | ||
MLTransformOutputT = TypeVar('MLTransformOutputT') | ||
|
||
# Input to the apply() method of BaseOperation. | ||
OperationInputT = TypeVar('OperationInputT') | ||
# Output of the apply() method of BaseOperation. | ||
OperationOutputT = TypeVar('OperationOutputT') | ||
|
||
|
||
class ArtifactMode(object): | ||
PRODUCE = 'produce' | ||
CONSUME = 'consume' | ||
|
||
|
||
class BaseOperation(Generic[OperationInputT, OperationOutputT], abc.ABC): | ||
def __init__(self, columns: List[str]) -> None: | ||
""" | ||
Base Opertation class data processing transformations. | ||
Args: | ||
columns: List of column names to apply the transformation. | ||
""" | ||
self.columns = columns | ||
|
||
@abc.abstractmethod | ||
def apply( | ||
self, data: OperationInputT, output_column_name: str) -> OperationOutputT: | ||
""" | ||
Define any processing logic in the apply() method. | ||
processing logics are applied on inputs and returns a transformed | ||
output. | ||
Args: | ||
inputs: input data. | ||
""" | ||
|
||
|
||
class ProcessHandler(Generic[ExampleT, MLTransformOutputT], abc.ABC): | ||
""" | ||
Only for internal use. No backwards compatibility guarantees. | ||
""" | ||
@abc.abstractmethod | ||
def process_data( | ||
self, pcoll: beam.PCollection[ExampleT] | ||
) -> beam.PCollection[MLTransformOutputT]: | ||
""" | ||
Logic to process the data. This will be the entrypoint in | ||
beam.MLTransform to process incoming data. | ||
""" | ||
|
||
@abc.abstractmethod | ||
def append_transform(self, transform: BaseOperation): | ||
""" | ||
Append transforms to the ProcessHandler. | ||
""" | ||
|
||
|
||
class MLTransform(beam.PTransform[beam.PCollection[ExampleT], | ||
beam.PCollection[MLTransformOutputT]], | ||
Generic[ExampleT, MLTransformOutputT]): | ||
def __init__( | ||
self, | ||
*, | ||
artifact_location: str, | ||
artifact_mode: str = ArtifactMode.PRODUCE, | ||
transforms: Optional[Sequence[BaseOperation]] = None): | ||
""" | ||
Args: | ||
artifact_location: A storage location for artifacts resulting from | ||
MLTransform. These artifacts include transformations applied to | ||
the dataset and generated values like min, max from ScaleTo01, | ||
and mean, var from ScaleToZScore. Artifacts are produced and stored | ||
in this location when the `artifact_mode` is set to 'produce'. | ||
Conversely, when `artifact_mode` is set to 'consume', artifacts are | ||
retrieved from this location. Note that when consuming artifacts, | ||
it is not necessary to pass the transforms since they are inherently | ||
stored within the artifacts themselves. The value assigned to | ||
`artifact_location` should be a valid storage path where the artifacts | ||
can be written to or read from. | ||
transforms: A list of transforms to apply to the data. All the transforms | ||
are applied in the order they are specified. The input of the | ||
i-th transform is the output of the (i-1)-th transform. Multi-input | ||
transforms are not supported yet. | ||
artifact_mode: Whether to produce or consume artifacts. If set to | ||
'consume', the handler will assume that the artifacts are already | ||
computed and stored in the artifact_location. Pass the same artifact | ||
location that was passed during produce phase to ensure that the | ||
right artifacts are read. If set to 'produce', the handler | ||
will compute the artifacts and store them in the artifact_location. | ||
The artifacts will be read from this location during the consume phase. | ||
There is no need to pass the transforms in this case since they are | ||
already embedded in the stored artifacts. | ||
""" | ||
# avoid circular import | ||
# pylint: disable=wrong-import-order, wrong-import-position | ||
from apache_beam.ml.transforms.handlers import TFTProcessHandler | ||
# TODO: When new ProcessHandlers(eg: JaxProcessHandler) are introduced, | ||
# create a mapping between transforms and ProcessHandler since | ||
# ProcessHandler is not exposed to the user. | ||
process_handler: ProcessHandler = TFTProcessHandler( | ||
artifact_location=artifact_location, | ||
artifact_mode=artifact_mode, | ||
transforms=transforms) # type: ignore[arg-type] | ||
|
||
self._process_handler = process_handler | ||
|
||
def expand( | ||
self, pcoll: beam.PCollection[ExampleT] | ||
) -> beam.PCollection[MLTransformOutputT]: | ||
""" | ||
This is the entrypoint for the MLTransform. This method will | ||
invoke the process_data() method of the ProcessHandler instance | ||
to process the incoming data. | ||
process_data takes in a PCollection and applies the PTransforms | ||
necessary to process the data and returns a PCollection of | ||
transformed data. | ||
Args: | ||
pcoll: A PCollection of ExampleT type. | ||
Returns: | ||
A PCollection of MLTransformOutputT type. | ||
""" | ||
return self._process_handler.process_data(pcoll) | ||
|
||
def with_transform(self, transform: BaseOperation): | ||
""" | ||
Add a transform to the MLTransform pipeline. | ||
Args: | ||
transform: A BaseOperation instance. | ||
Returns: | ||
A MLTransform instance. | ||
""" | ||
self._process_handler.append_transform(transform) | ||
return self |
Oops, something went wrong.