Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python]Remove get_artifacts in MLTranform since artifacts are stored in artifact location #29016

Merged
merged 11 commits into from
Oct 25, 2023
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

* Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's BigtableIO.BigtableSource when you have more cores than bytes to read (Java) [#28793](https://github.com/apache/beam/issues/28793).
* `watch_file_pattern` arg of the [RunInference](https://github.com/apache/beam/blob/104c10b3ee536a9a3ea52b4dbf62d86b669da5d9/sdks/python/apache_beam/ml/inference/base.py#L997) arg had no effect prior to 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0, follow the documentation at https://beam.apache.org/documentation/ml/side-input-updates/ and use `WatchFilePattern` PTransform as a SideInput. ([#28948](https://github.com/apache/beam/pulls/28948))
* `MLTransform` doesn't output artifacts such as min, max and quantiles. Instead, `MLTransform` will add a feature to output these artifacts as human readable format - [#29017](https://github.com/apache/beam/issues/29017). For now, to use the artifacts such as min and max that were produced by the eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which reads artifacts that were produced earlier in a different `MLTransform` ([#29016](https://github.com/apache/beam/pull/29016/))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import tensorflow_transform as tft # pylint: disable=unused-import
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_scalar
except ImportError:
raise unittest.SkipTest('tensorflow_transform is not installed.')

Expand All @@ -46,8 +46,8 @@ def check_mltransform_compute_and_apply_vocab():

def check_mltransform_scale_to_0_1():
expected = '''[START mltransform_scale_to_0_1]
Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32))
[END mltransform_scale_to_0_1] '''.splitlines()[1:-1]
return expected

Expand Down Expand Up @@ -80,7 +80,7 @@ def test_mltransform_scale_to_0_1(self, mock_stdout):
self.assertEqual(predicted, expected)

def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout):
mltransform_compute_and_apply_vocabulary_with_non_columnar_data()
mltransform_compute_and_apply_vocabulary_with_scalar()
predicted = mock_stdout.getvalue().splitlines()
expected = check_mltransform_compute_and_apply_vocabulary_with_scalar()
self.assertEqual(predicted, expected)
Expand Down
57 changes: 6 additions & 51 deletions sdks/python/apache_beam/ml/transforms/tft.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
import tensorflow as tf
import tensorflow_transform as tft
from apache_beam.ml.transforms.base import BaseOperation
from tensorflow_transform import analyzers
from tensorflow_transform import common_types
from tensorflow_transform import tf_utils

