From 8cc957896f9de3438e406714d03d7bc3011fae70 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 14 Jul 2023 12:52:17 +0200 Subject: [PATCH 01/13] Python: Add more tests for the Avro writer --- python/pyiceberg/avro/decoder.py | 2 +- python/pyiceberg/avro/encoder.py | 111 ++++---------------- python/pyiceberg/avro/reader.py | 11 +- python/pyiceberg/avro/writer.py | 23 ++-- python/pyiceberg/expressions/literals.py | 4 +- python/pyiceberg/utils/datetime.py | 9 +- python/pyiceberg/utils/schema_conversion.py | 15 ++- python/tests/avro/test_decoder.py | 2 +- python/tests/avro/test_encoder.py | 45 ++------ python/tests/avro/test_file.py | 93 ++++++++++++++++ python/tests/test_transforms.py | 4 +- 11 files changed, 165 insertions(+), 154 deletions(-) diff --git a/python/pyiceberg/avro/decoder.py b/python/pyiceberg/avro/decoder.py index 366cf7b26e1c..494339bf74bf 100644 --- a/python/pyiceberg/avro/decoder.py +++ b/python/pyiceberg/avro/decoder.py @@ -132,7 +132,7 @@ def read_utf8(self) -> str: """ return self.read_bytes().decode("utf-8") - def read_uuid_from_fixed(self) -> UUID: + def read_uuid(self) -> UUID: """Reads a UUID as a fixed[16].""" return UUID(bytes=self.read(16)) diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index cf6d60123357..080b50361b7c 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -15,12 +15,13 @@ # specific language governing permissions and limitations # under the License. import decimal -import struct -from datetime import date, datetime, time +from datetime import time +from uuid import UUID from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import OutputStream -from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros +from pyiceberg.utils.datetime import time_to_micros +from pyiceberg.utils.decimal import decimal_to_unscaled class BinaryEncoder: @@ -64,112 +65,44 @@ def write_decimal_bytes(self, datum: decimal.Decimal) -> None: Since size of packed value in bytes for signed long is 8, 8 bytes are written. """ - sign, digits, _ = datum.as_tuple() - - unscaled_datum = 0 - for digit in digits: - unscaled_datum = (unscaled_datum * 10) + digit - - bits_req = unscaled_datum.bit_length() + 1 - if sign: - unscaled_datum = (1 << bits_req) - unscaled_datum - - bytes_req = bits_req // 8 - padding_bits = ~((1 << bits_req) - 1) if sign else 0 - packed_bits = padding_bits | unscaled_datum - - bytes_req += 1 if (bytes_req << 3) < bits_req else 0 - self.write_int(bytes_req) - for index in range(bytes_req - 1, -1, -1): - bits_to_write = packed_bits >> (8 * index) - self.write(bytearray([bits_to_write & 0xFF])) + unscaled_datum = decimal_to_unscaled(datum) + size = (unscaled_datum.bit_length() + 7) // 8 + bytes_datum = unscaled_datum.to_bytes(length=size, byteorder="big", signed=True) + self.write_bytes(bytes_datum) def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None: """Decimal in fixed are encoded as size of fixed bytes.""" - sign, digits, _ = datum.as_tuple() - - unscaled_datum = 0 - for digit in digits: - unscaled_datum = (unscaled_datum * 10) + digit - - bits_req = unscaled_datum.bit_length() + 1 - size_in_bits = size * 8 - offset_bits = size_in_bits - bits_req - - mask = 2**size_in_bits - 1 - bit = 1 - for _ in range(bits_req): - mask ^= bit - bit <<= 1 - - if bits_req < 8: - bytes_req = 1 - else: - bytes_req = bits_req // 8 - if bits_req % 8 != 0: - bytes_req += 1 - if sign: - unscaled_datum = (1 << bits_req) - unscaled_datum - unscaled_datum = mask | unscaled_datum - for index in range(size - 1, -1, -1): - bits_to_write = unscaled_datum >> (8 * index) - self.write(bytearray([bits_to_write & 0xFF])) - else: - for _ in range(offset_bits // 8): - self.write(b"\x00") - for index in range(bytes_req - 1, -1, -1): - bits_to_write = unscaled_datum >> (8 * index) - self.write(bytearray([bits_to_write & 0xFF])) + unscaled_datum = decimal_to_unscaled(datum) + bytes_datum = unscaled_datum.to_bytes(length=size, byteorder="big", signed=True) + self.write(bytes_datum) def write_bytes(self, b: bytes) -> None: """Bytes are encoded as a long followed by that many bytes of data.""" self.write_int(len(b)) - self.write(struct.pack(f"{len(b)}s", b)) + self.write(b) def write_bytes_fixed(self, b: bytes) -> None: """Writes fixed number of bytes.""" - self.write(struct.pack(f"{len(b)}s", b)) + self.write(b) def write_utf8(self, s: str) -> None: """A string is encoded as a long followed by that many bytes of UTF-8 encoded character data.""" self.write_bytes(s.encode("utf-8")) - def write_date_int(self, d: date) -> None: - """ - Encode python date object as int. - - It stores the number of days from the unix epoch, 1 January 1970 (ISO calendar). - """ - self.write_int(date_to_days(d)) - - def write_time_millis_int(self, dt: time) -> None: - """ - Encode python time object as int. + def write_uuid(self, uuid: UUID) -> None: + """Write UUID as a string. - It stores the number of milliseconds from midnight, 00:00:00.000 + The uuid logical type represents a random generated universally unique identifier (UUID). + An uuid logical type annotates an Avro string. The string has to conform with RFC-4122. """ - self.write_int(int(time_object_to_micros(dt) / 1000)) + if len(uuid.bytes) != 16: + raise ValueError(f"Expected UUID to have 16 bytes, got: len({uuid.bytes!r})") + return self.write(uuid.bytes) - def write_time_micros_long(self, dt: time) -> None: + def write_time_micros(self, dt: time) -> None: """ Encode python time object as long. It stores the number of microseconds from midnight, 00:00:00.000000 """ - self.write_int(time_object_to_micros(dt)) - - def write_timestamp_millis_long(self, dt: datetime) -> None: - """ - Encode python datetime object as long. - - It stores the number of milliseconds from midnight of unix epoch, 1 January 1970. - """ - self.write_int(int(datetime_to_micros(dt) / 1000)) - - def write_timestamp_micros_long(self, dt: datetime) -> None: - """ - Encode python datetime object as long. - - It stores the number of microseconds from midnight of unix epoch, 1 January 1970. - """ - self.write_int(datetime_to_micros(dt)) + self.write_int(time_to_micros(dt)) diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index a1978c6ca3e7..74df8a61d202 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -28,7 +28,7 @@ from abc import abstractmethod from dataclasses import dataclass from dataclasses import field as dataclassfield -from datetime import datetime, time +from datetime import date, datetime, time from decimal import Decimal from typing import ( Any, @@ -43,6 +43,7 @@ from pyiceberg.avro.decoder import BinaryDecoder from pyiceberg.typedef import StructProtocol from pyiceberg.types import StructType +from pyiceberg.utils.datetime import days_to_date from pyiceberg.utils.singleton import Singleton @@ -152,8 +153,8 @@ def skip(self, decoder: BinaryDecoder) -> None: class DateReader(Reader): - def read(self, decoder: BinaryDecoder) -> int: - return decoder.read_int() + def read(self, decoder: BinaryDecoder) -> date: + return days_to_date(decoder.read_int()) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() @@ -193,10 +194,10 @@ def skip(self, decoder: BinaryDecoder) -> None: class UUIDReader(Reader): def read(self, decoder: BinaryDecoder) -> UUID: - return decoder.read_uuid_from_fixed() + return decoder.read_uuid() def skip(self, decoder: BinaryDecoder) -> None: - decoder.skip(16) + decoder.skip_utf8() @dataclass(frozen=True) diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index 10a589715dcc..dbf36f841601 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -39,6 +39,7 @@ from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.types import StructType +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros from pyiceberg.utils.singleton import Singleton @@ -81,22 +82,25 @@ def write(self, encoder: BinaryEncoder, val: float) -> None: class DateWriter(Writer): def write(self, encoder: BinaryEncoder, val: Any) -> None: - encoder.write_date_int(val) + encoder.write_int(date_to_days(val)) class TimeWriter(Writer): def write(self, encoder: BinaryEncoder, val: time) -> None: - encoder.write_time_micros_long(val) + encoder.write_time_micros(val) class TimestampWriter(Writer): def write(self, encoder: BinaryEncoder, val: datetime) -> None: - encoder.write_timestamp_micros_long(val) + if val.tzinfo is not None: + raise ValueError(f"Timestamp should not have a timezone, but has: {val.tzinfo}") + + encoder.write_int(datetime_to_micros(val)) class TimestamptzWriter(Writer): def write(self, encoder: BinaryEncoder, val: datetime) -> None: - encoder.write_timestamp_micros_long(val) + encoder.write_int(datetime_to_micros(val)) class StringWriter(Writer): @@ -106,12 +110,9 @@ def write(self, encoder: BinaryEncoder, val: Any) -> None: class UUIDWriter(Writer): def write(self, encoder: BinaryEncoder, val: UUID) -> None: - uuid_bytes = val.bytes - - if len(uuid_bytes) != 16: - raise ValueError(f"Expected UUID to be 16 bytes, got: {len(uuid_bytes)}") - - encoder.write_bytes_fixed(uuid_bytes) + if len(val.bytes) != 16: + raise ValueError(f"Expected UUID to be 16 bytes, got: {len(val.bytes)}") + encoder.write(val.bytes) @dataclass(frozen=True) @@ -119,6 +120,8 @@ class FixedWriter(Writer): _len: int = dataclassfield() def write(self, encoder: BinaryEncoder, val: bytes) -> None: + if len(val) != self._len: + raise ValueError(f"Expected {self._len} bytes, got {len(val)}") encoder.write(val) def __len__(self) -> int: diff --git a/python/pyiceberg/expressions/literals.py b/python/pyiceberg/expressions/literals.py index 1e47e87608a9..f89d0c8331b6 100644 --- a/python/pyiceberg/expressions/literals.py +++ b/python/pyiceberg/expressions/literals.py @@ -50,7 +50,7 @@ from pyiceberg.utils.datetime import ( date_str_to_days, micros_to_days, - time_to_micros, + time_str_to_micros, timestamp_to_micros, timestamptz_to_micros, ) @@ -558,7 +558,7 @@ def _(self, type_var: DateType) -> Literal[int]: @to.register(TimeType) def _(self, type_var: TimeType) -> Literal[int]: try: - return TimeLiteral(time_to_micros(self.value)) + return TimeLiteral(time_str_to_micros(self.value)) except (TypeError, ValueError) as e: raise ValueError(f"Could not convert {self.value} into a {type_var}") from e diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index 71a62ca45607..1db33a436e17 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -61,14 +61,13 @@ def days_to_date(days: int) -> date: return EPOCH_DATE + timedelta(days) -def time_to_micros(time_str: str) -> int: +def time_str_to_micros(time_str: str) -> int: """Converts an ISO-8601 formatted time to microseconds from midnight.""" - t = time.fromisoformat(time_str) - return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond + return time_to_micros(time.fromisoformat(time_str)) -def time_object_to_micros(t: time) -> int: - """Converts an datetime.time object to microseconds from midnight.""" +def time_to_micros(t: time) -> int: + """Converts a datetime.time object to microseconds from midnight.""" return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) diff --git a/python/pyiceberg/utils/schema_conversion.py b/python/pyiceberg/utils/schema_conversion.py index 4f46668866da..faa593baa3f4 100644 --- a/python/pyiceberg/utils/schema_conversion.py +++ b/python/pyiceberg/utils/schema_conversion.py @@ -64,11 +64,11 @@ LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = { ("date", "int"): DateType(), - ("time-millis", "int"): TimeType(), + ("time-micros", "long"): TimeType(), ("timestamp-millis", "long"): TimestampType(), ("time-micros", "int"): TimeType(), - ("timestamp-micros", "long"): TimestampType(), ("uuid", "fixed"): UUIDType(), + ("uuid", "string"): UUIDType(), } AvroType = Union[str, Any] @@ -369,6 +369,11 @@ def _convert_logical_type(self, avro_logical_type: Dict[str, Any]) -> IcebergTyp return self._convert_logical_decimal_type(avro_logical_type) elif logical_type == "map": return self._convert_logical_map_type(avro_logical_type) + elif logical_type == "timestamp-micros": + if avro_logical_type.get("adjust-to-utc", False) is True: + return TimestamptzType() + else: + return TimestampType() elif (logical_type, physical_type) in LOGICAL_FIELD_TYPE_MAPPING: return LOGICAL_FIELD_TYPE_MAPPING[(logical_type, physical_type)] else: @@ -542,6 +547,8 @@ def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) - return { "type": "map", "values": value_result, + "key-id": self.last_map_key_field_id, + "value-id": self.last_map_value_field_id, } else: # Creates a logical map that's a list of schema's @@ -592,13 +599,13 @@ def visit_timestamp(self, timestamp_type: TimestampType) -> AvroType: def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> AvroType: # Iceberg only supports micro's - return {"type": "long", "logicalType": "timestamp-micros"} + return {"type": "long", "logicalType": "timestamp-micros", "adjust-to-utc": True} def visit_string(self, string_type: StringType) -> AvroType: return "string" def visit_uuid(self, uuid_type: UUIDType) -> AvroType: - return {"type": "string", "logicalType": "uuid"} + return {"type": "fixed", "size": "16", "logicalType": "uuid"} def visit_binary(self, binary_type: BinaryType) -> AvroType: return "bytes" diff --git a/python/tests/avro/test_decoder.py b/python/tests/avro/test_decoder.py index 35405c8badcd..3d8330e43343 100644 --- a/python/tests/avro/test_decoder.py +++ b/python/tests/avro/test_decoder.py @@ -166,7 +166,7 @@ def test_skip_double() -> None: def test_read_uuid_from_fixed() -> None: mis = MemoryInputStream(b"\x12\x34\x56\x78" * 4) decoder = BinaryDecoder(mis) - assert decoder.read_uuid_from_fixed() == UUID("{12345678-1234-5678-1234-567812345678}") + assert decoder.read_uuid() == UUID("{12345678-1234-5678-1234-567812345678}") def test_read_time_millis() -> None: diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index 4646e65e6e61..3ce0bcfb7907 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -19,6 +19,7 @@ import datetime import io import struct +import uuid from decimal import Decimal from pyiceberg.avro.encoder import BinaryEncoder @@ -120,6 +121,8 @@ def test_write_decimal_fixed() -> None: encoder.write_decimal_fixed(_input, 8) + buf = output.getbuffer() + assert len(buf) == 8 assert output.getbuffer() == b"\x00\x00\x00\x49\x25\x59\xf6\x4f" @@ -156,52 +159,24 @@ def test_write_utf8() -> None: assert output.getbuffer() == b"".join([b"\x7a", bin_input]) -def test_write_date_int() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = datetime.date(1970, 1, 2) - encoder.write_date_int(_input) - - assert output.getbuffer() == b"\x02" - - -def test_write_time_millis_int() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = datetime.time(1, 2, 3, 456000) - encoder.write_time_millis_int(_input) - - assert output.getbuffer() == b"\x80\xc3\xc6\x03" - - def test_write_time_micros_long() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) _input = datetime.time(1, 2, 3, 456000) - encoder.write_time_micros_long(_input) + encoder.write_time_micros(_input) assert output.getbuffer() == b"\x80\xb8\xfb\xde\x1b" -def test_write_timestamp_millis_long() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - encoder.write_timestamp_millis_long(_input) - - assert output.getbuffer() == b"\xf0\xdb\xcc\xad\xad\x61" - - -def test_write_timestamp_micros_long() -> None: +def test_write_uuid() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - encoder.write_timestamp_micros_long(_input) + _input = uuid.UUID("{12345678-1234-5678-1234-567812345678}") + encoder.write_uuid(_input) - assert output.getbuffer() == b"\x80\xe3\xad\x9f\xac\xca\xf8\x05" + buf = output.getbuffer() + assert len(buf) == 16 + assert buf.tobytes() == b"\x124Vx\x124Vx\x124Vx\x124Vx" diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index 53d0216ab07c..e651a1582039 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -14,11 +14,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import date, datetime, time from enum import Enum from tempfile import TemporaryDirectory from typing import Any +from uuid import UUID import pytest +from _decimal import Decimal from fastavro import reader, writer import pyiceberg.avro.file as avro @@ -33,7 +36,24 @@ ManifestEntry, ManifestEntryStatus, ) +from pyiceberg.schema import Schema from pyiceberg.typedef import Record +from pyiceberg.types import ( + BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, + IntegerType, + LongType, + NestedField, + StringType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, +) from pyiceberg.utils.schema_conversion import AvroSchemaConversion @@ -192,3 +212,76 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg() -> None: avro_entry = next(it) assert entry == avro_entry + + +@pytest.mark.parametrize("is_required", [True, False]) +def test_all_primitive_types(is_required: bool) -> None: + all_primitives_schema = Schema( + NestedField(field_id=1, name="field_fixed", field_type=FixedType(16), required=is_required), + NestedField(field_id=2, name="field_decimal", field_type=DecimalType(6, 2), required=is_required), + NestedField(field_id=3, name="field_bool", field_type=BooleanType(), required=is_required), + NestedField(field_id=4, name="field_int", field_type=IntegerType(), required=True), + NestedField(field_id=5, name="field_long", field_type=LongType(), required=is_required), + NestedField(field_id=6, name="field_float", field_type=FloatType(), required=is_required), + NestedField(field_id=7, name="field_double", field_type=DoubleType(), required=is_required), + NestedField(field_id=8, name="field_date", field_type=DateType(), required=is_required), + NestedField(field_id=9, name="field_time", field_type=TimeType(), required=is_required), + NestedField(field_id=10, name="field_timestamp", field_type=TimestampType(), required=is_required), + NestedField(field_id=11, name="field_timestamptz", field_type=TimestamptzType(), required=is_required), + NestedField(field_id=12, name="field_string", field_type=StringType(), required=is_required), + NestedField(field_id=13, name="field_uuid", field_type=UUIDType(), required=is_required), + schema_id=1, + ) + + class AllPrimitivesRecord(Record): + field_fixed: bytes + field_decimal: Decimal + field_bool: bool + field_int: int + field_long: int + field_float: float + field_double: float + field_date: date + field_time: time + field_timestamp: datetime + field_timestamptz: datetime + field_string: str + field_uuid: UUID + + def __init__(self, *data: Any, **named_data: Any) -> None: + super().__init__(*data, **{"struct": all_primitives_schema.as_struct(), **named_data}) + + record = AllPrimitivesRecord( + b"\x124Vx\x124Vx\x124Vx\x124Vx", + Decimal("123.45"), + True, + 123, + 429496729622, + 123.22000122070312, + 429496729622.314, + date(2022, 3, 1), + time(19, 25, 22), + datetime.fromisoformat("2023-03-01T00:19:25.000000"), + datetime.fromisoformat("2023-03-01T00:19:25.000000+00:00"), + "this is a sentence", + UUID("12345678-1234-5678-1234-567812345678"), + ) + + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/all_primitives.avro" + # write to disk + with avro.AvroOutputFile[AllPrimitivesRecord]( + PyArrowFileIO().new_output(tmp_avro_file), all_primitives_schema, "all_primitives_schema" + ) as out: + out.write_block([record]) + + # read from disk + with avro.AvroFile[AllPrimitivesRecord]( + PyArrowFileIO().new_input(tmp_avro_file), + all_primitives_schema, + {-1: AllPrimitivesRecord}, + ) as avro_reader: + it = iter(avro_reader) + avro_entry = next(it) + + assert record == avro_entry diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py index 201dbbbd6d93..5c4980d8ac42 100644 --- a/python/tests/test_transforms.py +++ b/python/tests/test_transforms.py @@ -88,7 +88,7 @@ from pyiceberg.utils.datetime import ( date_str_to_days, date_to_days, - time_to_micros, + time_str_to_micros, timestamp_to_micros, timestamptz_to_micros, ) @@ -102,7 +102,7 @@ (34, LongType(), 2017239379), (date_to_days(date(2017, 11, 16)), DateType(), -653330422), (date_str_to_days("2017-11-16"), DateType(), -653330422), - (time_to_micros("22:31:08"), TimeType(), -662762989), + (time_str_to_micros("22:31:08"), TimeType(), -662762989), ( timestamp_to_micros("2017-11-16T22:31:08"), TimestampType(), From 2c563484aca3e5752a0a913fe0260c52b6ba52c4 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 14 Jul 2023 14:00:54 +0200 Subject: [PATCH 02/13] Fix the tests --- python/pyiceberg/utils/datetime.py | 2 +- python/tests/utils/test_manifest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index 1db33a436e17..c0ed0afe7e9e 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -96,7 +96,7 @@ def datetime_to_millis(dt: datetime) -> int: delta = dt - EPOCH_TIMESTAMPTZ else: delta = dt - EPOCH_TIMESTAMP - return (delta.days * 86400 + delta.seconds) * 1_000 + delta.microseconds // 1_000 + return int((delta.total_seconds() * 1_000) + (delta.microseconds // 1_000)) def timestamptz_to_micros(timestamptz_str: str) -> int: diff --git a/python/tests/utils/test_manifest.py b/python/tests/utils/test_manifest.py index 76a4a8a2b4d9..40e8e1d0c93b 100644 --- a/python/tests/utils/test_manifest.py +++ b/python/tests/utils/test_manifest.py @@ -51,7 +51,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" ) assert data_file.file_format == FileFormat.PARQUET - assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]" + assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=datetime.date(1975, 4, 10)]" assert data_file.record_count == 19513 assert data_file.file_size_in_bytes == 388872 assert data_file.column_sizes == { From 8a8727ca71403575c1a4e64738aec2d536fa97e5 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 14 Jul 2023 18:45:52 +0200 Subject: [PATCH 03/13] WIP --- python/pyiceberg/avro/encoder.py | 4 ---- python/pyiceberg/avro/file.py | 2 +- python/tests/avro/test_encoder.py | 11 ----------- 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index 080b50361b7c..80db4dca31aa 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -81,10 +81,6 @@ def write_bytes(self, b: bytes) -> None: self.write_int(len(b)) self.write(b) - def write_bytes_fixed(self, b: bytes) -> None: - """Writes fixed number of bytes.""" - self.write(b) - def write_utf8(self, s: str) -> None: """A string is encoded as a long followed by that many bytes of UTF-8 encoded character data.""" self.write_bytes(s.encode("utf-8")) diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index 10f7ef7d7de2..932b671008bf 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -270,4 +270,4 @@ def write_block(self, objects: List[D]) -> None: self.encoder.write_int(len(objects)) self.encoder.write_int(len(block_content)) self.encoder.write(block_content) - self.encoder.write_bytes_fixed(self.sync_bytes) + self.encoder.write(self.sync_bytes) diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index 3ce0bcfb7907..f5a85e993957 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -137,17 +137,6 @@ def test_write_bytes() -> None: assert output.getbuffer() == b"".join([b"\x06", _input]) -def test_write_bytes_fixed() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = b"\x12\x34\x56" - - encoder.write_bytes_fixed(_input) - - assert output.getbuffer() == _input - - def test_write_utf8() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) From 19aa56d975cfb171f8d93bc907bd2f276fed215c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 31 Jul 2023 16:14:32 +0200 Subject: [PATCH 04/13] Update --- python/pyiceberg/avro/decoder.py | 5 ----- python/pyiceberg/avro/encoder.py | 2 +- python/pyiceberg/avro/reader.py | 4 ++-- python/pyiceberg/utils/datetime.py | 4 ++-- python/tests/avro/test_decoder.py | 8 -------- 5 files changed, 5 insertions(+), 18 deletions(-) diff --git a/python/pyiceberg/avro/decoder.py b/python/pyiceberg/avro/decoder.py index 100b9ccdf21b..95116cf61901 100644 --- a/python/pyiceberg/avro/decoder.py +++ b/python/pyiceberg/avro/decoder.py @@ -19,7 +19,6 @@ from datetime import datetime, time from io import SEEK_CUR from typing import Dict, List -from uuid import UUID from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import InputStream @@ -138,10 +137,6 @@ def read_utf8(self) -> str: """ return self.read_bytes().decode("utf-8") - def read_uuid(self) -> UUID: - """Reads a UUID as a fixed[16].""" - return UUID(bytes=self.read(16)) - def read_time_millis(self) -> time: """Reads a milliseconds granularity time from the stream. diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index 80db4dca31aa..a582f425c117 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -86,7 +86,7 @@ def write_utf8(self, s: str) -> None: self.write_bytes(s.encode("utf-8")) def write_uuid(self, uuid: UUID) -> None: - """Write UUID as a string. + """Write UUID as a fixed[16]. The uuid logical type represents a random generated universally unique identifier (UUID). An uuid logical type annotates an Avro string. The string has to conform with RFC-4122. diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index 06759f240802..9885b6eee897 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -195,10 +195,10 @@ def skip(self, decoder: BinaryDecoder) -> None: class UUIDReader(Reader): def read(self, decoder: BinaryDecoder) -> UUID: - return decoder.read_uuid() + return UUID(bytes=decoder.read(16)) def skip(self, decoder: BinaryDecoder) -> None: - decoder.skip_utf8() + decoder.skip(16) @dataclass(frozen=True) diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index c0ed0afe7e9e..f4a2515dd35c 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -68,7 +68,7 @@ def time_str_to_micros(time_str: str) -> int: def time_to_micros(t: time) -> int: """Converts a datetime.time object to microseconds from midnight.""" - return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) + return int((((t.hour * 60 * 60) + (t.minute * 60) + t.second) * 1e6) + t.microsecond) def datetime_to_micros(dt: datetime) -> int: @@ -96,7 +96,7 @@ def datetime_to_millis(dt: datetime) -> int: delta = dt - EPOCH_TIMESTAMPTZ else: delta = dt - EPOCH_TIMESTAMP - return int((delta.total_seconds() * 1_000) + (delta.microseconds // 1_000)) + return int(delta.total_seconds() * 1_000) def timestamptz_to_micros(timestamptz_str: str) -> int: diff --git a/python/tests/avro/test_decoder.py b/python/tests/avro/test_decoder.py index 6ecaa4264f6b..acbc4c067d14 100644 --- a/python/tests/avro/test_decoder.py +++ b/python/tests/avro/test_decoder.py @@ -22,7 +22,6 @@ from io import SEEK_SET from types import TracebackType from typing import Optional, Type -from uuid import UUID import pytest @@ -180,13 +179,6 @@ def test_skip_double(decoder_class: Type[BinaryDecoder]) -> None: assert decoder.tell() == 8 -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_uuid_from_fixed(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\x12\x34\x56\x78" * 4) - decoder = decoder_class(mis) - assert decoder.read_uuid() == UUID("{12345678-1234-5678-1234-567812345678}") - - @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) def test_read_time_millis(decoder_class: Type[BinaryDecoder]) -> None: mis = io.BytesIO(b"\xBC\x7D") From 5b43ea9ff959350ba0f4f25c81b1db22d0babc8b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 1 Aug 2023 11:04:11 +0200 Subject: [PATCH 05/13] Update python/pyiceberg/utils/datetime.py --- python/pyiceberg/utils/datetime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index f4a2515dd35c..660569a5ec62 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -68,7 +68,7 @@ def time_str_to_micros(time_str: str) -> int: def time_to_micros(t: time) -> int: """Converts a datetime.time object to microseconds from midnight.""" - return int((((t.hour * 60 * 60) + (t.minute * 60) + t.second) * 1e6) + t.microsecond) + return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond def datetime_to_micros(dt: datetime) -> int: From 6ba076170b44ab2b3865c3230356f70757e0b99b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 1 Aug 2023 11:04:22 +0200 Subject: [PATCH 06/13] Update python/pyiceberg/utils/datetime.py --- python/pyiceberg/utils/datetime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index 660569a5ec62..f43d0fa84b07 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -96,7 +96,7 @@ def datetime_to_millis(dt: datetime) -> int: delta = dt - EPOCH_TIMESTAMPTZ else: delta = dt - EPOCH_TIMESTAMP - return int(delta.total_seconds() * 1_000) + return (delta.days * 86400 + delta.seconds) * 1_000 + delta.microseconds // 1_000 def timestamptz_to_micros(timestamptz_str: str) -> int: From 1052661669c545847271a540f444f40d42a91574 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 1 Aug 2023 11:06:31 +0200 Subject: [PATCH 07/13] Update python/tests/avro/test_encoder.py --- python/tests/avro/test_encoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index f5a85e993957..fdddedb40129 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -163,7 +163,7 @@ def test_write_uuid() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - _input = uuid.UUID("{12345678-1234-5678-1234-567812345678}") + _input = uuid.UUID("12345678-1234-5678-1234-567812345678") encoder.write_uuid(_input) buf = output.getbuffer() From 3fba6bd6c339d87d38ff2bf27c1c6c871ea660e5 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 1 Aug 2023 11:10:51 +0200 Subject: [PATCH 08/13] WIP --- python/dev/provision.py | 44 ++++++++++++++ python/pyiceberg/avro/decoder.py | 64 +-------------------- python/pyiceberg/avro/encoder.py | 12 +--- python/pyiceberg/avro/reader.py | 58 +++++++++++++++---- python/pyiceberg/avro/writer.py | 27 ++++----- python/pyiceberg/utils/schema_conversion.py | 2 +- python/tests/avro/test_decoder.py | 57 ------------------ python/tests/avro/test_encoder.py | 12 ---- python/tests/avro/test_file.py | 11 ++-- python/tests/test_integration.py | 16 ++++++ python/tests/utils/test_manifest.py | 2 +- 11 files changed, 131 insertions(+), 174 deletions(-) diff --git a/python/dev/provision.py b/python/dev/provision.py index 73ec34fdc109..2f133b103ceb 100644 --- a/python/dev/provision.py +++ b/python/dev/provision.py @@ -185,3 +185,47 @@ all_types_dataframe.writeTo("default.test_all_types").tableProperty("format-version", "2").partitionedBy( "intCol" ).createOrReplace() + +for table_name, partition in [ + ("test_partitioned_by_identity", "ts"), + ("test_partitioned_by_years", "years(dt)"), + ("test_partitioned_by_months", "months(dt)"), + ("test_partitioned_by_days", "days(ts)"), + ("test_partitioned_by_hours", "hours(ts)"), + ("test_partitioned_by_truncate", "truncate(1, letter)"), + ("test_partitioned_by_bucket", "bucket(16, number)"), +]: + spark.sql( + f""" + CREATE OR REPLACE TABLE default.{table_name} ( + dt date, + ts timestamp, + number integer, + letter string + ) + USING iceberg; + """ + ) + + # Partitioning is not really needed, but there is a bug: + # https://github.com/apache/iceberg/pull/7685 + spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD {partition} AS dt_years") + + spark.sql( + f""" + INSERT INTO default.{table_name} + VALUES + (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), + (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), + (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 3, 'c'), + (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 4, 'd'), + (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 5, 'e'), + (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 6, 'f'), + (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 7, 'g'), + (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 8, 'h'), + (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 9, 'i'), + (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 10, 'j'), + (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 11, 'k'), + (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l'); + """ + ) diff --git a/python/pyiceberg/avro/decoder.py b/python/pyiceberg/avro/decoder.py index 95116cf61901..0b39340b0e93 100644 --- a/python/pyiceberg/avro/decoder.py +++ b/python/pyiceberg/avro/decoder.py @@ -14,20 +14,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import decimal from abc import ABC, abstractmethod -from datetime import datetime, time from io import SEEK_CUR from typing import Dict, List from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import InputStream -from pyiceberg.utils.datetime import micros_to_time, micros_to_timestamp, micros_to_timestamptz -from pyiceberg.utils.decimal import unscaled_to_decimal class BinaryDecoder(ABC): - """Read leaf values.""" + """Decodes bytes into Python physical primitives.""" @abstractmethod def __init__(self, input_stream: InputStream) -> None: @@ -105,73 +101,19 @@ def read_double(self) -> float: """ return float(STRUCT_DOUBLE.unpack(self.read(8))[0]) - def read_decimal_from_bytes(self, precision: int, scale: int) -> decimal.Decimal: - """Reads a value from the stream as a decimal. - - Decimal bytes are decoded as signed short, int or long depending on the - size of bytes. - """ - size = self.read_int() - return self.read_decimal_from_fixed(precision, scale, size) - - def read_decimal_from_fixed(self, _: int, scale: int, size: int) -> decimal.Decimal: - """Reads a value from the stream as a decimal. - - Decimal is encoded as fixed. Fixed instances are encoded using the - number of bytes declared in the schema. - """ - data = self.read(size) - unscaled_datum = int.from_bytes(data, byteorder="big", signed=True) - return unscaled_to_decimal(unscaled_datum, scale) - def read_bytes(self) -> bytes: """Bytes are encoded as a long followed by that many bytes of data.""" num_bytes = self.read_int() return self.read(num_bytes) if num_bytes > 0 else b"" def read_utf8(self) -> str: - """Reads a utf-8 encoded string from the stream. + """Reads an utf-8 encoded string from the stream. A string is encoded as a long followed by that many bytes of UTF-8 encoded character data. """ return self.read_bytes().decode("utf-8") - def read_time_millis(self) -> time: - """Reads a milliseconds granularity time from the stream. - - Int is decoded as python time object which represents - the number of milliseconds after midnight, 00:00:00.000. - """ - millis = self.read_int() - return micros_to_time(millis * 1000) - - def read_time_micros(self) -> time: - """Reads a microseconds granularity time from the stream. - - Long is decoded as python time object which represents - the number of microseconds after midnight, 00:00:00.000000. - """ - return micros_to_time(self.read_int()) - - def read_timestamp_micros(self) -> datetime: - """Reads a microsecond granularity timestamp from the stream. - - Long is decoded as python datetime object which represents - the number of microseconds from the unix epoch, 1 January 1970. - """ - return micros_to_timestamp(self.read_int()) - - def read_timestamptz_micros(self) -> datetime: - """Reads a microsecond granularity timestamptz from the stream. - - Long is decoded as python datetime object which represents - the number of microseconds from the unix epoch, 1 January 1970. - - Adjusted to UTC. - """ - return micros_to_timestamptz(self.read_int()) - def skip_boolean(self) -> None: self.skip(1) @@ -194,7 +136,7 @@ def skip_utf8(self) -> None: class StreamingBinaryDecoder(BinaryDecoder): - """Read leaf values.""" + """Decodes bytes into Python physical primitives.""" __slots__ = "_input_stream" _input_stream: InputStream diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index a582f425c117..578aed6be9d9 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -15,17 +15,15 @@ # specific language governing permissions and limitations # under the License. import decimal -from datetime import time from uuid import UUID from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import OutputStream -from pyiceberg.utils.datetime import time_to_micros from pyiceberg.utils.decimal import decimal_to_unscaled class BinaryEncoder: - """Write leaf values.""" + """Encodes Python physical types into bytes.""" _output_stream: OutputStream @@ -94,11 +92,3 @@ def write_uuid(self, uuid: UUID) -> None: if len(uuid.bytes) != 16: raise ValueError(f"Expected UUID to have 16 bytes, got: len({uuid.bytes!r})") return self.write(uuid.bytes) - - def write_time_micros(self, dt: time) -> None: - """ - Encode python time object as long. - - It stores the number of microseconds from midnight, 00:00:00.000000 - """ - self.write_int(time_to_micros(dt)) diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index 9885b6eee897..0c5ba2014738 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -43,7 +43,7 @@ from pyiceberg.avro.decoder import BinaryDecoder from pyiceberg.typedef import StructProtocol from pyiceberg.types import StructType -from pyiceberg.utils.datetime import days_to_date +from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.singleton import Singleton @@ -154,32 +154,56 @@ def skip(self, decoder: BinaryDecoder) -> None: class DateReader(Reader): - def read(self, decoder: BinaryDecoder) -> date: - return days_to_date(decoder.read_int()) + """Reads a day granularity date from the stream. + + The number of days from 1 January 1970. + """ + def read(self, decoder: BinaryDecoder) -> int: + return decoder.read_int() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() class TimeReader(Reader): - def read(self, decoder: BinaryDecoder) -> time: - return decoder.read_time_micros() + """Reads a microsecond granularity timestamp from the stream. + + Long is decoded as an integer which represents + the number of microseconds from the unix epoch, 1 January 1970. + """ + + def read(self, decoder: BinaryDecoder) -> int: + return decoder.read_int() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() class TimestampReader(Reader): - def read(self, decoder: BinaryDecoder) -> datetime: - return decoder.read_timestamp_micros() + """Reads a microsecond granularity timestamp from the stream. + + Long is decoded as python integer which represents + the number of microseconds from the unix epoch, 1 January 1970. + """ + + def read(self, decoder: BinaryDecoder) -> int: + return decoder.read_int() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() class TimestamptzReader(Reader): - def read(self, decoder: BinaryDecoder) -> datetime: - return decoder.read_timestamptz_micros() + """Reads a microsecond granularity timestamptz from the stream. + + Long is decoded as python integer which represents + the number of microseconds from the unix epoch, 1 January 1970. + + Adjusted to UTC. + """ + + def read(self, decoder: BinaryDecoder) -> int: + return decoder.read_int() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() @@ -221,6 +245,12 @@ def __repr__(self) -> str: class BinaryReader(Reader): + """Read a binary value. + + First reads an integer, to get the length of the binary value, + then reads the binary field itself. + """ + def read(self, decoder: BinaryDecoder) -> bytes: return decoder.read_bytes() @@ -230,11 +260,19 @@ def skip(self, decoder: BinaryDecoder) -> None: @dataclass(frozen=True) class DecimalReader(Reader): + """Reads a value as a decimal. + + Decimal bytes are decoded as signed short, int or long depending on the + size of bytes. + """ + precision: int = dataclassfield() scale: int = dataclassfield() def read(self, decoder: BinaryDecoder) -> Decimal: - return decoder.read_decimal_from_bytes(self.precision, self.scale) + data = decoder.read(decoder.read_int()) + unscaled_datum = int.from_bytes(data, byteorder="big", signed=True) + return unscaled_to_decimal(unscaled_datum, self.scale) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index fc0d4a9fb307..60a56d3cc097 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -25,18 +25,18 @@ from abc import abstractmethod from dataclasses import dataclass from dataclasses import field as dataclassfield -from datetime import datetime, time +from datetime import datetime, time, date from typing import ( Any, Dict, List, - Tuple, + Tuple, Union, ) from uuid import UUID from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.types import StructType -from pyiceberg.utils.datetime import date_to_days, datetime_to_micros +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros from pyiceberg.utils.singleton import Singleton @@ -78,26 +78,23 @@ def write(self, encoder: BinaryEncoder, val: float) -> None: class DateWriter(Writer): - def write(self, encoder: BinaryEncoder, val: Any) -> None: - encoder.write_int(date_to_days(val)) + def write(self, encoder: BinaryEncoder, val: int) -> None: + encoder.write_int(val) class TimeWriter(Writer): - def write(self, encoder: BinaryEncoder, val: time) -> None: - encoder.write_time_micros(val) + def write(self, encoder: BinaryEncoder, val: int) -> None: + encoder.write_int(val) class TimestampWriter(Writer): - def write(self, encoder: BinaryEncoder, val: datetime) -> None: - if val.tzinfo is not None: - raise ValueError(f"Timestamp should not have a timezone, but has: {val.tzinfo}") - - encoder.write_int(datetime_to_micros(val)) + def write(self, encoder: BinaryEncoder, val: int) -> None: + encoder.write_int(val) class TimestamptzWriter(Writer): - def write(self, encoder: BinaryEncoder, val: datetime) -> None: - encoder.write_int(datetime_to_micros(val)) + def write(self, encoder: BinaryEncoder, val: int) -> None: + encoder.write_int(val) class StringWriter(Writer): @@ -107,8 +104,6 @@ def write(self, encoder: BinaryEncoder, val: Any) -> None: class UUIDWriter(Writer): def write(self, encoder: BinaryEncoder, val: UUID) -> None: - if len(val.bytes) != 16: - raise ValueError(f"Expected UUID to be 16 bytes, got: {len(val.bytes)}") encoder.write(val.bytes) diff --git a/python/pyiceberg/utils/schema_conversion.py b/python/pyiceberg/utils/schema_conversion.py index faa593baa3f4..ff947d1f533e 100644 --- a/python/pyiceberg/utils/schema_conversion.py +++ b/python/pyiceberg/utils/schema_conversion.py @@ -595,7 +595,7 @@ def visit_time(self, time_type: TimeType) -> AvroType: def visit_timestamp(self, timestamp_type: TimestampType) -> AvroType: # Iceberg only supports micro's - return {"type": "long", "logicalType": "timestamp-micros"} + return {"type": "long", "logicalType": "timestamp-micros", "adjust-to-utc": False} def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> AvroType: # Iceberg only supports micro's diff --git a/python/tests/avro/test_decoder.py b/python/tests/avro/test_decoder.py index acbc4c067d14..c51f1090fc37 100644 --- a/python/tests/avro/test_decoder.py +++ b/python/tests/avro/test_decoder.py @@ -17,8 +17,6 @@ from __future__ import annotations import io -from datetime import datetime, timezone -from decimal import Decimal from io import SEEK_SET from types import TracebackType from typing import Optional, Type @@ -33,15 +31,6 @@ AVAILABLE_DECODERS = [StreamingBinaryDecoder, InMemoryBinaryDecoder] -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_decimal_from_fixed(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\x00\x00\x00\x05\x6A\x48\x1C\xFB\x2C\x7C\x50\x00") - decoder = decoder_class(mis) - actual = decoder.read_decimal_from_fixed(28, 15, 12) - expected = Decimal("99892.123400000000000") - assert actual == expected - - @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) def test_read_boolean_true(decoder_class: Type[BinaryDecoder]) -> None: mis = io.BytesIO(b"\x01") @@ -81,24 +70,6 @@ def test_skip_int(decoder_class: Type[BinaryDecoder]) -> None: assert decoder.tell() == 1 -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_decimal(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\x18\x00\x00\x00\x05\x6A\x48\x1C\xFB\x2C\x7C\x50\x00") - decoder = decoder_class(mis) - actual = decoder.read_decimal_from_bytes(28, 15) - expected = Decimal("99892.123400000000000") - assert actual == expected - - -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_decimal_from_fixed_big(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\x0E\xC2\x02\xE9\x06\x16\x33\x49\x77\x67\xA8\x00") - decoder = decoder_class(mis) - actual = decoder.read_decimal_from_fixed(28, 15, 12) - expected = Decimal("4567335489766.998340000000000") - assert actual == expected - - @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) def test_read_negative_bytes(decoder_class: Type[BinaryDecoder]) -> None: mis = io.BytesIO(b"") @@ -179,34 +150,6 @@ def test_skip_double(decoder_class: Type[BinaryDecoder]) -> None: assert decoder.tell() == 8 -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_time_millis(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\xBC\x7D") - decoder = decoder_class(mis) - assert decoder.read_time_millis().microsecond == 30000 - - -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_time_micros(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\xBC\x7D") - decoder = decoder_class(mis) - assert decoder.read_time_micros().microsecond == 8030 - - -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_timestamp_micros(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\xBC\x7D") - decoder = decoder_class(mis) - assert decoder.read_timestamp_micros() == datetime(1970, 1, 1, 0, 0, 0, 8030) - - -@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_timestamptz_micros(decoder_class: Type[BinaryDecoder]) -> None: - mis = io.BytesIO(b"\xBC\x7D") - decoder = decoder_class(mis) - assert decoder.read_timestamptz_micros() == datetime(1970, 1, 1, 0, 0, 0, 8030, tzinfo=timezone.utc) - - @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) def test_read_bytes(decoder_class: Type[BinaryDecoder]) -> None: mis = io.BytesIO(b"\x08\x01\x02\x03\x04") diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index f5a85e993957..e2ed941a7c6e 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import datetime import io import struct import uuid @@ -148,17 +147,6 @@ def test_write_utf8() -> None: assert output.getbuffer() == b"".join([b"\x7a", bin_input]) -def test_write_time_micros_long() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = datetime.time(1, 2, 3, 456000) - - encoder.write_time_micros(_input) - - assert output.getbuffer() == b"\x80\xb8\xfb\xde\x1b" - - def test_write_uuid() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index 944a6f69f721..27387704920a 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -260,10 +260,10 @@ def __init__(self, *data: Any, **named_data: Any) -> None: 429496729622, 123.22000122070312, 429496729622.314, - date(2022, 3, 1), - time(19, 25, 22), - datetime.fromisoformat("2023-03-01T00:19:25.000000"), - datetime.fromisoformat("2023-03-01T00:19:25.000000+00:00"), + 19052, + 69922000000, + 1677629965000000, + 1677629965000000, "this is a sentence", UUID("12345678-1234-5678-1234-567812345678"), ) @@ -285,4 +285,5 @@ def __init__(self, *data: Any, **named_data: Any) -> None: it = iter(avro_reader) avro_entry = next(it) - assert record == avro_entry + for idx, field in enumerate(all_primitives_schema.as_struct()): + assert record[idx] == avro_entry[idx], f"Invalid {field}" diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index 36099208190b..4fdc8d901bf3 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -299,3 +299,19 @@ def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No # Testing the slicing of indices arrow_table = test_positional_mor_double_deletes.scan(limit=8).to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10] + + +@pytest.mark.integration +def test_partitioned_tables(catalog: Catalog) -> None: + for table_name, predicate in [ + ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"), + ('test_partitioned_by_years', "dt >= '2023-03-05'"), + ('test_partitioned_by_months', "dt >= '2023-03-05'"), + ("test_partitioned_by_days", "ts >= '2023-03-05T00:00:00+00:00'"), + ("test_partitioned_by_hours", "ts >= '2023-03-05T00:00:00+00:00'"), + ("test_partitioned_by_truncate", "letter >= 'e'"), + ("test_partitioned_by_bucket", "number >= '5'"), + ]: + table = catalog.load_table(f"default.{table_name}") + arrow_table = table.scan(selected_fields=("number",), row_filter=predicate).to_arrow() + assert set(arrow_table["number"].to_pylist()) == {5, 6, 7, 8, 9, 10, 11, 12}, f"Table {table_name}, predicate {predicate}" diff --git a/python/tests/utils/test_manifest.py b/python/tests/utils/test_manifest.py index 40e8e1d0c93b..76a4a8a2b4d9 100644 --- a/python/tests/utils/test_manifest.py +++ b/python/tests/utils/test_manifest.py @@ -51,7 +51,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" ) assert data_file.file_format == FileFormat.PARQUET - assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=datetime.date(1975, 4, 10)]" + assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]" assert data_file.record_count == 19513 assert data_file.file_size_in_bytes == 388872 assert data_file.column_sizes == { From da66c462140a0b2293ab8e05c2aeefb2cba72574 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 1 Aug 2023 11:15:20 +0200 Subject: [PATCH 09/13] Cleanup --- python/pyiceberg/avro/reader.py | 2 +- python/pyiceberg/avro/writer.py | 4 +--- python/tests/test_integration.py | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index 0c5ba2014738..bbc9f81fefc4 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -28,7 +28,6 @@ from abc import abstractmethod from dataclasses import dataclass from dataclasses import field as dataclassfield -from datetime import date, datetime, time from decimal import Decimal from typing import ( Any, @@ -158,6 +157,7 @@ class DateReader(Reader): The number of days from 1 January 1970. """ + def read(self, decoder: BinaryDecoder) -> int: return decoder.read_int() diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index 60a56d3cc097..8c1e85bffb3d 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -25,18 +25,16 @@ from abc import abstractmethod from dataclasses import dataclass from dataclasses import field as dataclassfield -from datetime import datetime, time, date from typing import ( Any, Dict, List, - Tuple, Union, + Tuple, ) from uuid import UUID from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.types import StructType -from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros from pyiceberg.utils.singleton import Singleton diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index 4fdc8d901bf3..37ba5b9048be 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -305,8 +305,8 @@ def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No def test_partitioned_tables(catalog: Catalog) -> None: for table_name, predicate in [ ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"), - ('test_partitioned_by_years', "dt >= '2023-03-05'"), - ('test_partitioned_by_months', "dt >= '2023-03-05'"), + ("test_partitioned_by_years", "dt >= '2023-03-05'"), + ("test_partitioned_by_months", "dt >= '2023-03-05'"), ("test_partitioned_by_days", "ts >= '2023-03-05T00:00:00+00:00'"), ("test_partitioned_by_hours", "ts >= '2023-03-05T00:00:00+00:00'"), ("test_partitioned_by_truncate", "letter >= 'e'"), From 0a041697363f4c6d4accb0f9c2e4bb80aaeb872f Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 1 Aug 2023 11:24:42 +0200 Subject: [PATCH 10/13] Cleanup --- python/pyiceberg/utils/schema_conversion.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyiceberg/utils/schema_conversion.py b/python/pyiceberg/utils/schema_conversion.py index ff947d1f533e..0746024e5101 100644 --- a/python/pyiceberg/utils/schema_conversion.py +++ b/python/pyiceberg/utils/schema_conversion.py @@ -65,10 +65,8 @@ LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = { ("date", "int"): DateType(), ("time-micros", "long"): TimeType(), - ("timestamp-millis", "long"): TimestampType(), - ("time-micros", "int"): TimeType(), + ("timestamp-micros", "long"): TimestampType(), ("uuid", "fixed"): UUIDType(), - ("uuid", "string"): UUIDType(), } AvroType = Union[str, Any] From 48472940e874d88df42b293889389a54def831d4 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 3 Aug 2023 20:46:02 +0200 Subject: [PATCH 11/13] Cleanup --- python/dev/provision.py | 4 +-- python/pyiceberg/avro/encoder.py | 19 ------------ python/pyiceberg/avro/reader.py | 7 ++--- python/pyiceberg/avro/writer.py | 3 +- python/pyiceberg/utils/decimal.py | 47 ++++++++++++++++++++++++++++-- python/tests/avro/test_encoder.py | 25 ---------------- python/tests/avro/test_writer.py | 17 +++++++++++ python/tests/utils/test_decimal.py | 40 +++++++++++++++++++++++++ 8 files changed, 107 insertions(+), 55 deletions(-) create mode 100644 python/tests/utils/test_decimal.py diff --git a/python/dev/provision.py b/python/dev/provision.py index 2f133b103ceb..f62687b746af 100644 --- a/python/dev/provision.py +++ b/python/dev/provision.py @@ -207,9 +207,7 @@ """ ) - # Partitioning is not really needed, but there is a bug: - # https://github.com/apache/iceberg/pull/7685 - spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD {partition} AS dt_years") + spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD {partition}") spark.sql( f""" diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index 578aed6be9d9..a25e4bb66df6 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -14,12 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import decimal from uuid import UUID from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import OutputStream -from pyiceberg.utils.decimal import decimal_to_unscaled class BinaryEncoder: @@ -57,23 +55,6 @@ def write_double(self, f: float) -> None: """A double is written as 8 bytes.""" self.write(STRUCT_DOUBLE.pack(f)) - def write_decimal_bytes(self, datum: decimal.Decimal) -> None: - """ - Decimal in bytes are encoded as long. - - Since size of packed value in bytes for signed long is 8, 8 bytes are written. - """ - unscaled_datum = decimal_to_unscaled(datum) - size = (unscaled_datum.bit_length() + 7) // 8 - bytes_datum = unscaled_datum.to_bytes(length=size, byteorder="big", signed=True) - self.write_bytes(bytes_datum) - - def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None: - """Decimal in fixed are encoded as size of fixed bytes.""" - unscaled_datum = decimal_to_unscaled(datum) - bytes_datum = unscaled_datum.to_bytes(length=size, byteorder="big", signed=True) - self.write(bytes_datum) - def write_bytes(self, b: bytes) -> None: """Bytes are encoded as a long followed by that many bytes of data.""" self.write_int(len(b)) diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index bbc9f81fefc4..54609cd17b0f 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -42,7 +42,7 @@ from pyiceberg.avro.decoder import BinaryDecoder from pyiceberg.typedef import StructProtocol from pyiceberg.types import StructType -from pyiceberg.utils.decimal import unscaled_to_decimal +from pyiceberg.utils.decimal import bytes_to_decimal, decimal_required_bytes from pyiceberg.utils.singleton import Singleton @@ -270,9 +270,8 @@ class DecimalReader(Reader): scale: int = dataclassfield() def read(self, decoder: BinaryDecoder) -> Decimal: - data = decoder.read(decoder.read_int()) - unscaled_datum = int.from_bytes(data, byteorder="big", signed=True) - return unscaled_to_decimal(unscaled_datum, self.scale) + data = decoder.read(decimal_required_bytes(self.precision)) + return bytes_to_decimal(data, self.scale) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index 8c1e85bffb3d..2a1dcd580487 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -35,6 +35,7 @@ from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.types import StructType +from pyiceberg.utils.decimal import decimal_required_bytes, decimal_to_bytes from pyiceberg.utils.singleton import Singleton @@ -136,7 +137,7 @@ class DecimalWriter(Writer): scale: int = dataclassfield() def write(self, encoder: BinaryEncoder, val: Any) -> None: - return encoder.write_decimal_bytes(val) + return encoder.write(decimal_to_bytes(val, byte_length=decimal_required_bytes(self.precision))) def __repr__(self) -> str: """Returns string representation of this object.""" diff --git a/python/pyiceberg/utils/decimal.py b/python/pyiceberg/utils/decimal.py index 32f6db6aa949..6828e4125ed5 100644 --- a/python/pyiceberg/utils/decimal.py +++ b/python/pyiceberg/utils/decimal.py @@ -16,8 +16,10 @@ # under the License. """Helper methods for working with Python Decimals.""" +import math from decimal import Decimal -from typing import Union +from functools import lru_cache +from typing import Optional, Union def decimal_to_unscaled(value: Decimal) -> int: @@ -64,16 +66,33 @@ def bytes_required(value: Union[int, Decimal]) -> int: raise ValueError(f"Unsupported value: {value}") -def decimal_to_bytes(value: Decimal) -> bytes: +def decimal_to_bytes(value: Decimal, byte_length: Optional[int] = None) -> bytes: """Returns a byte representation of a decimal. Args: value (Decimal): a decimal value. + byte_length (int): The number of bytes. Returns: bytes: the unscaled value of the Decimal as bytes. """ unscaled_value = decimal_to_unscaled(value) - return unscaled_value.to_bytes(bytes_required(unscaled_value), byteorder="big", signed=True) + if byte_length is None: + byte_length = bytes_required(unscaled_value) + return unscaled_value.to_bytes(byte_length, byteorder="big", signed=True) + + +def bytes_to_decimal(value: bytes, scale: int) -> Decimal: + """Returns a decimal from the bytes. + + Args: + value (bytes): tbe bytes to be converted into a decimal. + scale (int): the scale of the decimal. + + Returns: + Decimal: the scaled decimal. + """ + unscaled_datum = int.from_bytes(value, byteorder="big", signed=True) + return unscaled_to_decimal(unscaled_datum, scale) def truncate_decimal(value: Decimal, width: int) -> Decimal: @@ -88,3 +107,25 @@ def truncate_decimal(value: Decimal, width: int) -> Decimal: unscaled_value = decimal_to_unscaled(value) applied_value = unscaled_value - (((unscaled_value % width) + width) % width) return unscaled_to_decimal(applied_value, abs(int(value.as_tuple().exponent))) + + +@lru_cache +def decimal_required_bytes(precision: int) -> int: + """Compute the number of bytes required to store a precision. + + Args: + precision: The number of digits to store. + + Returns: + The number of bytes required to store a decimal with a certain precision. + """ + if precision <= 0 or precision > 40: + raise ValueError(f"Unsupported precision, outside of (0, 40]: {precision}") + + for num_bytes, max_precision in enumerate( + [math.floor(math.log10(math.fabs(math.pow(2, 8 * pos - 1) - 1))) for pos in range(24)] + ): + if precision <= max_precision: + return num_bytes + + raise ValueError(f"Could not determine the required bytes for preciesion {precision}") diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index ea3305346ff4..5866719434e7 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -19,7 +19,6 @@ import io import struct import uuid -from decimal import Decimal from pyiceberg.avro.encoder import BinaryEncoder @@ -101,30 +100,6 @@ def test_write_double() -> None: assert output.getbuffer() == struct.pack(" None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = Decimal("3.14159265359") - - encoder.write_decimal_bytes(_input) - - assert output.getbuffer() == b"\x0a\x49\x25\x59\xf6\x4f" - - -def test_write_decimal_fixed() -> None: - output = io.BytesIO() - encoder = BinaryEncoder(output) - - _input = Decimal("3.14159265359") - - encoder.write_decimal_fixed(_input, 8) - - buf = output.getbuffer() - assert len(buf) == 8 - assert output.getbuffer() == b"\x00\x00\x00\x49\x25\x59\xf6\x4f" - - def test_write_bytes() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) diff --git a/python/tests/avro/test_writer.py b/python/tests/avro/test_writer.py index c517a0cd1c4d..2cdcd4482a02 100644 --- a/python/tests/avro/test_writer.py +++ b/python/tests/avro/test_writer.py @@ -21,6 +21,7 @@ from typing import Dict, List import pytest +from _decimal import Decimal from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.avro.resolver import construct_writer @@ -218,3 +219,19 @@ class MyStruct(Record): b"\x00", ] ) + + +def test_write_decimal() -> None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + schema = StructType( + NestedField(1, "decimal", DecimalType(10, 2), required=True), + ) + + class MyStruct(Record): + decimal: Decimal + + construct_writer(schema).write(encoder, MyStruct(Decimal("1000.12"))) + + assert output.getvalue() == b"\x00\x00\x01\x86\xac" diff --git a/python/tests/utils/test_decimal.py b/python/tests/utils/test_decimal.py new file mode 100644 index 000000000000..52ba665c7477 --- /dev/null +++ b/python/tests/utils/test_decimal.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +from pyiceberg.utils.decimal import decimal_required_bytes + + +def test_decimal_required_bytes() -> None: + assert decimal_required_bytes(precision=1) == 1 + assert decimal_required_bytes(precision=2) == 1 + assert decimal_required_bytes(precision=3) == 2 + assert decimal_required_bytes(precision=4) == 2 + assert decimal_required_bytes(precision=5) == 3 + assert decimal_required_bytes(precision=7) == 4 + assert decimal_required_bytes(precision=8) == 4 + assert decimal_required_bytes(precision=10) == 5 + assert decimal_required_bytes(precision=32) == 14 + assert decimal_required_bytes(precision=40) == 17 + + with pytest.raises(ValueError) as exc_info: + decimal_required_bytes(precision=41) + assert "(0, 40]" in str(exc_info.value) + + with pytest.raises(ValueError) as exc_info: + decimal_required_bytes(precision=-1) + assert "(0, 40]" in str(exc_info.value) From 2829d4b41221033ae88b00fd85ee86246fd54530 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 7 Aug 2023 17:34:04 +0200 Subject: [PATCH 12/13] Cleanup --- python/pyiceberg/avro/reader.py | 11 ++++++++--- python/pyiceberg/utils/decimal.py | 16 ++++++---------- python/tests/utils/test_decimal.py | 4 ++-- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index 54609cd17b0f..8a8f319a90c6 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -258,7 +258,7 @@ def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() -@dataclass(frozen=True) +@dataclass(frozen=True, init=False) class DecimalReader(Reader): """Reads a value as a decimal. @@ -268,10 +268,15 @@ class DecimalReader(Reader): precision: int = dataclassfield() scale: int = dataclassfield() + _length: int + + def __init__(self, precision: int, scale: int): + object.__setattr__(self, "precision", precision) + object.__setattr__(self, "scale", scale) + object.__setattr__(self, "_length", decimal_required_bytes(precision)) def read(self, decoder: BinaryDecoder) -> Decimal: - data = decoder.read(decimal_required_bytes(self.precision)) - return bytes_to_decimal(data, self.scale) + return bytes_to_decimal(decoder.read(self._length), self.scale) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() diff --git a/python/pyiceberg/utils/decimal.py b/python/pyiceberg/utils/decimal.py index 6828e4125ed5..06b2b945cd8d 100644 --- a/python/pyiceberg/utils/decimal.py +++ b/python/pyiceberg/utils/decimal.py @@ -18,7 +18,6 @@ """Helper methods for working with Python Decimals.""" import math from decimal import Decimal -from functools import lru_cache from typing import Optional, Union @@ -109,7 +108,10 @@ def truncate_decimal(value: Decimal, width: int) -> Decimal: return unscaled_to_decimal(applied_value, abs(int(value.as_tuple().exponent))) -@lru_cache +MAX_PRECISION = tuple(math.floor(math.log10(math.fabs(math.pow(2, 8 * pos - 1) - 1))) for pos in range(24)) +REQUIRED_LENGTH = tuple(next(pos for pos in range(24) if p <= MAX_PRECISION[pos]) for p in range(40)) + + def decimal_required_bytes(precision: int) -> int: """Compute the number of bytes required to store a precision. @@ -119,13 +121,7 @@ def decimal_required_bytes(precision: int) -> int: Returns: The number of bytes required to store a decimal with a certain precision. """ - if precision <= 0 or precision > 40: + if precision <= 0 or precision >= 40: raise ValueError(f"Unsupported precision, outside of (0, 40]: {precision}") - for num_bytes, max_precision in enumerate( - [math.floor(math.log10(math.fabs(math.pow(2, 8 * pos - 1) - 1))) for pos in range(24)] - ): - if precision <= max_precision: - return num_bytes - - raise ValueError(f"Could not determine the required bytes for preciesion {precision}") + return REQUIRED_LENGTH[precision] diff --git a/python/tests/utils/test_decimal.py b/python/tests/utils/test_decimal.py index 52ba665c7477..683eab93fdc0 100644 --- a/python/tests/utils/test_decimal.py +++ b/python/tests/utils/test_decimal.py @@ -29,10 +29,10 @@ def test_decimal_required_bytes() -> None: assert decimal_required_bytes(precision=8) == 4 assert decimal_required_bytes(precision=10) == 5 assert decimal_required_bytes(precision=32) == 14 - assert decimal_required_bytes(precision=40) == 17 + assert decimal_required_bytes(precision=38) == 16 with pytest.raises(ValueError) as exc_info: - decimal_required_bytes(precision=41) + decimal_required_bytes(precision=40) assert "(0, 40]" in str(exc_info.value) with pytest.raises(ValueError) as exc_info: From f20c70f55c16b1d0de4d0e615eea8469856b1bb3 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 7 Aug 2023 21:21:00 +0200 Subject: [PATCH 13/13] Fix the tests --- python/pyiceberg/conversions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyiceberg/conversions.py b/python/pyiceberg/conversions.py index ebf6cfa66643..8f155fce3d95 100644 --- a/python/pyiceberg/conversions.py +++ b/python/pyiceberg/conversions.py @@ -57,7 +57,7 @@ TimeType, UUIDType, ) -from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros from pyiceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal _BOOL_STRUCT = Struct(" bytes: @to_bytes.register(TimeType) def _(_: TimeType, value: Union[time, int]) -> bytes: if isinstance(value, time): - value = time_object_to_micros(value) + value = time_to_micros(value) return _LONG_STRUCT.pack(value)