Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/pyiceberg/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import struct

STRUCT_FLOAT = struct.Struct("<f") # little-endian float
STRUCT_DOUBLE = struct.Struct("<d") # little-endian double
8 changes: 1 addition & 7 deletions python/pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@
# specific language governing permissions and limitations
# under the License.
import decimal
import struct
from datetime import datetime, time
from io import SEEK_CUR
from typing import List
from uuid import UUID

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import InputStream
from pyiceberg.utils.datetime import micros_to_time, micros_to_timestamp, micros_to_timestamptz
from pyiceberg.utils.decimal import unscaled_to_decimal

STRUCT_FLOAT = struct.Struct("<f") # little-endian float
STRUCT_DOUBLE = struct.Struct("<d") # little-endian double
STRUCT_SIGNED_SHORT = struct.Struct(">h") # big-endian signed short
STRUCT_SIGNED_INT = struct.Struct(">i") # big-endian signed int
STRUCT_SIGNED_LONG = struct.Struct(">q") # big-endian signed long


class BinaryDecoder:
"""Read leaf values."""
Expand Down
183 changes: 183 additions & 0 deletions python/pyiceberg/avro/encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import decimal
import struct
from datetime import date, datetime, time

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import OutputStream
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros


class BinaryEncoder:
"""Write leaf values."""

_output_stream: OutputStream

def __init__(self, output_stream: OutputStream) -> None:
self._output_stream = output_stream

def write(self, b: bytes) -> None:
self._output_stream.write(b)

def write_boolean(self, boolean: bool) -> None:
"""A boolean is written as a single byte whose value is either 0 (false) or 1 (true).

Args:
boolean: The boolean to write
"""
self.write(bytearray([bool(boolean)]))

def write_int(self, integer: int) -> None:
"""Integer and long values are written using variable-length zig-zag coding."""
datum = (integer << 1) ^ (integer >> 63)
while (datum & ~0x7F) != 0:
self.write(bytearray([(datum & 0x7F) | 0x80]))
datum >>= 7
self.write(bytearray([datum]))

def write_float(self, f: float) -> None:
"""
A float is written as 4 bytes.
"""
self.write(STRUCT_FLOAT.pack(f))

def write_double(self, f: float) -> None:
"""
A double is written as 8 bytes.
"""
self.write(STRUCT_DOUBLE.pack(f))

def write_decimal_bytes(self, datum: decimal.Decimal) -> None:
"""
Decimal in bytes are encoded as long. Since size of packed value in bytes for
signed long is 8, 8 bytes are written.
"""
sign, digits, _ = datum.as_tuple()

unscaled_datum = 0
for digit in digits:
unscaled_datum = (unscaled_datum * 10) + digit

bits_req = unscaled_datum.bit_length() + 1
if sign:
unscaled_datum = (1 << bits_req) - unscaled_datum

bytes_req = bits_req // 8
padding_bits = ~((1 << bits_req) - 1) if sign else 0
packed_bits = padding_bits | unscaled_datum

bytes_req += 1 if (bytes_req << 3) < bits_req else 0
self.write_int(bytes_req)
for index in range(bytes_req - 1, -1, -1):
bits_to_write = packed_bits >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))

def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None:
"""
Decimal in fixed are encoded as size of fixed bytes.
"""
sign, digits, _ = datum.as_tuple()

unscaled_datum = 0
for digit in digits:
unscaled_datum = (unscaled_datum * 10) + digit

bits_req = unscaled_datum.bit_length() + 1
size_in_bits = size * 8
offset_bits = size_in_bits - bits_req

mask = 2**size_in_bits - 1
bit = 1
for _ in range(bits_req):
mask ^= bit
bit <<= 1

if bits_req < 8:
bytes_req = 1
else:
bytes_req = bits_req // 8
if bits_req % 8 != 0:
bytes_req += 1
if sign:
unscaled_datum = (1 << bits_req) - unscaled_datum
unscaled_datum = mask | unscaled_datum
for index in range(size - 1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))
else:
for _ in range(offset_bits // 8):
self.write(b"\x00")
for index in range(bytes_req - 1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))

def write_bytes(self, b: bytes) -> None:
"""
Bytes are encoded as a long followed by that many bytes of data.
"""
self.write_int(len(b))
self.write(struct.pack(f"{len(b)}s", b))

def write_bytes_fixed(self, b: bytes) -> None:
"""
Writes fixed number of bytes
"""
self.write(struct.pack(f"{len(b)}s", b))

