Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 224 additions & 3 deletions python/pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import base64
import struct
from abc import ABC, abstractmethod
from enum import IntEnum
from functools import singledispatch
from typing import (
Any,
Expand Down Expand Up @@ -58,6 +59,10 @@
VOID = "void"
BUCKET = "bucket"
TRUNCATE = "truncate"
YEAR = "year"
MONTH = "month"
DAY = "day"
HOUR = "hour"

BUCKET_PARSER = ParseNumberFromBrackets(BUCKET)
TRUNCATE_PARSER = ParseNumberFromBrackets(TRUNCATE)
Expand Down Expand Up @@ -92,6 +97,14 @@ def validate(cls, v: Any):
return BucketTransform(num_buckets=BUCKET_PARSER.match(v))
elif v.startswith(TRUNCATE):
return TruncateTransform(width=TRUNCATE_PARSER.match(v))
elif v == YEAR:
return YearTransform()
elif v == MONTH:
return MonthTransform()
elif v == DAY:
return DayTransform()
elif v == HOUR:
return HourTransform()
else:
return UnknownTransform(transform=v)
return v
Expand Down Expand Up @@ -141,7 +154,6 @@ class BucketTransform(Transform[S, int]):
num_buckets (int): The number of buckets.
"""

_source_type: IcebergType = PrivateAttr()
_num_buckets: PositiveInt = PrivateAttr()

def __init__(self, num_buckets: int, **data: Any):
Expand Down Expand Up @@ -215,6 +227,217 @@ def __repr__(self) -> str:
return f"BucketTransform(num_buckets={self._num_buckets})"


class TimeResolution(IntEnum):
YEAR = 6
MONTH = 5
WEEK = 4
DAY = 3
HOUR = 2
MINUTE = 1
SECOND = 0


class TimeTransform(Transform[S, int], Singleton):
@property
@abstractmethod
def granularity(self) -> TimeResolution:
...

def satisfies_order_of(self, other: Transform) -> bool:
return self.granularity <= other.granularity if hasattr(other, "granularity") else False

def result_type(self, source: IcebergType) -> IcebergType:
return IntegerType()

@property
def dedup_name(self) -> str:
return "time"

@property
def preserves_order(self) -> bool:
return True


class YearTransform(TimeTransform):
"""Transforms a datetime value into a year value.

Example:
>>> transform = YearTransform()
>>> transform.transform(TimestampType())(1512151975038194)
47
"""

__root__: Literal["year"] = Field(default="year")

def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
source_type = type(source)
if source_type == DateType:

def year_func(v):
return datetime.days_to_years(v)

elif source_type in {TimestampType, TimestamptzType}:

def year_func(v):
return datetime.micros_to_years(v)

else:
raise ValueError(f"Cannot apply year transform for type: {source}")

return lambda v: year_func(v) if v is not None else None

def can_transform(self, source: IcebergType) -> bool:
return type(source) in {
DateType,
TimestampType,
TimestamptzType,
}

@property
def granularity(self) -> TimeResolution:
return TimeResolution.YEAR

def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
return datetime.to_human_year(value) if isinstance(value, int) else "null"

def __repr__(self) -> str:
return "YearTransform()"


class MonthTransform(TimeTransform):
"""Transforms a datetime value into a month value.

Example:
>>> transform = MonthTransform()
>>> transform.transform(DateType())(17501)
575
"""

__root__: Literal["month"] = Field(default="month")

def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
source_type = type(source)
if source_type == DateType:

def month_func(v):
return datetime.days_to_months(v)

elif source_type in {TimestampType, TimestamptzType}:

def month_func(v):
return datetime.micros_to_months(v)

else:
raise ValueError(f"Cannot apply month transform for type: {source}")

return lambda v: month_func(v) if v else None

def can_transform(self, source: IcebergType) -> bool:
return type(source) in {
DateType,
TimestampType,
TimestamptzType,
}

@property
def granularity(self) -> TimeResolution:
return TimeResolution.MONTH

def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
return datetime.to_human_month(value) if isinstance(value, int) else "null"

def __repr__(self) -> str:
return "MonthTransform()"


class DayTransform(TimeTransform):
"""Transforms a datetime value into a day value.

Example:
>>> transform = MonthTransform()
>>> transform.transform(DateType())(17501)
17501
"""

__root__: Literal["day"] = Field(default="day")

def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
source_type = type(source)
if source_type == DateType:

def day_func(v):
return v

elif source_type in {TimestampType, TimestamptzType}:

def day_func(v):
return datetime.micros_to_days(v)

else:
raise ValueError(f"Cannot apply day transform for type: {source}")

return lambda v: day_func(v) if v else None

def can_transform(self, source: IcebergType) -> bool:
return type(source) in {
DateType,
TimestampType,
TimestamptzType,
}

def result_type(self, source: IcebergType) -> IcebergType:
return DateType()

@property
def granularity(self) -> TimeResolution:
return TimeResolution.DAY

def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
return datetime.to_human_day(value) if isinstance(value, int) else "null"

def __repr__(self) -> str:
return "DayTransform()"


class HourTransform(TimeTransform):
"""Transforms a datetime value into a hour value.

Example:
>>> transform = HourTransform()
>>> transform.transform(TimestampType())(1512151975038194)
420042
"""

__root__: Literal["hour"] = Field(default="hour")

def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[int]]:
if type(source) in {TimestampType, TimestamptzType}:

def hour_func(v):
return datetime.micros_to_hours(v)

else:
raise ValueError(f"Cannot apply hour transform for type: {source}")

return lambda v: hour_func(v) if v else None

def can_transform(self, source: IcebergType) -> bool:
return type(source) in {
TimestampType,
TimestamptzType,
}

@property
def granularity(self) -> TimeResolution:
return TimeResolution.HOUR

def to_human_string(self, _: IcebergType, value: Optional[S]) -> str:
return datetime.to_human_hour(value) if isinstance(value, int) else "null"

def __repr__(self) -> str:
return "HourTransform()"


def _base64encode(buffer: bytes) -> str:
"""Converts bytes to base64 string"""
return base64.b64encode(buffer).decode("ISO-8859-1")
Expand All @@ -230,7 +453,6 @@ class IdentityTransform(Transform[S, S]):
"""

__root__: Literal["identity"] = Field(default="identity")
_source_type: IcebergType = PrivateAttr()

def transform(self, source: IcebergType) -> Callable[[Optional[S]], Optional[S]]:
return lambda v: v
Expand Down Expand Up @@ -389,7 +611,6 @@ class UnknownTransform(Transform):
"""

__root__: Literal["unknown"] = Field(default="unknown")
_source_type: IcebergType = PrivateAttr()
_transform: str = PrivateAttr()

def __init__(self, transform: str, **data: Any):
Expand Down
42 changes: 42 additions & 0 deletions python/pyiceberg/utils/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,26 @@ def micros_to_timestamptz(micros: int):
return EPOCH_TIMESTAMPTZ + dt


def to_human_year(year_ordinal: int) -> str:
"""Converts a DateType value to human string"""
return f"{EPOCH_TIMESTAMP.year + year_ordinal:0=4d}"


def to_human_month(month_ordinal: int) -> str:
"""Converts a DateType value to human string"""
return f"{EPOCH_TIMESTAMP.year + month_ordinal // 12:0=4d}-{1 + month_ordinal % 12:0=2d}"


def to_human_day(day_ordinal: int) -> str:
"""Converts a DateType value to human string"""
return (EPOCH_DATE + timedelta(days=day_ordinal)).isoformat()


def to_human_hour(hour_ordinal: int) -> str:
"""Converts a DateType value to human string"""
return (EPOCH_TIMESTAMP + timedelta(hours=hour_ordinal)).isoformat("-", "hours")


def to_human_time(micros_from_midnight: int) -> str:
"""Converts a TimeType value to human string"""
return micros_to_time(micros_from_midnight).isoformat()
Expand All @@ -116,3 +131,30 @@ def to_human_timestamptz(timestamp_micros: int) -> str:
def to_human_timestamp(timestamp_micros: int) -> str:
"""Converts a TimestampType value to human string"""
return (EPOCH_TIMESTAMP + timedelta(microseconds=timestamp_micros)).isoformat()


def micros_to_hours(timestamp: int) -> int:
"""Converts a timestamp in microseconds to a date in hours"""
return int((datetime.utcfromtimestamp(timestamp // 1_000_000) - EPOCH_TIMESTAMP).total_seconds() / 3600)


def days_to_months(days: int) -> int:
"""Creates a date from the number of days from 1970-01-01"""
d = days_to_date(days)
return (d.year - EPOCH_DATE.year) * 12 + (d.month - EPOCH_DATE.month)


def micros_to_months(timestamp: int) -> int:
dt = micros_to_timestamp(timestamp)
return (dt.year - EPOCH_TIMESTAMP.year) * 12 + (dt.month - EPOCH_TIMESTAMP.month) - (1 if dt.day < EPOCH_TIMESTAMP.day else 0)


def days_to_years(days: int) -> int:
return days_to_date(days).year - EPOCH_DATE.year


def micros_to_years(timestamp: int) -> int:
dt = micros_to_timestamp(timestamp)
return (dt.year - EPOCH_TIMESTAMP.year) - (
1 if dt.month < EPOCH_TIMESTAMP.month or (dt.month == EPOCH_TIMESTAMP.month and dt.day < EPOCH_TIMESTAMP.day) else 0
)
Loading