Skip to content
4 changes: 4 additions & 0 deletions python/pyiceberg/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import struct

STRUCT_FLOAT = struct.Struct("<f") # little-endian float
STRUCT_DOUBLE = struct.Struct("<d") # little-endian double
8 changes: 1 addition & 7 deletions python/pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@
# specific language governing permissions and limitations
# under the License.
import decimal
import struct
from datetime import datetime, time
from io import SEEK_CUR
from typing import List
from uuid import UUID

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import InputStream
from pyiceberg.utils.datetime import micros_to_time, micros_to_timestamp, micros_to_timestamptz
from pyiceberg.utils.decimal import unscaled_to_decimal

STRUCT_FLOAT = struct.Struct("<f") # little-endian float
STRUCT_DOUBLE = struct.Struct("<d") # little-endian double
STRUCT_SIGNED_SHORT = struct.Struct(">h") # big-endian signed short
STRUCT_SIGNED_INT = struct.Struct(">i") # big-endian signed int
STRUCT_SIGNED_LONG = struct.Struct(">q") # big-endian signed long


class BinaryDecoder:
"""Read leaf values."""
Expand Down
175 changes: 175 additions & 0 deletions python/pyiceberg/avro/encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import decimal
import struct
from datetime import date, datetime, time

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import OutputStream
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros


class BinaryEncoder:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decoder has a UUID method that is missing here. Might be good to add it so that we have a round-trip test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great one, I'll add it to the fastavro roundtrip test as well.

"""Write leaf values."""

_output_stream: OutputStream

def __init__(self, output_stream: OutputStream) -> None:
self._output_stream = output_stream

def write(self, b: bytes) -> None:
self._output_stream.write(b)

def write_boolean(self, boolean: bool) -> None:
"""A boolean is written as a single byte whose value is either 0 (false) or 1 (true).

Args:
boolean: The boolean to write.
"""
self.write(bytearray([bool(boolean)]))

def write_int(self, integer: int) -> None:
"""Integer and long values are written using variable-length zig-zag coding."""
datum = (integer << 1) ^ (integer >> 63)
while (datum & ~0x7F) != 0:
self.write(bytearray([(datum & 0x7F) | 0x80]))
datum >>= 7
self.write(bytearray([datum]))

def write_float(self, f: float) -> None:
"""A float is written as 4 bytes."""
self.write(STRUCT_FLOAT.pack(f))

def write_double(self, f: float) -> None:
"""A double is written as 8 bytes."""
self.write(STRUCT_DOUBLE.pack(f))

def write_decimal_bytes(self, datum: decimal.Decimal) -> None:
"""
Decimal in bytes are encoded as long.

Since size of packed value in bytes for signed long is 8, 8 bytes are written.
"""
sign, digits, _ = datum.as_tuple()

unscaled_datum = 0
for digit in digits:
unscaled_datum = (unscaled_datum * 10) + digit

bits_req = unscaled_datum.bit_length() + 1
if sign:
unscaled_datum = (1 << bits_req) - unscaled_datum

bytes_req = bits_req // 8
padding_bits = ~((1 << bits_req) - 1) if sign else 0
packed_bits = padding_bits | unscaled_datum

bytes_req += 1 if (bytes_req << 3) < bits_req else 0
self.write_int(bytes_req)
for index in range(bytes_req - 1, -1, -1):
bits_to_write = packed_bits >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))

def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, if I remember correctly, we replaced these implementations with more native Python in the read path. We can probably do the same thing here for faster encoding and simpler code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, here it is:

        unscaled_datum = int.from_bytes(data, byteorder="big", signed=True)
        return unscaled_to_decimal(unscaled_datum, scale)

Maybe there's an encoder equivalent to int.from_bytes that we can use to simplify this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's much nicer indeed.

"""Decimal in fixed are encoded as size of fixed bytes."""
sign, digits, _ = datum.as_tuple()

unscaled_datum = 0
for digit in digits:
unscaled_datum = (unscaled_datum * 10) + digit

bits_req = unscaled_datum.bit_length() + 1
size_in_bits = size * 8
offset_bits = size_in_bits - bits_req

mask = 2**size_in_bits - 1
bit = 1
for _ in range(bits_req):
mask ^= bit
bit <<= 1

if bits_req < 8:
bytes_req = 1
else:
bytes_req = bits_req // 8
if bits_req % 8 != 0:
bytes_req += 1
if sign:
unscaled_datum = (1 << bits_req) - unscaled_datum
unscaled_datum = mask | unscaled_datum
for index in range(size - 1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))
else:
for _ in range(offset_bits // 8):
self.write(b"\x00")
for index in range(bytes_req - 1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))

def write_bytes(self, b: bytes) -> None:
"""Bytes are encoded as a long followed by that many bytes of data."""
self.write_int(len(b))
self.write(struct.pack(f"{len(b)}s", b))

