Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions python/src/iceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
TimeType,
UUIDType,
)
from iceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal


def handle_none(func):
Expand All @@ -69,33 +70,6 @@ def wrapper(primitive_type, value_str):
return wrapper


def decimal_to_unscaled(value: Decimal) -> int:
"""Get an unscaled value given a Decimal value

Args:
value (Decimal): A Decimal instance

Returns:
int: The unscaled value
"""
sign, digits, _ = value.as_tuple()
return int(Decimal((sign, digits, 0)).to_integral_value())


def unscaled_to_decimal(unscaled: int, scale: int) -> Decimal:
"""Get a scaled Decimal value given an unscaled value and a scale

Args:
unscaled (int): An unscaled value
scale (int): A scale to set for the returned Decimal instance

Returns:
Decimal: A scaled Decimal instance
"""
sign, digits, _ = Decimal(unscaled).as_tuple()
return Decimal((sign, digits, -scale))


@singledispatch
def partition_to_py(primitive_type, value_str: str):
"""A generic function which converts a partition string to a python built-in
Expand Down Expand Up @@ -251,9 +225,7 @@ def _(primitive_type, value: Decimal) -> bytes:
f"Cannot serialize value, precision of value is greater than precision of type {primitive_type}: {len(digits)}"
)

unscaled_value = decimal_to_unscaled(value=Decimal((sign, digits, 0)))
min_num_bytes = ((unscaled_value).bit_length() + 7) // 8
return unscaled_value.to_bytes(min_num_bytes, "big", signed=True)
return decimal_to_bytes(value)


@singledispatch
Expand Down
54 changes: 27 additions & 27 deletions python/src/iceberg/expression/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
# specific language governing permissions and limitations
# under the License.

import re
import struct
import sys
from abc import ABC, abstractmethod
from datetime import date, datetime, time
from decimal import ROUND_HALF_UP, Decimal
from functools import singledispatch
from typing import Generic, Optional, TypeVar, Union
from uuid import UUID

from iceberg.utils.datetime import (
date_to_days,
micros_to_days,
time_to_micros,
timestamp_to_micros,
timestamptz_to_micros,
)

if sys.version_info >= (3, 8):
from functools import singledispatchmethod # pragma: no cover
else:
Expand All @@ -52,11 +58,6 @@
UUIDType,
)

EPOCH_DATE = date.fromisoformat("1970-01-01")
EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000")
ISO_TIMESTAMP = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?")
EPOCH_TIMESTAMPTZ = datetime.fromisoformat("1970-01-01T00:00:00.000000+00:00")
ISO_TIMESTAMPTZ = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?[-+]\d\d:\d\d")
T = TypeVar("T")


Expand Down Expand Up @@ -360,7 +361,7 @@ def _(self, type_var: TimestampType) -> Literal[int]:

@to.register(DateType)
def _(self, type_var: DateType) -> Literal[int]:
return DateLiteral((datetime.fromtimestamp(self.value / 1_000_000) - EPOCH_TIMESTAMP).days)
return DateLiteral(micros_to_days(self.value))


class DecimalLiteral(Literal[Decimal]):
Expand Down Expand Up @@ -391,33 +392,32 @@ def _(self, type_var: StringType) -> Literal[str]:
return self

@to.register(DateType)
def _(self, type_var: DateType) -> Literal[int]:
return DateLiteral((date.fromisoformat(self.value) - EPOCH_DATE).days)
def _(self, type_var: DateType) -> Optional[Literal[int]]:
try:
return DateLiteral(date_to_days(self.value))
except (TypeError, ValueError):
return None

@to.register(TimeType)
def _(self, type_var: TimeType) -> Literal[int]:
t = time.fromisoformat(self.value)
return TimeLiteral((((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond)
def _(self, type_var: TimeType) -> Optional[Literal[int]]:
try:
return TimeLiteral(time_to_micros(self.value))
except (TypeError, ValueError):
return None

@to.register(TimestampType)
def _(self, type_var: TimestampType) -> Optional[Literal[int]]:
if ISO_TIMESTAMP.fullmatch(self.value):
try:
delta = datetime.fromisoformat(self.value) - EPOCH_TIMESTAMP
return TimestampLiteral((delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds)
except TypeError:
return None
return None
try:
return TimestampLiteral(timestamp_to_micros(self.value))
except (TypeError, ValueError):
return None

@to.register(TimestamptzType)
def _(self, type_var: TimestamptzType) -> Optional[Literal[int]]:
if ISO_TIMESTAMPTZ.fullmatch(self.value):
try:
delta = datetime.fromisoformat(self.value) - EPOCH_TIMESTAMPTZ
return TimestampLiteral((delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds)
except TypeError:
return None
return None
try:
return TimestampLiteral(timestamptz_to_micros(self.value))
except (TypeError, ValueError):
return None

@to.register(UUIDType)
def _(self, type_var: UUIDType) -> Literal[UUID]:
Expand Down
35 changes: 17 additions & 18 deletions python/src/iceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# specific language governing permissions and limitations
# under the License.

import math
import struct
from abc import ABC
from decimal import Decimal
from typing import Optional
from typing import Generic, Optional, TypeVar
from uuid import UUID

import mmh3 # type: ignore
Expand All @@ -37,9 +37,13 @@
TimeType,
UUIDType,
)
from iceberg.utils.decimal import decimal_to_bytes

S = TypeVar("S")
T = TypeVar("T")

class Transform:

class Transform(ABC, Generic[S, T]):
"""Transform base class for concrete transforms.

