From 8c967ab0ac4da6f8202fe6773b2550c54f01c15b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 1 May 2023 20:32:34 +0200 Subject: [PATCH 1/8] Python: Avro write support --- python/pyiceberg/avro/__init__.py | 4 + python/pyiceberg/avro/decoder.py | 8 +- python/pyiceberg/avro/encoder.py | 183 +++++++++++++++++ python/pyiceberg/avro/resolver.py | 94 +++++++++ python/pyiceberg/avro/writer.py | 198 +++++++++++++++++++ python/pyiceberg/io/pyarrow.py | 2 +- python/pyiceberg/schema.py | 9 - python/pyiceberg/utils/schema_conversion.py | 140 ++++++++++++- python/tests/utils/test_schema_conversion.py | 18 +- 9 files changed, 634 insertions(+), 22 deletions(-) create mode 100644 python/pyiceberg/avro/encoder.py create mode 100644 python/pyiceberg/avro/writer.py diff --git a/python/pyiceberg/avro/__init__.py b/python/pyiceberg/avro/__init__.py index 13a83393a912..d7d8b55ef913 100644 --- a/python/pyiceberg/avro/__init__.py +++ b/python/pyiceberg/avro/__init__.py @@ -14,3 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import struct + +STRUCT_FLOAT = struct.Struct("h") # big-endian signed short -STRUCT_SIGNED_INT = struct.Struct(">i") # big-endian signed int -STRUCT_SIGNED_LONG = struct.Struct(">q") # big-endian signed long - class BinaryDecoder: """Read leaf values.""" diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py new file mode 100644 index 000000000000..c77f9dc24fd5 --- /dev/null +++ b/python/pyiceberg/avro/encoder.py @@ -0,0 +1,183 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import decimal +import struct +from datetime import date, datetime, time + +from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT +from pyiceberg.io import OutputStream +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros + + +class BinaryEncoder: + """Write leaf values.""" + + _output_stream: OutputStream + + def __init__(self, output_stream: OutputStream) -> None: + self._output_stream = output_stream + + def write(self, b: bytes) -> None: + self._output_stream.write(b) + + def write_boolean(self, boolean: bool) -> None: + """A boolean is written as a single byte whose value is either 0 (false) or 1 (true). + + Args: + boolean: The boolean to write + """ + self.write(bytearray([bool(boolean)])) + + def write_int(self, integer: int) -> None: + """Integer and long values are written using variable-length zig-zag coding.""" + datum = (integer << 1) ^ (integer >> 63) + while (datum & ~0x7F) != 0: + self.write(bytearray([(datum & 0x7F) | 0x80])) + datum >>= 7 + self.write(bytearray([datum])) + + def write_float(self, f: float) -> None: + """ + A float is written as 4 bytes. + """ + self.write(STRUCT_FLOAT.pack(f)) + + def write_double(self, f: float) -> None: + """ + A double is written as 8 bytes. + """ + self.write(STRUCT_DOUBLE.pack(f)) + + def write_decimal_bytes(self, datum: decimal.Decimal) -> None: + """ + Decimal in bytes are encoded as long. Since size of packed value in bytes for + signed long is 8, 8 bytes are written. + """ + sign, digits, _ = datum.as_tuple() + + unscaled_datum = 0 + for digit in digits: + unscaled_datum = (unscaled_datum * 10) + digit + + bits_req = unscaled_datum.bit_length() + 1 + if sign: + unscaled_datum = (1 << bits_req) - unscaled_datum + + bytes_req = bits_req // 8 + padding_bits = ~((1 << bits_req) - 1) if sign else 0 + packed_bits = padding_bits | unscaled_datum + + bytes_req += 1 if (bytes_req << 3) < bits_req else 0 + self.write_int(bytes_req) + for index in range(bytes_req - 1, -1, -1): + bits_to_write = packed_bits >> (8 * index) + self.write(bytearray([bits_to_write & 0xFF])) + + def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None: + """ + Decimal in fixed are encoded as size of fixed bytes. + """ + sign, digits, _ = datum.as_tuple() + + unscaled_datum = 0 + for digit in digits: + unscaled_datum = (unscaled_datum * 10) + digit + + bits_req = unscaled_datum.bit_length() + 1 + size_in_bits = size * 8 + offset_bits = size_in_bits - bits_req + + mask = 2**size_in_bits - 1 + bit = 1 + for _ in range(bits_req): + mask ^= bit + bit <<= 1 + + if bits_req < 8: + bytes_req = 1 + else: + bytes_req = bits_req // 8 + if bits_req % 8 != 0: + bytes_req += 1 + if sign: + unscaled_datum = (1 << bits_req) - unscaled_datum + unscaled_datum = mask | unscaled_datum + for index in range(size - 1, -1, -1): + bits_to_write = unscaled_datum >> (8 * index) + self.write(bytearray([bits_to_write & 0xFF])) + else: + for _ in range(offset_bits // 8): + self.write(b"\x00") + for index in range(bytes_req - 1, -1, -1): + bits_to_write = unscaled_datum >> (8 * index) + self.write(bytearray([bits_to_write & 0xFF])) + + def write_bytes(self, b: bytes) -> None: + """ + Bytes are encoded as a long followed by that many bytes of data. + """ + self.write_int(len(b)) + self.write(struct.pack(f"{len(b)}s", b)) + + def write_bytes_fixed(self, b: bytes) -> None: + """ + Writes fixed number of bytes + """ + self.write(struct.pack(f"{len(b)}s", b)) + + def write_utf8(self, s: str) -> None: + """ + A string is encoded as a long followed by + that many bytes of UTF-8 encoded character data. + """ + self.write_bytes(s.encode("utf-8")) + + def write_date_int(self, d: date) -> None: + """ + Encode python date object as int. + It stores the number of days from + the unix epoch, 1 January 1970 (ISO calendar). + """ + self.write_int(date_to_days(d)) + + def write_time_millis_int(self, dt: time) -> None: + """ + Encode python time object as int. + It stores the number of milliseconds from midnight, 00:00:00.000 + """ + self.write_int(int(time_to_micros(dt) / 1000)) + + def write_time_micros_long(self, dt: time) -> None: + """ + Encode python time object as long. + It stores the number of microseconds from midnight, 00:00:00.000000 + """ + self.write_int(time_to_micros(dt)) + + def write_timestamp_millis_long(self, dt: datetime) -> None: + """ + Encode python datetime object as long. + It stores the number of milliseconds from midnight of unix epoch, 1 January 1970. + """ + self.write_int(int(datetime_to_micros(dt) / 1000)) + + def write_timestamp_micros_long(self, dt: datetime) -> None: + """ + Encode python datetime object as long. + It stores the number of microseconds from midnight of unix epoch, 1 January 1970. + """ + self.write_int(datetime_to_micros(dt)) diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py index 407ce4648df1..1840a04e4acc 100644 --- a/python/pyiceberg/avro/resolver.py +++ b/python/pyiceberg/avro/resolver.py @@ -48,12 +48,33 @@ TimestamptzReader, UUIDReader, ) +from pyiceberg.avro.writer import ( + BinaryWriter, + BooleanWriter, + DateWriter, + DecimalWriter, + DoubleWriter, + FixedWriter, + FloatWriter, + IntegerWriter, + ListWriter, + MapWriter, + StringWriter, + StructWriter, + TimestamptzWriter, + TimestampWriter, + TimeWriter, + UUIDWriter, + Writer, +) from pyiceberg.exceptions import ResolveError from pyiceberg.schema import ( PartnerAccessor, PrimitiveWithPartnerVisitor, Schema, + SchemaVisitorPerPrimitiveType, promote, + visit, visit_with_partner, ) from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol @@ -97,6 +118,79 @@ def construct_reader( return resolve(file_schema, file_schema, read_types) +def construct_writer(file_schema: Union[Schema, IcebergType]) -> Writer: + """Constructs a writer from a file schema + + Args: + file_schema (Schema | IcebergType): The schema of the Avro file + + Raises: + NotImplementedError: If attempting to resolve an unrecognized object type + """ + return visit(file_schema, ConstructWriter()) + + +class ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]): + """Constructs a writer tree from an Iceberg schema""" + + def schema(self, schema: Schema, struct_result: Writer) -> Writer: + return struct_result + + def struct(self, struct: StructType, field_results: List[Writer]) -> Writer: + return StructWriter(tuple(field_results)) + + def field(self, field: NestedField, field_result: Writer) -> Writer: + return field_result + + def list(self, list_type: ListType, element_result: Writer) -> Writer: + return ListWriter(element_result) + + def map(self, map_type: MapType, key_result: Writer, value_result: Writer) -> Writer: + return MapWriter(key_result, value_result) + + def visit_fixed(self, fixed_type: FixedType) -> Writer: + return FixedWriter(len(fixed_type)) + + def visit_decimal(self, decimal_type: DecimalType) -> Writer: + return DecimalWriter(decimal_type.precision, decimal_type.scale) + + def visit_boolean(self, boolean_type: BooleanType) -> Writer: + return BooleanWriter() + + def visit_integer(self, integer_type: IntegerType) -> Writer: + return IntegerWriter() + + def visit_long(self, long_type: LongType) -> Writer: + return IntegerWriter() + + def visit_float(self, float_type: FloatType) -> Writer: + return FloatWriter() + + def visit_double(self, double_type: DoubleType) -> Writer: + return DoubleWriter() + + def visit_date(self, date_type: DateType) -> Writer: + return DateWriter() + + def visit_time(self, time_type: TimeType) -> Writer: + return TimeWriter() + + def visit_timestamp(self, timestamp_type: TimestampType) -> Writer: + return TimestampWriter() + + def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Writer: + return TimestamptzWriter() + + def visit_string(self, string_type: StringType) -> Writer: + return StringWriter() + + def visit_uuid(self, uuid_type: UUIDType) -> Writer: + return UUIDWriter() + + def visit_binary(self, binary_type: BinaryType) -> Writer: + return BinaryWriter() + + def resolve( file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType], diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py new file mode 100644 index 000000000000..e352bc06919e --- /dev/null +++ b/python/pyiceberg/avro/writer.py @@ -0,0 +1,198 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Classes for building the Reader tree + +Constructing a reader tree from the schema makes it easy +to decouple the reader implementation from the schema. + +The reader tree can be changed in such a way that the +read schema is different, while respecting the read schema +""" +from __future__ import annotations + +from abc import abstractmethod +from dataclasses import dataclass +from dataclasses import field as dataclassfield +from datetime import datetime, time +from typing import ( + Any, + Dict, + List, + Tuple, +) +from uuid import UUID + +from pyiceberg.avro.encoder import BinaryEncoder +from pyiceberg.types import StructType +from pyiceberg.utils.singleton import Singleton + + +class Writer(Singleton): + @abstractmethod + def write(self, encoder: BinaryEncoder, val: Any) -> Any: + ... + + def __repr__(self) -> str: + return f"{self.__class__.__name__}()" + + +class NoneWriter(Writer): + def write(self, _: BinaryEncoder, __: Any) -> None: + return None + + +class BooleanWriter(Writer): + def write(self, encoder: BinaryEncoder, val: bool) -> None: + encoder.write_boolean(val) + + +class IntegerWriter(Writer): + """Longs and ints are encoded the same way, and there is no long in Python""" + + def write(self, encoder: BinaryEncoder, val: int) -> None: + encoder.write_int(val) + + +class FloatWriter(Writer): + def write(self, encoder: BinaryEncoder, val: float) -> None: + encoder.write_float(val) + + +class DoubleWriter(Writer): + def write(self, encoder: BinaryEncoder, val: float) -> None: + encoder.write_double(val) + + +class DateWriter(Writer): + def write(self, encoder: BinaryEncoder, val: Any) -> None: + encoder.write_date_int(val) + + +class TimeWriter(Writer): + def write(self, encoder: BinaryEncoder, val: time) -> None: + encoder.write_time_micros_long(val) + + +class TimestampWriter(Writer): + def write(self, encoder: BinaryEncoder, val: datetime) -> None: + encoder.write_timestamp_micros_long(val) + + +class TimestamptzWriter(Writer): + def write(self, encoder: BinaryEncoder, val: datetime) -> None: + encoder.write_timestamp_micros_long(val) + + +class StringWriter(Writer): + def write(self, encoder: BinaryEncoder, val: Any) -> None: + encoder.write_utf8(val) + + +class UUIDWriter(Writer): + def write(self, encoder: BinaryEncoder, val: UUID) -> None: + uuid_bytes = val.bytes + + if len(uuid_bytes) != 16: + raise ValueError(f"Expected UUID to be 16 bytes, got: {len(uuid_bytes)}") + + encoder.write_bytes_fixed(uuid_bytes) + + +@dataclass(frozen=True) +class FixedWriter(Writer): + _len: int = dataclassfield() + + def write(self, encoder: BinaryEncoder, val: bytes) -> None: + encoder.write(val) + + def __len__(self) -> int: + return self._len + + def __repr__(self) -> str: + return f"FixedReader({self._len})" + + +class BinaryWriter(Writer): + """Variable byte length writer""" + + def write(self, encoder: BinaryEncoder, val: Any) -> None: + encoder.write_bytes(val) + + +@dataclass(frozen=True) +class DecimalWriter(Writer): + precision: int = dataclassfield() + scale: int = dataclassfield() + + def write(self, encoder: BinaryEncoder, val: Any) -> None: + return encoder.write_decimal_bytes(val) + + def __repr__(self) -> str: + return f"DecimalReader({self.precision}, {self.scale})" + + +@dataclass(frozen=True) +class OptionWriter(Writer): + option: Writer = dataclassfield() + + def write(self, encoder: BinaryEncoder, val: Any) -> None: + if val is not None: + encoder.write_int(1) + self.option.write(encoder, val) + else: + encoder.write_int(0) + + +@dataclass(frozen=True) +class StructWriter(Writer): + field_writers: Tuple[Writer, ...] = dataclassfield() + + def write(self, encoder: BinaryEncoder, val: StructType) -> None: + for writer, value in zip(self.field_writers, val.fields): + writer.write(encoder, value) + + def __eq__(self, other: Any) -> bool: + return self.field_writers == other.field_writers if isinstance(other, StructWriter) else False + + def __repr__(self) -> str: + return f"StructReader({','.join(repr(field) for field in self.field_writers)})" + + def __hash__(self) -> int: + return hash(self.field_writers) + + +@dataclass(frozen=True) +class ListWriter(Writer): + element_writer: Writer + + def write(self, encoder: BinaryEncoder, val: List[Any]) -> None: + encoder.write_int(len(val)) + for v in val: + self.element_writer.write(encoder, v) + + +@dataclass(frozen=True) +class MapWriter(Writer): + key_writer: Writer + value_writer: Writer + + def write(self, encoder: BinaryEncoder, val: Dict[Any, Any]) -> None: + encoder.write_int(len(val)) + for k, v in val: + self.key_writer.write(encoder, k) + self.value_writer.write(encoder, v) diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py index 7454c6f744bd..87bf4fb6bc90 100644 --- a/python/pyiceberg/io/pyarrow.py +++ b/python/pyiceberg/io/pyarrow.py @@ -412,7 +412,7 @@ def visit_time(self, _: TimeType) -> pa.DataType: def visit_timestamp(self, _: TimestampType) -> pa.DataType: return pa.timestamp(unit="us") - def visit_timestampz(self, _: TimestamptzType) -> pa.DataType: + def visit_timestamptz(self, _: TimestamptzType) -> pa.DataType: return pa.timestamp(unit="us", tz="UTC") def visit_string(self, _: StringType) -> pa.DataType: diff --git a/python/pyiceberg/schema.py b/python/pyiceberg/schema.py index 1fdd7d049321..47d05eed9b0b 100644 --- a/python/pyiceberg/schema.py +++ b/python/pyiceberg/schema.py @@ -1361,12 +1361,3 @@ def _(file_type: DecimalType, read_type: IcebergType) -> IcebergType: raise ResolveError(f"Cannot reduce precision from {file_type} to {read_type}") else: raise ResolveError(f"Cannot promote an decimal to {read_type}") - - -@promote.register(FixedType) -def _(file_type: FixedType, read_type: IcebergType) -> IcebergType: - if isinstance(read_type, UUIDType) and len(file_type) == 16: - # Since pyarrow reads parquet UUID as fixed 16-byte binary, the promotion is needed to ensure read compatibility - return read_type - else: - raise ResolveError(f"Cannot promote {file_type} to {read_type}") diff --git a/python/pyiceberg/utils/schema_conversion.py b/python/pyiceberg/utils/schema_conversion.py index 395dc3ee80e2..239bd2ca9fa5 100644 --- a/python/pyiceberg/utils/schema_conversion.py +++ b/python/pyiceberg/utils/schema_conversion.py @@ -22,11 +22,12 @@ Any, Dict, List, + Optional, Tuple, Union, ) -from pyiceberg.schema import Schema +from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit from pyiceberg.types import ( BinaryType, BooleanType, @@ -45,13 +46,14 @@ StringType, StructType, TimestampType, + TimestamptzType, TimeType, UUIDType, ) logger = logging.getLogger(__name__) -PRIMITIVE_FIELD_TYPE_MAPPING: dict[str, PrimitiveType] = { +PRIMITIVE_FIELD_TYPE_MAPPING: Dict[str, PrimitiveType] = { "boolean": BooleanType(), "bytes": BinaryType(), "double": DoubleType(), @@ -62,7 +64,7 @@ "enum": StringType(), } -LOGICAL_FIELD_TYPE_MAPPING: dict[tuple[str, str], PrimitiveType] = { +LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = { ("date", "int"): DateType(), ("time-millis", "int"): TimeType(), ("timestamp-millis", "long"): TimestampType(), @@ -71,6 +73,8 @@ ("uuid", "fixed"): UUIDType(), } +AvroType = Union[str, Any] + class AvroSchemaConversion: def avro_to_iceberg(self, avro_schema: dict[str, Any]) -> Schema: @@ -118,6 +122,10 @@ def avro_to_iceberg(self, avro_schema: dict[str, Any]) -> Schema: """ return Schema(*[self._convert_field(field) for field in avro_schema["fields"]], schema_id=1) + def iceberg_to_avro(self, schema: Schema, schema_name: Optional[str] = None) -> AvroType: + """Converts an Iceberg schema into an Avro dictionary that can be serialized to JSON.""" + return visit(schema, ConvertSchemaToAvro(schema_name)) + def _resolve_union( self, type_union: Union[Dict[str, str], List[Union[str, Dict[str, str]]], str] ) -> Tuple[Union[str, Dict[str, Any]], bool]: @@ -470,3 +478,129 @@ def _convert_fixed_type(self, avro_type: dict[str, Any]) -> FixedType: An Iceberg equivalent fixed type. """ return FixedType(length=avro_type["size"]) + + +class ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]): + """Converts an Iceberg schema to an Avro schema.""" + + schema_name: Optional[str] + last_list_field_id: int + last_map_key_field_id: int + last_map_value_field_id: int + + def __init__(self, schema_name: Optional[str]) -> None: + """Converts an Iceberg schema to an Avro schema. + + Args: + schema_name: The name of the root record. + """ + self.schema_name = schema_name + + def schema(self, schema: Schema, struct_result: AvroType) -> AvroType: + if isinstance(struct_result, dict) and self.schema_name is not None: + struct_result["name"] = self.schema_name + return struct_result + + def before_list_element(self, element: NestedField) -> None: + self.last_list_field_id = element.field_id + + def before_map_key(self, key: NestedField) -> None: + self.last_map_key_field_id = key.field_id + + def before_map_value(self, value: NestedField) -> None: + self.last_map_value_field_id = value.field_id + + def struct(self, struct: StructType, field_results: List[AvroType]) -> AvroType: + return {"type": "record", "fields": field_results} + + def field(self, field: NestedField, field_result: AvroType) -> AvroType: + # Sets the schema name + if isinstance(field_result, dict) and field_result.get("type") == "record": + field_result["name"] = f"r{field.field_id}" + + result = { + "name": field.name, + "field-id": field.field_id, + "type": field_result if field.required else ["null", field_result], + } + + if field.optional: + result["default"] = None + + if field.doc is not None: + result["doc"] = field.doc + + return result + + def list(self, list_type: ListType, element_result: AvroType) -> AvroType: + # Sets the schema name in case of a record + if isinstance(element_result, dict) and element_result.get("type") == "record": + element_result["name"] = f"r{self.last_list_field_id}" + return {"type": "array", "element-id": self.last_list_field_id, "items": element_result} + + def map(self, map_type: MapType, key_result: AvroType, value_result: AvroType) -> AvroType: + if isinstance(key_result, StringType): + # Avro Maps does not support other keys than a String, + return { + "type": "map", + "values": value_result, + } + else: + # Creates a logical map that's a list of schema's + # binary compatible + return { + "type": "array", + "items": { + "type": "record", + "name": f"k{self.last_map_key_field_id}_v{self.last_map_value_field_id}", + "fields": [ + {"name": "key", "type": key_result, "field-id": self.last_map_key_field_id}, + {"name": "value", "type": value_result, "field-id": self.last_map_value_field_id}, + ], + }, + "logicalType": "map", + } + + def visit_fixed(self, fixed_type: FixedType) -> AvroType: + return {"type": "fixed", "size": len(fixed_type)} + + def visit_decimal(self, decimal_type: DecimalType) -> AvroType: + return {"type": "bytes", "logicalType": "decimal", "precision": decimal_type.precision, "scale": decimal_type.scale} + + def visit_boolean(self, boolean_type: BooleanType) -> AvroType: + return "boolean" + + def visit_integer(self, integer_type: IntegerType) -> AvroType: + return "int" + + def visit_long(self, long_type: LongType) -> AvroType: + return "long" + + def visit_float(self, float_type: FloatType) -> AvroType: + return "float" + + def visit_double(self, double_type: DoubleType) -> AvroType: + return "double" + + def visit_date(self, date_type: DateType) -> AvroType: + return {"type": "int", "logicalType": "date"} + + def visit_time(self, time_type: TimeType) -> AvroType: + return {"type": "long", "logicalType": "time-micros"} + + def visit_timestamp(self, timestamp_type: TimestampType) -> AvroType: + # Iceberg only supports micro's + return {"type": "long", "logicalType": "timestamp-micros"} + + def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> AvroType: + # Iceberg only supports micro's + return {"type": "long", "logicalType": "timestamp-micros"} + + def visit_string(self, string_type: StringType) -> AvroType: + return "string" + + def visit_uuid(self, uuid_type: UUIDType) -> AvroType: + return {"type": "string", "logicalType": "uuid"} + + def visit_binary(self, binary_type: BinaryType) -> AvroType: + return "bytes" diff --git a/python/tests/utils/test_schema_conversion.py b/python/tests/utils/test_schema_conversion.py index 6a8c5a28c75a..c9360da643d1 100644 --- a/python/tests/utils/test_schema_conversion.py +++ b/python/tests/utils/test_schema_conversion.py @@ -37,8 +37,8 @@ from pyiceberg.utils.schema_conversion import AvroSchemaConversion -def test_iceberg_to_avro(avro_schema_manifest_file_v1: Dict[str, Any]) -> None: - iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file_v1) +def test_avro_to_iceberg(avro_schema_manifest_file: Dict[str, Any]) -> None: + iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file) expected_iceberg_schema = Schema( NestedField( field_id=500, name="manifest_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" @@ -354,3 +354,17 @@ def test_logical_map_with_invalid_fields() -> None: AvroSchemaConversion()._convert_logical_map_type(avro_type) assert "Invalid key-value pair schema:" in str(exc_info.value) + + +def test_iceberg_to_avro_manifest_list(avro_schema_manifest_file: Dict[str, Any]) -> None: + """Round trip the manifest list""" + iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file) + avro_result = AvroSchemaConversion().iceberg_to_avro(iceberg_schema, schema_name="manifest_file") + assert avro_schema_manifest_file == avro_result + + +def test_iceberg_to_avro_manifest(avro_schema_manifest_entry: Dict[str, Any]) -> None: + """Round trip the manifest itself""" + iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_entry) + avro_result = AvroSchemaConversion().iceberg_to_avro(iceberg_schema, schema_name="manifest_entry") + assert avro_schema_manifest_entry == avro_result From b0ab83ae1a17b8c12992ca2e407940ebc1ce4c8f Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Sun, 4 Jun 2023 18:23:13 -0300 Subject: [PATCH 2/8] Add class to write Avro files and add PoC update_table api call --- python/pyiceberg/avro/file.py | 82 ++++++++++++++++++++- python/pyiceberg/avro/resolver.py | 6 +- python/pyiceberg/avro/writer.py | 8 +- python/pyiceberg/catalog/rest.py | 1 + python/pyiceberg/manifest.py | 2 +- python/pyiceberg/typedef.py | 4 + python/pyiceberg/utils/schema_conversion.py | 18 ++--- 7 files changed, 105 insertions(+), 16 deletions(-) diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index 31d6a45c4c56..5ffb9c2f19ab 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -18,7 +18,7 @@ """Avro reader for reading Avro files.""" from __future__ import annotations -import json +import json, os, io from dataclasses import dataclass from enum import Enum from types import TracebackType @@ -26,6 +26,7 @@ Callable, Dict, Generic, + List, Optional, Type, TypeVar, @@ -33,9 +34,11 @@ from pyiceberg.avro.codecs import KNOWN_CODECS, Codec from pyiceberg.avro.decoder import BinaryDecoder +from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.avro.reader import Reader -from pyiceberg.avro.resolver import construct_reader, resolve -from pyiceberg.io import InputFile, InputStream +from pyiceberg.avro.writer import Writer +from pyiceberg.avro.resolver import construct_reader, resolve, construct_writer +from pyiceberg.io import InputFile, InputStream, OutputStream, OutputFile from pyiceberg.io.memory import MemoryInputStream from pyiceberg.schema import Schema from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol @@ -199,3 +202,76 @@ def __next__(self) -> D: def _read_header(self) -> AvroFileHeader: return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder) + + +class AvroOutputFile(Generic[D]): + output_file: OutputFile + output_stream: OutputStream + schema: Schema + schema_name: str + encoder: BinaryEncoder + sync_bytes: bytes + writer: Writer + + def __init__( + self, + output_file: OutputFile, + schema: Schema, + schema_name: str + ) -> None: + self.output_file = output_file + self.schema = schema + self.schema_name = schema_name + self.sync_bytes = os.urandom(SYNC_SIZE) + self.writer = construct_writer(self.schema) + + def __enter__(self) -> AvroFile[D]: + """ + Opens the file and writes the header + + Returns: + The file object to write records to + """ + self.output_stream = self.output_file.create(overwrite = True) + self.encoder = BinaryEncoder(self.output_stream) + + self._write_header() + self.writer = construct_writer(self.schema) + + return self + + def __exit__( + self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] + ) -> None: + self.output_stream.close() + + def _write_header(self): + json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro( + self.schema, + schema_name=self.schema_name + )) + header = AvroFileHeader( + magic = MAGIC, + meta = { + _SCHEMA_KEY: json_schema, + _CODEC_KEY: "null" + }, + sync = self.sync_bytes + ) + construct_writer(META_SCHEMA).write(self.encoder, header) + + def write_block(self, objects: List[D]) -> None: + + in_memory = io.BytesIO() + block_content_encoder = BinaryEncoder(output_stream = in_memory) + for obj in objects: + self.writer.write(block_content_encoder, obj) + block_content = in_memory.getvalue() + + self.encoder.write_int(len(objects)) + self.encoder.write_int(len(block_content)) + self.encoder.write(block_content) + self.encoder.write_bytes_fixed(self.sync_bytes) + + + diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py index 1840a04e4acc..b0ad80e313d7 100644 --- a/python/pyiceberg/avro/resolver.py +++ b/python/pyiceberg/avro/resolver.py @@ -59,6 +59,7 @@ IntegerWriter, ListWriter, MapWriter, + OptionWriter, StringWriter, StructWriter, TimestamptzWriter, @@ -140,7 +141,10 @@ def struct(self, struct: StructType, field_results: List[Writer]) -> Writer: return StructWriter(tuple(field_results)) def field(self, field: NestedField, field_result: Writer) -> Writer: - return field_result + if field.required: + return field_result + else: + return OptionWriter(field_result) def list(self, list_type: ListType, element_result: Writer) -> Writer: return ListWriter(element_result) diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index e352bc06919e..5ba0f4fc5d7f 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -163,7 +163,7 @@ class StructWriter(Writer): field_writers: Tuple[Writer, ...] = dataclassfield() def write(self, encoder: BinaryEncoder, val: StructType) -> None: - for writer, value in zip(self.field_writers, val.fields): + for writer, value in zip(self.field_writers, val.fields()): writer.write(encoder, value) def __eq__(self, other: Any) -> bool: @@ -184,6 +184,8 @@ def write(self, encoder: BinaryEncoder, val: List[Any]) -> None: encoder.write_int(len(val)) for v in val: self.element_writer.write(encoder, v) + if len(val) > 0: + encoder.write_int(0) @dataclass(frozen=True) @@ -193,6 +195,8 @@ class MapWriter(Writer): def write(self, encoder: BinaryEncoder, val: Dict[Any, Any]) -> None: encoder.write_int(len(val)) - for k, v in val: + for k, v in val.items(): self.key_writer.write(encoder, k) self.value_writer.write(encoder, v) + if len(val) > 0: + encoder.write_int(0) diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py index 4c36390171a3..e16e4ede4d1a 100644 --- a/python/pyiceberg/catalog/rest.py +++ b/python/pyiceberg/catalog/rest.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. from json import JSONDecodeError +from enum import Enum from typing import ( Any, Dict, diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py index 9a2b3eb53303..43e1cc86ac78 100644 --- a/python/pyiceberg/manifest.py +++ b/python/pyiceberg/manifest.py @@ -172,7 +172,7 @@ class DataFile(Record): nan_value_counts: Dict[int, int] lower_bounds: Dict[int, bytes] upper_bounds: Dict[int, bytes] - key_metadata: Optional[bytes] + key_metadata: Optional[bytes] split_offsets: Optional[List[int]] equality_ids: Optional[List[int]] sort_order_id: Optional[int] diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py index df133416e81d..251eed3ff91b 100644 --- a/python/pyiceberg/typedef.py +++ b/python/pyiceberg/typedef.py @@ -24,6 +24,7 @@ Any, Callable, Dict, + List, Optional, Protocol, Set, @@ -155,3 +156,6 @@ def __eq__(self, other: Any) -> bool: def __repr__(self) -> str: return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" + + def fields(self) -> List[str]: + return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name.values()] diff --git a/python/pyiceberg/utils/schema_conversion.py b/python/pyiceberg/utils/schema_conversion.py index 239bd2ca9fa5..a7d4e5584657 100644 --- a/python/pyiceberg/utils/schema_conversion.py +++ b/python/pyiceberg/utils/schema_conversion.py @@ -77,7 +77,7 @@ class AvroSchemaConversion: - def avro_to_iceberg(self, avro_schema: dict[str, Any]) -> Schema: + def avro_to_iceberg(self, avro_schema: Dict[str, Any]) -> Schema: """Converts an Apache Avro into an Apache Iceberg schema equivalent. This expects to have field id's to be encoded in the Avro schema: @@ -215,7 +215,7 @@ def _convert_schema(self, avro_type: Union[str, Dict[str, Any]]) -> IcebergType: else: raise TypeError(f"Unknown type: {avro_type}") - def _convert_field(self, field: dict[str, Any]) -> NestedField: + def _convert_field(self, field: Dict[str, Any]) -> NestedField: """Converts an Avro field into an Iceberg equivalent field. Args: @@ -237,7 +237,7 @@ def _convert_field(self, field: dict[str, Any]) -> NestedField: doc=field.get("doc"), ) - def _convert_record_type(self, record_type: dict[str, Any]) -> StructType: + def _convert_record_type(self, record_type: Dict[str, Any]) -> StructType: """ Converts the fields from a record into an Iceberg struct. @@ -291,7 +291,7 @@ def _convert_record_type(self, record_type: dict[str, Any]) -> StructType: return StructType(*[self._convert_field(field) for field in record_type["fields"]]) - def _convert_array_type(self, array_type: dict[str, Any]) -> ListType: + def _convert_array_type(self, array_type: Dict[str, Any]) -> ListType: if "element-id" not in array_type: raise ValueError(f"Cannot convert array-type, missing element-id: {array_type}") @@ -303,7 +303,7 @@ def _convert_array_type(self, array_type: dict[str, Any]) -> ListType: element_required=element_required, ) - def _convert_map_type(self, map_type: dict[str, Any]) -> MapType: + def _convert_map_type(self, map_type: Dict[str, Any]) -> MapType: """Converts an avro map type into an Iceberg MapType. Args: @@ -340,7 +340,7 @@ def _convert_map_type(self, map_type: dict[str, Any]) -> MapType: value_required=value_required, ) - def _convert_logical_type(self, avro_logical_type: dict[str, Any]) -> IcebergType: + def _convert_logical_type(self, avro_logical_type: Dict[str, Any]) -> IcebergType: """Convert a schema with a logical type annotation into an IcebergType. For the decimal and map we need to fetch more keys from the dict, and for @@ -376,7 +376,7 @@ def _convert_logical_type(self, avro_logical_type: dict[str, Any]) -> IcebergTyp else: raise ValueError(f"Unknown logical/physical type combination: {avro_logical_type}") - def _convert_logical_decimal_type(self, avro_type: dict[str, Any]) -> DecimalType: + def _convert_logical_decimal_type(self, avro_type: Dict[str, Any]) -> DecimalType: """Converts an avro type to an Iceberg DecimalType. Args: @@ -403,7 +403,7 @@ def _convert_logical_decimal_type(self, avro_type: dict[str, Any]) -> DecimalTyp """ return DecimalType(precision=avro_type["precision"], scale=avro_type["scale"]) - def _convert_logical_map_type(self, avro_type: dict[str, Any]) -> MapType: + def _convert_logical_map_type(self, avro_type: Dict[str, Any]) -> MapType: """Converts an avro map type to an Iceberg MapType. In the case where a map hasn't a key as a type you can use a logical map to still encode this in Avro. @@ -455,7 +455,7 @@ def _convert_logical_map_type(self, avro_type: dict[str, Any]) -> MapType: value_required=value.required, ) - def _convert_fixed_type(self, avro_type: dict[str, Any]) -> FixedType: + def _convert_fixed_type(self, avro_type: Dict[str, Any]) -> FixedType: """ Converts Avro Type to the equivalent Iceberg fixed type. From cdaa19db8c5cc095c615f72954f49f5fe8e1c8ca Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Wed, 21 Jun 2023 15:20:16 -0300 Subject: [PATCH 3/8] Add tests for the avro readers and writers This commit also fixes some small bugs uncovered by the new tests --- python/pyiceberg/avro/encoder.py | 11 +- python/pyiceberg/avro/resolver.py | 2 +- python/pyiceberg/schema.py | 8 +- python/tests/avro/test_encoder.py | 243 +++++++++++++++++++ python/tests/avro/test_file.py | 133 +++++++++- python/tests/avro/test_writer.py | 220 +++++++++++++++++ python/tests/utils/test_schema_conversion.py | 10 +- 7 files changed, 613 insertions(+), 14 deletions(-) create mode 100644 python/tests/avro/test_encoder.py create mode 100644 python/tests/avro/test_writer.py diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index c77f9dc24fd5..622de52e6844 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -20,8 +20,13 @@ from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import OutputStream -from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros +def _time_object_to_micros(t: time): + return int(t.hour * 60 * 60 * 1e6 +\ + t.minute * 60 * 1e6 +\ + t.second * 1e6 +\ + t.microsecond) class BinaryEncoder: """Write leaf values.""" @@ -159,14 +164,14 @@ def write_time_millis_int(self, dt: time) -> None: Encode python time object as int. It stores the number of milliseconds from midnight, 00:00:00.000 """ - self.write_int(int(time_to_micros(dt) / 1000)) + self.write_int(int(_time_object_to_micros(dt) / 1000)) def write_time_micros_long(self, dt: time) -> None: """ Encode python time object as long. It stores the number of microseconds from midnight, 00:00:00.000000 """ - self.write_int(time_to_micros(dt)) + self.write_int(_time_object_to_micros(dt)) def write_timestamp_millis_long(self, dt: datetime) -> None: """ diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py index b0ad80e313d7..90f8d897937b 100644 --- a/python/pyiceberg/avro/resolver.py +++ b/python/pyiceberg/avro/resolver.py @@ -347,7 +347,7 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Rea def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader: return TimestampReader() - def visit_timestampz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader: + def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader: return TimestamptzReader() def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Reader: diff --git a/python/pyiceberg/schema.py b/python/pyiceberg/schema.py index 47d05eed9b0b..6d915e14563f 100644 --- a/python/pyiceberg/schema.py +++ b/python/pyiceberg/schema.py @@ -428,7 +428,7 @@ def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) -> elif isinstance(primitive, TimestampType): return self.visit_timestamp(primitive, primitive_partner) elif isinstance(primitive, TimestamptzType): - return self.visit_timestampz(primitive, primitive_partner) + return self.visit_timestamptz(primitive, primitive_partner) elif isinstance(primitive, StringType): return self.visit_string(primitive, primitive_partner) elif isinstance(primitive, UUIDType): @@ -477,7 +477,7 @@ def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) - """Visit a TimestampType.""" @abstractmethod - def visit_timestampz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T: + def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T: """Visit a TimestamptzType.""" @abstractmethod @@ -606,7 +606,7 @@ def primitive(self, primitive: PrimitiveType) -> T: elif isinstance(primitive, TimestampType): return self.visit_timestamp(primitive) elif isinstance(primitive, TimestamptzType): - return self.visit_timestampz(primitive) + return self.visit_timestamptz(primitive) elif isinstance(primitive, StringType): return self.visit_string(primitive) elif isinstance(primitive, UUIDType): @@ -657,7 +657,7 @@ def visit_timestamp(self, timestamp_type: TimestampType) -> T: """Visit a TimestampType.""" @abstractmethod - def visit_timestampz(self, timestamptz_type: TimestamptzType) -> T: + def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T: """Visit a TimestamptzType.""" @abstractmethod diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py new file mode 100644 index 000000000000..5bcd777a2cb5 --- /dev/null +++ b/python/tests/avro/test_encoder.py @@ -0,0 +1,243 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import io +import struct +from decimal import Decimal +import datetime + +from pyiceberg.avro.encoder import BinaryEncoder, _time_object_to_micros + +def zigzag_encode(datum): + result = [] + datum = (datum << 1) ^ (datum >> 63) + while (datum & ~0x7F) != 0: + result.append(struct.pack("B", (datum & 0x7F) | 0x80)) + datum >>= 7 + result.append(struct.pack("B", datum)) + return b''.join(result) + +def test_write() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = b'\x12\x34\x56' + + encoder.write(input) + + + assert output.getbuffer() == input + +def test_write_boolean() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = b'\x12\x34\x56' + + encoder.write_boolean(True) + encoder.write_boolean(False) + + assert output.getbuffer() == struct.pack('??', True, False) + + +def print_hex(bytes): + print(''.join('{:02x}'.format(x) for x in bytes)) + +def test_write_int() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + _1byte_input = 2 + _2byte_input = 7466 + _3byte_input = 523490 + _4byte_input = 86561570 + _5byte_input = 2510416930 + _6byte_input = 734929016866 + _7byte_input = 135081528772642 + _8byte_input = 35124861473277986 + + encoder.write_int(_1byte_input) + encoder.write_int(_2byte_input) + encoder.write_int(_3byte_input) + encoder.write_int(_4byte_input) + encoder.write_int(_5byte_input) + encoder.write_int(_6byte_input) + encoder.write_int(_7byte_input) + encoder.write_int(_8byte_input) + + buffer = output.getbuffer() + + assert buffer[ 0: 1] == zigzag_encode(_1byte_input) + assert buffer[ 1: 3] == zigzag_encode(_2byte_input) + assert buffer[ 3: 6] == zigzag_encode(_3byte_input) + assert buffer[ 6:10] == zigzag_encode(_4byte_input) + assert buffer[10:15] == zigzag_encode(_5byte_input) + assert buffer[15:21] == zigzag_encode(_6byte_input) + assert buffer[21:28] == zigzag_encode(_7byte_input) + assert buffer[28:36] == zigzag_encode(_8byte_input) + +def test_write_float() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = 3.14159265359 + + encoder.write_float(input) + + assert output.getbuffer() == struct.pack(' None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = 3.14159265359 + + encoder.write_double(input) + + assert output.getbuffer() == struct.pack(' None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = Decimal('3.14159265359') + + encoder.write_decimal_bytes(input) + + assert output.getbuffer() == b'\x0a\x49\x25\x59\xf6\x4f' + + +def test_write_decimal_fixed() -> None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = Decimal('3.14159265359') + + encoder.write_decimal_fixed(input, 8) + + print_hex(output.getbuffer()) + + assert output.getbuffer() == b'\x00\x00\x00\x49\x25\x59\xf6\x4f' + + +def test_write_bytes() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = b'\x12\x34\x56' + + encoder.write_bytes(input) + + assert output.getbuffer() == b''.join([zigzag_encode(len(input)), input]) + + +def test_write_bytes_fixed() -> None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = b'\x12\x34\x56' + + encoder.write_bytes_fixed(input) + + assert output.getbuffer() == input + + +def test_write_utf8() -> None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = "That, my liege, is how we know the Earth to be banana-shaped." + bin_input = input.encode() + + encoder.write_utf8(input) + + assert output.getbuffer() == b''.join([zigzag_encode(len(bin_input)), bin_input]) + + +def test_write_date_int() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = datetime.date(1970, 1, 2) + + days = (input - datetime.date(1970, 1, 1)).days + + encoder.write_date_int(input) + + assert output.getbuffer() == zigzag_encode(days) + + +def test_write_time_millis_int() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = datetime.time(1, 2, 3, 456000) + reference = int((1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000)/1000) + + encoder.write_time_millis_int(input) + + assert output.getbuffer() == zigzag_encode(reference) + + +def test_write_time_micros_long() -> None: + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = datetime.time(1, 2, 3, 456000) + reference = int(1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000) + + encoder.write_time_micros_long(input) + + assert output.getbuffer() == zigzag_encode(reference) + + +def test_write_timestamp_millis_long() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = datetime.datetime(2023, 1, 1, 1, 2, 3) + + millis = int((input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e3) + + encoder.write_timestamp_millis_long(input) + + assert output.getbuffer() == zigzag_encode(millis) + + +def test_write_timestamp_micros_long() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + input = datetime.datetime(2023, 1, 1, 1, 2, 3) + + micros = int((input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e6) + + encoder.write_timestamp_micros_long(input) + + assert output.getbuffer() == zigzag_encode(micros) diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index cdb973a39541..bbfa51112776 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -18,7 +18,22 @@ from pyiceberg.avro.codecs import DeflateCodec from pyiceberg.avro.file import META_SCHEMA, AvroFileHeader - +from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, + MANIFEST_ENTRY_SCHEMA, +) +from pyiceberg.typedef import Record +import pyiceberg.avro.file as avro +from pyiceberg.io.pyarrow import PyArrowFileIO +from tempfile import TemporaryDirectory +from enum import Enum +from fastavro import writer, reader, parse_schema +from pyiceberg.utils.schema_conversion import AvroSchemaConversion def get_deflate_compressor() -> None: header = AvroFileHeader(struct=META_SCHEMA) @@ -58,3 +73,119 @@ def test_missing_schema() -> None: header.get_schema() assert "No schema found in Avro file headers" in str(exc_info.value) + + + +# helper function to serialize our objects to dicts to enable +# direct comparison with the dicts returned by fastavro +def todict(obj): + if isinstance(obj, dict): + data = [] + for (k, v) in obj.items(): + data.append({"key":k, "value": v}) + return data + elif isinstance(obj, Enum): + return obj.value + elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes): + return [todict(v) for v in obj] + elif hasattr(obj, "__dict__"): + data = dict([(key, todict(value)) + for key, value in obj.__dict__.items() + if not callable(value) and not key.startswith('_')]) + return data + else: + return obj + +def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None: + + data_file = DataFile( + content = DataFileContent.DATA, + file_path = "s3://some-path/some-file.parquet", + file_format = FileFormat.PARQUET, + partition = Record(), + record_count = 131327, + file_size_in_bytes = 220669226, + column_sizes = { 1: 220661854 }, + value_counts = { 1: 131327 }, + null_value_counts = { 1: 0 }, + nan_value_counts = {}, + lower_bounds = { 1: "aaaaaaaaaaaaaaaa".encode() }, + upper_bounds = { 1: "zzzzzzzzzzzzzzzz".encode() }, + key_metadata = b"\xde\xad\xbe\xef", + split_offsets = [ 4, 133697593 ], + equality_ids = [], + sort_order_id = 4, + spec_id = 3 + ) + entry = ManifestEntry( + status = ManifestEntryStatus.ADDED, + snapshot_id = 8638475580105682862, + data_sequence_number = 0, + file_sequence_number = 0, + data_file = data_file + ) + + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/manifest_entry.avro" + + with avro.AvroOutputFile(PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry") as out: + out.write_block([entry]) + + + schema = AvroSchemaConversion().iceberg_to_avro(MANIFEST_ENTRY_SCHEMA, schema_name="manifest_entry") + + with open(tmp_avro_file, 'rb') as fo: + r = reader(fo=fo, reader_schema=schema) + it = iter(r) + + fa_entry = next(it) + + assert todict(entry) == fa_entry + + +def test_write_manifest_entry_with_fastavro_read_with_iceberg() -> None: + + data_file = DataFile( + content = DataFileContent.DATA, + file_path = "s3://some-path/some-file.parquet", + file_format = FileFormat.PARQUET, + partition = Record(), + record_count = 131327, + file_size_in_bytes = 220669226, + column_sizes = { 1: 220661854 }, + value_counts = { 1: 131327 }, + null_value_counts = { 1: 0 }, + nan_value_counts = {}, + lower_bounds = { 1: "aaaaaaaaaaaaaaaa".encode() }, + upper_bounds = { 1: "zzzzzzzzzzzzzzzz".encode() }, + key_metadata = b"\xde\xad\xbe\xef", + split_offsets = [ 4, 133697593 ], + equality_ids = [], + sort_order_id = 4, + spec_id = 3 + ) + entry = ManifestEntry( + status = ManifestEntryStatus.ADDED, + snapshot_id = 8638475580105682862, + data_sequence_number = 0, + file_sequence_number = 0, + data_file = data_file + ) + + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/manifest_entry.avro" + + schema = AvroSchemaConversion().iceberg_to_avro(MANIFEST_ENTRY_SCHEMA, schema_name="manifest_entry") + + with open(tmp_avro_file, 'wb') as out: + writer(out, schema, [todict(entry)]) + + with avro.AvroFile[ManifestEntry]( + PyArrowFileIO().new_input(tmp_avro_file), + MANIFEST_ENTRY_SCHEMA, + {-1: ManifestEntry, 2: DataFile}, + ) as reader: + it = iter(reader) + avro_entry = next(it) + + assert entry == avro_entry \ No newline at end of file diff --git a/python/tests/avro/test_writer.py b/python/tests/avro/test_writer.py new file mode 100644 index 000000000000..6bc9d6e72d77 --- /dev/null +++ b/python/tests/avro/test_writer.py @@ -0,0 +1,220 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=protected-access + +import pytest + +from pyiceberg.avro.writer import ( + BinaryWriter, + BooleanWriter, + DateWriter, + DecimalWriter, + DoubleWriter, + FixedWriter, + FloatWriter, + IntegerWriter, + StringWriter, + StructWriter, + TimeWriter, + TimestampWriter, + TimestamptzWriter, + UUIDWriter, +) +from pyiceberg.avro.resolver import construct_writer +from pyiceberg.avro.encoder import BinaryEncoder +from pyiceberg.schema import Schema +from pyiceberg.typedef import Record +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, + IntegerType, + ListType, + LongType, + MapType, + NestedField, + PrimitiveType, + StringType, + StructType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, +) +import io +import struct +from typing import Dict, List + +def zigzag_encode(datum): + result = [] + datum = (datum << 1) ^ (datum >> 63) + while (datum & ~0x7F) != 0: + result.append(struct.pack("B", (datum & 0x7F) | 0x80)) + datum >>= 7 + result.append(struct.pack("B", datum)) + return b''.join(result) + + +def test_fixed_writer() -> None: + assert construct_writer(FixedType(22)) == FixedWriter(22) + + +def test_decimal_writer() -> None: + assert construct_writer(DecimalType(19, 25)) == DecimalWriter(19, 25) + + +def test_boolean_writer() -> None: + assert construct_writer(BooleanType()) == BooleanWriter() + + +def test_integer_writer() -> None: + assert construct_writer(IntegerType()) == IntegerWriter() + + +def test_long_writer() -> None: + assert construct_writer(LongType()) == IntegerWriter() + + +def test_float_writer() -> None: + assert construct_writer(FloatType()) == FloatWriter() + + +def test_double_writer() -> None: + assert construct_writer(DoubleType()) == DoubleWriter() + + +def test_date_writer() -> None: + assert construct_writer(DateType()) == DateWriter() + + +def test_time_writer() -> None: + assert construct_writer(TimeType()) == TimeWriter() + + +def test_timestamp_writer() -> None: + assert construct_writer(TimestampType()) == TimestampWriter() + + +def test_timestamptz_writer() -> None: + assert construct_writer(TimestamptzType()) == TimestamptzWriter() + + +def test_string_writer() -> None: + assert construct_writer(StringType()) == StringWriter() + + +def test_binary_writer() -> None: + assert construct_writer(BinaryType()) == BinaryWriter() + + +def test_unknown_type() -> None: + class UnknownType(PrimitiveType): + __root__ = "UnknownType" + + with pytest.raises(ValueError) as exc_info: + construct_writer(UnknownType()) + + assert "Unknown type:" in str(exc_info.value) + + +def test_uuid_writer() -> None: + assert construct_writer(UUIDType()) == UUIDWriter() + + +def test_write_simple_struct() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + schema = StructType( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "property", StringType(), required=True) + ) + + class MyStruct(Record): + id: int + property: str + + struct = MyStruct(id=12, property="awesome") + + enc_str = "awesome".encode() + + construct_writer(schema).write(encoder, struct) + + assert output.getbuffer() == b''.join([b'\x18', zigzag_encode(len(enc_str)), enc_str]) + + +def test_write_struct_with_dict() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + schema = StructType( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "properties", MapType(3, IntegerType(), 4, IntegerType()), required=True) + ) + + class MyStruct(Record): + id: int + properties: Dict[int,int] + + struct = MyStruct(id=12, properties={1:2, 3:4}) + + construct_writer(schema).write(encoder, struct) + + assert output.getbuffer() == b''.join([ + b'\x18', + zigzag_encode(len(struct.properties)), + zigzag_encode(1), + zigzag_encode(2), + zigzag_encode(3), + zigzag_encode(4), + b'\x00' + ]) + + +def test_write_struct_with_list() -> None: + + output = io.BytesIO() + encoder = BinaryEncoder(output) + + schema = StructType( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "properties", ListType(3, IntegerType()), required=True) + ) + + class MyStruct(Record): + id: int + properties: List[int] + + struct = MyStruct(id=12, properties=[1,2,3,4]) + + construct_writer(schema).write(encoder, struct) + + assert output.getbuffer() == b''.join([ + b'\x18', + zigzag_encode(len(struct.properties)), + zigzag_encode(1), + zigzag_encode(2), + zigzag_encode(3), + zigzag_encode(4), + b'\x00' + ]) diff --git a/python/tests/utils/test_schema_conversion.py b/python/tests/utils/test_schema_conversion.py index c9360da643d1..2c42c445e432 100644 --- a/python/tests/utils/test_schema_conversion.py +++ b/python/tests/utils/test_schema_conversion.py @@ -37,8 +37,8 @@ from pyiceberg.utils.schema_conversion import AvroSchemaConversion -def test_avro_to_iceberg(avro_schema_manifest_file: Dict[str, Any]) -> None: - iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file) +def test_avro_to_iceberg(avro_schema_manifest_file_v1: Dict[str, Any]) -> None: + iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file_v1) expected_iceberg_schema = Schema( NestedField( field_id=500, name="manifest_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" @@ -356,11 +356,11 @@ def test_logical_map_with_invalid_fields() -> None: assert "Invalid key-value pair schema:" in str(exc_info.value) -def test_iceberg_to_avro_manifest_list(avro_schema_manifest_file: Dict[str, Any]) -> None: +def test_iceberg_to_avro_manifest_list(avro_schema_manifest_file_v1: Dict[str, Any]) -> None: """Round trip the manifest list""" - iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file) + iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file_v1) avro_result = AvroSchemaConversion().iceberg_to_avro(iceberg_schema, schema_name="manifest_file") - assert avro_schema_manifest_file == avro_result + assert avro_schema_manifest_file_v1 == avro_result def test_iceberg_to_avro_manifest(avro_schema_manifest_entry: Dict[str, Any]) -> None: From 33e67a3c7983c3d13f687dadf74668123ecc3977 Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Wed, 21 Jun 2023 16:27:13 -0300 Subject: [PATCH 4/8] Appease pylint and pydocstyle --- python/pyiceberg/avro/encoder.py | 40 ++++++--------- python/pyiceberg/avro/file.py | 4 +- python/pyiceberg/avro/resolver.py | 10 ++-- python/pyiceberg/avro/writer.py | 8 +-- python/tests/avro/test_encoder.py | 81 +++++++++++++++---------------- python/tests/avro/test_file.py | 7 ++- python/tests/avro/test_writer.py | 18 +++---- 7 files changed, 76 insertions(+), 92 deletions(-) diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index 622de52e6844..f8640365efdf 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -43,7 +43,7 @@ def write_boolean(self, boolean: bool) -> None: """A boolean is written as a single byte whose value is either 0 (false) or 1 (true). Args: - boolean: The boolean to write + boolean: The boolean to write. """ self.write(bytearray([bool(boolean)])) @@ -56,21 +56,18 @@ def write_int(self, integer: int) -> None: self.write(bytearray([datum])) def write_float(self, f: float) -> None: - """ - A float is written as 4 bytes. - """ + """A float is written as 4 bytes.""" self.write(STRUCT_FLOAT.pack(f)) def write_double(self, f: float) -> None: - """ - A double is written as 8 bytes. - """ + """A double is written as 8 bytes.""" self.write(STRUCT_DOUBLE.pack(f)) def write_decimal_bytes(self, datum: decimal.Decimal) -> None: """ - Decimal in bytes are encoded as long. Since size of packed value in bytes for - signed long is 8, 8 bytes are written. + Decimal in bytes are encoded as long. + + Since size of packed value in bytes for signed long is 8, 8 bytes are written. """ sign, digits, _ = datum.as_tuple() @@ -93,9 +90,7 @@ def write_decimal_bytes(self, datum: decimal.Decimal) -> None: self.write(bytearray([bits_to_write & 0xFF])) def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None: - """ - Decimal in fixed are encoded as size of fixed bytes. - """ + """Decimal in fixed are encoded as size of fixed bytes.""" sign, digits, _ = datum.as_tuple() unscaled_datum = 0 @@ -132,36 +127,30 @@ def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None: self.write(bytearray([bits_to_write & 0xFF])) def write_bytes(self, b: bytes) -> None: - """ - Bytes are encoded as a long followed by that many bytes of data. - """ + """Bytes are encoded as a long followed by that many bytes of data.""" self.write_int(len(b)) self.write(struct.pack(f"{len(b)}s", b)) def write_bytes_fixed(self, b: bytes) -> None: - """ - Writes fixed number of bytes - """ + """Writes fixed number of bytes.""" self.write(struct.pack(f"{len(b)}s", b)) def write_utf8(self, s: str) -> None: - """ - A string is encoded as a long followed by - that many bytes of UTF-8 encoded character data. - """ + """A string is encoded as a long followed by that many bytes of UTF-8 encoded character data.""" self.write_bytes(s.encode("utf-8")) def write_date_int(self, d: date) -> None: """ Encode python date object as int. - It stores the number of days from - the unix epoch, 1 January 1970 (ISO calendar). + + It stores the number of days from the unix epoch, 1 January 1970 (ISO calendar). """ self.write_int(date_to_days(d)) def write_time_millis_int(self, dt: time) -> None: """ Encode python time object as int. + It stores the number of milliseconds from midnight, 00:00:00.000 """ self.write_int(int(_time_object_to_micros(dt) / 1000)) @@ -169,6 +158,7 @@ def write_time_millis_int(self, dt: time) -> None: def write_time_micros_long(self, dt: time) -> None: """ Encode python time object as long. + It stores the number of microseconds from midnight, 00:00:00.000000 """ self.write_int(_time_object_to_micros(dt)) @@ -176,6 +166,7 @@ def write_time_micros_long(self, dt: time) -> None: def write_timestamp_millis_long(self, dt: datetime) -> None: """ Encode python datetime object as long. + It stores the number of milliseconds from midnight of unix epoch, 1 January 1970. """ self.write_int(int(datetime_to_micros(dt) / 1000)) @@ -183,6 +174,7 @@ def write_timestamp_millis_long(self, dt: datetime) -> None: def write_timestamp_micros_long(self, dt: datetime) -> None: """ Encode python datetime object as long. + It stores the number of microseconds from midnight of unix epoch, 1 January 1970. """ self.write_int(datetime_to_micros(dt)) diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index 5ffb9c2f19ab..fe865809fe61 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -148,7 +148,7 @@ def __enter__(self) -> AvroFile[D]: """Generates a reader tree for the payload within an avro file. Returns: - A generator returning the AvroStructs + A generator returning the AvroStructs. """ self.input_stream = self.input_file.open(seekable=False) self.decoder = BinaryDecoder(self.input_stream) @@ -227,7 +227,7 @@ def __init__( def __enter__(self) -> AvroFile[D]: """ - Opens the file and writes the header + Opens the file and writes the header. Returns: The file object to write records to diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py index 90f8d897937b..578e5e0436a9 100644 --- a/python/pyiceberg/avro/resolver.py +++ b/python/pyiceberg/avro/resolver.py @@ -120,19 +120,19 @@ def construct_reader( def construct_writer(file_schema: Union[Schema, IcebergType]) -> Writer: - """Constructs a writer from a file schema + """Constructs a writer from a file schema. Args: - file_schema (Schema | IcebergType): The schema of the Avro file + file_schema (Schema | IcebergType): The schema of the Avro file. Raises: - NotImplementedError: If attempting to resolve an unrecognized object type + NotImplementedError: If attempting to resolve an unrecognized object type. """ return visit(file_schema, ConstructWriter()) class ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]): - """Constructs a writer tree from an Iceberg schema""" + """Constructs a writer tree from an Iceberg schema.""" def schema(self, schema: Schema, struct_result: Writer) -> Writer: return struct_result @@ -204,7 +204,7 @@ def resolve( """Resolves the file and read schema to produce a reader. Args: - file_schema (Schema | IcebergType): The schema of the Avro file + file_schema (Schema | IcebergType): The schema of the Avro file. read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema. read_types (Dict[int, Callable[..., StructProtocol]]): A dict of types to use for struct data. read_enums (Dict[int, Callable[..., Enum]]): A dict of fields that have to be converted to an enum. diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index 5ba0f4fc5d7f..49505fd194c8 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -15,13 +15,13 @@ # specific language governing permissions and limitations # under the License. """ -Classes for building the Reader tree +Classes for building the Reader tree. Constructing a reader tree from the schema makes it easy to decouple the reader implementation from the schema. The reader tree can be changed in such a way that the -read schema is different, while respecting the read schema +read schema is different, while respecting the read schema. """ from __future__ import annotations @@ -62,7 +62,7 @@ def write(self, encoder: BinaryEncoder, val: bool) -> None: class IntegerWriter(Writer): - """Longs and ints are encoded the same way, and there is no long in Python""" + """Longs and ints are encoded the same way, and there is no long in Python.""" def write(self, encoder: BinaryEncoder, val: int) -> None: encoder.write_int(val) @@ -128,7 +128,7 @@ def __repr__(self) -> str: class BinaryWriter(Writer): - """Variable byte length writer""" + """Variable byte length writer.""" def write(self, encoder: BinaryEncoder, val: Any) -> None: encoder.write_bytes(val) diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index 5bcd777a2cb5..64c853a506e3 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -21,7 +21,7 @@ from decimal import Decimal import datetime -from pyiceberg.avro.encoder import BinaryEncoder, _time_object_to_micros +from pyiceberg.avro.encoder import BinaryEncoder def zigzag_encode(datum): result = [] @@ -32,34 +32,30 @@ def zigzag_encode(datum): result.append(struct.pack("B", datum)) return b''.join(result) + def test_write() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = b'\x12\x34\x56' + _input = b'\x12\x34\x56' - encoder.write(input) + encoder.write(_input) + assert output.getbuffer() == _input - assert output.getbuffer() == input def test_write_boolean() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = b'\x12\x34\x56' - encoder.write_boolean(True) encoder.write_boolean(False) assert output.getbuffer() == struct.pack('??', True, False) -def print_hex(bytes): - print(''.join('{:02x}'.format(x) for x in bytes)) - def test_write_int() -> None: output = io.BytesIO() @@ -94,36 +90,37 @@ def test_write_int() -> None: assert buffer[21:28] == zigzag_encode(_7byte_input) assert buffer[28:36] == zigzag_encode(_8byte_input) + def test_write_float() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = 3.14159265359 + _input = 3.14159265359 - encoder.write_float(input) + encoder.write_float(_input) - assert output.getbuffer() == struct.pack(' None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = 3.14159265359 + _input = 3.14159265359 - encoder.write_double(input) + encoder.write_double(_input) - assert output.getbuffer() == struct.pack(' None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = Decimal('3.14159265359') + _input = Decimal('3.14159265359') - encoder.write_decimal_bytes(input) + encoder.write_decimal_bytes(_input) assert output.getbuffer() == b'\x0a\x49\x25\x59\xf6\x4f' @@ -132,11 +129,9 @@ def test_write_decimal_fixed() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = Decimal('3.14159265359') - - encoder.write_decimal_fixed(input, 8) + _input = Decimal('3.14159265359') - print_hex(output.getbuffer()) + encoder.write_decimal_fixed(_input, 8) assert output.getbuffer() == b'\x00\x00\x00\x49\x25\x59\xf6\x4f' @@ -146,32 +141,32 @@ def test_write_bytes() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = b'\x12\x34\x56' + _input = b'\x12\x34\x56' - encoder.write_bytes(input) + encoder.write_bytes(_input) - assert output.getbuffer() == b''.join([zigzag_encode(len(input)), input]) + assert output.getbuffer() == b''.join([zigzag_encode(len(_input)), _input]) def test_write_bytes_fixed() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = b'\x12\x34\x56' + _input = b'\x12\x34\x56' - encoder.write_bytes_fixed(input) + encoder.write_bytes_fixed(_input) - assert output.getbuffer() == input + assert output.getbuffer() == _input def test_write_utf8() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = "That, my liege, is how we know the Earth to be banana-shaped." - bin_input = input.encode() + _input = "That, my liege, is how we know the Earth to be banana-shaped." + bin_input = _input.encode() - encoder.write_utf8(input) + encoder.write_utf8(_input) assert output.getbuffer() == b''.join([zigzag_encode(len(bin_input)), bin_input]) @@ -181,11 +176,11 @@ def test_write_date_int() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = datetime.date(1970, 1, 2) + _input = datetime.date(1970, 1, 2) - days = (input - datetime.date(1970, 1, 1)).days + days = (_input - datetime.date(1970, 1, 1)).days - encoder.write_date_int(input) + encoder.write_date_int(_input) assert output.getbuffer() == zigzag_encode(days) @@ -195,10 +190,10 @@ def test_write_time_millis_int() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = datetime.time(1, 2, 3, 456000) + _input = datetime.time(1, 2, 3, 456000) reference = int((1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000)/1000) - encoder.write_time_millis_int(input) + encoder.write_time_millis_int(_input) assert output.getbuffer() == zigzag_encode(reference) @@ -207,10 +202,10 @@ def test_write_time_micros_long() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = datetime.time(1, 2, 3, 456000) + _input = datetime.time(1, 2, 3, 456000) reference = int(1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000) - encoder.write_time_micros_long(input) + encoder.write_time_micros_long(_input) assert output.getbuffer() == zigzag_encode(reference) @@ -220,11 +215,11 @@ def test_write_timestamp_millis_long() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = datetime.datetime(2023, 1, 1, 1, 2, 3) + _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - millis = int((input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e3) + millis = int((_input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e3) - encoder.write_timestamp_millis_long(input) + encoder.write_timestamp_millis_long(_input) assert output.getbuffer() == zigzag_encode(millis) @@ -234,10 +229,10 @@ def test_write_timestamp_micros_long() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - input = datetime.datetime(2023, 1, 1, 1, 2, 3) + _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - micros = int((input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e6) + micros = int((_input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e6) - encoder.write_timestamp_micros_long(input) + encoder.write_timestamp_micros_long(_input) assert output.getbuffer() == zigzag_encode(micros) diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index bbfa51112776..081e5fd54e21 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -24,7 +24,6 @@ FileFormat, ManifestEntry, ManifestEntryStatus, - ManifestFile, MANIFEST_ENTRY_SCHEMA, ) from pyiceberg.typedef import Record @@ -32,7 +31,7 @@ from pyiceberg.io.pyarrow import PyArrowFileIO from tempfile import TemporaryDirectory from enum import Enum -from fastavro import writer, reader, parse_schema +from fastavro import writer, reader from pyiceberg.utils.schema_conversion import AvroSchemaConversion def get_deflate_compressor() -> None: @@ -184,8 +183,8 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg() -> None: PyArrowFileIO().new_input(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, {-1: ManifestEntry, 2: DataFile}, - ) as reader: - it = iter(reader) + ) as avro_reader: + it = iter(avro_reader) avro_entry = next(it) assert entry == avro_entry \ No newline at end of file diff --git a/python/tests/avro/test_writer.py b/python/tests/avro/test_writer.py index 6bc9d6e72d77..e5207f946213 100644 --- a/python/tests/avro/test_writer.py +++ b/python/tests/avro/test_writer.py @@ -28,7 +28,6 @@ FloatWriter, IntegerWriter, StringWriter, - StructWriter, TimeWriter, TimestampWriter, TimestamptzWriter, @@ -36,7 +35,6 @@ ) from pyiceberg.avro.resolver import construct_writer from pyiceberg.avro.encoder import BinaryEncoder -from pyiceberg.schema import Schema from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, @@ -153,11 +151,11 @@ class MyStruct(Record): id: int property: str - struct = MyStruct(id=12, property="awesome") + my_struct = MyStruct(id=12, property="awesome") enc_str = "awesome".encode() - construct_writer(schema).write(encoder, struct) + construct_writer(schema).write(encoder, my_struct) assert output.getbuffer() == b''.join([b'\x18', zigzag_encode(len(enc_str)), enc_str]) @@ -176,13 +174,13 @@ class MyStruct(Record): id: int properties: Dict[int,int] - struct = MyStruct(id=12, properties={1:2, 3:4}) + my_struct = MyStruct(id=12, properties={1:2, 3:4}) - construct_writer(schema).write(encoder, struct) + construct_writer(schema).write(encoder, my_struct) assert output.getbuffer() == b''.join([ b'\x18', - zigzag_encode(len(struct.properties)), + zigzag_encode(len(my_struct.properties)), zigzag_encode(1), zigzag_encode(2), zigzag_encode(3), @@ -205,13 +203,13 @@ class MyStruct(Record): id: int properties: List[int] - struct = MyStruct(id=12, properties=[1,2,3,4]) + my_struct = MyStruct(id=12, properties=[1,2,3,4]) - construct_writer(schema).write(encoder, struct) + construct_writer(schema).write(encoder, my_struct) assert output.getbuffer() == b''.join([ b'\x18', - zigzag_encode(len(struct.properties)), + zigzag_encode(len(my_struct.properties)), zigzag_encode(1), zigzag_encode(2), zigzag_encode(3), From b797fe9ff4fde328af0a05c8feaef17ad8caf84c Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Wed, 21 Jun 2023 16:36:36 -0300 Subject: [PATCH 5/8] Appease pre-commit hooks --- python/pyiceberg/avro/encoder.py | 13 ++- python/pyiceberg/avro/file.py | 52 +++++------- python/pyiceberg/avro/writer.py | 2 +- python/pyiceberg/catalog/rest.py | 1 - python/pyiceberg/manifest.py | 2 +- python/pyiceberg/typedef.py | 4 +- python/tests/avro/test_encoder.py | 74 ++++++++--------- python/tests/avro/test_file.py | 133 +++++++++++++++--------------- python/tests/avro/test_writer.py | 80 +++++++++--------- 9 files changed, 170 insertions(+), 191 deletions(-) diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index f8640365efdf..a8c7156b3e68 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -22,11 +22,10 @@ from pyiceberg.io import OutputStream from pyiceberg.utils.datetime import date_to_days, datetime_to_micros -def _time_object_to_micros(t: time): - return int(t.hour * 60 * 60 * 1e6 +\ - t.minute * 60 * 1e6 +\ - t.second * 1e6 +\ - t.microsecond) + +def _time_object_to_micros(t: time) -> int: + return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) + class BinaryEncoder: """Write leaf values.""" @@ -66,7 +65,7 @@ def write_double(self, f: float) -> None: def write_decimal_bytes(self, datum: decimal.Decimal) -> None: """ Decimal in bytes are encoded as long. - + Since size of packed value in bytes for signed long is 8, 8 bytes are written. """ sign, digits, _ = datum.as_tuple() @@ -174,7 +173,7 @@ def write_timestamp_millis_long(self, dt: datetime) -> None: def write_timestamp_micros_long(self, dt: datetime) -> None: """ Encode python datetime object as long. - + It stores the number of microseconds from midnight of unix epoch, 1 January 1970. """ self.write_int(datetime_to_micros(dt)) diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index fe865809fe61..8bad8f69100a 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -18,7 +18,9 @@ """Avro reader for reading Avro files.""" from __future__ import annotations -import json, os, io +import io +import json +import os from dataclasses import dataclass from enum import Enum from types import TracebackType @@ -36,9 +38,14 @@ from pyiceberg.avro.decoder import BinaryDecoder from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.avro.reader import Reader +from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve from pyiceberg.avro.writer import Writer -from pyiceberg.avro.resolver import construct_reader, resolve, construct_writer -from pyiceberg.io import InputFile, InputStream, OutputStream, OutputFile +from pyiceberg.io import ( + InputFile, + InputStream, + OutputFile, + OutputStream, +) from pyiceberg.io.memory import MemoryInputStream from pyiceberg.schema import Schema from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol @@ -213,26 +220,21 @@ class AvroOutputFile(Generic[D]): sync_bytes: bytes writer: Writer - def __init__( - self, - output_file: OutputFile, - schema: Schema, - schema_name: str - ) -> None: + def __init__(self, output_file: OutputFile, schema: Schema, schema_name: str) -> None: self.output_file = output_file - self.schema = schema + self.schema = schema self.schema_name = schema_name - self.sync_bytes = os.urandom(SYNC_SIZE) - self.writer = construct_writer(self.schema) + self.sync_bytes = os.urandom(SYNC_SIZE) + self.writer = construct_writer(self.schema) - def __enter__(self) -> AvroFile[D]: + def __enter__(self) -> AvroOutputFile[D]: """ Opens the file and writes the header. Returns: The file object to write records to """ - self.output_stream = self.output_file.create(overwrite = True) + self.output_stream = self.output_file.create(overwrite=True) self.encoder = BinaryEncoder(self.output_stream) self._write_header() @@ -245,25 +247,14 @@ def __exit__( ) -> None: self.output_stream.close() - def _write_header(self): - json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro( - self.schema, - schema_name=self.schema_name - )) - header = AvroFileHeader( - magic = MAGIC, - meta = { - _SCHEMA_KEY: json_schema, - _CODEC_KEY: "null" - }, - sync = self.sync_bytes - ) + def _write_header(self) -> None: + json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.schema, schema_name=self.schema_name)) + header = AvroFileHeader(magic=MAGIC, meta={_SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}, sync=self.sync_bytes) construct_writer(META_SCHEMA).write(self.encoder, header) def write_block(self, objects: List[D]) -> None: - in_memory = io.BytesIO() - block_content_encoder = BinaryEncoder(output_stream = in_memory) + block_content_encoder = BinaryEncoder(output_stream=in_memory) for obj in objects: self.writer.write(block_content_encoder, obj) block_content = in_memory.getvalue() @@ -272,6 +263,3 @@ def write_block(self, objects: List[D]) -> None: self.encoder.write_int(len(block_content)) self.encoder.write(block_content) self.encoder.write_bytes_fixed(self.sync_bytes) - - - diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index 49505fd194c8..d6ddd50d0224 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -163,7 +163,7 @@ class StructWriter(Writer): field_writers: Tuple[Writer, ...] = dataclassfield() def write(self, encoder: BinaryEncoder, val: StructType) -> None: - for writer, value in zip(self.field_writers, val.fields()): + for writer, value in zip(self.field_writers, val.record_fields()): writer.write(encoder, value) def __eq__(self, other: Any) -> bool: diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py index e16e4ede4d1a..4c36390171a3 100644 --- a/python/pyiceberg/catalog/rest.py +++ b/python/pyiceberg/catalog/rest.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. from json import JSONDecodeError -from enum import Enum from typing import ( Any, Dict, diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py index 2d6b9a4496d5..50d16c0820b4 100644 --- a/python/pyiceberg/manifest.py +++ b/python/pyiceberg/manifest.py @@ -172,7 +172,7 @@ class DataFile(Record): nan_value_counts: Dict[int, int] lower_bounds: Dict[int, bytes] upper_bounds: Dict[int, bytes] - key_metadata: Optional[bytes] + key_metadata: Optional[bytes] split_offsets: Optional[List[int]] equality_ids: Optional[List[int]] sort_order_id: Optional[int] diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py index 251eed3ff91b..0cb01d6ed253 100644 --- a/python/pyiceberg/typedef.py +++ b/python/pyiceberg/typedef.py @@ -156,6 +156,6 @@ def __eq__(self, other: Any) -> bool: def __repr__(self) -> str: return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" - - def fields(self) -> List[str]: + + def record_fields(self) -> List[str]: return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name.values()] diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index 64c853a506e3..aa5505cef363 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -16,48 +16,46 @@ # under the License. from __future__ import annotations +import datetime import io import struct from decimal import Decimal -import datetime from pyiceberg.avro.encoder import BinaryEncoder -def zigzag_encode(datum): + +def zigzag_encode(datum: int) -> bytes: result = [] datum = (datum << 1) ^ (datum >> 63) while (datum & ~0x7F) != 0: result.append(struct.pack("B", (datum & 0x7F) | 0x80)) datum >>= 7 result.append(struct.pack("B", datum)) - return b''.join(result) + return b"".join(result) def test_write() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) - _input = b'\x12\x34\x56' + _input = b"\x12\x34\x56" encoder.write(_input) - + assert output.getbuffer() == _input def test_write_boolean() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) encoder.write_boolean(True) encoder.write_boolean(False) - assert output.getbuffer() == struct.pack('??', True, False) + assert output.getbuffer() == struct.pack("??", True, False) def test_write_int() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) @@ -81,10 +79,10 @@ def test_write_int() -> None: buffer = output.getbuffer() - assert buffer[ 0: 1] == zigzag_encode(_1byte_input) - assert buffer[ 1: 3] == zigzag_encode(_2byte_input) - assert buffer[ 3: 6] == zigzag_encode(_3byte_input) - assert buffer[ 6:10] == zigzag_encode(_4byte_input) + assert buffer[0:1] == zigzag_encode(_1byte_input) + assert buffer[1:3] == zigzag_encode(_2byte_input) + assert buffer[3:6] == zigzag_encode(_3byte_input) + assert buffer[6:10] == zigzag_encode(_4byte_input) assert buffer[10:15] == zigzag_encode(_5byte_input) assert buffer[15:21] == zigzag_encode(_6byte_input) assert buffer[21:28] == zigzag_encode(_7byte_input) @@ -92,7 +90,6 @@ def test_write_int() -> None: def test_write_float() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) @@ -100,7 +97,7 @@ def test_write_float() -> None: encoder.write_float(_input) - assert output.getbuffer() == struct.pack(' None: @@ -111,51 +108,50 @@ def test_write_double() -> None: encoder.write_double(_input) - assert output.getbuffer() == struct.pack(' None: output = io.BytesIO() encoder = BinaryEncoder(output) - _input = Decimal('3.14159265359') + _input = Decimal("3.14159265359") encoder.write_decimal_bytes(_input) - assert output.getbuffer() == b'\x0a\x49\x25\x59\xf6\x4f' + assert output.getbuffer() == b"\x0a\x49\x25\x59\xf6\x4f" def test_write_decimal_fixed() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - _input = Decimal('3.14159265359') + _input = Decimal("3.14159265359") encoder.write_decimal_fixed(_input, 8) - assert output.getbuffer() == b'\x00\x00\x00\x49\x25\x59\xf6\x4f' + assert output.getbuffer() == b"\x00\x00\x00\x49\x25\x59\xf6\x4f" def test_write_bytes() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) - _input = b'\x12\x34\x56' + _input = b"\x12\x34\x56" encoder.write_bytes(_input) - - assert output.getbuffer() == b''.join([zigzag_encode(len(_input)), _input]) + + assert output.getbuffer() == b"".join([zigzag_encode(len(_input)), _input]) def test_write_bytes_fixed() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) - _input = b'\x12\x34\x56' + _input = b"\x12\x34\x56" encoder.write_bytes_fixed(_input) - + assert output.getbuffer() == _input @@ -167,12 +163,11 @@ def test_write_utf8() -> None: bin_input = _input.encode() encoder.write_utf8(_input) - - assert output.getbuffer() == b''.join([zigzag_encode(len(bin_input)), bin_input]) + + assert output.getbuffer() == b"".join([zigzag_encode(len(bin_input)), bin_input]) def test_write_date_int() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) @@ -181,20 +176,19 @@ def test_write_date_int() -> None: days = (_input - datetime.date(1970, 1, 1)).days encoder.write_date_int(_input) - + assert output.getbuffer() == zigzag_encode(days) def test_write_time_millis_int() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) _input = datetime.time(1, 2, 3, 456000) - reference = int((1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000)/1000) + reference = int((1 * 60 * 60 * 1e6 + 2 * 60 * 1e6 + 3 * 1e6 + 456000) / 1000) encoder.write_time_millis_int(_input) - + assert output.getbuffer() == zigzag_encode(reference) @@ -203,36 +197,34 @@ def test_write_time_micros_long() -> None: encoder = BinaryEncoder(output) _input = datetime.time(1, 2, 3, 456000) - reference = int(1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000) + reference = int(1 * 60 * 60 * 1e6 + 2 * 60 * 1e6 + 3 * 1e6 + 456000) encoder.write_time_micros_long(_input) - + assert output.getbuffer() == zigzag_encode(reference) def test_write_timestamp_millis_long() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - millis = int((_input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e3) + millis = int((_input - datetime.datetime(1970, 1, 1, 0, 0, 0)).total_seconds() * 1e3) encoder.write_timestamp_millis_long(_input) - + assert output.getbuffer() == zigzag_encode(millis) def test_write_timestamp_micros_long() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - micros = int((_input - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e6) + micros = int((_input - datetime.datetime(1970, 1, 1, 0, 0, 0)).total_seconds() * 1e6) encoder.write_timestamp_micros_long(_input) - + assert output.getbuffer() == zigzag_encode(micros) diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index 081e5fd54e21..d7d141c21b53 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -14,26 +14,29 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from enum import Enum +from tempfile import TemporaryDirectory +from typing import Any + import pytest +from fastavro import reader, writer +import pyiceberg.avro.file as avro from pyiceberg.avro.codecs import DeflateCodec from pyiceberg.avro.file import META_SCHEMA, AvroFileHeader +from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import ( + MANIFEST_ENTRY_SCHEMA, DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus, - MANIFEST_ENTRY_SCHEMA, ) from pyiceberg.typedef import Record -import pyiceberg.avro.file as avro -from pyiceberg.io.pyarrow import PyArrowFileIO -from tempfile import TemporaryDirectory -from enum import Enum -from fastavro import writer, reader from pyiceberg.utils.schema_conversion import AvroSchemaConversion + def get_deflate_compressor() -> None: header = AvroFileHeader(struct=META_SCHEMA) header[0] = bytes(0) @@ -74,101 +77,97 @@ def test_missing_schema() -> None: assert "No schema found in Avro file headers" in str(exc_info.value) - # helper function to serialize our objects to dicts to enable # direct comparison with the dicts returned by fastavro -def todict(obj): +def todict(obj: Any) -> Any: if isinstance(obj, dict): data = [] - for (k, v) in obj.items(): - data.append({"key":k, "value": v}) + for k, v in obj.items(): + data.append({"key": k, "value": v}) return data elif isinstance(obj, Enum): return obj.value elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes): return [todict(v) for v in obj] elif hasattr(obj, "__dict__"): - data = dict([(key, todict(value)) - for key, value in obj.__dict__.items() - if not callable(value) and not key.startswith('_')]) - return data + return {key: todict(value) for key, value in obj.__dict__.items() if not callable(value) and not key.startswith("_")} else: return obj -def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None: +def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None: data_file = DataFile( - content = DataFileContent.DATA, - file_path = "s3://some-path/some-file.parquet", - file_format = FileFormat.PARQUET, - partition = Record(), - record_count = 131327, - file_size_in_bytes = 220669226, - column_sizes = { 1: 220661854 }, - value_counts = { 1: 131327 }, - null_value_counts = { 1: 0 }, - nan_value_counts = {}, - lower_bounds = { 1: "aaaaaaaaaaaaaaaa".encode() }, - upper_bounds = { 1: "zzzzzzzzzzzzzzzz".encode() }, - key_metadata = b"\xde\xad\xbe\xef", - split_offsets = [ 4, 133697593 ], - equality_ids = [], - sort_order_id = 4, - spec_id = 3 + content=DataFileContent.DATA, + file_path="s3://some-path/some-file.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=131327, + file_size_in_bytes=220669226, + column_sizes={1: 220661854}, + value_counts={1: 131327}, + null_value_counts={1: 0}, + nan_value_counts={}, + lower_bounds={1: b"aaaaaaaaaaaaaaaa"}, + upper_bounds={1: b"zzzzzzzzzzzzzzzz"}, + key_metadata=b"\xde\xad\xbe\xef", + split_offsets=[4, 133697593], + equality_ids=[], + sort_order_id=4, + spec_id=3, ) entry = ManifestEntry( - status = ManifestEntryStatus.ADDED, - snapshot_id = 8638475580105682862, - data_sequence_number = 0, - file_sequence_number = 0, - data_file = data_file + status=ManifestEntryStatus.ADDED, + snapshot_id=8638475580105682862, + data_sequence_number=0, + file_sequence_number=0, + data_file=data_file, ) with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/manifest_entry.avro" - with avro.AvroOutputFile(PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry") as out: + with avro.AvroOutputFile[ManifestEntry]( + PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry" + ) as out: out.write_block([entry]) - schema = AvroSchemaConversion().iceberg_to_avro(MANIFEST_ENTRY_SCHEMA, schema_name="manifest_entry") - with open(tmp_avro_file, 'rb') as fo: + with open(tmp_avro_file, "rb") as fo: r = reader(fo=fo, reader_schema=schema) it = iter(r) - + fa_entry = next(it) - + assert todict(entry) == fa_entry def test_write_manifest_entry_with_fastavro_read_with_iceberg() -> None: - data_file = DataFile( - content = DataFileContent.DATA, - file_path = "s3://some-path/some-file.parquet", - file_format = FileFormat.PARQUET, - partition = Record(), - record_count = 131327, - file_size_in_bytes = 220669226, - column_sizes = { 1: 220661854 }, - value_counts = { 1: 131327 }, - null_value_counts = { 1: 0 }, - nan_value_counts = {}, - lower_bounds = { 1: "aaaaaaaaaaaaaaaa".encode() }, - upper_bounds = { 1: "zzzzzzzzzzzzzzzz".encode() }, - key_metadata = b"\xde\xad\xbe\xef", - split_offsets = [ 4, 133697593 ], - equality_ids = [], - sort_order_id = 4, - spec_id = 3 + content=DataFileContent.DATA, + file_path="s3://some-path/some-file.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=131327, + file_size_in_bytes=220669226, + column_sizes={1: 220661854}, + value_counts={1: 131327}, + null_value_counts={1: 0}, + nan_value_counts={}, + lower_bounds={1: b"aaaaaaaaaaaaaaaa"}, + upper_bounds={1: b"zzzzzzzzzzzzzzzz"}, + key_metadata=b"\xde\xad\xbe\xef", + split_offsets=[4, 133697593], + equality_ids=[], + sort_order_id=4, + spec_id=3, ) entry = ManifestEntry( - status = ManifestEntryStatus.ADDED, - snapshot_id = 8638475580105682862, - data_sequence_number = 0, - file_sequence_number = 0, - data_file = data_file + status=ManifestEntryStatus.ADDED, + snapshot_id=8638475580105682862, + data_sequence_number=0, + file_sequence_number=0, + data_file=data_file, ) with TemporaryDirectory() as tmpdir: @@ -176,7 +175,7 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg() -> None: schema = AvroSchemaConversion().iceberg_to_avro(MANIFEST_ENTRY_SCHEMA, schema_name="manifest_entry") - with open(tmp_avro_file, 'wb') as out: + with open(tmp_avro_file, "wb") as out: writer(out, schema, [todict(entry)]) with avro.AvroFile[ManifestEntry]( @@ -187,4 +186,4 @@ def test_write_manifest_entry_with_fastavro_read_with_iceberg() -> None: it = iter(avro_reader) avro_entry = next(it) - assert entry == avro_entry \ No newline at end of file + assert entry == avro_entry diff --git a/python/tests/avro/test_writer.py b/python/tests/avro/test_writer.py index e5207f946213..c517a0cd1c4d 100644 --- a/python/tests/avro/test_writer.py +++ b/python/tests/avro/test_writer.py @@ -16,8 +16,14 @@ # under the License. # pylint:disable=protected-access +import io +import struct +from typing import Dict, List + import pytest +from pyiceberg.avro.encoder import BinaryEncoder +from pyiceberg.avro.resolver import construct_writer from pyiceberg.avro.writer import ( BinaryWriter, BooleanWriter, @@ -28,13 +34,11 @@ FloatWriter, IntegerWriter, StringWriter, - TimeWriter, - TimestampWriter, TimestamptzWriter, + TimestampWriter, + TimeWriter, UUIDWriter, ) -from pyiceberg.avro.resolver import construct_writer -from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, @@ -57,18 +61,16 @@ TimeType, UUIDType, ) -import io -import struct -from typing import Dict, List -def zigzag_encode(datum): + +def zigzag_encode(datum: int) -> bytes: result = [] datum = (datum << 1) ^ (datum >> 63) while (datum & ~0x7F) != 0: result.append(struct.pack("B", (datum & 0x7F) | 0x80)) datum >>= 7 result.append(struct.pack("B", datum)) - return b''.join(result) + return b"".join(result) def test_fixed_writer() -> None: @@ -138,13 +140,11 @@ def test_uuid_writer() -> None: def test_write_simple_struct() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) schema = StructType( - NestedField(1, "id", IntegerType(), required=True), - NestedField(2, "property", StringType(), required=True) + NestedField(1, "id", IntegerType(), required=True), NestedField(2, "property", StringType(), required=True) ) class MyStruct(Record): @@ -153,66 +153,68 @@ class MyStruct(Record): my_struct = MyStruct(id=12, property="awesome") - enc_str = "awesome".encode() + enc_str = b"awesome" construct_writer(schema).write(encoder, my_struct) - assert output.getbuffer() == b''.join([b'\x18', zigzag_encode(len(enc_str)), enc_str]) + assert output.getbuffer() == b"".join([b"\x18", zigzag_encode(len(enc_str)), enc_str]) def test_write_struct_with_dict() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) schema = StructType( NestedField(1, "id", IntegerType(), required=True), - NestedField(2, "properties", MapType(3, IntegerType(), 4, IntegerType()), required=True) + NestedField(2, "properties", MapType(3, IntegerType(), 4, IntegerType()), required=True), ) class MyStruct(Record): id: int - properties: Dict[int,int] + properties: Dict[int, int] - my_struct = MyStruct(id=12, properties={1:2, 3:4}) + my_struct = MyStruct(id=12, properties={1: 2, 3: 4}) construct_writer(schema).write(encoder, my_struct) - assert output.getbuffer() == b''.join([ - b'\x18', - zigzag_encode(len(my_struct.properties)), - zigzag_encode(1), - zigzag_encode(2), - zigzag_encode(3), - zigzag_encode(4), - b'\x00' - ]) + assert output.getbuffer() == b"".join( + [ + b"\x18", + zigzag_encode(len(my_struct.properties)), + zigzag_encode(1), + zigzag_encode(2), + zigzag_encode(3), + zigzag_encode(4), + b"\x00", + ] + ) def test_write_struct_with_list() -> None: - output = io.BytesIO() encoder = BinaryEncoder(output) schema = StructType( NestedField(1, "id", IntegerType(), required=True), - NestedField(2, "properties", ListType(3, IntegerType()), required=True) + NestedField(2, "properties", ListType(3, IntegerType()), required=True), ) class MyStruct(Record): id: int properties: List[int] - my_struct = MyStruct(id=12, properties=[1,2,3,4]) + my_struct = MyStruct(id=12, properties=[1, 2, 3, 4]) construct_writer(schema).write(encoder, my_struct) - assert output.getbuffer() == b''.join([ - b'\x18', - zigzag_encode(len(my_struct.properties)), - zigzag_encode(1), - zigzag_encode(2), - zigzag_encode(3), - zigzag_encode(4), - b'\x00' - ]) + assert output.getbuffer() == b"".join( + [ + b"\x18", + zigzag_encode(len(my_struct.properties)), + zigzag_encode(1), + zigzag_encode(2), + zigzag_encode(3), + zigzag_encode(4), + b"\x00", + ] + ) From 3a5efc5cd0560bbb3dd277ced0353ecdeea7835b Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 27 Jun 2023 14:07:25 -0300 Subject: [PATCH 6/8] Address PR review comments --- python/pyiceberg/avro/encoder.py | 11 ++----- python/pyiceberg/avro/resolver.py | 5 +-- python/pyiceberg/avro/writer.py | 2 +- python/pyiceberg/utils/datetime.py | 4 +++ python/tests/avro/test_encoder.py | 53 +++++++++--------------------- python/tests/avro/test_file.py | 4 +-- 6 files changed, 25 insertions(+), 54 deletions(-) diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index a8c7156b3e68..517b96537ae4 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -20,12 +20,7 @@ from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import OutputStream -from pyiceberg.utils.datetime import date_to_days, datetime_to_micros - - -def _time_object_to_micros(t: time) -> int: - return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) - +from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros class BinaryEncoder: """Write leaf values.""" @@ -152,7 +147,7 @@ def write_time_millis_int(self, dt: time) -> None: It stores the number of milliseconds from midnight, 00:00:00.000 """ - self.write_int(int(_time_object_to_micros(dt) / 1000)) + self.write_int(int(time_object_to_micros(dt) / 1000)) def write_time_micros_long(self, dt: time) -> None: """ @@ -160,7 +155,7 @@ def write_time_micros_long(self, dt: time) -> None: It stores the number of microseconds from midnight, 00:00:00.000000 """ - self.write_int(_time_object_to_micros(dt)) + self.write_int(time_object_to_micros(dt)) def write_timestamp_millis_long(self, dt: datetime) -> None: """ diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py index 578e5e0436a9..51505301ea01 100644 --- a/python/pyiceberg/avro/resolver.py +++ b/python/pyiceberg/avro/resolver.py @@ -141,10 +141,7 @@ def struct(self, struct: StructType, field_results: List[Writer]) -> Writer: return StructWriter(tuple(field_results)) def field(self, field: NestedField, field_result: Writer) -> Writer: - if field.required: - return field_result - else: - return OptionWriter(field_result) + return field_result if field.required else OptionWriter(field_result) def list(self, list_type: ListType, element_result: Writer) -> Writer: return ListWriter(element_result) diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index d6ddd50d0224..a533aff7c171 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -53,7 +53,7 @@ def __repr__(self) -> str: class NoneWriter(Writer): def write(self, _: BinaryEncoder, __: Any) -> None: - return None + pass class BooleanWriter(Writer): diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index 918a22ddbfb8..08d6f5ace36b 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -66,6 +66,10 @@ def time_to_micros(time_str: str) -> int: t = time.fromisoformat(time_str) return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond +def time_object_to_micros(t: time) -> int: + """Converts an datetime.time object to microseconds from midnight.""" + return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) + def datetime_to_micros(dt: datetime) -> int: """Converts a datetime to microseconds from 1970-01-01T00:00:00.000000.""" diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index aa5505cef363..e2798d5fe551 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -24,16 +24,6 @@ from pyiceberg.avro.encoder import BinaryEncoder -def zigzag_encode(datum: int) -> bytes: - result = [] - datum = (datum << 1) ^ (datum >> 63) - while (datum & ~0x7F) != 0: - result.append(struct.pack("B", (datum & 0x7F) | 0x80)) - datum >>= 7 - result.append(struct.pack("B", datum)) - return b"".join(result) - - def test_write() -> None: output = io.BytesIO() encoder = BinaryEncoder(output) @@ -79,14 +69,14 @@ def test_write_int() -> None: buffer = output.getbuffer() - assert buffer[0:1] == zigzag_encode(_1byte_input) - assert buffer[1:3] == zigzag_encode(_2byte_input) - assert buffer[3:6] == zigzag_encode(_3byte_input) - assert buffer[6:10] == zigzag_encode(_4byte_input) - assert buffer[10:15] == zigzag_encode(_5byte_input) - assert buffer[15:21] == zigzag_encode(_6byte_input) - assert buffer[21:28] == zigzag_encode(_7byte_input) - assert buffer[28:36] == zigzag_encode(_8byte_input) + assert buffer[0:1] == b'\x04' + assert buffer[1:3] == b'\xd4\x74' + assert buffer[3:6] == b'\xc4\xf3\x3f' + assert buffer[6:10] == b'\xc4\xcc\xc6\x52' + assert buffer[10:15] == b'\xc4\xb0\x8f\xda\x12' + assert buffer[15:21] == b'\xc4\xe0\xf6\xd2\xe3\x2a' + assert buffer[21:28] == b'\xc4\xa0\xce\xe8\xe3\xb6\x3d' + assert buffer[28:36] == b'\xc4\xa0\xb2\xae\x83\xf8\xe4\x7c' def test_write_float() -> None: @@ -141,7 +131,7 @@ def test_write_bytes() -> None: encoder.write_bytes(_input) - assert output.getbuffer() == b"".join([zigzag_encode(len(_input)), _input]) + assert output.getbuffer() == b"".join([b'\x06', _input]) def test_write_bytes_fixed() -> None: @@ -161,10 +151,9 @@ def test_write_utf8() -> None: _input = "That, my liege, is how we know the Earth to be banana-shaped." bin_input = _input.encode() - encoder.write_utf8(_input) - assert output.getbuffer() == b"".join([zigzag_encode(len(bin_input)), bin_input]) + assert output.getbuffer() == b"".join([b'\x7a', bin_input]) def test_write_date_int() -> None: @@ -172,12 +161,9 @@ def test_write_date_int() -> None: encoder = BinaryEncoder(output) _input = datetime.date(1970, 1, 2) - - days = (_input - datetime.date(1970, 1, 1)).days - encoder.write_date_int(_input) - assert output.getbuffer() == zigzag_encode(days) + assert output.getbuffer() == b'\x02' def test_write_time_millis_int() -> None: @@ -185,11 +171,9 @@ def test_write_time_millis_int() -> None: encoder = BinaryEncoder(output) _input = datetime.time(1, 2, 3, 456000) - reference = int((1 * 60 * 60 * 1e6 + 2 * 60 * 1e6 + 3 * 1e6 + 456000) / 1000) - encoder.write_time_millis_int(_input) - assert output.getbuffer() == zigzag_encode(reference) + assert output.getbuffer() == b'\x80\xc3\xc6\x03' def test_write_time_micros_long() -> None: @@ -197,11 +181,10 @@ def test_write_time_micros_long() -> None: encoder = BinaryEncoder(output) _input = datetime.time(1, 2, 3, 456000) - reference = int(1 * 60 * 60 * 1e6 + 2 * 60 * 1e6 + 3 * 1e6 + 456000) encoder.write_time_micros_long(_input) - assert output.getbuffer() == zigzag_encode(reference) + assert output.getbuffer() == b'\x80\xb8\xfb\xde\x1b' def test_write_timestamp_millis_long() -> None: @@ -209,12 +192,9 @@ def test_write_timestamp_millis_long() -> None: encoder = BinaryEncoder(output) _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - - millis = int((_input - datetime.datetime(1970, 1, 1, 0, 0, 0)).total_seconds() * 1e3) - encoder.write_timestamp_millis_long(_input) - assert output.getbuffer() == zigzag_encode(millis) + assert output.getbuffer() == b'\xf0\xdb\xcc\xad\xad\x61' def test_write_timestamp_micros_long() -> None: @@ -222,9 +202,6 @@ def test_write_timestamp_micros_long() -> None: encoder = BinaryEncoder(output) _input = datetime.datetime(2023, 1, 1, 1, 2, 3) - - micros = int((_input - datetime.datetime(1970, 1, 1, 0, 0, 0)).total_seconds() * 1e6) - encoder.write_timestamp_micros_long(_input) - assert output.getbuffer() == zigzag_encode(micros) + assert output.getbuffer() == b'\x80\xe3\xad\x9f\xac\xca\xf8\x05' diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index d7d141c21b53..092b5aae4ce7 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -131,10 +131,8 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None: ) as out: out.write_block([entry]) - schema = AvroSchemaConversion().iceberg_to_avro(MANIFEST_ENTRY_SCHEMA, schema_name="manifest_entry") - with open(tmp_avro_file, "rb") as fo: - r = reader(fo=fo, reader_schema=schema) + r = reader(fo=fo) it = iter(r) fa_entry = next(it) From bcd84061b6a6b5e51e45bcfc211922df0cd34081 Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 27 Jun 2023 14:17:50 -0300 Subject: [PATCH 7/8] Appease pre-commit hooks --- python/pyiceberg/avro/encoder.py | 1 + python/pyiceberg/avro/file.py | 1 + python/pyiceberg/avro/writer.py | 7 +++++++ python/pyiceberg/utils/datetime.py | 1 + python/tests/avro/test_encoder.py | 30 +++++++++++++++--------------- 5 files changed, 25 insertions(+), 15 deletions(-) diff --git a/python/pyiceberg/avro/encoder.py b/python/pyiceberg/avro/encoder.py index 517b96537ae4..cf6d60123357 100644 --- a/python/pyiceberg/avro/encoder.py +++ b/python/pyiceberg/avro/encoder.py @@ -22,6 +22,7 @@ from pyiceberg.io import OutputStream from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros + class BinaryEncoder: """Write leaf values.""" diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index c9975720ae4d..aad76b4fc4c9 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -250,6 +250,7 @@ def __enter__(self) -> AvroOutputFile[D]: def __exit__( self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] ) -> None: + """Performs cleanup when exiting the scope of a 'with' statement.""" self.output_stream.close() def _write_header(self) -> None: diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py index a533aff7c171..10a589715dcc 100644 --- a/python/pyiceberg/avro/writer.py +++ b/python/pyiceberg/avro/writer.py @@ -48,6 +48,7 @@ def write(self, encoder: BinaryEncoder, val: Any) -> Any: ... def __repr__(self) -> str: + """Returns string representation of this object.""" return f"{self.__class__.__name__}()" @@ -121,9 +122,11 @@ def write(self, encoder: BinaryEncoder, val: bytes) -> None: encoder.write(val) def __len__(self) -> int: + """Returns the length of this object.""" return self._len def __repr__(self) -> str: + """Returns string representation of this object.""" return f"FixedReader({self._len})" @@ -143,6 +146,7 @@ def write(self, encoder: BinaryEncoder, val: Any) -> None: return encoder.write_decimal_bytes(val) def __repr__(self) -> str: + """Returns string representation of this object.""" return f"DecimalReader({self.precision}, {self.scale})" @@ -167,12 +171,15 @@ def write(self, encoder: BinaryEncoder, val: StructType) -> None: writer.write(encoder, value) def __eq__(self, other: Any) -> bool: + """Implements the equality operator for this object.""" return self.field_writers == other.field_writers if isinstance(other, StructWriter) else False def __repr__(self) -> str: + """Returns string representation of this object.""" return f"StructReader({','.join(repr(field) for field in self.field_writers)})" def __hash__(self) -> int: + """Returns the hash of the writer as hash of this object.""" return hash(self.field_writers) diff --git a/python/pyiceberg/utils/datetime.py b/python/pyiceberg/utils/datetime.py index 08d6f5ace36b..fd9f6c677609 100644 --- a/python/pyiceberg/utils/datetime.py +++ b/python/pyiceberg/utils/datetime.py @@ -66,6 +66,7 @@ def time_to_micros(time_str: str) -> int: t = time.fromisoformat(time_str) return (((t.hour * 60 + t.minute) * 60) + t.second) * 1_000_000 + t.microsecond + def time_object_to_micros(t: time) -> int: """Converts an datetime.time object to microseconds from midnight.""" return int(t.hour * 60 * 60 * 1e6 + t.minute * 60 * 1e6 + t.second * 1e6 + t.microsecond) diff --git a/python/tests/avro/test_encoder.py b/python/tests/avro/test_encoder.py index e2798d5fe551..4646e65e6e61 100644 --- a/python/tests/avro/test_encoder.py +++ b/python/tests/avro/test_encoder.py @@ -69,14 +69,14 @@ def test_write_int() -> None: buffer = output.getbuffer() - assert buffer[0:1] == b'\x04' - assert buffer[1:3] == b'\xd4\x74' - assert buffer[3:6] == b'\xc4\xf3\x3f' - assert buffer[6:10] == b'\xc4\xcc\xc6\x52' - assert buffer[10:15] == b'\xc4\xb0\x8f\xda\x12' - assert buffer[15:21] == b'\xc4\xe0\xf6\xd2\xe3\x2a' - assert buffer[21:28] == b'\xc4\xa0\xce\xe8\xe3\xb6\x3d' - assert buffer[28:36] == b'\xc4\xa0\xb2\xae\x83\xf8\xe4\x7c' + assert buffer[0:1] == b"\x04" + assert buffer[1:3] == b"\xd4\x74" + assert buffer[3:6] == b"\xc4\xf3\x3f" + assert buffer[6:10] == b"\xc4\xcc\xc6\x52" + assert buffer[10:15] == b"\xc4\xb0\x8f\xda\x12" + assert buffer[15:21] == b"\xc4\xe0\xf6\xd2\xe3\x2a" + assert buffer[21:28] == b"\xc4\xa0\xce\xe8\xe3\xb6\x3d" + assert buffer[28:36] == b"\xc4\xa0\xb2\xae\x83\xf8\xe4\x7c" def test_write_float() -> None: @@ -131,7 +131,7 @@ def test_write_bytes() -> None: encoder.write_bytes(_input) - assert output.getbuffer() == b"".join([b'\x06', _input]) + assert output.getbuffer() == b"".join([b"\x06", _input]) def test_write_bytes_fixed() -> None: @@ -153,7 +153,7 @@ def test_write_utf8() -> None: bin_input = _input.encode() encoder.write_utf8(_input) - assert output.getbuffer() == b"".join([b'\x7a', bin_input]) + assert output.getbuffer() == b"".join([b"\x7a", bin_input]) def test_write_date_int() -> None: @@ -163,7 +163,7 @@ def test_write_date_int() -> None: _input = datetime.date(1970, 1, 2) encoder.write_date_int(_input) - assert output.getbuffer() == b'\x02' + assert output.getbuffer() == b"\x02" def test_write_time_millis_int() -> None: @@ -173,7 +173,7 @@ def test_write_time_millis_int() -> None: _input = datetime.time(1, 2, 3, 456000) encoder.write_time_millis_int(_input) - assert output.getbuffer() == b'\x80\xc3\xc6\x03' + assert output.getbuffer() == b"\x80\xc3\xc6\x03" def test_write_time_micros_long() -> None: @@ -184,7 +184,7 @@ def test_write_time_micros_long() -> None: encoder.write_time_micros_long(_input) - assert output.getbuffer() == b'\x80\xb8\xfb\xde\x1b' + assert output.getbuffer() == b"\x80\xb8\xfb\xde\x1b" def test_write_timestamp_millis_long() -> None: @@ -194,7 +194,7 @@ def test_write_timestamp_millis_long() -> None: _input = datetime.datetime(2023, 1, 1, 1, 2, 3) encoder.write_timestamp_millis_long(_input) - assert output.getbuffer() == b'\xf0\xdb\xcc\xad\xad\x61' + assert output.getbuffer() == b"\xf0\xdb\xcc\xad\xad\x61" def test_write_timestamp_micros_long() -> None: @@ -204,4 +204,4 @@ def test_write_timestamp_micros_long() -> None: _input = datetime.datetime(2023, 1, 1, 1, 2, 3) encoder.write_timestamp_micros_long(_input) - assert output.getbuffer() == b'\x80\xe3\xad\x9f\xac\xca\xf8\x05' + assert output.getbuffer() == b"\x80\xe3\xad\x9f\xac\xca\xf8\x05" From fd3a7d00383343ff9e0f6da23b40d90fc5ed8c7d Mon Sep 17 00:00:00 2001 From: Max de Bayser Date: Tue, 4 Jul 2023 10:51:52 -0300 Subject: [PATCH 8/8] Add additional metadata to avro output file --- python/pyiceberg/avro/file.py | 6 ++++-- python/tests/avro/test_file.py | 9 ++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index aad76b4fc4c9..10f7ef7d7de2 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -225,12 +225,13 @@ class AvroOutputFile(Generic[D]): sync_bytes: bytes writer: Writer - def __init__(self, output_file: OutputFile, schema: Schema, schema_name: str) -> None: + def __init__(self, output_file: OutputFile, schema: Schema, schema_name: str, metadata: Dict[str, str] = EMPTY_DICT) -> None: self.output_file = output_file self.schema = schema self.schema_name = schema_name self.sync_bytes = os.urandom(SYNC_SIZE) self.writer = construct_writer(self.schema) + self.metadata = metadata def __enter__(self) -> AvroOutputFile[D]: """ @@ -255,7 +256,8 @@ def __exit__( def _write_header(self) -> None: json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.schema, schema_name=self.schema_name)) - header = AvroFileHeader(magic=MAGIC, meta={_SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}, sync=self.sync_bytes) + meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"} + header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes) construct_writer(META_SCHEMA).write(self.encoder, header) def write_block(self, objects: List[D]) -> None: diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py index 092b5aae4ce7..53d0216ab07c 100644 --- a/python/tests/avro/test_file.py +++ b/python/tests/avro/test_file.py @@ -123,16 +123,23 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None: data_file=data_file, ) + additional_metadata = {"foo": "bar"} + with TemporaryDirectory() as tmpdir: tmp_avro_file = tmpdir + "/manifest_entry.avro" with avro.AvroOutputFile[ManifestEntry]( - PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry" + PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry", additional_metadata ) as out: out.write_block([entry]) with open(tmp_avro_file, "rb") as fo: r = reader(fo=fo) + + for k, v in additional_metadata.items(): + assert k in r.metadata + assert v == r.metadata[k] + it = iter(r) fa_entry = next(it)