diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py index 39ec5d538e1d..db8f4fd777b7 100644 --- a/python/src/iceberg/transforms.py +++ b/python/src/iceberg/transforms.py @@ -15,9 +15,11 @@ # specific language governing permissions and limitations # under the License. +import base64 import struct from abc import ABC, abstractmethod from decimal import Decimal +from functools import singledispatchmethod from typing import Generic, Optional, TypeVar from uuid import UUID @@ -37,7 +39,8 @@ TimeType, UUIDType, ) -from iceberg.utils.decimal import decimal_to_bytes +from iceberg.utils import datetime +from iceberg.utils.decimal import decimal_to_bytes, truncate_decimal S = TypeVar("S") T = TypeVar("T") @@ -224,6 +227,160 @@ def hash(self, value: UUID) -> int: ) +def _base64encode(buffer: bytes) -> str: + """Converts bytes to base64 string""" + return base64.b64encode(buffer).decode("ISO-8859-1") + + +class IdentityTransform(Transform[S, S]): + """Transforms a value into itself. + + Example: + >>> transform = IdentityTransform(StringType()) + >>> transform.apply('hello-world') + 'hello-world' + """ + + def __init__(self, source_type: IcebergType): + super().__init__( + "identity", + f"transforms.identity(source_type={repr(source_type)})", + ) + self._type = source_type + + def apply(self, value: Optional[S]) -> Optional[S]: + return value + + def can_transform(self, source: IcebergType) -> bool: + return source.is_primitive + + def result_type(self, source: IcebergType) -> IcebergType: + return source + + @property + def preserves_order(self) -> bool: + return True + + def satisfies_order_of(self, other: Transform) -> bool: + """ordering by value is the same as long as the other preserves order""" + return other.preserves_order + + def to_human_string(self, value: Optional[S]) -> str: + return self._human_string(value) + + @singledispatchmethod + def _human_string(self, value: Optional[S]) -> str: + return str(value) if value is not None else "null" + + @_human_string.register(bytes) + def _(self, value: bytes) -> str: + return _base64encode(value) + + @_human_string.register(int) + def _(self, value: int) -> str: + return self._int_to_human_string(self._type, value) + + @singledispatchmethod + def _int_to_human_string(self, value_type: IcebergType, value: int) -> str: + return str(value) + + @_int_to_human_string.register(DateType) + def _(self, value_type: IcebergType, value: int) -> str: + return datetime.to_human_day(value) + + @_int_to_human_string.register(TimeType) + def _(self, value_type: IcebergType, value: int) -> str: + return datetime.to_human_time(value) + + @_int_to_human_string.register(TimestampType) + def _(self, value_type: IcebergType, value: int) -> str: + return datetime.to_human_timestamp(value) + + @_int_to_human_string.register(TimestamptzType) + def _(self, value_type: IcebergType, value: int) -> str: + return datetime.to_human_timestamptz(value) + + +class TruncateTransform(Transform[S, S]): + """A transform for truncating a value to a specified width. + Args: + source_type (Type): An Iceberg Type of IntegerType, LongType, StringType, BinaryType or DecimalType + width (int): The truncate width + Raises: + ValueError: If a type is provided that is incompatible with a Truncate transform + """ + + def __init__(self, source_type: IcebergType, width: int): + super().__init__( + f"truncate[{width}]", + f"transforms.truncate(source_type={repr(source_type)}, width={width})", + ) + self._type = source_type + self._width = width + + @property + def width(self): + return self._width + + def apply(self, value: Optional[S]) -> Optional[S]: + return self._truncate_value(value) if value is not None else None + + @singledispatchmethod + def _truncate_value(self, value: S) -> S: + raise ValueError(f"Cannot truncate value: {value}") + + @_truncate_value.register(int) + def _(self, value): + """Truncate a given int value into a given width if feasible.""" + if type(self._type) in {IntegerType, LongType}: + return value - value % self._width + else: + raise ValueError(f"Cannot truncate type: {self._type} for value: {value}") + + @_truncate_value.register(str) + def _(self, value): + """Truncate a given string to a given width.""" + return value[0 : min(self._width, len(value))] + + @_truncate_value.register(bytes) + def _(self, value): + """Truncate a given binary bytes into a given width.""" + if isinstance(self._type, BinaryType): + return value[0 : min(self._width, len(value))] + else: + raise ValueError(f"Cannot truncate type: {self._type}") + + @_truncate_value.register(Decimal) + def _(self, value): + """Truncate a given decimal value into a given width.""" + return truncate_decimal(value, self._width) + + def can_transform(self, source: IcebergType) -> bool: + return self._type == source + + def result_type(self, source: IcebergType) -> IcebergType: + return source + + def preserves_order(self) -> bool: + return True + + def satisfies_order_of(self, other: Transform) -> bool: + if self == other: + return True + elif isinstance(self._type, StringType) and isinstance(other, TruncateTransform) and isinstance(other._type, StringType): + return self._width >= other._width + + return False + + def to_human_string(self, value: Optional[S]) -> str: + if value is None: + return "null" + elif isinstance(value, bytes): + return _base64encode(value) + else: + return str(value) + + class UnknownTransform(Transform): """A transform that represents when an unknown transform is provided Args: @@ -294,5 +451,13 @@ def bucket(source_type: IcebergType, num_buckets: int) -> BaseBucketTransform: raise ValueError(f"Cannot bucket by type: {source_type}") +def identity(source_type: IcebergType) -> IdentityTransform: + return IdentityTransform(source_type) + + +def truncate(source_type: IcebergType, width: int) -> TruncateTransform: + return TruncateTransform(source_type, width) + + def always_null() -> Transform: return VoidTransform() diff --git a/python/src/iceberg/utils/datetime.py b/python/src/iceberg/utils/datetime.py index c8c12393b629..69b6580f695a 100644 --- a/python/src/iceberg/utils/datetime.py +++ b/python/src/iceberg/utils/datetime.py @@ -17,7 +17,12 @@ """Helper methods for working with date/time representations """ import re -from datetime import date, datetime, time +from datetime import ( + date, + datetime, + time, + timedelta, +) EPOCH_DATE = date.fromisoformat("1970-01-01") EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000") @@ -63,3 +68,24 @@ def timestamptz_to_micros(timestamptz_str: str) -> int: if ISO_TIMESTAMPTZ.fullmatch(timestamptz_str): return datetime_to_micros(datetime.fromisoformat(timestamptz_str)) raise ValueError(f"Invalid timestamp with zone: {timestamptz_str} (must be ISO-8601)") + + +def to_human_day(day_ordinal: int) -> str: + """Converts a DateType value to human string""" + return (EPOCH_DATE + timedelta(days=day_ordinal)).isoformat() + + +def to_human_time(micros_from_midnight: int) -> str: + """Converts a TimeType value to human string""" + to_day = EPOCH_TIMESTAMP + timedelta(microseconds=micros_from_midnight) + return to_day.time().isoformat() + + +def to_human_timestamptz(timestamp_micros: int) -> str: + """Converts a TimestamptzType value to human string""" + return (EPOCH_TIMESTAMPTZ + timedelta(microseconds=timestamp_micros)).isoformat() + + +def to_human_timestamp(timestamp_micros: int) -> str: + """Converts a TimestampType value to human string""" + return (EPOCH_TIMESTAMP + timedelta(microseconds=timestamp_micros)).isoformat() diff --git a/python/src/iceberg/utils/decimal.py b/python/src/iceberg/utils/decimal.py index 1d4c2bddefd0..40d3dd48ef3a 100644 --- a/python/src/iceberg/utils/decimal.py +++ b/python/src/iceberg/utils/decimal.py @@ -75,3 +75,18 @@ def decimal_to_bytes(value: Decimal) -> bytes: """ unscaled_value = decimal_to_unscaled(value) return unscaled_value.to_bytes(bytes_required(unscaled_value), byteorder="big", signed=True) + + +def truncate_decimal(value: Decimal, width: int) -> Decimal: + """Get a truncated Decimal value given a decimal value and a width + + Args: + value (Decimal): a decimal value + width (int): A width for the returned Decimal instance + + Returns: + Decimal: A truncated Decimal instance + """ + unscaled_value = decimal_to_unscaled(value) + applied_value = unscaled_value - (((unscaled_value % width) + width) % width) + return Decimal(f"{applied_value}e{value.as_tuple().exponent}") diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py index 7855345f9de4..8606e4aaeaf6 100644 --- a/python/tests/test_transforms.py +++ b/python/tests/test_transforms.py @@ -28,7 +28,9 @@ BooleanType, DateType, DecimalType, + DoubleType, FixedType, + FloatType, IntegerType, LongType, StringType, @@ -129,6 +131,107 @@ def test_string_with_surrogate_pair(): assert bucket_transform.hash(string_with_surrogate_pair) == mmh3.hash(as_bytes) +@pytest.mark.parametrize( + "type_var,value,expected", + [ + (LongType(), None, "null"), + (DateType(), 17501, "2017-12-01"), + (TimeType(), 36775038194, "10:12:55.038194"), + (TimestamptzType(), 1512151975038194, "2017-12-01T18:12:55.038194+00:00"), + (TimestampType(), 1512151975038194, "2017-12-01T18:12:55.038194"), + (LongType(), -1234567890000, "-1234567890000"), + (StringType(), "a/b/c=d", "a/b/c=d"), + (DecimalType(9, 2), Decimal("-1.50"), "-1.50"), + (FixedType(100), b"foo", "Zm9v"), + ], +) +def test_identity_human_string(type_var, value, expected): + identity = transforms.identity(type_var) + assert identity.to_human_string(value) == expected + + +@pytest.mark.parametrize( + "type_var", + [ + BinaryType(), + BooleanType(), + DateType(), + DecimalType(8, 2), + DoubleType(), + FixedType(16), + FloatType(), + IntegerType(), + LongType(), + StringType(), + TimestampType(), + TimestamptzType(), + TimeType(), + UUIDType(), + ], +) +def test_identity_method(type_var): + identity_transform = transforms.identity(type_var) + assert str(identity_transform) == str(eval(repr(identity_transform))) + assert identity_transform.can_transform(type_var) + assert identity_transform.result_type(type_var) == type_var + assert identity_transform.apply("test") == "test" + + +@pytest.mark.parametrize("type_var", [IntegerType(), LongType()]) +@pytest.mark.parametrize( + "input_var,expected", + [(1, 0), (5, 0), (9, 0), (10, 10), (11, 10), (-1, -10), (-10, -10), (-12, -20)], +) +def test_truncate_integer(type_var, input_var, expected): + trunc = transforms.truncate(type_var, 10) + assert trunc.apply(input_var) == expected + + +@pytest.mark.parametrize( + "input_var,expected", + [ + (Decimal("12.34"), Decimal("12.30")), + (Decimal("12.30"), Decimal("12.30")), + (Decimal("12.29"), Decimal("12.20")), + (Decimal("0.05"), Decimal("0.00")), + (Decimal("-0.05"), Decimal("-0.10")), + ], +) +def test_truncate_decimal(input_var, expected): + trunc = transforms.truncate(DecimalType(9, 2), 10) + assert trunc.apply(input_var) == expected + + +@pytest.mark.parametrize("input_var,expected", [("abcdefg", "abcde"), ("abc", "abc")]) +def test_truncate_string(input_var, expected): + trunc = transforms.truncate(StringType(), 5) + assert trunc.apply(input_var) == expected + + +@pytest.mark.parametrize( + "type_var,value,expected_human_str,expected", + [ + (BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"), + (DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")), + (IntegerType(), 123, "123", 123), + (LongType(), 123, "123", 123), + (StringType(), "foo", "foo", "f"), + ], +) +def test_truncate_method(type_var, value, expected_human_str, expected): + truncate_transform = transforms.truncate(type_var, 1) + assert str(truncate_transform) == str(eval(repr(truncate_transform))) + assert truncate_transform.can_transform(type_var) + assert truncate_transform.result_type(type_var) == type_var + assert truncate_transform.to_human_string(value) == expected_human_str + assert truncate_transform.apply(value) == expected + assert truncate_transform.to_human_string(None) == "null" + assert truncate_transform.width == 1 + assert truncate_transform.apply(None) is None + assert truncate_transform.preserves_order() + assert truncate_transform.satisfies_order_of(truncate_transform) + + def test_unknown_transform(): unknown_transform = transforms.UnknownTransform(FixedType(8), "unknown") assert str(unknown_transform) == str(eval(repr(unknown_transform))) diff --git a/python/tox.ini b/python/tox.ini new file mode 100644 index 000000000000..488957edf331 --- /dev/null +++ b/python/tox.ini @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[tox] +envlist = py38,py39,py310,linters +skip_missing_interpreters = true + +[testenv] +usedevelop = true +extras = arrow +deps = + pip>=21.1 + coverage + mock + pytest + pytest-checkdocs + pyarrow +setenv = + COVERAGE_FILE = test-reports/{envname}/.coverage + PYTEST_ADDOPTS = --junitxml=test-reports/{envname}/junit.xml -vv +commands = + coverage run --source src --parallel-mode -m pytest {posargs} + coverage combine + coverage report -m --fail-under=90 + coverage html -d test-reports/{envname}/coverage-html + coverage xml -o test-reports/{envname}/coverage.xml + +[testenv:linters] +usedevelop = true +deps = + {[testenv:format-check]deps} + {[testenv:type-check]deps} +commands = + {[testenv:format-check]commands} + {[testenv:type-check]commands} + +[testenv:format-check] +deps = + black + isort + autoflake + pylint + pyenchant +commands = + autoflake -r --check --ignore-init-module-imports --remove-all-unused-imports src tests + isort --profile black --check-only src tests + black --line-length 130 --check --diff src tests + pylint src tests + +[testenv:format] +deps = + {[testenv:format-check]deps} +commands = + autoflake -r --in-place --ignore-init-module-imports --remove-all-unused-imports src tests + isort --profile black src tests + black --line-length 130 src tests + +[testenv:type-check] +deps = + mypy +commands = + mypy --install-types --non-interactive --config tox.ini src + +[testenv:docs] +basepython = python3 +deps = + -r docs/source/requirements.txt +commands = + sphinx-build -E -W -c docs/source/ -b html docs/source/ docs/build/html + sphinx-build -E -W -c docs/source/ -b man docs/source/ docs/build/man + +[testenv:serve-docs] +basepython = python3 +skip_install = true +changedir = docs/build/html +deps = +commands = + python -m http.server {posargs} + +[pytest] +norecursedirs=.* +addopts = --doctest-modules + +[gh-actions] +python = + 3.8: py38, linters + 3.9: py39 + 3.10: py310 + +[mypy] + +no_implicit_optional=True +warn_redundant_casts=True +warn_unreachable=True + +[mypy-pyarrow.*] +ignore_missing_imports = True