From c8eed22442fbb14f0ae31e3ccaede6f3b97ba1b8 Mon Sep 17 00:00:00 2001 From: Jun He Date: Tue, 14 Jun 2022 00:05:15 -0700 Subject: [PATCH 1/3] add truncate transform --- python/src/iceberg/transforms.py | 86 ++++++++++++++++++++++++++++- python/src/iceberg/utils/decimal.py | 13 +++++ python/tests/test_transforms.py | 55 ++++++++++++++++++ 3 files changed, 153 insertions(+), 1 deletion(-) diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py index a2fa8f67fd37..5d3cd20ed37d 100644 --- a/python/src/iceberg/transforms.py +++ b/python/src/iceberg/transforms.py @@ -40,8 +40,8 @@ UUIDType, ) from iceberg.utils import datetime -from iceberg.utils.decimal import decimal_to_bytes from src.iceberg.utils.singleton import Singleton +from iceberg.utils.decimal import decimal_to_bytes, truncate_decimal S = TypeVar("S") T = TypeVar("T") @@ -302,6 +302,86 @@ def _(self, _: IcebergType, value: int) -> str: return datetime.to_human_timestamptz(value) +class TruncateTransform(Transform[S, S]): + """A transform for truncating a value to a specified width. + Args: + source_type (Type): An Iceberg Type of IntegerType, LongType, StringType, BinaryType or DecimalType + width (int): The truncate width + Raises: + ValueError: If a type is provided that is incompatible with a Truncate transform + """ + + def __init__(self, source_type: IcebergType, width: int): + super().__init__( + f"truncate[{width}]", + f"transforms.truncate(source_type={repr(source_type)}, width={width})", + ) + self._type = source_type + self._width = width + + @property + def width(self): + return self._width + + def apply(self, value: Optional[S]) -> Optional[S]: + return self._truncate_value(value) if value is not None else None + + @singledispatchmethod + def _truncate_value(self, value: S) -> S: + raise ValueError(f"Cannot truncate value: {value}") + + @_truncate_value.register(int) + def _(self, value): + """Truncate a given int value into a given width if feasible.""" + if type(self._type) in {IntegerType, LongType}: + return value - value % self._width + else: + raise ValueError(f"Cannot truncate type: {self._type} for value: {value}") + + @_truncate_value.register(str) + def _(self, value): + """Truncate a given string to a given width.""" + return value[0 : min(self._width, len(value))] + + @_truncate_value.register(bytes) + def _(self, value): + """Truncate a given binary bytes into a given width.""" + if isinstance(self._type, BinaryType): + return value[0 : min(self._width, len(value))] + else: + raise ValueError(f"Cannot truncate type: {self._type}") + + @_truncate_value.register(Decimal) + def _(self, value): + """Truncate a given decimal value into a given width.""" + return truncate_decimal(value, self._width) + + def can_transform(self, source: IcebergType) -> bool: + return self._type == source + + def result_type(self, source: IcebergType) -> IcebergType: + return source + + def preserves_order(self) -> bool: + return True + + def satisfies_order_of(self, other: Transform) -> bool: + if self == other: + return True + elif isinstance(self._type, StringType) and isinstance(other, TruncateTransform) and isinstance(other._type, StringType): + return self._width >= other._width + + return False + + def to_human_string(self, value: Optional[S]) -> str: + if value is None: + return "null" + elif isinstance(value, bytes): + return _base64encode(value) + else: + return str(value) + + class UnknownTransform(Transform): """A transform that represents when an unknown transform is provided Args: @@ -369,5 +449,9 @@ def identity(source_type: IcebergType) -> IdentityTransform: return IdentityTransform(source_type) +def truncate(source_type: IcebergType, width: int) -> TruncateTransform: + return TruncateTransform(source_type, width) + + def always_null() -> Transform: return VoidTransform() diff --git a/python/src/iceberg/utils/decimal.py b/python/src/iceberg/utils/decimal.py index 1d4c2bddefd0..42f4a9869066 100644 --- a/python/src/iceberg/utils/decimal.py +++ b/python/src/iceberg/utils/decimal.py @@ -75,3 +75,16 @@ def decimal_to_bytes(value: Decimal) -> bytes: """ unscaled_value = decimal_to_unscaled(value) return unscaled_value.to_bytes(bytes_required(unscaled_value), byteorder="big", signed=True) + + +def truncate_decimal(value: Decimal, width: int) -> Decimal: + """Get a truncated Decimal value given a decimal value and a width + Args: + value (Decimal): a decimal value + width (int): A width for the returned Decimal instance + Returns: + Decimal: A truncated Decimal instance + """ + unscaled_value = decimal_to_unscaled(value) + applied_value = unscaled_value - (((unscaled_value % width) + width) % width) + return Decimal(f"{applied_value}e{value.as_tuple().exponent}") diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py index d7b8e968a9fd..8606e4aaeaf6 100644 --- a/python/tests/test_transforms.py +++ b/python/tests/test_transforms.py @@ -177,6 +177,61 @@ def test_identity_method(type_var): assert identity_transform.apply("test") == "test" +@pytest.mark.parametrize("type_var", [IntegerType(), LongType()]) +@pytest.mark.parametrize( + "input_var,expected", + [(1, 0), (5, 0), (9, 0), (10, 10), (11, 10), (-1, -10), (-10, -10), (-12, -20)], +) +def test_truncate_integer(type_var, input_var, expected): + trunc = transforms.truncate(type_var, 10) + assert trunc.apply(input_var) == expected + + +@pytest.mark.parametrize( + "input_var,expected", + [ + (Decimal("12.34"), Decimal("12.30")), + (Decimal("12.30"), Decimal("12.30")), + (Decimal("12.29"), Decimal("12.20")), + (Decimal("0.05"), Decimal("0.00")), + (Decimal("-0.05"), Decimal("-0.10")), + ], +) +def test_truncate_decimal(input_var, expected): + trunc = transforms.truncate(DecimalType(9, 2), 10) + assert trunc.apply(input_var) == expected + + +@pytest.mark.parametrize("input_var,expected", [("abcdefg", "abcde"), ("abc", "abc")]) +def test_truncate_string(input_var, expected): + trunc = transforms.truncate(StringType(), 5) + assert trunc.apply(input_var) == expected + + +@pytest.mark.parametrize( + "type_var,value,expected_human_str,expected", + [ + (BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"), + (DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")), + (IntegerType(), 123, "123", 123), + (LongType(), 123, "123", 123), + (StringType(), "foo", "foo", "f"), + ], +) +def test_truncate_method(type_var, value, expected_human_str, expected): + truncate_transform = transforms.truncate(type_var, 1) + assert str(truncate_transform) == str(eval(repr(truncate_transform))) + assert truncate_transform.can_transform(type_var) + assert truncate_transform.result_type(type_var) == type_var + assert truncate_transform.to_human_string(value) == expected_human_str + assert truncate_transform.apply(value) == expected + assert truncate_transform.to_human_string(None) == "null" + assert truncate_transform.width == 1 + assert truncate_transform.apply(None) is None + assert truncate_transform.preserves_order() + assert truncate_transform.satisfies_order_of(truncate_transform) + + def test_unknown_transform(): unknown_transform = transforms.UnknownTransform(FixedType(8), "unknown") assert str(unknown_transform) == str(eval(repr(unknown_transform))) From 0e53c2280fa4eb41462ede9d6522367b30076bcf Mon Sep 17 00:00:00 2001 From: Jun He Date: Sun, 26 Jun 2022 21:04:02 -0700 Subject: [PATCH 2/3] address the comments --- python/src/iceberg/transforms.py | 11 ++++++++--- python/src/iceberg/utils/decimal.py | 2 +- python/tests/test_transforms.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py index 5d3cd20ed37d..2d54eedb7c22 100644 --- a/python/src/iceberg/transforms.py +++ b/python/src/iceberg/transforms.py @@ -320,9 +320,13 @@ def __init__(self, source_type: IcebergType, width: int): self._width = width @property - def width(self): + def width(self) -> int: return self._width + @property + def type(self) -> IcebergType: + return self._type + def apply(self, value: Optional[S]) -> Optional[S]: return self._truncate_value(value) if value is not None else None @@ -362,14 +366,15 @@ def can_transform(self, source: IcebergType) -> bool: def result_type(self, source: IcebergType) -> IcebergType: return source + @property def preserves_order(self) -> bool: return True def satisfies_order_of(self, other: Transform) -> bool: if self == other: return True - elif isinstance(self._type, StringType) and isinstance(other, TruncateTransform) and isinstance(other._type, StringType): - return self._width >= other._width + elif isinstance(self._type, StringType) and isinstance(other, TruncateTransform) and isinstance(other.type, StringType): + return self._width >= other.width return False diff --git a/python/src/iceberg/utils/decimal.py b/python/src/iceberg/utils/decimal.py index 42f4a9869066..40bc087390c3 100644 --- a/python/src/iceberg/utils/decimal.py +++ b/python/src/iceberg/utils/decimal.py @@ -87,4 +87,4 @@ def truncate_decimal(value: Decimal, width: int) -> Decimal: """ unscaled_value = decimal_to_unscaled(value) applied_value = unscaled_value - (((unscaled_value % width) + width) % width) - return Decimal(f"{applied_value}e{value.as_tuple().exponent}") + return unscaled_to_decimal(applied_value, -value.as_tuple().exponent) diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py index 8606e4aaeaf6..ad6b76cf4d3c 100644 --- a/python/tests/test_transforms.py +++ b/python/tests/test_transforms.py @@ -228,7 +228,7 @@ def test_truncate_method(type_var, value, expected_human_str, expected): assert truncate_transform.to_human_string(None) == "null" assert truncate_transform.width == 1 assert truncate_transform.apply(None) is None - assert truncate_transform.preserves_order() + assert truncate_transform.preserves_order assert truncate_transform.satisfies_order_of(truncate_transform) From 99192496c391a83bb189c34869ae7dfa9a304744 Mon Sep 17 00:00:00 2001 From: Jun He Date: Wed, 29 Jun 2022 23:57:54 -0700 Subject: [PATCH 3/3] address the comments --- python/src/iceberg/transforms.py | 147 +++++++++++++++++-------------- python/tests/test_transforms.py | 2 + 2 files changed, 82 insertions(+), 67 deletions(-) diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py index 2d54eedb7c22..aafda5df8058 100644 --- a/python/src/iceberg/transforms.py +++ b/python/src/iceberg/transforms.py @@ -19,8 +19,13 @@ import struct from abc import ABC, abstractmethod from decimal import Decimal -from functools import singledispatchmethod -from typing import Generic, Optional, TypeVar +from functools import singledispatch +from typing import ( + Any, + Generic, + Optional, + TypeVar, +) from uuid import UUID import mmh3 # type: ignore @@ -40,8 +45,8 @@ UUIDType, ) from iceberg.utils import datetime -from src.iceberg.utils.singleton import Singleton from iceberg.utils.decimal import decimal_to_bytes, truncate_decimal +from src.iceberg.utils.singleton import Singleton S = TypeVar("S") T = TypeVar("T") @@ -267,39 +272,7 @@ def satisfies_order_of(self, other: Transform) -> bool: return other.preserves_order def to_human_string(self, value: Optional[S]) -> str: - return self._human_string(value) - - @singledispatchmethod - def _human_string(self, value: Optional[S]) -> str: - return str(value) if value is not None else "null" - - @_human_string.register(bytes) - def _(self, value: bytes) -> str: - return _base64encode(value) - - @_human_string.register(int) - def _(self, value: int) -> str: - return self._int_to_human_string(self._type, value) - - @singledispatchmethod - def _int_to_human_string(self, _: IcebergType, value: int) -> str: - return str(value) - - @_int_to_human_string.register(DateType) - def _(self, _: IcebergType, value: int) -> str: - return datetime.to_human_day(value) - - @_int_to_human_string.register(TimeType) - def _(self, _: IcebergType, value: int) -> str: - return datetime.to_human_time(value) - - @_int_to_human_string.register(TimestampType) - def _(self, _: IcebergType, value: int) -> str: - return datetime.to_human_timestamp(value) - - @_int_to_human_string.register(TimestamptzType) - def _(self, _: IcebergType, value: int) -> str: - return datetime.to_human_timestamptz(value) + return _human_string(value, self._type) if value is not None else "null" class TruncateTransform(Transform[S, S]): @@ -312,6 +285,7 @@ class TruncateTransform(Transform[S, S]): """ def __init__(self, source_type: IcebergType, width: int): + assert width > 0, f"width ({width}) should be greater than 0" super().__init__( f"truncate[{width}]", f"transforms.truncate(source_type={repr(source_type)}, width={width})", @@ -328,37 +302,7 @@ def type(self) -> IcebergType: return self._type def apply(self, value: Optional[S]) -> Optional[S]: - return self._truncate_value(value) if value is not None else None - - @singledispatchmethod - def _truncate_value(self, value: S) -> S: - raise ValueError(f"Cannot truncate value: {value}") - - @_truncate_value.register(int) - def _(self, value): - """Truncate a given int value into a given width if feasible.""" - if type(self._type) in {IntegerType, LongType}: - return value - value % self._width - else: - raise ValueError(f"Cannot truncate type: {self._type} for value: {value}") - - @_truncate_value.register(str) - def _(self, value): - """Truncate a given string to a given width.""" - return value[0 : min(self._width, len(value))] - - @_truncate_value.register(bytes) - def _(self, value): - """Truncate a given binary bytes into a given width.""" - if isinstance(self._type, BinaryType): - return value[0 : min(self._width, len(value))] - else: - raise ValueError(f"Cannot truncate type: {self._type}") - - @_truncate_value.register(Decimal) - def _(self, value): - """Truncate a given decimal value into a given width.""" - return truncate_decimal(value, self._width) + return _truncate_value(value, self._width) if value is not None else None def can_transform(self, source: IcebergType) -> bool: return self._type == source @@ -387,6 +331,75 @@ def to_human_string(self, value: Optional[S]) -> str: return str(value) +@singledispatch +def _human_string(value: Any, _type: IcebergType) -> str: + return str(value) + + +@_human_string.register(bytes) +def _(value: bytes, _type: IcebergType) -> str: + return _base64encode(value) + + +@_human_string.register(int) +def _(value: int, _type: IcebergType) -> str: + return _int_to_human_string(_type, value) + + +@singledispatch +def _int_to_human_string(_type: IcebergType, value: int) -> str: + return str(value) + + +@_int_to_human_string.register(DateType) +def _(_type: IcebergType, value: int) -> str: + return datetime.to_human_day(value) + + +@_int_to_human_string.register(TimeType) +def _(_type: IcebergType, value: int) -> str: + return datetime.to_human_time(value) + + +@_int_to_human_string.register(TimestampType) +def _(_type: IcebergType, value: int) -> str: + return datetime.to_human_timestamp(value) + + +@_int_to_human_string.register(TimestamptzType) +def _(_type: IcebergType, value: int) -> str: + return datetime.to_human_timestamptz(value) + + +@singledispatch +def _truncate_value(value: Any, _width: int) -> S: + raise ValueError(f"Cannot truncate value: {value}") + + +@_truncate_value.register(int) +def _(value: int, _width: int) -> int: + """Truncate a given int value into a given width if feasible.""" + return value - value % _width + + +@_truncate_value.register(str) +def _(value: str, _width: int) -> str: + """Truncate a given string to a given width.""" + return value[0 : min(_width, len(value))] + + +@_truncate_value.register(bytes) +def _(value: bytes, _width: int) -> bytes: + """Truncate a given binary bytes into a given width.""" + return value[0 : min(_width, len(value))] + + +@_truncate_value.register(Decimal) +def _(value: Decimal, _width: int) -> Decimal: + """Truncate a given decimal value into a given width.""" + return truncate_decimal(value, _width) + + class UnknownTransform(Transform): """A transform that represents when an unknown transform is provided Args: diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py index ad6b76cf4d3c..dc3ce4ec2737 100644 --- a/python/tests/test_transforms.py +++ b/python/tests/test_transforms.py @@ -212,10 +212,12 @@ def test_truncate_string(input_var, expected): "type_var,value,expected_human_str,expected", [ (BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"), + (BinaryType(), bytes("\u2603de", "utf-8"), "4piDZGU=", b"\xe2"), (DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")), (IntegerType(), 123, "123", 123), (LongType(), 123, "123", 123), (StringType(), "foo", "foo", "f"), + (StringType(), "\u2603de", "\u2603de", "\u2603"), ], ) def test_truncate_method(type_var, value, expected_human_str, expected):