A base class to transform values and project predicates on partition values.
Expand All @@ -60,18 +64,19 @@ def __repr__(self):
def __str__(self):
return self._transform_string

def __call__(self, value):
def __call__(self, value: S) -> Optional[T]:
return self.apply(value)

def apply(self, value):
raise NotImplementedError()
def apply(self, value: S) -> Optional[T]:
...

def can_transform(self, source: IcebergType) -> bool:
return False

def result_type(self, source: IcebergType) -> IcebergType:
raise NotImplementedError()
...

@property
def preserves_order(self) -> bool:
return False

Expand All @@ -83,11 +88,12 @@ def to_human_string(self, value) -> str:
return "null"
return str(value)

@property
def dedup_name(self) -> str:
return self._transform_string


class BaseBucketTransform(Transform):
class BaseBucketTransform(Transform[S, int]):
"""Base Transform class to transform a value into a bucket partition value

Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit
Expand All @@ -110,18 +116,15 @@ def __init__(self, source_type: IcebergType, num_buckets: int):
def num_buckets(self) -> int:
return self._num_buckets

def hash(self, value) -> int:
def hash(self, value: S) -> int:
raise NotImplementedError()

def apply(self, value) -> Optional[int]:
def apply(self, value: S) -> Optional[int]:
if value is None:
return None

return (self.hash(value) & IntegerType.max) % self._num_buckets

def can_transform(self, source: IcebergType) -> bool:
raise NotImplementedError()

def result_type(self, source: IcebergType) -> IcebergType:
return IntegerType()

Expand Down Expand Up @@ -156,11 +159,7 @@ def can_transform(self, source: IcebergType) -> bool:
return isinstance(source, DecimalType)

def hash(self, value: Decimal) -> int:
value_tuple = value.as_tuple()
unscaled_value = int(("-" if value_tuple.sign else "") + "".join([str(d) for d in value_tuple.digits]))
number_of_bytes = int(math.ceil(unscaled_value.bit_length() / 8))
value_in_bytes = unscaled_value.to_bytes(length=number_of_bytes, byteorder="big")
return mmh3.hash(value_in_bytes)
return mmh3.hash(decimal_to_bytes(value))


class BucketStringTransform(BaseBucketTransform):
Expand Down
65 changes: 65 additions & 0 deletions python/src/iceberg/utils/datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Helper methods for working with date/time representations
"""
import re
from datetime import date, datetime, time

