diff --git a/python/src/iceberg/conversions.py b/python/src/iceberg/conversions.py index 6bf91e803cdb..0af73567e79d 100644 --- a/python/src/iceberg/conversions.py +++ b/python/src/iceberg/conversions.py @@ -50,6 +50,7 @@ TimeType, UUIDType, ) +from iceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal def handle_none(func): @@ -69,33 +70,6 @@ def wrapper(primitive_type, value_str): return wrapper -def decimal_to_unscaled(value: Decimal) -> int: - """Get an unscaled value given a Decimal value - - Args: - value (Decimal): A Decimal instance - - Returns: - int: The unscaled value - """ - sign, digits, _ = value.as_tuple() - return int(Decimal((sign, digits, 0)).to_integral_value()) - - -def unscaled_to_decimal(unscaled: int, scale: int) -> Decimal: - """Get a scaled Decimal value given an unscaled value and a scale - - Args: - unscaled (int): An unscaled value - scale (int): A scale to set for the returned Decimal instance - - Returns: - Decimal: A scaled Decimal instance - """ - sign, digits, _ = Decimal(unscaled).as_tuple() - return Decimal((sign, digits, -scale)) - - @singledispatch def partition_to_py(primitive_type, value_str: str): """A generic function which converts a partition string to a python built-in @@ -251,9 +225,7 @@ def _(primitive_type, value: Decimal) -> bytes: f"Cannot serialize value, precision of value is greater than precision of type {primitive_type}: {len(digits)}" ) - unscaled_value = decimal_to_unscaled(value=Decimal((sign, digits, 0))) - min_num_bytes = ((unscaled_value).bit_length() + 7) // 8 - return unscaled_value.to_bytes(min_num_bytes, "big", signed=True) + return decimal_to_bytes(value) @singledispatch diff --git a/python/src/iceberg/expression/literals.py b/python/src/iceberg/expression/literals.py index 14708049d01a..58dc66ef1b22 100644 --- a/python/src/iceberg/expression/literals.py +++ b/python/src/iceberg/expression/literals.py @@ -19,16 +19,22 @@ # specific language governing permissions and limitations # under the License. -import re import struct import sys from abc import ABC, abstractmethod -from datetime import date, datetime, time from decimal import ROUND_HALF_UP, Decimal from functools import singledispatch from typing import Generic, Optional, TypeVar, Union from uuid import UUID +from iceberg.utils.datetime import ( + date_to_days, + micros_to_days, + time_to_micros, + timestamp_to_micros, + timestamptz_to_micros, +) + if sys.version_info >= (3, 8): from functools import singledispatchmethod # pragma: no cover else: @@ -52,11 +58,6 @@ UUIDType, ) -EPOCH_DATE = date.fromisoformat("1970-01-01") -EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000") -ISO_TIMESTAMP = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?") -EPOCH_TIMESTAMPTZ = datetime.fromisoformat("1970-01-01T00:00:00.000000+00:00") -ISO_TIMESTAMPTZ = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?[-+]\d\d:\d\d") T = TypeVar("T") @@ -360,7 +361,7 @@ def _(self, type_var: TimestampType) -> Literal[int]: @to.register(DateType) def _(self, type_var: DateType) -> Literal[int]: - return DateLiteral((datetime.fromtimestamp(self.value / 1_000_000) - EPOCH_TIMESTAMP).days) + return DateLiteral(micros_to_days(self.value)) class DecimalLiteral(Literal[Decimal]): @@ -391,33 +392,32 @@ def _(self, type_var: StringType) -> Literal[str]: return self @to.register(DateType) - def _(self, type_var: DateType) -> Literal[int]: - return DateLiteral((date.fromisoformat(self.value) - EPOCH_DATE).days) + def _(self, type_var: DateType) -> Optional[Literal[int]]: + try: + return DateLiteral(date_to_days(self.value)) + except (TypeError, ValueError): + return None @to.register(TimeType) - def _(self, type_var: TimeType) -> Literal[int]: - t = time.fromisoformat(self.value) - return TimeLiteral((((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond) + def _(self, type_var: TimeType) -> Optional[Literal[int]]: + try: + return TimeLiteral(time_to_micros(self.value)) + except (TypeError, ValueError): + return None @to.register(TimestampType) def _(self, type_var: TimestampType) -> Optional[Literal[int]]: - if ISO_TIMESTAMP.fullmatch(self.value): - try: - delta = datetime.fromisoformat(self.value) - EPOCH_TIMESTAMP - return TimestampLiteral((delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds) - except TypeError: - return None - return None + try: + return TimestampLiteral(timestamp_to_micros(self.value)) + except (TypeError, ValueError): + return None @to.register(TimestamptzType) def _(self, type_var: TimestamptzType) -> Optional[Literal[int]]: - if ISO_TIMESTAMPTZ.fullmatch(self.value): - try: - delta = datetime.fromisoformat(self.value) - EPOCH_TIMESTAMPTZ - return TimestampLiteral((delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds) - except TypeError: - return None - return None + try: + return TimestampLiteral(timestamptz_to_micros(self.value)) + except (TypeError, ValueError): + return None @to.register(UUIDType) def _(self, type_var: UUIDType) -> Literal[UUID]: diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py index d24668cc60fc..cfa0b9ed7393 100644 --- a/python/src/iceberg/transforms.py +++ b/python/src/iceberg/transforms.py @@ -15,10 +15,10 @@ # specific language governing permissions and limitations # under the License. -import math import struct +from abc import ABC from decimal import Decimal -from typing import Optional +from typing import Generic, Optional, TypeVar from uuid import UUID import mmh3 # type: ignore @@ -37,9 +37,13 @@ TimeType, UUIDType, ) +from iceberg.utils.decimal import decimal_to_bytes +S = TypeVar("S") +T = TypeVar("T") -class Transform: + +class Transform(ABC, Generic[S, T]): """Transform base class for concrete transforms. A base class to transform values and project predicates on partition values. @@ -60,18 +64,19 @@ def __repr__(self): def __str__(self): return self._transform_string - def __call__(self, value): + def __call__(self, value: S) -> Optional[T]: return self.apply(value) - def apply(self, value): - raise NotImplementedError() + def apply(self, value: S) -> Optional[T]: + ... def can_transform(self, source: IcebergType) -> bool: return False def result_type(self, source: IcebergType) -> IcebergType: - raise NotImplementedError() + ... + @property def preserves_order(self) -> bool: return False @@ -83,11 +88,12 @@ def to_human_string(self, value) -> str: return "null" return str(value) + @property def dedup_name(self) -> str: return self._transform_string -class BaseBucketTransform(Transform): +class BaseBucketTransform(Transform[S, int]): """Base Transform class to transform a value into a bucket partition value Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit @@ -110,18 +116,15 @@ def __init__(self, source_type: IcebergType, num_buckets: int): def num_buckets(self) -> int: return self._num_buckets - def hash(self, value) -> int: + def hash(self, value: S) -> int: raise NotImplementedError() - def apply(self, value) -> Optional[int]: + def apply(self, value: S) -> Optional[int]: if value is None: return None return (self.hash(value) & IntegerType.max) % self._num_buckets - def can_transform(self, source: IcebergType) -> bool: - raise NotImplementedError() - def result_type(self, source: IcebergType) -> IcebergType: return IntegerType() @@ -156,11 +159,7 @@ def can_transform(self, source: IcebergType) -> bool: return isinstance(source, DecimalType) def hash(self, value: Decimal) -> int: - value_tuple = value.as_tuple() - unscaled_value = int(("-" if value_tuple.sign else "") + "".join([str(d) for d in value_tuple.digits])) - number_of_bytes = int(math.ceil(unscaled_value.bit_length() / 8)) - value_in_bytes = unscaled_value.to_bytes(length=number_of_bytes, byteorder="big") - return mmh3.hash(value_in_bytes) + return mmh3.hash(decimal_to_bytes(value)) class BucketStringTransform(BaseBucketTransform): diff --git a/python/src/iceberg/utils/datetime.py b/python/src/iceberg/utils/datetime.py new file mode 100644 index 000000000000..c8c12393b629 --- /dev/null +++ b/python/src/iceberg/utils/datetime.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Helper methods for working with date/time representations +""" +import re +from datetime import date, datetime, time + +EPOCH_DATE = date.fromisoformat("1970-01-01") +EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000") +ISO_TIMESTAMP = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?") +EPOCH_TIMESTAMPTZ = datetime.fromisoformat("1970-01-01T00:00:00.000000+00:00") +ISO_TIMESTAMPTZ = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?[-+]\d\d:\d\d") + + +def micros_to_days(timestamp: int) -> int: + """Converts a timestamp in microseconds to a date in days""" + return (datetime.fromtimestamp(timestamp / 1_000_000) - EPOCH_TIMESTAMP).days + + +def date_to_days(date_str: str) -> int: + """Converts an ISO-8601 formatted date to days from 1970-01-01""" + return (date.fromisoformat(date_str) - EPOCH_DATE).days + + +def time_to_micros(time_str: str) -> int: + """Converts an ISO-8601 formatted time to microseconds from midnight""" + t = time.fromisoformat(time_str) + return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond + + +def datetime_to_micros(dt: datetime) -> int: + """Converts a datetime to microseconds from 1970-01-01T00:00:00.000000""" + if dt.tzinfo: + delta = dt - EPOCH_TIMESTAMPTZ + else: + delta = dt - EPOCH_TIMESTAMP + return (delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds + + +def timestamp_to_micros(timestamp_str: str) -> int: + """Converts an ISO-9601 formatted timestamp without zone to microseconds from 1970-01-01T00:00:00.000000""" + if ISO_TIMESTAMP.fullmatch(timestamp_str): + return datetime_to_micros(datetime.fromisoformat(timestamp_str)) + raise ValueError(f"Invalid timestamp without zone: {timestamp_str} (must be ISO-8601)") + + +def timestamptz_to_micros(timestamptz_str: str) -> int: + """Converts an ISO-8601 formatted timestamp with zone to microseconds from 1970-01-01T00:00:00.000000+00:00""" + if ISO_TIMESTAMPTZ.fullmatch(timestamptz_str): + return datetime_to_micros(datetime.fromisoformat(timestamptz_str)) + raise ValueError(f"Invalid timestamp with zone: {timestamptz_str} (must be ISO-8601)") diff --git a/python/src/iceberg/utils/decimal.py b/python/src/iceberg/utils/decimal.py new file mode 100644 index 000000000000..1d4c2bddefd0 --- /dev/null +++ b/python/src/iceberg/utils/decimal.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Helper methods for working with Python Decimals +""" +from decimal import Decimal +from typing import Union + + +def decimal_to_unscaled(value: Decimal) -> int: + """Get an unscaled value given a Decimal value + + Args: + value (Decimal): A Decimal instance + + Returns: + int: The unscaled value + """ + sign, digits, _ = value.as_tuple() + return int(Decimal((sign, digits, 0)).to_integral_value()) + + +def unscaled_to_decimal(unscaled: int, scale: int) -> Decimal: + """Get a scaled Decimal value given an unscaled value and a scale + + Args: + unscaled (int): An unscaled value + scale (int): A scale to set for the returned Decimal instance + + Returns: + Decimal: A scaled Decimal instance + """ + sign, digits, _ = Decimal(unscaled).as_tuple() + return Decimal((sign, digits, -scale)) + + +def bytes_required(value: Union[int, Decimal]) -> int: + """Returns the minimum number of bytes needed to serialize a decimal or unscaled value + + Args: + value (int | Decimal): a Decimal value or unscaled int value + + Returns: + int: the minimum number of bytes needed to serialize the value + """ + if isinstance(value, int): + return (value.bit_length() + 7) // 8 + elif isinstance(value, Decimal): + return (decimal_to_unscaled(value).bit_length() + 7) // 8 + + raise ValueError(f"Unsupported value: {value}") + + +def decimal_to_bytes(value: Decimal) -> bytes: + """Returns a byte representation of a decimal + + Args: + value (Decimal): a decimal value + Returns: + bytes: the unscaled value of the Decimal as bytes + """ + unscaled_value = decimal_to_unscaled(value) + return unscaled_value.to_bytes(bytes_required(unscaled_value), byteorder="big", signed=True) diff --git a/python/tests/test_conversions.py b/python/tests/test_conversions.py index c8005237974d..b107023473f7 100644 --- a/python/tests/test_conversions.py +++ b/python/tests/test_conversions.py @@ -77,6 +77,7 @@ import pytest +import iceberg.utils.decimal as decimal_util from iceberg import conversions from iceberg.types import ( BinaryType, @@ -112,7 +113,7 @@ ) def test_decimal_to_unscaled(value, expected_result): """Test converting a decimal to an unscaled value""" - assert conversions.decimal_to_unscaled(value=value) == expected_result + assert decimal_util.decimal_to_unscaled(value=value) == expected_result @pytest.mark.parametrize( @@ -130,7 +131,7 @@ def test_decimal_to_unscaled(value, expected_result): ) def test_unscaled_to_decimal(unscaled, scale, expected_result): """Test converting an unscaled value to a decimal with a specified scale""" - assert conversions.unscaled_to_decimal(unscaled=unscaled, scale=scale) == expected_result + assert decimal_util.unscaled_to_decimal(unscaled=unscaled, scale=scale) == expected_result @pytest.mark.parametrize( diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py index 96b68b8fb470..12e786f954c4 100644 --- a/python/tests/test_transforms.py +++ b/python/tests/test_transforms.py @@ -15,10 +15,10 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime -from decimal import Decimal, getcontext +from decimal import Decimal from uuid import UUID +import mmh3 as mmh3 import pytest from iceberg import transforms @@ -35,6 +35,12 @@ TimeType, UUIDType, ) +from iceberg.utils.datetime import ( + date_to_days, + time_to_micros, + timestamp_to_micros, + timestamptz_to_micros, +) @pytest.mark.parametrize( @@ -43,15 +49,15 @@ (1, IntegerType(), 1392991556), (34, IntegerType(), 2017239379), (34, LongType(), 2017239379), - (17486, DateType(), -653330422), - (81068000000, TimeType(), -662762989), + (date_to_days("2017-11-16"), DateType(), -653330422), + (time_to_micros("22:31:08"), TimeType(), -662762989), ( - int(datetime.fromisoformat("2017-11-16T22:31:08+00:00").timestamp() * 1000000), + timestamp_to_micros("2017-11-16T22:31:08"), TimestampType(), -2047944441, ), ( - int(datetime.fromisoformat("2017-11-16T14:31:08-08:00").timestamp() * 1000000), + timestamptz_to_micros("2017-11-16T14:31:08-08:00"), TimestamptzType(), -2047944441, ), @@ -65,25 +71,6 @@ def test_bucket_hash_values(test_input, test_type, expected): assert transforms.bucket(test_type, 8).hash(test_input) == expected -@pytest.mark.parametrize( - "test_input,test_type,scale_factor,expected_hash,expected", - [ - (Decimal("14.20"), DecimalType(9, 2), Decimal(10) ** -2, -500754589, 59), - ( - Decimal("137302769811943318102518958871258.37580"), - DecimalType(38, 5), - Decimal(10) ** -5, - -32334285, - 63, - ), - ], -) -def test_decimal_bucket(test_input, test_type, scale_factor, expected_hash, expected): - getcontext().prec = 38 - assert transforms.bucket(test_type, 100).hash(test_input.quantize(scale_factor)) == expected_hash - assert transforms.bucket(test_type, 100).apply(test_input.quantize(scale_factor)) == expected - - @pytest.mark.parametrize( "bucket,value,expected", [ @@ -131,3 +118,10 @@ def test_bucket_method(type_var): assert bucket_transform.num_buckets == 8 assert bucket_transform.apply(None) is None assert bucket_transform.to_human_string("test") == "test" + + +def test_string_with_surrogate_pair(): + string_with_surrogate_pair = "string with a surrogate pair: 💰" + as_bytes = bytes(string_with_surrogate_pair, "UTF-8") + bucket_transform = transforms.bucket(StringType(), 100) + assert bucket_transform.hash(string_with_surrogate_pair) == mmh3.hash(as_bytes)