From 44ca7b9e005b78e0ef3e506053fd921cd3139ee2 Mon Sep 17 00:00:00 2001 From: Jun He Date: Sun, 27 Mar 2022 18:59:36 -0700 Subject: [PATCH 1/2] Support bucket transform. --- python/setup.cfg | 2 + python/src/iceberg/transforms.py | 234 +++++++++++++++++++++++++++++++ python/tests/test_transforms.py | 133 ++++++++++++++++++ 3 files changed, 369 insertions(+) create mode 100644 python/src/iceberg/transforms.py create mode 100644 python/tests/test_transforms.py diff --git a/python/setup.cfg b/python/setup.cfg index a707fe123d77..07500c299d0d 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -42,6 +42,8 @@ package_dir = = src packages = find: python_requires = >=3.7 +install_requires = + mmh3 [options.extras_require] arrow = pyarrow diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py new file mode 100644 index 000000000000..076e41d908dd --- /dev/null +++ b/python/src/iceberg/transforms.py @@ -0,0 +1,234 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import math +import struct +from decimal import Decimal +from typing import Optional +from uuid import UUID + +import mmh3 # type: ignore + +from iceberg.types import ( + BinaryType, + DateType, + DecimalType, + FixedType, + IcebergType, + IntegerType, + LongType, + StringType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, +) + + +class Transform: + """Transform base class for concrete transforms. + + A base class to transform values and project predicates on partition values. + This class is not used directly. Instead, use one of module method to create the child classes. + + Args: + transform_string (str): name of the transform type + repr_string (str): string representation of a transform instance + """ + + def __init__(self, transform_string: str, repr_string: str): + self._transform_string = transform_string + self._repr_string = repr_string + + def __repr__(self): + return self._repr_string + + def __str__(self): + return self._transform_string + + def __call__(self, value): + return self.apply(value) + + def apply(self, value): + raise NotImplementedError() + + def can_transform(self, source: IcebergType) -> bool: + return False + + def result_type(self, source: IcebergType) -> IcebergType: + raise NotImplementedError() + + def preserves_order(self) -> bool: + return False + + def satisfies_order_of(self, other) -> bool: + return self == other + + def to_human_string(self, value) -> str: + if value is None: + return "null" + return str(value) + + def dedup_name(self) -> str: + return self._transform_string + + +class BaseBucketTransform(Transform): + """Base Transform class to transform a value into a bucket partition value + + Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit + hash of the source value to produce a positive value by mod the bucket number. + + Args: + source_type (Type): An Iceberg Type of IntegerType, LongType, DecimalType, DateType, TimeType, + TimestampType, TimestamptzType, StringType, BinaryType, FixedType, UUIDType. + num_buckets (int): The number of buckets. + """ + + def __init__(self, source_type: IcebergType, num_buckets: int): + super().__init__( + f"bucket[{num_buckets}]", + f"transforms.bucket(source_type={repr(source_type)}, num_buckets={num_buckets})", + ) + self._num_buckets = num_buckets + + @property + def num_buckets(self) -> int: + return self._num_buckets + + def hash(self, value) -> int: + raise NotImplementedError() + + def apply(self, value) -> Optional[int]: + if value is None: + return None + + return (self.hash(value) & IntegerType.max) % self._num_buckets + + def can_transform(self, source: IcebergType) -> bool: + raise NotImplementedError() + + def result_type(self, source: IcebergType) -> IcebergType: + return IntegerType() + + +class BucketNumberTransform(BaseBucketTransform): + """Transforms a value of IntegerType, LongType, DateType, TimeType, TimestampType, or TimestamptzType + into a bucket partition value + + Example: + >>> transform = BucketNumberTransform(100) + >>> transform.apply(81068000000) + 59 + """ + + def can_transform(self, source: IcebergType) -> bool: + return type(source) in {IntegerType, DateType, LongType, TimeType, TimestampType, TimestamptzType} + + def hash(self, value) -> int: + return mmh3.hash(struct.pack(">> transform = BucketDecimalTransform(100) + >>> transform.apply(Decimal("14.20")) + 59 + """ + + def can_transform(self, source: IcebergType) -> bool: + return isinstance(source, DecimalType) + + def hash(self, value: Decimal) -> int: + value_tuple = value.as_tuple() + unscaled_value = int(("-" if value_tuple.sign else "") + "".join([str(d) for d in value_tuple.digits])) + number_of_bytes = int(math.ceil(unscaled_value.bit_length() / 8)) + value_in_bytes = unscaled_value.to_bytes(length=number_of_bytes, byteorder="big") + return mmh3.hash(value_in_bytes) + + +class BucketStringTransform(BaseBucketTransform): + """Transforms a value of StringType into a bucket partition value. + + Example: + >>> transform = BucketStringTransform(100) + >>> transform.apply("iceberg") + 89 + """ + + def can_transform(self, source: IcebergType) -> bool: + return isinstance(source, StringType) + + def hash(self, value: str) -> int: + return mmh3.hash(value) + + +class BucketBytesTransform(BaseBucketTransform): + """Transforms a value of BinaryType into a bucket partition value. + + Example: + >>> transform = BucketBytesTransform(100) + >>> transform.apply(b"\x00\x01\x02\x03") + 41 + """ + + def can_transform(self, source: IcebergType) -> bool: + return type(source) in {FixedType, BinaryType} + + def hash(self, value: bytes) -> int: + return mmh3.hash(value) + + +class BucketUUIDTransform(BaseBucketTransform): + """Transforms a value of UUIDType into a bucket partition value. + + Example: + >>> transform = BucketUUIDTransform(100) + >>> transform.apply(UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")) + 40 + """ + + def can_transform(self, source: IcebergType) -> bool: + return isinstance(source, UUIDType) + + def hash(self, value: UUID) -> int: + return mmh3.hash( + struct.pack( + ">QQ", + (value.int >> 64) & 0xFFFFFFFFFFFFFFFF, + value.int & 0xFFFFFFFFFFFFFFFF, + ) + ) + + +def bucket(source_type: IcebergType, num_buckets: int) -> BaseBucketTransform: + if type(source_type) in {IntegerType, LongType, DateType, TimeType, TimestampType, TimestamptzType}: + return BucketNumberTransform(source_type, num_buckets) + elif isinstance(source_type, DecimalType): + return BucketDecimalTransform(source_type, num_buckets) + elif isinstance(source_type, StringType): + return BucketStringTransform(source_type, num_buckets) + elif isinstance(source_type, BinaryType): + return BucketBytesTransform(source_type, num_buckets) + elif isinstance(source_type, FixedType): + return BucketBytesTransform(source_type, num_buckets) + elif isinstance(source_type, UUIDType): + return BucketUUIDTransform(source_type, num_buckets) + else: + raise ValueError(f"Cannot bucket by type: {source_type}") diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py new file mode 100644 index 000000000000..96b68b8fb470 --- /dev/null +++ b/python/tests/test_transforms.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from decimal import Decimal, getcontext +from uuid import UUID + +import pytest + +from iceberg import transforms +from iceberg.types import ( + BinaryType, + DateType, + DecimalType, + FixedType, + IntegerType, + LongType, + StringType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, +) + + +@pytest.mark.parametrize( + "test_input,test_type,expected", + [ + (1, IntegerType(), 1392991556), + (34, IntegerType(), 2017239379), + (34, LongType(), 2017239379), + (17486, DateType(), -653330422), + (81068000000, TimeType(), -662762989), + ( + int(datetime.fromisoformat("2017-11-16T22:31:08+00:00").timestamp() * 1000000), + TimestampType(), + -2047944441, + ), + ( + int(datetime.fromisoformat("2017-11-16T14:31:08-08:00").timestamp() * 1000000), + TimestamptzType(), + -2047944441, + ), + (b"\x00\x01\x02\x03", BinaryType(), -188683207), + (b"\x00\x01\x02\x03", FixedType(4), -188683207), + ("iceberg", StringType(), 1210000089), + (UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"), UUIDType(), 1488055340), + ], +) +def test_bucket_hash_values(test_input, test_type, expected): + assert transforms.bucket(test_type, 8).hash(test_input) == expected + + +@pytest.mark.parametrize( + "test_input,test_type,scale_factor,expected_hash,expected", + [ + (Decimal("14.20"), DecimalType(9, 2), Decimal(10) ** -2, -500754589, 59), + ( + Decimal("137302769811943318102518958871258.37580"), + DecimalType(38, 5), + Decimal(10) ** -5, + -32334285, + 63, + ), + ], +) +def test_decimal_bucket(test_input, test_type, scale_factor, expected_hash, expected): + getcontext().prec = 38 + assert transforms.bucket(test_type, 100).hash(test_input.quantize(scale_factor)) == expected_hash + assert transforms.bucket(test_type, 100).apply(test_input.quantize(scale_factor)) == expected + + +@pytest.mark.parametrize( + "bucket,value,expected", + [ + (transforms.bucket(IntegerType(), 100), 34, 79), + (transforms.bucket(LongType(), 100), 34, 79), + (transforms.bucket(DateType(), 100), 17486, 26), + (transforms.bucket(TimeType(), 100), 81068000000, 59), + (transforms.bucket(TimestampType(), 100), 1510871468000000, 7), + (transforms.bucket(DecimalType(9, 2), 100), Decimal("14.20"), 59), + (transforms.bucket(StringType(), 100), "iceberg", 89), + ( + transforms.bucket(UUIDType(), 100), + UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"), + 40, + ), + (transforms.bucket(FixedType(3), 128), b"foo", 32), + (transforms.bucket(BinaryType(), 128), b"\x00\x01\x02\x03", 57), + ], +) +def test_buckets(bucket, value, expected): + assert bucket.apply(value) == expected + + +@pytest.mark.parametrize( + "type_var", + [ + BinaryType(), + DateType(), + DecimalType(8, 5), + FixedType(8), + IntegerType(), + LongType(), + StringType(), + TimestampType(), + TimestamptzType(), + TimeType(), + UUIDType(), + ], +) +def test_bucket_method(type_var): + bucket_transform = transforms.bucket(type_var, 8) + assert str(bucket_transform) == str(eval(repr(bucket_transform))) + assert bucket_transform.can_transform(type_var) + assert bucket_transform.result_type(type_var) == IntegerType() + assert bucket_transform.num_buckets == 8 + assert bucket_transform.apply(None) is None + assert bucket_transform.to_human_string("test") == "test" From 731828aa93bc88e3e1597b43e56fc649a285bcba Mon Sep 17 00:00:00 2001 From: Jun He Date: Sun, 27 Mar 2022 21:58:29 -0700 Subject: [PATCH 2/2] fix the docs --- python/src/iceberg/transforms.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py index 076e41d908dd..d24668cc60fc 100644 --- a/python/src/iceberg/transforms.py +++ b/python/src/iceberg/transforms.py @@ -131,7 +131,7 @@ class BucketNumberTransform(BaseBucketTransform): into a bucket partition value Example: - >>> transform = BucketNumberTransform(100) + >>> transform = BucketNumberTransform(LongType(), 100) >>> transform.apply(81068000000) 59 """ @@ -147,7 +147,7 @@ class BucketDecimalTransform(BaseBucketTransform): """Transforms a value of DecimalType into a bucket partition value. Example: - >>> transform = BucketDecimalTransform(100) + >>> transform = BucketDecimalTransform(DecimalType(9, 2), 100) >>> transform.apply(Decimal("14.20")) 59 """ @@ -172,6 +172,9 @@ class BucketStringTransform(BaseBucketTransform): 89 """ + def __init__(self, num_buckets: int): + super().__init__(StringType(), num_buckets) + def can_transform(self, source: IcebergType) -> bool: return isinstance(source, StringType) @@ -180,11 +183,11 @@ def hash(self, value: str) -> int: class BucketBytesTransform(BaseBucketTransform): - """Transforms a value of BinaryType into a bucket partition value. + """Transforms a value of FixedType or BinaryType into a bucket partition value. Example: - >>> transform = BucketBytesTransform(100) - >>> transform.apply(b"\x00\x01\x02\x03") + >>> transform = BucketBytesTransform(BinaryType(), 100) + >>> transform.apply(b"\\x00\\x01\\x02\\x03") 41 """ @@ -204,6 +207,9 @@ class BucketUUIDTransform(BaseBucketTransform): 40 """ + def __init__(self, num_buckets: int): + super().__init__(UUIDType(), num_buckets) + def can_transform(self, source: IcebergType) -> bool: return isinstance(source, UUIDType) @@ -223,12 +229,12 @@ def bucket(source_type: IcebergType, num_buckets: int) -> BaseBucketTransform: elif isinstance(source_type, DecimalType): return BucketDecimalTransform(source_type, num_buckets) elif isinstance(source_type, StringType): - return BucketStringTransform(source_type, num_buckets) + return BucketStringTransform(num_buckets) elif isinstance(source_type, BinaryType): return BucketBytesTransform(source_type, num_buckets) elif isinstance(source_type, FixedType): return BucketBytesTransform(source_type, num_buckets) elif isinstance(source_type, UUIDType): - return BucketUUIDTransform(source_type, num_buckets) + return BucketUUIDTransform(num_buckets) else: raise ValueError(f"Cannot bucket by type: {source_type}")