EPOCH_DATE = date.fromisoformat("1970-01-01")
EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000")
ISO_TIMESTAMP = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?")
EPOCH_TIMESTAMPTZ = datetime.fromisoformat("1970-01-01T00:00:00.000000+00:00")
ISO_TIMESTAMPTZ = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?[-+]\d\d:\d\d")


def micros_to_days(timestamp: int) -> int:
"""Converts a timestamp in microseconds to a date in days"""
return (datetime.fromtimestamp(timestamp / 1_000_000) - EPOCH_TIMESTAMP).days


def date_to_days(date_str: str) -> int:
"""Converts an ISO-8601 formatted date to days from 1970-01-01"""
return (date.fromisoformat(date_str) - EPOCH_DATE).days


def time_to_micros(time_str: str) -> int:
"""Converts an ISO-8601 formatted time to microseconds from midnight"""
t = time.fromisoformat(time_str)
return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond


def datetime_to_micros(dt: datetime) -> int:
"""Converts a datetime to microseconds from 1970-01-01T00:00:00.000000"""
if dt.tzinfo:
delta = dt - EPOCH_TIMESTAMPTZ
else:
delta = dt - EPOCH_TIMESTAMP
return (delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds


def timestamp_to_micros(timestamp_str: str) -> int:
"""Converts an ISO-9601 formatted timestamp without zone to microseconds from 1970-01-01T00:00:00.000000"""
if ISO_TIMESTAMP.fullmatch(timestamp_str):
return datetime_to_micros(datetime.fromisoformat(timestamp_str))
raise ValueError(f"Invalid timestamp without zone: {timestamp_str} (must be ISO-8601)")


def timestamptz_to_micros(timestamptz_str: str) -> int:
"""Converts an ISO-8601 formatted timestamp with zone to microseconds from 1970-01-01T00:00:00.000000+00:00"""
if ISO_TIMESTAMPTZ.fullmatch(timestamptz_str):
return datetime_to_micros(datetime.fromisoformat(timestamptz_str))
raise ValueError(f"Invalid timestamp with zone: {timestamptz_str} (must be ISO-8601)")
77 changes: 77 additions & 0 deletions python/src/iceberg/utils/decimal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Helper methods for working with Python Decimals
"""
from decimal import Decimal
from typing import Union


def decimal_to_unscaled(value: Decimal) -> int:
"""Get an unscaled value given a Decimal value

Args:
value (Decimal): A Decimal instance

Returns:
int: The unscaled value
"""
sign, digits, _ = value.as_tuple()
return int(Decimal((sign, digits, 0)).to_integral_value())


def unscaled_to_decimal(unscaled: int, scale: int) -> Decimal:
"""Get a scaled Decimal value given an unscaled value and a scale

Args:
unscaled (int): An unscaled value
scale (int): A scale to set for the returned Decimal instance

Returns:
Decimal: A scaled Decimal instance
"""
sign, digits, _ = Decimal(unscaled).as_tuple()
return Decimal((sign, digits, -scale))


def bytes_required(value: Union[int, Decimal]) -> int:
"""Returns the minimum number of bytes needed to serialize a decimal or unscaled value

Args:
value (int | Decimal): a Decimal value or unscaled int value

Returns:
int: the minimum number of bytes needed to serialize the value
"""
if isinstance(value, int):
return (value.bit_length() + 7) // 8
elif isinstance(value, Decimal):
return (decimal_to_unscaled(value).bit_length() + 7) // 8

raise ValueError(f"Unsupported value: {value}")


def decimal_to_bytes(value: Decimal) -> bytes:
"""Returns a byte representation of a decimal

Args:
value (Decimal): a decimal value
Returns:
bytes: the unscaled value of the Decimal as bytes
"""
unscaled_value = decimal_to_unscaled(value)
return unscaled_value.to_bytes(bytes_required(unscaled_value), byteorder="big", signed=True)
Loading