Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 166 additions & 1 deletion python/src/iceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import base64
import struct
from abc import ABC, abstractmethod
from decimal import Decimal
from functools import singledispatchmethod
from typing import Generic, Optional, TypeVar
from uuid import UUID

Expand All @@ -37,7 +39,8 @@
TimeType,
UUIDType,
)
from iceberg.utils.decimal import decimal_to_bytes
from iceberg.utils import datetime
from iceberg.utils.decimal import decimal_to_bytes, truncate_decimal

S = TypeVar("S")
T = TypeVar("T")
Expand Down Expand Up @@ -224,6 +227,160 @@ def hash(self, value: UUID) -> int:
)


def _base64encode(buffer: bytes) -> str:
"""Converts bytes to base64 string"""
return base64.b64encode(buffer).decode("ISO-8859-1")


class IdentityTransform(Transform[S, S]):
"""Transforms a value into itself.

Example:
>>> transform = IdentityTransform(StringType())
>>> transform.apply('hello-world')
'hello-world'
"""

def __init__(self, source_type: IcebergType):
super().__init__(
"identity",
f"transforms.identity(source_type={repr(source_type)})",
)
self._type = source_type

def apply(self, value: Optional[S]) -> Optional[S]:
return value

def can_transform(self, source: IcebergType) -> bool:
return source.is_primitive

def result_type(self, source: IcebergType) -> IcebergType:
return source

@property
def preserves_order(self) -> bool:
return True

def satisfies_order_of(self, other: Transform) -> bool:
"""ordering by value is the same as long as the other preserves order"""
return other.preserves_order

def to_human_string(self, value: Optional[S]) -> str:
return self._human_string(value)

@singledispatchmethod
def _human_string(self, value: Optional[S]) -> str:
return str(value) if value is not None else "null"

@_human_string.register(bytes)
def _(self, value: bytes) -> str:
return _base64encode(value)

@_human_string.register(int)
def _(self, value: int) -> str:
return self._int_to_human_string(self._type, value)

@singledispatchmethod
def _int_to_human_string(self, value_type: IcebergType, value: int) -> str:
return str(value)