def write_utf8(self, s: str) -> None:
"""
A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
"""
self.write_bytes(s.encode("utf-8"))

def write_date_int(self, d: date) -> None:
"""
Encode python date object as int.
It stores the number of days from
the unix epoch, 1 January 1970 (ISO calendar).
"""
self.write_int(date_to_days(d))

def write_time_millis_int(self, dt: time) -> None:
"""
Encode python time object as int.
It stores the number of milliseconds from midnight, 00:00:00.000
"""
self.write_int(int(time_to_micros(dt) / 1000))

def write_time_micros_long(self, dt: time) -> None:
"""
Encode python time object as long.
It stores the number of microseconds from midnight, 00:00:00.000000
"""
self.write_int(time_to_micros(dt))

def write_timestamp_millis_long(self, dt: datetime) -> None:
"""
Encode python datetime object as long.
It stores the number of milliseconds from midnight of unix epoch, 1 January 1970.
"""
self.write_int(int(datetime_to_micros(dt) / 1000))

def write_timestamp_micros_long(self, dt: datetime) -> None:
"""
Encode python datetime object as long.
It stores the number of microseconds from midnight of unix epoch, 1 January 1970.
"""
self.write_int(datetime_to_micros(dt))
94 changes: 94 additions & 0 deletions python/pyiceberg/avro/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,33 @@
TimestamptzReader,
UUIDReader,
)
from pyiceberg.avro.writer import (
BinaryWriter,
BooleanWriter,
DateWriter,
DecimalWriter,
DoubleWriter,
FixedWriter,
FloatWriter,
IntegerWriter,
ListWriter,
MapWriter,
StringWriter,
StructWriter,
TimestamptzWriter,
TimestampWriter,
TimeWriter,
UUIDWriter,
Writer,
)
from pyiceberg.exceptions import ResolveError
from pyiceberg.schema import (
PartnerAccessor,
PrimitiveWithPartnerVisitor,
Schema,
SchemaVisitorPerPrimitiveType,
promote,
visit,
visit_with_partner,
)
from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol
Expand Down Expand Up @@ -92,6 +113,79 @@ def construct_reader(
return resolve(file_schema, file_schema, read_types)


def construct_writer(file_schema: Union[Schema, IcebergType]) -> Writer:
"""Constructs a writer from a file schema

Args:
file_schema (Schema | IcebergType): The schema of the Avro file

Raises:
NotImplementedError: If attempting to resolve an unrecognized object type
"""
return visit(file_schema, ConstructWriter())


class ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]):
"""Constructs a writer tree from an Iceberg schema"""

def schema(self, schema: Schema, struct_result: Writer) -> Writer:
return struct_result

def struct(self, struct: StructType, field_results: List[Writer]) -> Writer:
return StructWriter(tuple(field_results))

def field(self, field: NestedField, field_result: Writer) -> Writer:
return field_result

def list(self, list_type: ListType, element_result: Writer) -> Writer:
return ListWriter(element_result)

def map(self, map_type: MapType, key_result: Writer, value_result: Writer) -> Writer:
return MapWriter(key_result, value_result)

def visit_fixed(self, fixed_type: FixedType) -> Writer:
return FixedWriter(len(fixed_type))

def visit_decimal(self, decimal_type: DecimalType) -> Writer:
return DecimalWriter(decimal_type.precision, decimal_type.scale)

def visit_boolean(self, boolean_type: BooleanType) -> Writer:
return BooleanWriter()

def visit_integer(self, integer_type: IntegerType) -> Writer:
return IntegerWriter()

def visit_long(self, long_type: LongType) -> Writer:
return IntegerWriter()

def visit_float(self, float_type: FloatType) -> Writer:
return FloatWriter()

def visit_double(self, double_type: DoubleType) -> Writer:
return DoubleWriter()

def visit_date(self, date_type: DateType) -> Writer:
return DateWriter()

def visit_time(self, time_type: TimeType) -> Writer:
return TimeWriter()

def visit_timestamp(self, timestamp_type: TimestampType) -> Writer:
return TimestampWriter()

def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Writer:
return TimestamptzWriter()

def visit_string(self, string_type: StringType) -> Writer:
return StringWriter()

def visit_uuid(self, uuid_type: UUIDType) -> Writer:
return UUIDWriter()

def visit_binary(self, binary_type: BinaryType) -> Writer:
return BinaryWriter()


def resolve(
file_schema: Union[Schema, IcebergType],
read_schema: Union[Schema, IcebergType],
Expand Down
Loading