__all__ = [
'ComputeAndApplyVocabulary',
Expand Down Expand Up @@ -77,6 +75,8 @@ def wrapper(fn):
return wrapper


# TODO: https://github.com/apache/beam/pull/29016
# Add support for outputting artifacts to a text file in human readable form.
class TFTOperation(BaseOperation[common_types.TensorType,
common_types.TensorType]):
def __init__(self, columns: List[str]) -> None:
Expand Down Expand Up @@ -240,15 +240,6 @@ def apply_transform(
}
return output_dict

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
mean_var = tft.analyzers._mean_and_var(data)
shape = [tf.shape(data)[0], 1]
return {
col_name + '_mean': tf.broadcast_to(mean_var[0], shape),
col_name + '_var': tf.broadcast_to(mean_var[1], shape),
}


@register_input_dtype(float)
class ScaleTo01(TFTOperation):
Expand Down Expand Up @@ -280,14 +271,6 @@ def __init__(
self.elementwise = elementwise
self.name = name

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
shape = [tf.shape(data)[0], 1]
return {
col_name + '_min': tf.broadcast_to(tft.min(data), shape),
col_name + '_max': tf.broadcast_to(tft.max(data), shape)
}

AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
Expand Down Expand Up @@ -368,34 +351,6 @@ def __init__(
self.elementwise = elementwise
self.name = name

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
num_buckets = self.num_buckets
epsilon = self.epsilon
elementwise = self.elementwise

if num_buckets < 1:
raise ValueError('Invalid num_buckets %d' % num_buckets)

if isinstance(data, (tf.SparseTensor, tf.RaggedTensor)) and elementwise:
raise ValueError(
'bucketize requires `x` to be dense if `elementwise=True`')

x_values = tf_utils.get_values(data)

if epsilon is None:
# See explanation in args documentation for epsilon.
epsilon = min(1.0 / num_buckets, 0.01)

quantiles = analyzers.quantiles(
x_values, num_buckets, epsilon, reduce_instance_dims=not elementwise)
shape = [
tf.shape(data)[0], num_buckets - 1 if num_buckets > 1 else num_buckets
]
# These quantiles are used as the bucket boundaries in the later stages.
# Should we change the prefix _quantiles to _bucket_boundaries?
return {col_name + '_quantiles': tf.broadcast_to(quantiles, shape)}

def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
Expand Down Expand Up @@ -614,10 +569,6 @@ def __init__(
raise ValueError(
'ngrams_separator must be specified when ngram_range is not (1, 1)')

def get_artifacts(self, data: tf.SparseTensor,
col_name: str) -> Dict[str, tf.Tensor]:
return self.compute_word_count_fn(data, col_name)

def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
if self.split_string_by_delimiter:
data = self._split_string_with_delimiter(
Expand All @@ -626,6 +577,10 @@ def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
data, self.ngram_range, self.ngrams_separator, self.name)
return {output_col_name: output}

def get_artifacts(self, data: tf.SparseTensor,
col_name: str) -> Dict[str, tf.Tensor]:
return self.compute_word_count_fn(data, col_name)
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved


def count_unqiue_words(data: tf.SparseTensor,
output_col_name: str) -> Dict[str, tf.Tensor]:
Expand Down
81 changes: 20 additions & 61 deletions sdks/python/apache_beam/ml/transforms/tft_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,6 @@
z_score_expected = {'x_mean': 3.5, 'x_var': 2.9166666666666665}


def assert_z_score_artifacts(element):
element = element.as_dict()
assert 'x_mean' in element
assert 'x_var' in element
assert element['x_mean'] == z_score_expected['x_mean']
assert element['x_var'] == z_score_expected['x_var']


def assert_ScaleTo01_artifacts(element):
element = element.as_dict()
assert 'x_min' in element
assert 'x_max' in element
assert element['x_min'] == 1
assert element['x_max'] == 6


def assert_bucketize_artifacts(element):
element = element.as_dict()
assert 'x_quantiles' in element
assert np.array_equal(
element['x_quantiles'], np.array([3, 5], dtype=np.float32))


class ScaleZScoreTest(unittest.TestCase):
def setUp(self) -> None:
self.artifact_location = tempfile.mkdtemp()
Expand Down Expand Up @@ -100,7 +77,18 @@ def test_z_score(self):
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
_ = (result | beam.Map(assert_z_score_artifacts))
expected_data = [
np.array([-1.46385], dtype=np.float32),
np.array([-0.87831], dtype=np.float32),
np.array([-0.29277], dtype=np.float32),
np.array([0.29277], dtype=np.float32),
np.array([0.87831], dtype=np.float32),
np.array([1.46385], dtype=np.float32),
]

actual_data = (result | beam.Map(lambda x: x.x))
assert_that(
actual_data, equal_to(expected_data, equals_fn=np.array_equal))

def test_z_score_list_data(self):
list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}]
Expand All @@ -111,7 +99,14 @@ def test_z_score_list_data(self):
| "listMLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
_ = (list_result | beam.Map(assert_z_score_artifacts))

expected_data = [
np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32),
np.array([0.29277, 0.87831, 1.46385], dtype=np.float32)
]
actual_data = (list_result | beam.Map(lambda x: x.x))
assert_that(
actual_data, equal_to(expected_data, equals_fn=np.array_equal))


class ScaleTo01Test(unittest.TestCase):
Expand All @@ -130,7 +125,6 @@ def test_ScaleTo01_list(self):
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))
_ = (list_result | beam.Map(assert_ScaleTo01_artifacts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than removing all of these checks, can we check that the written artifact is correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added #29017 to do this but for now, we will have to use some TF tools to do it. It is not clear how to do it but it is not straight forward.

To read the artifacts, we use read_artifact_location and there are tests covering this path.


expected_output = [
np.array([0, 0.2, 0.4], dtype=np.float32),
Expand All @@ -150,7 +144,6 @@ def test_ScaleTo01(self):
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))

_ = (result | beam.Map(assert_ScaleTo01_artifacts))
expected_output = (
np.array([0], dtype=np.float32),
np.array([0.2], dtype=np.float32),
Expand Down Expand Up @@ -179,7 +172,6 @@ def test_bucketize(self):
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
_ = (result | beam.Map(assert_bucketize_artifacts))

transformed_data = (result | beam.Map(lambda x: x.x))
expected_data = [
Expand All @@ -202,8 +194,6 @@ def test_bucketize_list(self):
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
_ = (list_result | beam.Map(assert_bucketize_artifacts))

transformed_data = (
list_result
| "TransformedColumnX" >> beam.Map(lambda ele: ele.x))
Expand All @@ -214,36 +204,6 @@ def test_bucketize_list(self):
assert_that(
transformed_data, equal_to(expected_data, equals_fn=np.array_equal))

@parameterized.expand([
(range(1, 10), [4, 7]),
(range(9, 0, -1), [4, 7]),
(range(19, 0, -1), [10]),
(range(1, 100), [25, 50, 75]),
# similar to the above but with odd number of elements
(range(1, 100, 2), [25, 51, 75]),
(range(99, 0, -1), range(10, 100, 10))
])
def test_bucketize_boundaries(self, test_input, expected_boundaries):
# boundaries are outputted as artifacts for the Bucketize transform.
data = [{'x': [i]} for i in test_input]
num_buckets = len(expected_boundaries) + 1
with beam.Pipeline() as p:
result = (
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=num_buckets)))
actual_boundaries = (
result
| beam.Map(lambda x: x.as_dict())
| beam.Map(lambda x: x['x_quantiles']))

def assert_boundaries(actual_boundaries):
assert np.array_equal(actual_boundaries, expected_boundaries)

_ = (actual_boundaries | beam.Map(assert_boundaries))


class ApplyBucketsTest(unittest.TestCase):
def setUp(self) -> None:
Expand Down Expand Up @@ -751,7 +711,6 @@ def map_element_to_count(elements, counts):
transforms=[
tft.BagOfWords(columns=['x'], compute_word_count=True)
]))