@_int_to_human_string.register(DateType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_day(value)

@_int_to_human_string.register(TimeType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_time(value)

@_int_to_human_string.register(TimestampType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_timestamp(value)

@_int_to_human_string.register(TimestamptzType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_timestamptz(value)


class TruncateTransform(Transform[S, S]):
"""A transform for truncating a value to a specified width.
Args:
source_type (Type): An Iceberg Type of IntegerType, LongType, StringType, BinaryType or DecimalType
width (int): The truncate width
Raises:
ValueError: If a type is provided that is incompatible with a Truncate transform
"""

def __init__(self, source_type: IcebergType, width: int):
super().__init__(
f"truncate[{width}]",
f"transforms.truncate(source_type={repr(source_type)}, width={width})",
)
self._type = source_type
self._width = width

@property
def width(self):
return self._width

def apply(self, value: Optional[S]) -> Optional[S]:
return self._truncate_value(value) if value is not None else None

@singledispatchmethod
def _truncate_value(self, value: S) -> S:
raise ValueError(f"Cannot truncate value: {value}")

@_truncate_value.register(int)
def _(self, value):
"""Truncate a given int value into a given width if feasible."""
if type(self._type) in {IntegerType, LongType}:
return value - value % self._width
else:
raise ValueError(f"Cannot truncate type: {self._type} for value: {value}")

@_truncate_value.register(str)
def _(self, value):
"""Truncate a given string to a given width."""
return value[0 : min(self._width, len(value))]

@_truncate_value.register(bytes)
def _(self, value):
"""Truncate a given binary bytes into a given width."""
if isinstance(self._type, BinaryType):
return value[0 : min(self._width, len(value))]
else:
raise ValueError(f"Cannot truncate type: {self._type}")

@_truncate_value.register(Decimal)
def _(self, value):
"""Truncate a given decimal value into a given width."""
return truncate_decimal(value, self._width)

def can_transform(self, source: IcebergType) -> bool:
return self._type == source

def result_type(self, source: IcebergType) -> IcebergType:
return source

def preserves_order(self) -> bool:
return True

def satisfies_order_of(self, other: Transform) -> bool:
if self == other:
return True
elif isinstance(self._type, StringType) and isinstance(other, TruncateTransform) and isinstance(other._type, StringType):
return self._width >= other._width

return False

def to_human_string(self, value: Optional[S]) -> str:
if value is None:
return "null"
elif isinstance(value, bytes):
return _base64encode(value)
else:
return str(value)


class UnknownTransform(Transform):
"""A transform that represents when an unknown transform is provided
Args:
Expand Down Expand Up @@ -294,5 +451,13 @@ def bucket(source_type: IcebergType, num_buckets: int) -> BaseBucketTransform:
raise ValueError(f"Cannot bucket by type: {source_type}")


def identity(source_type: IcebergType) -> IdentityTransform:
return IdentityTransform(source_type)


def truncate(source_type: IcebergType, width: int) -> TruncateTransform:
return TruncateTransform(source_type, width)


def always_null() -> Transform:
return VoidTransform()
28 changes: 27 additions & 1 deletion python/src/iceberg/utils/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
"""Helper methods for working with date/time representations
"""
import re
from datetime import date, datetime, time
from datetime import (
date,
datetime,
time,
timedelta,
)

EPOCH_DATE = date.fromisoformat("1970-01-01")
EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000")
Expand Down Expand Up @@ -63,3 +68,24 @@ def timestamptz_to_micros(timestamptz_str: str) -> int:
if ISO_TIMESTAMPTZ.fullmatch(timestamptz_str):
return datetime_to_micros(datetime.fromisoformat(timestamptz_str))
raise ValueError(f"Invalid timestamp with zone: {timestamptz_str} (must be ISO-8601)")


def to_human_day(day_ordinal: int) -> str:
"""Converts a DateType value to human string"""
return (EPOCH_DATE + timedelta(days=day_ordinal)).isoformat()


def to_human_time(micros_from_midnight: int) -> str:
"""Converts a TimeType value to human string"""
to_day = EPOCH_TIMESTAMP + timedelta(microseconds=micros_from_midnight)
return to_day.time().isoformat()


def to_human_timestamptz(timestamp_micros: int) -> str:
"""Converts a TimestamptzType value to human string"""
return (EPOCH_TIMESTAMPTZ + timedelta(microseconds=timestamp_micros)).isoformat()


def to_human_timestamp(timestamp_micros: int) -> str:
"""Converts a TimestampType value to human string"""
return (EPOCH_TIMESTAMP + timedelta(microseconds=timestamp_micros)).isoformat()
15 changes: 15 additions & 0 deletions python/src/iceberg/utils/decimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,18 @@ def decimal_to_bytes(value: Decimal) -> bytes:
"""
unscaled_value = decimal_to_unscaled(value)
return unscaled_value.to_bytes(bytes_required(unscaled_value), byteorder="big", signed=True)


def truncate_decimal(value: Decimal, width: int) -> Decimal:
"""Get a truncated Decimal value given a decimal value and a width

Args:
value (Decimal): a decimal value
width (int): A width for the returned Decimal instance

Returns:
Decimal: A truncated Decimal instance
"""
unscaled_value = decimal_to_unscaled(value)
applied_value = unscaled_value - (((unscaled_value % width) + width) % width)
return Decimal(f"{applied_value}e{value.as_tuple().exponent}")
103 changes: 103 additions & 0 deletions python/tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
LongType,
StringType,
Expand Down Expand Up @@ -129,6 +131,107 @@ def test_string_with_surrogate_pair():
assert bucket_transform.hash(string_with_surrogate_pair) == mmh3.hash(as_bytes)


@pytest.mark.parametrize(
"type_var,value,expected",
[
(LongType(), None, "null"),
(DateType(), 17501, "2017-12-01"),
(TimeType(), 36775038194, "10:12:55.038194"),
(TimestamptzType(), 1512151975038194, "2017-12-01T18:12:55.038194+00:00"),
(TimestampType(), 1512151975038194, "2017-12-01T18:12:55.038194"),
(LongType(), -1234567890000, "-1234567890000"),
(StringType(), "a/b/c=d", "a/b/c=d"),
(DecimalType(9, 2), Decimal("-1.50"), "-1.50"),
(FixedType(100), b"foo", "Zm9v"),
],
)
def test_identity_human_string(type_var, value, expected):
identity = transforms.identity(type_var)
assert identity.to_human_string(value) == expected


@pytest.mark.parametrize(
"type_var",
[
BinaryType(),
BooleanType(),
DateType(),
DecimalType(8, 2),
DoubleType(),
FixedType(16),
FloatType(),
IntegerType(),
LongType(),
StringType(),
TimestampType(),
TimestamptzType(),
TimeType(),
UUIDType(),
],
)
def test_identity_method(type_var):
identity_transform = transforms.identity(type_var)
assert str(identity_transform) == str(eval(repr(identity_transform)))
assert identity_transform.can_transform(type_var)
assert identity_transform.result_type(type_var) == type_var
assert identity_transform.apply("test") == "test"


@pytest.mark.parametrize("type_var", [IntegerType(), LongType()])
@pytest.mark.parametrize(
"input_var,expected",
[(1, 0), (5, 0), (9, 0), (10, 10), (11, 10), (-1, -10), (-10, -10), (-12, -20)],
)
def test_truncate_integer(type_var, input_var, expected):
trunc = transforms.truncate(type_var, 10)
assert trunc.apply(input_var) == expected


@pytest.mark.parametrize(
"input_var,expected",
[
(Decimal("12.34"), Decimal("12.30")),
(Decimal("12.30"), Decimal("12.30")),
(Decimal("12.29"), Decimal("12.20")),
(Decimal("0.05"), Decimal("0.00")),
(Decimal("-0.05"), Decimal("-0.10")),
],
)
def test_truncate_decimal(input_var, expected):
trunc = transforms.truncate(DecimalType(9, 2), 10)
assert trunc.apply(input_var) == expected


@pytest.mark.parametrize("input_var,expected", [("abcdefg", "abcde"), ("abc", "abc")])
def test_truncate_string(input_var, expected):
trunc = transforms.truncate(StringType(), 5)
assert trunc.apply(input_var) == expected


@pytest.mark.parametrize(
"type_var,value,expected_human_str,expected",
[
(BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"),
(DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")),
(IntegerType(), 123, "123", 123),
(LongType(), 123, "123", 123),
(StringType(), "foo", "foo", "f"),
],
)
def test_truncate_method(type_var, value, expected_human_str, expected):
truncate_transform = transforms.truncate(type_var, 1)
assert str(truncate_transform) == str(eval(repr(truncate_transform)))
assert truncate_transform.can_transform(type_var)
assert truncate_transform.result_type(type_var) == type_var
assert truncate_transform.to_human_string(value) == expected_human_str
assert truncate_transform.apply(value) == expected
assert truncate_transform.to_human_string(None) == "null"
assert truncate_transform.width == 1
assert truncate_transform.apply(None) is None
assert truncate_transform.preserves_order()
assert truncate_transform.satisfies_order_of(truncate_transform)


def test_unknown_transform():
unknown_transform = transforms.UnknownTransform(FixedType(8), "unknown")
assert str(unknown_transform) == str(eval(repr(unknown_transform)))
Expand Down
Loading