Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions python/dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,45 @@
all_types_dataframe.writeTo("default.test_all_types").tableProperty("format-version", "2").partitionedBy(
"intCol"
).createOrReplace()

for table_name, partition in [
("test_partitioned_by_identity", "ts"),
("test_partitioned_by_years", "years(dt)"),
("test_partitioned_by_months", "months(dt)"),
("test_partitioned_by_days", "days(ts)"),
("test_partitioned_by_hours", "hours(ts)"),
("test_partitioned_by_truncate", "truncate(1, letter)"),
("test_partitioned_by_bucket", "bucket(16, number)"),
]:
spark.sql(
f"""
CREATE OR REPLACE TABLE default.{table_name} (
dt date,
ts timestamp,
number integer,
letter string
)
USING iceberg;
"""
)

spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD {partition}")

spark.sql(
f"""
INSERT INTO default.{table_name}
VALUES
(CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'),
(CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'),
(CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 3, 'c'),
(CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 4, 'd'),
(CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 5, 'e'),
(CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 6, 'f'),
(CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 7, 'g'),
(CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 8, 'h'),
(CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 9, 'i'),
(CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 10, 'j'),
(CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 11, 'k'),
(CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l');
"""
)
69 changes: 3 additions & 66 deletions python/pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import decimal
from abc import ABC, abstractmethod
from datetime import datetime, time
from io import SEEK_CUR
from typing import Dict, List
from uuid import UUID

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import InputStream
from pyiceberg.utils.datetime import micros_to_time, micros_to_timestamp, micros_to_timestamptz
from pyiceberg.utils.decimal import unscaled_to_decimal


class BinaryDecoder(ABC):
"""Read leaf values."""
"""Decodes bytes into Python physical primitives."""

@abstractmethod
def __init__(self, input_stream: InputStream) -> None:
Expand Down Expand Up @@ -106,77 +101,19 @@ def read_double(self) -> float:
"""
return float(STRUCT_DOUBLE.unpack(self.read(8))[0])

def read_decimal_from_bytes(self, precision: int, scale: int) -> decimal.Decimal:
"""Reads a value from the stream as a decimal.

Decimal bytes are decoded as signed short, int or long depending on the
size of bytes.
"""
size = self.read_int()
return self.read_decimal_from_fixed(precision, scale, size)

def read_decimal_from_fixed(self, _: int, scale: int, size: int) -> decimal.Decimal:
"""Reads a value from the stream as a decimal.

Decimal is encoded as fixed. Fixed instances are encoded using the
number of bytes declared in the schema.
"""
data = self.read(size)
unscaled_datum = int.from_bytes(data, byteorder="big", signed=True)
return unscaled_to_decimal(unscaled_datum, scale)

def read_bytes(self) -> bytes:
"""Bytes are encoded as a long followed by that many bytes of data."""
num_bytes = self.read_int()
return self.read(num_bytes) if num_bytes > 0 else b""

def read_utf8(self) -> str:
"""Reads a utf-8 encoded string from the stream.
"""Reads an utf-8 encoded string from the stream.

A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
"""
return self.read_bytes().decode("utf-8")

def read_uuid_from_fixed(self) -> UUID:
"""Reads a UUID as a fixed[16]."""
return UUID(bytes=self.read(16))

def read_time_millis(self) -> time:
"""Reads a milliseconds granularity time from the stream.

Int is decoded as python time object which represents
the number of milliseconds after midnight, 00:00:00.000.
"""
millis = self.read_int()
return micros_to_time(millis * 1000)

def read_time_micros(self) -> time:
"""Reads a microseconds granularity time from the stream.

Long is decoded as python time object which represents
the number of microseconds after midnight, 00:00:00.000000.
"""
return micros_to_time(self.read_int())

def read_timestamp_micros(self) -> datetime:
"""Reads a microsecond granularity timestamp from the stream.

Long is decoded as python datetime object which represents
the number of microseconds from the unix epoch, 1 January 1970.
"""
return micros_to_timestamp(self.read_int())

def read_timestamptz_micros(self) -> datetime:
"""Reads a microsecond granularity timestamptz from the stream.

Long is decoded as python datetime object which represents
the number of microseconds from the unix epoch, 1 January 1970.

Adjusted to UTC.
"""
return micros_to_timestamptz(self.read_int())

def skip_boolean(self) -> None:
self.skip(1)

Expand All @@ -199,7 +136,7 @@ def skip_utf8(self) -> None:


class StreamingBinaryDecoder(BinaryDecoder):
"""Read leaf values."""
"""Decodes bytes into Python physical primitives."""