def write_bytes_fixed(self, b: bytes) -> None:
"""Writes fixed number of bytes."""
self.write(struct.pack(f"{len(b)}s", b))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.write already accepts bytes. Why does this need to use struct.pack?


def write_utf8(self, s: str) -> None:
"""A string is encoded as a long followed by that many bytes of UTF-8 encoded character data."""
self.write_bytes(s.encode("utf-8"))

def write_date_int(self, d: date) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply write_date?

"""
Encode python date object as int.

It stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).
"""
self.write_int(date_to_days(d))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method needed? I thought our internal representation was already int and not datetime.date.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of it, and make it part of the write tree


def write_time_millis_int(self, dt: time) -> None:
"""
Encode python time object as int.

It stores the number of milliseconds from midnight, 00:00:00.000
"""
self.write_int(int(time_object_to_micros(dt) / 1000))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed. Iceberg doesn't allow writing millisecond precision timestamps.


def write_time_micros_long(self, dt: time) -> None:
"""
Encode python time object as long.

It stores the number of microseconds from midnight, 00:00:00.000000
"""
self.write_int(time_object_to_micros(dt))

def write_timestamp_millis_long(self, dt: datetime) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, no need for the millis.

"""
Encode python datetime object as long.

It stores the number of milliseconds from midnight of unix epoch, 1 January 1970.
"""
self.write_int(int(datetime_to_micros(dt) / 1000))

def write_timestamp_micros_long(self, dt: datetime) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no check here for datetime's zone. I think we need to validate that there is no zone. We may also need a write_timestamptz method. And can we rename this to write_timestamp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed these methods. The encoder should only accept the physical types. I've added the check to the write tree.

"""
Encode python datetime object as long.

It stores the number of microseconds from midnight of unix epoch, 1 January 1970.
"""
self.write_int(datetime_to_micros(dt))
73 changes: 70 additions & 3 deletions python/pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,34 @@
"""Avro reader for reading Avro files."""
from __future__ import annotations

import io
import json
import os
from dataclasses import dataclass
from enum import Enum
from types import TracebackType
from typing import (
Callable,
Dict,
Generic,
List,
Optional,
Type,
TypeVar,
)

from pyiceberg.avro.codecs import KNOWN_CODECS, Codec
from pyiceberg.avro.decoder import BinaryDecoder
from pyiceberg.avro.encoder import BinaryEncoder
from pyiceberg.avro.reader import Reader
from pyiceberg.avro.resolver import construct_reader, resolve
from pyiceberg.io import InputFile, InputStream
from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve
from pyiceberg.avro.writer import Writer
from pyiceberg.io import (
InputFile,
InputStream,
OutputFile,
OutputStream,
)
from pyiceberg.io.memory import MemoryInputStream
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol
Expand Down Expand Up @@ -147,7 +157,7 @@ def __enter__(self) -> AvroFile[D]:
"""Generates a reader tree for the payload within an avro file.

Returns:
A generator returning the AvroStructs
A generator returning the AvroStructs.
"""
self.input_stream = self.input_file.open(seekable=False)
self.decoder = BinaryDecoder(self.input_stream)
Expand Down Expand Up @@ -204,3 +214,60 @@ def __next__(self) -> D:

def _read_header(self) -> AvroFileHeader:
return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder)


class AvroOutputFile(Generic[D]):
output_file: OutputFile
output_stream: OutputStream
schema: Schema
schema_name: str
encoder: BinaryEncoder
sync_bytes: bytes
writer: Writer

def __init__(self, output_file: OutputFile, schema: Schema, schema_name: str, metadata: Dict[str, str] = EMPTY_DICT) -> None:
self.output_file = output_file
self.schema = schema
self.schema_name = schema_name
self.sync_bytes = os.urandom(SYNC_SIZE)
self.writer = construct_writer(self.schema)
self.metadata = metadata

def __enter__(self) -> AvroOutputFile[D]:
"""
Opens the file and writes the header.

Returns:
The file object to write records to
"""
self.output_stream = self.output_file.create(overwrite=True)
self.encoder = BinaryEncoder(self.output_stream)

self._write_header()
self.writer = construct_writer(self.schema)

return self

def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Performs cleanup when exiting the scope of a 'with' statement."""
self.output_stream.close()

def _write_header(self) -> None:
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.schema, schema_name=self.schema_name))
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}
header = AvroFileHeader(magic=MAGIC, meta=meta, sync=self.sync_bytes)
construct_writer(META_SCHEMA).write(self.encoder, header)

def write_block(self, objects: List[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)
for obj in objects:
self.writer.write(block_content_encoder, obj)
block_content = in_memory.getvalue()

self.encoder.write_int(len(objects))
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)
self.encoder.write_bytes_fixed(self.sync_bytes)
Loading