Skip to content
40 changes: 30 additions & 10 deletions python/src/iceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import decimal
import struct
from datetime import date, datetime, time
from io import SEEK_CUR

from iceberg.io.base import InputStream
from iceberg.utils.datetime import (
Expand Down Expand Up @@ -56,6 +57,9 @@ def read(self, n: int) -> bytes:
raise ValueError(f"Read {len(read_bytes)} bytes, expected {n} bytes")
return read_bytes

def skip(self, n: int) -> None:
self._input_stream.seek(n, SEEK_CUR)

def read_boolean(self) -> bool:
"""
a boolean is written as a single byte
Expand All @@ -64,11 +68,7 @@ def read_boolean(self) -> bool:
return ord(self.read(1)) == 1

def read_int(self) -> int:
"""int values are written using variable-length, zigzag coding."""
return self.read_long()

def read_long(self) -> int:
"""long values are written using variable-length, zigzag coding."""
"""int/long values are written using variable-length, zigzag coding."""
b = ord(self.read(1))
n = b & 0x7F
shift = 7
Expand Down Expand Up @@ -100,7 +100,7 @@ def read_decimal_from_bytes(self, precision: int, scale: int) -> decimal.Decimal
Decimal bytes are decoded as signed short, int or long depending on the
size of bytes.
"""
size = self.read_long()
size = self.read_int()
return self.read_decimal_from_fixed(precision, scale, size)

def read_decimal_from_fixed(self, _: int, scale: int, size: int) -> decimal.Decimal:
Expand All @@ -116,7 +116,7 @@ def read_bytes(self) -> bytes:
"""
Bytes are encoded as a long followed by that many bytes of data.
"""
return self.read(self.read_long())
return self.read(self.read_int())

def read_utf8(self) -> str:
"""
Expand Down Expand Up @@ -146,14 +146,14 @@ def read_time_micros(self) -> time:
long is decoded as python time object which represents
the number of microseconds after midnight, 00:00:00.000000.
"""
return micros_to_time(self.read_long())
return micros_to_time(self.read_int())

def read_timestamp_micros(self) -> datetime:
"""
long is decoded as python datetime object which represents
the number of microseconds from the unix epoch, 1 January 1970.
"""
return micros_to_timestamp(self.read_long())
return micros_to_timestamp(self.read_int())

def read_timestamptz_micros(self):
"""
Expand All @@ -162,4 +162,24 @@ def read_timestamptz_micros(self):

Adjusted to UTC
"""
return micros_to_timestamptz(self.read_long())
return micros_to_timestamptz(self.read_int())

def skip_boolean(self) -> None:
self.skip(1)

def skip_int(self) -> None:
b = ord(self.read(1))
while (b & 0x80) != 0:
b = ord(self.read(1))

def skip_float(self) -> None:
self.skip(4)

def skip_double(self) -> None:
self.skip(8)

def skip_bytes(self) -> None:
self.skip(self.read_int())

def skip_utf8(self) -> None:
self.skip_bytes()
15 changes: 11 additions & 4 deletions python/src/iceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from iceberg.avro.codecs import KNOWN_CODECS, Codec
from iceberg.avro.decoder import BinaryDecoder
from iceberg.avro.reader import AvroStruct, ConstructReader, StructReader
from iceberg.avro.resolver import resolve
from iceberg.io.base import InputFile, InputStream
from iceberg.io.memory import MemoryInputStream
from iceberg.schema import Schema, visit
Expand Down Expand Up @@ -107,6 +108,7 @@ def __next__(self) -> AvroStruct:

class AvroFile:
input_file: InputFile
read_schema: Schema | None
input_stream: InputStream
header: AvroFileHeader
schema: Schema
Expand All @@ -116,8 +118,9 @@ class AvroFile:
decoder: BinaryDecoder
block: Block | None = None

def __init__(self, input_file: InputFile) -> None:
def __init__(self, input_file: InputFile, read_schema: Schema | None = None) -> None:
self.input_file = input_file
self.read_schema = read_schema

def __enter__(self):
"""
Expand All @@ -132,7 +135,11 @@ def __enter__(self):
self.header = self._read_header()
self.schema = self.header.get_schema()
self.file_length = len(self.input_file)
self.reader = visit(self.schema, ConstructReader())
if not self.read_schema:
self.reader = visit(self.schema, ConstructReader())
else:
self.reader = resolve(self.schema, self.read_schema)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -149,9 +156,9 @@ def _read_block(self) -> int:
raise ValueError(f"Expected sync bytes {self.header.sync!r}, but got {sync_marker!r}")
if self.is_EOF():
raise StopIteration
block_records = self.decoder.read_long()
block_records = self.decoder.read_int()

block_bytes_len = self.decoder.read_long()
block_bytes_len = self.decoder.read_int()
block_bytes = self.decoder.read(block_bytes_len)
if codec := self.header.compression_codec():
block_bytes = codec.decompress(block_bytes)
Expand Down
Loading