# the unique elements and counts are artifacts and will be
# stored in the result and same for all the elements in the
# PCollection.
Expand Down
9 changes: 5 additions & 4 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ deps =
holdup==1.8.0
extras =
gcp
allowlist_externals =
allowlist_externals =
bash
echo
sleep
Expand Down Expand Up @@ -194,7 +194,7 @@ deps =
extras =
azure
passenv = REQUESTS_CA_BUNDLE
allowlist_externals =
allowlist_externals =
wget
az
bash
Expand Down Expand Up @@ -311,11 +311,12 @@ commands =
# Run all DataFrame API unit tests
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe'

[testenv:py{38,39}-tft-113]
[testenv:py{38,39}-tft-{113,114}]
deps =
113: tensorflow_transform>=1.13.0,<1.14.0
114: tensorflow_transform>=1.14.0,<1.15.0
commands =
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms'
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py'

[testenv:py{38,39,310,311}-pytorch-{19,110,111,112,113}]
deps =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ artifacts.
When you use the `write_artifact_location` parameter, the `MLTransform` class runs the
specified transformations on the dataset and then creates artifacts from these
transformations. The artifacts are stored in the location that you specify in
the `write_artifact_location` parameter or in the `MLTransform` output.
the `write_artifact_location` parameter.

Write mode is useful when you want to store the results of your transformations
for future use. For example, if you apply the same transformations on a
Expand All @@ -120,8 +120,7 @@ The following examples demonstrate how write mode works.
The `ComputeAndApplyVocabulary`
transform outputs the indices of the vocabulary to the vocabulary file.
- The `ScaleToZScore` transform calculates the mean and variance over the entire dataset
and then normalizes the entire dataset using the mean and variance. The
mean and variance are outputted by the `MLTransform` operation.
and then normalizes the entire dataset using the mean and variance.
When you use the `write_artifact_location` parameter, these
values are stored as a `tensorflow` graph in the location specified by
the `write_artifact_location` parameter value. You can reuse the values in read mode
Expand Down
Loading