__slots__ = "_input_stream"
_input_stream: InputStream
Expand Down
120 changes: 10 additions & 110 deletions python/pyiceberg/avro/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import decimal
import struct
from datetime import date, datetime, time
from uuid import UUID

from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import OutputStream
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_object_to_micros


class BinaryEncoder:
"""Write leaf values."""
"""Encodes Python physical types into bytes."""

_output_stream: OutputStream

Expand Down Expand Up @@ -58,118 +55,21 @@ def write_double(self, f: float) -> None:
"""A double is written as 8 bytes."""
self.write(STRUCT_DOUBLE.pack(f))

def write_decimal_bytes(self, datum: decimal.Decimal) -> None:
"""
Decimal in bytes are encoded as long.

Since size of packed value in bytes for signed long is 8, 8 bytes are written.
"""
sign, digits, _ = datum.as_tuple()

unscaled_datum = 0
for digit in digits:
unscaled_datum = (unscaled_datum * 10) + digit

bits_req = unscaled_datum.bit_length() + 1
if sign:
unscaled_datum = (1 << bits_req) - unscaled_datum

bytes_req = bits_req // 8
padding_bits = ~((1 << bits_req) - 1) if sign else 0
packed_bits = padding_bits | unscaled_datum

bytes_req += 1 if (bytes_req << 3) < bits_req else 0
self.write_int(bytes_req)
for index in range(bytes_req - 1, -1, -1):
bits_to_write = packed_bits >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))

def write_decimal_fixed(self, datum: decimal.Decimal, size: int) -> None:
"""Decimal in fixed are encoded as size of fixed bytes."""
sign, digits, _ = datum.as_tuple()

unscaled_datum = 0
for digit in digits:
unscaled_datum = (unscaled_datum * 10) + digit

bits_req = unscaled_datum.bit_length() + 1
size_in_bits = size * 8
offset_bits = size_in_bits - bits_req

mask = 2**size_in_bits - 1
bit = 1
for _ in range(bits_req):
mask ^= bit
bit <<= 1

if bits_req < 8:
bytes_req = 1
else:
bytes_req = bits_req // 8
if bits_req % 8 != 0:
bytes_req += 1
if sign:
unscaled_datum = (1 << bits_req) - unscaled_datum
unscaled_datum = mask | unscaled_datum
for index in range(size - 1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))
else:
for _ in range(offset_bits // 8):
self.write(b"\x00")
for index in range(bytes_req - 1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
self.write(bytearray([bits_to_write & 0xFF]))

def write_bytes(self, b: bytes) -> None:
"""Bytes are encoded as a long followed by that many bytes of data."""
self.write_int(len(b))
self.write(struct.pack(f"{len(b)}s", b))

def write_bytes_fixed(self, b: bytes) -> None:
"""Writes fixed number of bytes."""
self.write(struct.pack(f"{len(b)}s", b))
self.write(b)

def write_utf8(self, s: str) -> None:
"""A string is encoded as a long followed by that many bytes of UTF-8 encoded character data."""
self.write_bytes(s.encode("utf-8"))

def write_date_int(self, d: date) -> None:
"""
Encode python date object as int.

It stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).
"""
self.write_int(date_to_days(d))

def write_time_millis_int(self, dt: time) -> None:
"""
Encode python time object as int.

It stores the number of milliseconds from midnight, 00:00:00.000
"""
self.write_int(int(time_object_to_micros(dt) / 1000))

def write_time_micros_long(self, dt: time) -> None:
"""
Encode python time object as long.

It stores the number of microseconds from midnight, 00:00:00.000000
"""
self.write_int(time_object_to_micros(dt))

def write_timestamp_millis_long(self, dt: datetime) -> None:
"""
Encode python datetime object as long.

It stores the number of milliseconds from midnight of unix epoch, 1 January 1970.
"""
self.write_int(int(datetime_to_micros(dt) / 1000))

def write_timestamp_micros_long(self, dt: datetime) -> None:
"""
Encode python datetime object as long.
def write_uuid(self, uuid: UUID) -> None:
"""Write UUID as a fixed[16].

It stores the number of microseconds from midnight of unix epoch, 1 January 1970.
The uuid logical type represents a random generated universally unique identifier (UUID).
An uuid logical type annotates an Avro string. The string has to conform with RFC-4122.
"""
self.write_int(datetime_to_micros(dt))
if len(uuid.bytes) != 16:
raise ValueError(f"Expected UUID to have 16 bytes, got: len({uuid.bytes!r})")
return self.write(uuid.bytes)
2 changes: 1 addition & 1 deletion python/pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,4 @@ def write_block(self, objects: List[D]) -> None:
self.encoder.write_int(len(objects))
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)
self.encoder.write_bytes_fixed(self.sync_bytes)
self.encoder.write(self.sync_bytes)
Loading