Skip to content

Commit 5473911

Browse files
committed
Python: Refactor to use common decimal and datetime util.
1 parent 5ba34da commit 5473911

File tree

7 files changed

+187
-100
lines changed

7 files changed

+187
-100
lines changed

python/src/iceberg/conversions.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
TimeType,
5151
UUIDType,
5252
)
53+
from iceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal
5354

5455

5556
def handle_none(func):
@@ -69,33 +70,6 @@ def wrapper(primitive_type, value_str):
6970
return wrapper
7071

7172

72-
def decimal_to_unscaled(value: Decimal) -> int:
73-
"""Get an unscaled value given a Decimal value
74-
75-
Args:
76-
value (Decimal): A Decimal instance
77-
78-
Returns:
79-
int: The unscaled value
80-
"""
81-
sign, digits, _ = value.as_tuple()
82-
return int(Decimal((sign, digits, 0)).to_integral_value())
83-
84-
85-
def unscaled_to_decimal(unscaled: int, scale: int) -> Decimal:
86-
"""Get a scaled Decimal value given an unscaled value and a scale
87-
88-
Args:
89-
unscaled (int): An unscaled value
90-
scale (int): A scale to set for the returned Decimal instance
91-
92-
Returns:
93-
Decimal: A scaled Decimal instance
94-
"""
95-
sign, digits, _ = Decimal(unscaled).as_tuple()
96-
return Decimal((sign, digits, -scale))
97-
98-
9973
@singledispatch
10074
def partition_to_py(primitive_type, value_str: str):
10175
"""A generic function which converts a partition string to a python built-in
@@ -251,9 +225,7 @@ def _(primitive_type, value: Decimal) -> bytes:
251225
f"Cannot serialize value, precision of value is greater than precision of type {primitive_type}: {len(digits)}"
252226
)
253227

254-
unscaled_value = decimal_to_unscaled(value=Decimal((sign, digits, 0)))
255-
min_num_bytes = ((unscaled_value).bit_length() + 7) // 8
256-
return unscaled_value.to_bytes(min_num_bytes, "big", signed=True)
228+
return decimal_to_bytes(value)
257229

258230

259231
@singledispatch

python/src/iceberg/expression/literals.py

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@
1919
# specific language governing permissions and limitations
2020
# under the License.
2121

22-
import re
2322
import struct
2423
import sys
2524
from abc import ABC, abstractmethod
26-
from datetime import date, datetime, time
2725
from decimal import ROUND_HALF_UP, Decimal
2826
from functools import singledispatch
2927
from typing import Generic, Optional, TypeVar, Union
3028
from uuid import UUID
3129

30+
from iceberg.utils.datetime import (
31+
date_to_days,
32+
micros_to_days,
33+
time_to_micros,
34+
timestamp_to_micros,
35+
timestamptz_to_micros,
36+
)
37+
3238
if sys.version_info >= (3, 8):
3339
from functools import singledispatchmethod # pragma: no cover
3440
else:
@@ -52,11 +58,6 @@
5258
UUIDType,
5359
)
5460

55-
EPOCH_DATE = date.fromisoformat("1970-01-01")
56-
EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000")
57-
ISO_TIMESTAMP = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?")
58-
EPOCH_TIMESTAMPTZ = datetime.fromisoformat("1970-01-01T00:00:00.000000+00:00")
59-
ISO_TIMESTAMPTZ = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?[-+]\d\d:\d\d")
6061
T = TypeVar("T")
6162

6263

@@ -360,7 +361,7 @@ def _(self, type_var: TimestampType) -> Literal[int]:
360361

361362
@to.register(DateType)
362363
def _(self, type_var: DateType) -> Literal[int]:
363-
return DateLiteral((datetime.fromtimestamp(self.value / 1_000_000) - EPOCH_TIMESTAMP).days)
364+
return DateLiteral(micros_to_days(self.value))
364365

365366

366367
class DecimalLiteral(Literal[Decimal]):
@@ -392,32 +393,25 @@ def _(self, type_var: StringType) -> Literal[str]:
392393

393394
@to.register(DateType)
394395
def _(self, type_var: DateType) -> Literal[int]:
395-
return DateLiteral((date.fromisoformat(self.value) - EPOCH_DATE).days)
396+
return DateLiteral(date_to_days(self.value))
396397

397398
@to.register(TimeType)
398399
def _(self, type_var: TimeType) -> Literal[int]:
399-
t = time.fromisoformat(self.value)
400-
return TimeLiteral((((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond)
400+
return TimeLiteral(time_to_micros(self.value))
401401

402402
@to.register(TimestampType)
403403
def _(self, type_var: TimestampType) -> Optional[Literal[int]]:
404-
if ISO_TIMESTAMP.fullmatch(self.value):
405-
try:
406-
delta = datetime.fromisoformat(self.value) - EPOCH_TIMESTAMP
407-
return TimestampLiteral((delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds)
408-
except TypeError:
409-
return None
410-
return None
404+
try:
405+
return TimestampLiteral(timestamp_to_micros(self.value))
406+
except TypeError:
407+
return None
411408

412409
@to.register(TimestamptzType)
413410
def _(self, type_var: TimestamptzType) -> Optional[Literal[int]]:
414-
if ISO_TIMESTAMPTZ.fullmatch(self.value):
415-
try:
416-
delta = datetime.fromisoformat(self.value) - EPOCH_TIMESTAMPTZ
417-
return TimestampLiteral((delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds)
418-
except TypeError:
419-
return None
420-
return None
411+
try:
412+
return TimestampLiteral(timestamptz_to_micros(self.value))
413+
except TypeError:
414+
return None
421415

422416
@to.register(UUIDType)
423417
def _(self, type_var: UUIDType) -> Literal[UUID]:

python/src/iceberg/transforms.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
import math
1918
import struct
19+
from abc import ABC
2020
from decimal import Decimal
21-
from typing import Optional
21+
from typing import Generic, Optional, TypeVar
2222
from uuid import UUID
2323

2424
import mmh3 # type: ignore
@@ -37,9 +37,13 @@
3737
TimeType,
3838
UUIDType,
3939
)
40+
from iceberg.utils.decimal import decimal_to_bytes
4041

42+
S = TypeVar("S")
43+
T = TypeVar("T")
4144

42-
class Transform:
45+
46+
class Transform(ABC, Generic[S, T]):
4347
"""Transform base class for concrete transforms.
4448
4549
A base class to transform values and project predicates on partition values.
@@ -60,18 +64,19 @@ def __repr__(self):
6064
def __str__(self):
6165
return self._transform_string
6266

