Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions python/src/iceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import base64
import struct
from abc import ABC, abstractmethod
from decimal import Decimal
from functools import singledispatchmethod
from typing import Generic, Optional, TypeVar
from uuid import UUID

Expand All @@ -37,6 +39,7 @@
TimeType,
UUIDType,
)
from iceberg.utils import datetime
from iceberg.utils.decimal import decimal_to_bytes

S = TypeVar("S")
Expand Down Expand Up @@ -224,6 +227,80 @@ def hash(self, value: UUID) -> int:
)


def _base64encode(buffer: bytes) -> str:
"""Converts bytes to base64 string"""
return base64.b64encode(buffer).decode("ISO-8859-1")


class IdentityTransform(Transform[S, S]):
"""Transforms a value into itself.

Example:
>>> transform = IdentityTransform(StringType())
>>> transform.apply('hello-world')
'hello-world'
"""

def __init__(self, source_type: IcebergType):
super().__init__(
"identity",
f"transforms.identity(source_type={repr(source_type)})",
)
self._type = source_type

def apply(self, value: Optional[S]) -> Optional[S]:
return value

def can_transform(self, source: IcebergType) -> bool:
return source.is_primitive

def result_type(self, source: IcebergType) -> IcebergType:
return source

@property
def preserves_order(self) -> bool:
return True

def satisfies_order_of(self, other: Transform) -> bool:
"""ordering by value is the same as long as the other preserves order"""
return other.preserves_order

def to_human_string(self, value: Optional[S]) -> str:
return self._human_string(value)

@singledispatchmethod
def _human_string(self, value: Optional[S]) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: There's no need for these to be in the class, and it's actually faster if they are simple methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean moving them out of the class?

Wondering if we can save _human_string but only keeps _int_to_human_string.
like

    def to_human_string(self, value: Optional[S]) -> str:
        if value is None:
            return "null"
        elif isinstance(value, bytes):
            return _base64encode(value)
        elif isinstance(value, int):
            return self._int_to_human_string(self._type, value)
        else:
            return str(value)

return str(value) if value is not None else "null"

@_human_string.register(bytes)
def _(self, value: bytes) -> str:
return _base64encode(value)

@_human_string.register(int)
def _(self, value: int) -> str:
return self._int_to_human_string(self._type, value)

@singledispatchmethod
def _int_to_human_string(self, value_type: IcebergType, value: int) -> str:
return str(value)

@_int_to_human_string.register(DateType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_day(value)

@_int_to_human_string.register(TimeType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_time(value)

@_int_to_human_string.register(TimestampType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_timestamp(value)

@_int_to_human_string.register(TimestamptzType)
def _(self, value_type: IcebergType, value: int) -> str:
return datetime.to_human_timestamptz(value)


class UnknownTransform(Transform):
"""A transform that represents when an unknown transform is provided
Args:
Expand Down Expand Up @@ -294,5 +371,9 @@ def bucket(source_type: IcebergType, num_buckets: int) -> BaseBucketTransform:
raise ValueError(f"Cannot bucket by type: {source_type}")


def identity(source_type: IcebergType) -> IdentityTransform:
return IdentityTransform(source_type)


def always_null() -> Transform:
return VoidTransform()
34 changes: 33 additions & 1 deletion python/src/iceberg/utils/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
"""Helper methods for working with date/time representations
"""
import re
from datetime import date, datetime, time
from datetime import (
date,
datetime,
time,
timedelta,
)

EPOCH_DATE = date.fromisoformat("1970-01-01")
EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000")
Expand All @@ -42,6 +47,13 @@ def time_to_micros(time_str: str) -> int:
return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond


def time_from_micros(micros: int) -> time:
seconds = micros // 1_000_000
minutes = seconds // 60
hours = minutes // 60
return time(hour=hours, minute=minutes % 60, second=seconds % 60, microsecond=micros % 1_000_000)


def datetime_to_micros(dt: datetime) -> int:
"""Converts a datetime to microseconds from 1970-01-01T00:00:00.000000"""
if dt.tzinfo:
Expand All @@ -63,3 +75,23 @@ def timestamptz_to_micros(timestamptz_str: str) -> int:
if ISO_TIMESTAMPTZ.fullmatch(timestamptz_str):
return datetime_to_micros(datetime.fromisoformat(timestamptz_str))
raise ValueError(f"Invalid timestamp with zone: {timestamptz_str} (must be ISO-8601)")


def to_human_day(day_ordinal: int) -> str:
"""Converts a DateType value to human string"""
return (EPOCH_DATE + timedelta(days=day_ordinal)).isoformat()


def to_human_time(micros_from_midnight: int) -> str:
"""Converts a TimeType value to human string"""
return time_from_micros(micros_from_midnight).isoformat()


def to_human_timestamptz(timestamp_micros: int) -> str:
"""Converts a TimestamptzType value to human string"""
return (EPOCH_TIMESTAMPTZ + timedelta(microseconds=timestamp_micros)).isoformat()


def to_human_timestamp(timestamp_micros: int) -> str:
"""Converts a TimestampType value to human string"""
return (EPOCH_TIMESTAMP + timedelta(microseconds=timestamp_micros)).isoformat()
48 changes: 48 additions & 0 deletions python/tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
LongType,
StringType,
Expand Down Expand Up @@ -129,6 +131,52 @@ def test_string_with_surrogate_pair():
assert bucket_transform.hash(string_with_surrogate_pair) == mmh3.hash(as_bytes)


@pytest.mark.parametrize(
"type_var,value,expected",
[
(LongType(), None, "null"),
(DateType(), 17501, "2017-12-01"),
(TimeType(), 36775038194, "10:12:55.038194"),
(TimestamptzType(), 1512151975038194, "2017-12-01T18:12:55.038194+00:00"),
(TimestampType(), 1512151975038194, "2017-12-01T18:12:55.038194"),
(LongType(), -1234567890000, "-1234567890000"),
(StringType(), "a/b/c=d", "a/b/c=d"),
(DecimalType(9, 2), Decimal("-1.50"), "-1.50"),
(FixedType(100), b"foo", "Zm9v"),
],
)
def test_identity_human_string(type_var, value, expected):
identity = transforms.identity(type_var)
assert identity.to_human_string(value) == expected


@pytest.mark.parametrize(
"type_var",
[
BinaryType(),
BooleanType(),
DateType(),
DecimalType(8, 2),
DoubleType(),
FixedType(16),
FloatType(),
IntegerType(),
LongType(),
StringType(),
TimestampType(),
TimestamptzType(),
TimeType(),
UUIDType(),
],
)
def test_identity_method(type_var):
identity_transform = transforms.identity(type_var)
assert str(identity_transform) == str(eval(repr(identity_transform)))
assert identity_transform.can_transform(type_var)
assert identity_transform.result_type(type_var) == type_var
assert identity_transform.apply("test") == "test"


def test_unknown_transform():
unknown_transform = transforms.UnknownTransform(FixedType(8), "unknown")
assert str(unknown_transform) == str(eval(repr(unknown_transform)))
Expand Down