63-
def __call__(self, value):
67+
def __call__(self, value: S) -> Optional[T]:
6468
return self.apply(value)
6569

66-
def apply(self, value):
67-
raise NotImplementedError()
70+
def apply(self, value: S) -> Optional[T]:
71+
...
6872

6973
def can_transform(self, source: IcebergType) -> bool:
7074
return False
7175

7276
def result_type(self, source: IcebergType) -> IcebergType:
73-
raise NotImplementedError()
77+
...
7478

79+
@property
7580
def preserves_order(self) -> bool:
7681
return False
7782

@@ -83,11 +88,12 @@ def to_human_string(self, value) -> str:
8388
return "null"
8489
return str(value)
8590

91+
@property
8692
def dedup_name(self) -> str:
8793
return self._transform_string
8894

8995

90-
class BaseBucketTransform(Transform):
96+
class BaseBucketTransform(Transform[S, int]):
9197
"""Base Transform class to transform a value into a bucket partition value
9298
9399
Transforms are parameterized by a number of buckets. Bucket partition transforms use a 32-bit
@@ -110,18 +116,15 @@ def __init__(self, source_type: IcebergType, num_buckets: int):
110116
def num_buckets(self) -> int:
111117
return self._num_buckets
112118

113-
def hash(self, value) -> int:
119+
def hash(self, value: S) -> Optional[int]:
114120
raise NotImplementedError()
115121

116-
def apply(self, value) -> Optional[int]:
122+
def apply(self, value: S) -> Optional[int]:
117123
if value is None:
118124
return None
119125

120126
return (self.hash(value) & IntegerType.max) % self._num_buckets
121127

122-
def can_transform(self, source: IcebergType) -> bool:
123-
raise NotImplementedError()
124-
125128
def result_type(self, source: IcebergType) -> IcebergType:
126129
return IntegerType()
127130

@@ -156,11 +159,7 @@ def can_transform(self, source: IcebergType) -> bool:
156159
return isinstance(source, DecimalType)
157160

158161
def hash(self, value: Decimal) -> int:
159-
value_tuple = value.as_tuple()
160-
unscaled_value = int(("-" if value_tuple.sign else "") + "".join([str(d) for d in value_tuple.digits]))
161-
number_of_bytes = int(math.ceil(unscaled_value.bit_length() / 8))
162-
value_in_bytes = unscaled_value.to_bytes(length=number_of_bytes, byteorder="big")
163-
return mmh3.hash(value_in_bytes)
162+
return mmh3.hash(decimal_to_bytes(value))
164163

165164

166165
class BucketStringTransform(BaseBucketTransform):
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Helper methods for working with date/time representations
18+
"""
19+
import re
20+
from datetime import date, datetime, time
21+
22+
EPOCH_DATE = date.fromisoformat("1970-01-01")
23+
EPOCH_TIMESTAMP = datetime.fromisoformat("1970-01-01T00:00:00.000000")
24+
ISO_TIMESTAMP = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?")
25+
EPOCH_TIMESTAMPTZ = datetime.fromisoformat("1970-01-01T00:00:00.000000+00:00")
26+
ISO_TIMESTAMPTZ = re.compile(r"\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d(.\d{1,6})?[-+]\d\d:\d\d")
27+
28+
29+
def micros_to_days(timestamp: int) -> int:
30+
"""Converts a timestamp in microseconds to a date in days"""
31+
return (datetime.fromtimestamp(timestamp / 1_000_000) - EPOCH_TIMESTAMP).days
32+
33+
34+
def date_to_days(date_str: str) -> int:
35+
"""Converts an ISO-8601 formatted date to days from 1970-01-01"""
36+
return (date.fromisoformat(date_str) - EPOCH_DATE).days
37+
38+
39+
def time_to_micros(time_str: str) -> int:
40+
"""Converts an ISO-8601 formatted time to microseconds from midnight"""
41+
t = time.fromisoformat(time_str)
42+
return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond
43+
44+
45+
def datetime_to_micros(dt: datetime) -> int:
46+
"""Converts a datetime to microseconds from 1970-01-01T00:00:00.000000"""
47+
if dt.tzinfo:
48+
delta = dt - EPOCH_TIMESTAMPTZ
49+
else:
50+
delta = dt - EPOCH_TIMESTAMP
51+
return (delta.days * 86400 + delta.seconds) * 1_000_000 + delta.microseconds
52+
53+
54+
def timestamp_to_micros(timestamp_str: str) -> int:
55+
"""Converts an ISO-9601 formatted timestamp without zone to microseconds from 1970-01-01T00:00:00.000000"""
56+
if ISO_TIMESTAMP.fullmatch(timestamp_str):
57+
return datetime_to_micros(datetime.fromisoformat(timestamp_str))
58+
raise ValueError(f"Invalid timestamp without zone: {timestamp_str} (must be ISO-8601)")
59+
60+
61+
def timestamptz_to_micros(timestamptz_str: str) -> int:
62+
"""Converts an ISO-8601 formatted timestamp with zone to microseconds from 1970-01-01T00:00:00.000000+00:00"""
63+
if ISO_TIMESTAMPTZ.fullmatch(timestamptz_str):
64+
return datetime_to_micros(datetime.fromisoformat(timestamptz_str))
65+
raise ValueError(f"Invalid timestamp with zone: {timestamptz_str} (must be ISO-8601)")
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Helper methods for working with Python Decimals
19+
"""
20+
from decimal import Decimal
21+
from typing import Union
22+
23+
24+
def decimal_to_unscaled(value: Decimal) -> int:
25+
"""Get an unscaled value given a Decimal value
26+
27+
Args:
28+
value (Decimal): A Decimal instance
29+
30+
Returns:
31+
int: The unscaled value
32+
"""
33+
sign, digits, _ = value.as_tuple()
34+
return int(Decimal((sign, digits, 0)).to_integral_value())
35+
36+
37+
def unscaled_to_decimal(unscaled: int, scale: int) -> Decimal:
38+
"""Get a scaled Decimal value given an unscaled value and a scale
39+
40+
Args:
41+
unscaled (int): An unscaled value
42+
scale (int): A scale to set for the returned Decimal instance
43+
44+
Returns:
45+
Decimal: A scaled Decimal instance
46+
"""
47+
sign, digits, _ = Decimal(unscaled).as_tuple()
48+
return Decimal((sign, digits, -scale))
49+
50+
51+
def min_size(value: Union[int, Decimal]) -> int:
52+
"""Returns the minimum number of bytes needed to serialize a decimal or unscaled value
53+
54+
Args:
55+
value (int | Decimal): a Decimal value or unscaled int value
56+
57+
Returns:
58+
int: the minimum number of bytes needed to serialize the value
59+
"""
60+
if isinstance(value, int):
61+
return (value.bit_length() + 7) // 8
62+
elif isinstance(value, Decimal):
63+
return (decimal_to_unscaled(value).bit_length() + 7) // 8
64+
65+
raise ValueError(f"Unsupported value: {value}")
66+
67+
68+
def decimal_to_bytes(value: Decimal) -> bytes:
69+
unscaled_value = decimal_to_unscaled(value)
70+
return unscaled_value.to_bytes(min_size(unscaled_value), byteorder="big", signed=True)

0 commit comments

Comments
 (0)