diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py index fbc013f50c20..443082dec854 100644 --- a/python/pyiceberg/manifest.py +++ b/python/pyiceberg/manifest.py @@ -14,31 +14,47 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +from __future__ import annotations + +import math +from abc import ABC, abstractmethod from enum import Enum +from types import TracebackType from typing import ( Any, Dict, Iterator, List, Optional, + Type, ) -from pyiceberg.avro.file import AvroFile -from pyiceberg.io import FileIO, InputFile +from pyiceberg import conversions +from pyiceberg.avro.file import AvroFile, AvroOutputFile +from pyiceberg.conversions import to_bytes +from pyiceberg.exceptions import ValidationError +from pyiceberg.io import FileIO, InputFile, OutputFile +from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, + IcebergType, IntegerType, ListType, LongType, MapType, NestedField, + PrimitiveType, StringType, StructType, ) +# TODO: Double-check what's its purpose in java +UNASSIGNED_SEQ = -1 + class DataFileContent(int, Enum): DATA = 0 @@ -78,89 +94,106 @@ def __repr__(self) -> str: """Returns the string representation of the FileFormat class.""" return f"FileFormat.{self.name}" - -DATA_FILE_TYPE = StructType( - NestedField( - field_id=134, - name="content", - field_type=IntegerType(), - required=False, - doc="Contents of the file: 0=data, 1=position deletes, 2=equality deletes", - initial_default=DataFileContent.DATA, - ), - NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), - NestedField( - field_id=101, name="file_format", field_type=StringType(), required=True, doc="File format name: avro, orc, or parquet" - ), - NestedField( - field_id=102, - name="partition", - field_type=StructType(), - required=True, - doc="Partition data tuple, schema based on the partition spec", - ), - NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), - NestedField(field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"), - NestedField( - field_id=108, - name="column_sizes", - field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()), - required=False, - doc="Map of column id to total size on disk", - ), - NestedField( - field_id=109, - name="value_counts", - field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()), - required=False, - doc="Map of column id to total count, including null and NaN", - ), - NestedField( - field_id=110, - name="null_value_counts", - field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()), - required=False, - doc="Map of column id to null value count", - ), - NestedField( - field_id=137, - name="nan_value_counts", - field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()), - required=False, - doc="Map of column id to number of NaN values in the column", - ), - NestedField( - field_id=125, - name="lower_bounds", - field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()), - required=False, - doc="Map of column id to lower bound", - ), - NestedField( - field_id=128, - name="upper_bounds", - field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()), - required=False, - doc="Map of column id to upper bound", - ), - NestedField(field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"), - NestedField( - field_id=132, - name="split_offsets", - field_type=ListType(element_id=133, element_type=LongType(), element_required=True), - required=False, - doc="Splittable offsets", - ), - NestedField( - field_id=135, - name="equality_ids", - field_type=ListType(element_id=136, element_type=LongType(), element_required=True), - required=False, - doc="Equality comparison field IDs", - ), - NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"), - NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"), -) + def add_extension(self, filename: str) -> str: + if filename.endswith(f".{self.name.lower()}"): + return filename + return f"{filename}.{self.name.lower()}" + + +def data_file_type(partition_type: StructType) -> StructType: + return StructType( + NestedField( + field_id=134, + name="content", + field_type=IntegerType(), + required=False, + doc="Contents of the file: 0=data, 1=position deletes, 2=equality deletes", + initial_default=DataFileContent.DATA, + ), + NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), + NestedField( + field_id=101, + name="file_format", + field_type=StringType(), + required=True, + doc="File format name: avro, orc, or parquet", + ), + NestedField( + field_id=102, + name="partition", + field_type=partition_type, + required=True, + doc="Partition data tuple, schema based on the partition spec", + ), + NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), + NestedField( + field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" + ), + NestedField( + field_id=108, + name="column_sizes", + field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()), + required=False, + doc="Map of column id to total size on disk", + ), + NestedField( + field_id=109, + name="value_counts", + field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()), + required=False, + doc="Map of column id to total count, including null and NaN", + ), + NestedField( + field_id=110, + name="null_value_counts", + field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()), + required=False, + doc="Map of column id to null value count", + ), + NestedField( + field_id=137, + name="nan_value_counts", + field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()), + required=False, + doc="Map of column id to number of NaN values in the column", + ), + NestedField( + field_id=125, + name="lower_bounds", + field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()), + required=False, + doc="Map of column id to lower bound", + ), + NestedField( + field_id=128, + name="upper_bounds", + field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()), + required=False, + doc="Map of column id to upper bound", + ), + NestedField( + field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" + ), + NestedField( + field_id=132, + name="split_offsets", + field_type=ListType(element_id=133, element_type=LongType(), element_required=True), + required=False, + doc="Splittable offsets", + ), + NestedField( + field_id=135, + name="equality_ids", + field_type=ListType(element_id=136, element_type=LongType(), element_required=True), + required=False, + doc="Equality comparison field IDs", + ), + NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"), + NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"), + ) + + +DATA_FILE_TYPE = data_file_type(StructType()) class DataFile(Record): @@ -204,13 +237,17 @@ def __eq__(self, other: Any) -> bool: return self.file_path == other.file_path if isinstance(other, DataFile) else False -MANIFEST_ENTRY_SCHEMA = Schema( - NestedField(0, "status", IntegerType(), required=True), - NestedField(1, "snapshot_id", LongType(), required=False), - NestedField(3, "data_sequence_number", LongType(), required=False), - NestedField(4, "file_sequence_number", LongType(), required=False), - NestedField(2, "data_file", DATA_FILE_TYPE, required=True), -) +def manifest_entry_schema(data_file: StructType) -> Schema: + return Schema( + NestedField(0, "status", IntegerType(), required=True), + NestedField(1, "snapshot_id", LongType(), required=False), + NestedField(3, "data_sequence_number", LongType(), required=False), + NestedField(4, "file_sequence_number", LongType(), required=False), + NestedField(2, "data_file", data_file, required=True), + ) + + +MANIFEST_ENTRY_SCHEMA = manifest_entry_schema(DATA_FILE_TYPE) class ManifestEntry(Record): @@ -242,6 +279,65 @@ def __init__(self, *data: Any, **named_data: Any) -> None: super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, **named_data}) +class PartitionFieldStats: + _type: PrimitiveType + _contains_null: bool + _contains_nan: bool + _min: Optional[Any] + _max: Optional[Any] + + def __init__(self, iceberg_type: IcebergType) -> None: + assert isinstance(iceberg_type, PrimitiveType), f"Expected a primitive type for the partition field, got {iceberg_type}" + self._type = iceberg_type + self._contains_null = False + self._contains_nan = False + self._min = None + self._max = None + + def to_summary(self) -> PartitionFieldSummary: + return PartitionFieldSummary( + contains_null=self._contains_null, + contains_nan=self._contains_nan, + lower_bound=to_bytes(self._type, self._min) if self._min is not None else None, + upper_bound=to_bytes(self._type, self._max) if self._max is not None else None, + ) + + def update(self, value: Any) -> PartitionFieldStats: + if value is None: + self._contains_null = True + elif math.isnan(value): + self._contains_nan = True + else: + if self._min is None: + self._min = value + self._max = value + # TODO: may need to implement a custom comparator for incompatible types + elif value < self._min: + self._min = value + elif value > self._max: + self._max = value + return self + + +class PartitionSummary: + _fields: List[PartitionFieldStats] + _types: List[IcebergType] + + def __init__(self, spec: PartitionSpec, schema: Schema): + self._types = [field.field_type for field in spec.partition_type(schema).fields] + self._fields = [PartitionFieldStats(field_type) for field_type in self._types] + + def summaries(self) -> List[PartitionFieldSummary]: + return [field.to_summary() for field in self._fields] + + def update(self, partition_keys: Record) -> PartitionSummary: + for i, field_type in enumerate(self._types): + assert isinstance(field_type, PrimitiveType), f"Expected a primitive type for the partition field, got {field_type}" + partition_key = partition_keys[i] + self._fields[i].update(conversions.partition_to_py(field_type, partition_key)) + return self + + MANIFEST_FILE_SCHEMA: Schema = Schema( NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), NestedField(501, "manifest_length", LongType(), required=True), @@ -363,3 +459,232 @@ def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> Ma entry.file_sequence_number = manifest.sequence_number return entry + + +class ManifestWriter(ABC): + closed: bool + _spec: PartitionSpec + _output_file: OutputFile + _writer: AvroOutputFile[ManifestEntry] + _snapshot_id: int + _meta: Dict[str, str] + _added_files: int + _added_rows: int + _existing_files: int + _existing_rows: int + _deleted_files: int + _deleted_rows: int + _min_data_sequence_number: Optional[int] + _partition_summary: PartitionSummary + + def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str]): + self.closed = False + self._spec = spec + self._output_file = output_file + self._snapshot_id = snapshot_id + self._meta = meta + + self._added_files = 0 + self._added_rows = 0 + self._existing_files = 0 + self._existing_rows = 0 + self._deleted_files = 0 + self._deleted_rows = 0 + self._min_data_sequence_number = None + self._partition_summary = PartitionSummary(spec, schema) + self._manifest_entry_schema = manifest_entry_schema(data_file_type(spec.partition_type(schema))) + + def __enter__(self) -> ManifestWriter: + """Opens the writer.""" + self._writer = AvroOutputFile[ManifestEntry](self._output_file, self._manifest_entry_schema, "manifest_entry", self._meta) + self._writer.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Closes the writer.""" + self.closed = True + self._writer.__exit__(exc_type, exc_value, traceback) + + @abstractmethod + def content(self) -> ManifestContent: + ... + + def to_manifest_file(self) -> ManifestFile: + """Returns the manifest file.""" + # once the manifest file is generated, no more entries can be added + self.closed = True + min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ + return ManifestFile( + manifest_path=self._output_file.location, + manifest_length=len(self._writer.output_file), + partition_spec_id=self._spec.spec_id, + content=self.content(), + sequence_number=UNASSIGNED_SEQ, + min_sequence_number=min_sequence_number, + added_snapshot_id=self._snapshot_id, + added_files_count=self._added_files, + existing_files_count=self._existing_files, + deleted_files_count=self._deleted_files, + added_rows_count=self._added_rows, + existing_rows_count=self._existing_rows, + deleted_rows_count=self._deleted_rows, + partitions=self._partition_summary.summaries(), + key_metadatas=None, + ) + + def add_entry(self, entry: ManifestEntry) -> ManifestWriter: + if self.closed: + raise RuntimeError("Cannot add entry to closed manifest writer") + if entry.status == ManifestEntryStatus.ADDED: + self._added_files += 1 + self._added_rows += entry.data_file.record_count + elif entry.status == ManifestEntryStatus.EXISTING: + self._existing_files += 1 + self._existing_rows += entry.data_file.record_count + elif entry.status == ManifestEntryStatus.DELETED: + self._deleted_files += 1 + self._deleted_rows += entry.data_file.record_count + + self._partition_summary.update(entry.data_file.partition) + + if ( + (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING) + and entry.data_sequence_number is not None + and (self._min_data_sequence_number is None or entry.data_sequence_number < self._min_data_sequence_number) + ): + self._min_data_sequence_number = entry.data_sequence_number + + self._writer.write_block([entry]) + return self + + +class ManifestWriterV1(ManifestWriter): + def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): + super().__init__( + spec, + schema, + output_file, + snapshot_id, + { + "schema": schema.json(), + "partition-spec": spec.json(), + "partition-spec-id": str(spec.spec_id), + "format-version": "1", + }, + ) + + def content(self) -> ManifestContent: + return ManifestContent.DATA + + +class ManifestWriterV2(ManifestWriter): + def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): + super().__init__( + spec, + schema, + output_file, + snapshot_id, + { + "schema": schema.json(), + "partition-spec": spec.json(), + "partition-spec-id": str(spec.spec_id), + "format-version": "2", + "content": "data", + }, + ) + + def content(self) -> ManifestContent: + return ManifestContent.DATA + + +def write_manifest( + format_version: int, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int +) -> ManifestWriter: + if format_version == 1: + return ManifestWriterV1(spec, schema, output_file, snapshot_id) + elif format_version == 2: + return ManifestWriterV2(spec, schema, output_file, snapshot_id) + else: + # TODO: replace it with UnsupportedOperationException + raise ValueError(f"Cannot write manifest for table version: {format_version}") + + +class ManifestListWriter(ABC): + _output_file: OutputFile + _meta: Dict[str, str] + _manifest_files: List[ManifestFile] + _writer: AvroOutputFile[ManifestFile] + + def __init__(self, output_file: OutputFile, meta: Dict[str, str]): + self._output_file = output_file + self._meta = meta + self._manifest_files = [] + + def __enter__(self) -> ManifestListWriter: + """Opens the writer for writing.""" + self._writer = AvroOutputFile[ManifestFile](self._output_file, MANIFEST_FILE_SCHEMA, "manifest_file", self._meta) + self._writer.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Closes the writer.""" + self._writer.__exit__(exc_type, exc_value, traceback) + return + + @abstractmethod + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + ... + + def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWriter: + self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) + return self + + +class ManifestListWriterV1(ManifestListWriter): + def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int): + super().__init__( + output_file, {"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"} + ) + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + if manifest_file.content != ManifestContent.DATA: + raise ValidationError("Cannot store delete manifests in a v1 table") + return manifest_file + + +class ManifestListWriterV2(ManifestListWriter): + def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int): + super().__init__( + output_file, + { + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id), + "sequence-number": str(sequence_number), + "format-version": "2", + }, + ) + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + return manifest_file + + +def write_manifest_list( + format_version: int, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int +) -> ManifestListWriter: + if format_version == 1: + return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id) + elif format_version == 2: + return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) + else: + # TODO: replace it with UnsupportedOperationException + raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/python/tests/utils/test_manifest.py b/python/tests/utils/test_manifest.py index 76a4a8a2b4d9..f4063835aa40 100644 --- a/python/tests/utils/test_manifest.py +++ b/python/tests/utils/test_manifest.py @@ -14,6 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +from tempfile import TemporaryDirectory +from typing import Dict + +import fastavro +import pytest from pyiceberg.io import load_file_io from pyiceberg.io.pyarrow import PyArrowFileIO @@ -26,9 +32,24 @@ ManifestFile, PartitionFieldSummary, read_manifest_list, + write_manifest, + write_manifest_list, ) +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table import Snapshot from pyiceberg.table.snapshots import Operation, Summary +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType, NestedField + + +def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, str]) -> None: + with open(avro_file, "rb") as f: + reader = fastavro.reader(f) + metadata = reader.metadata + for k, v in expected_metadata.items(): + assert k in metadata + assert metadata[k] == v def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: @@ -278,3 +299,241 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None: assert entry.file_sequence_number == 3 assert entry.snapshot_id == 8744736658442914487 assert entry.status == ManifestEntryStatus.ADDED + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_manifest(generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: int) -> None: + io = load_file_io() + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + demo_manifest_file = snapshot.manifests(io)[0] + manifest_entries = demo_manifest_file.fetch_manifest_entry(io) + test_schema = Schema( + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False) + ) + test_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"), + PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"), + spec_id=demo_manifest_file.partition_spec_id, + ) + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/test_write_manifest.avro" + output = io.new_output(tmp_avro_file) + with write_manifest( + format_version=format_version, + spec=test_spec, + schema=test_schema, + output_file=output, + snapshot_id=8744736658442914487, + ) as writer: + for entry in manifest_entries: + writer.add_entry(entry) + new_manifest = writer.to_manifest_file() + with pytest.raises(RuntimeError): + writer.add_entry(manifest_entries[0]) + + expected_metadata = { + "schema": test_schema.json(), + "partition-spec": test_spec.json(), + "partition-spec-id": str(test_spec.spec_id), + "format-version": str(format_version), + } + if format_version == 2: + expected_metadata["content"] = "data" + _verify_metadata_with_fastavro( + tmp_avro_file, + expected_metadata, + ) + new_manifest_entries = new_manifest.fetch_manifest_entry(io) + + manifest_entry = new_manifest_entries[0] + + assert manifest_entry.status == ManifestEntryStatus.ADDED + assert manifest_entry.snapshot_id == 8744736658442914487 + assert manifest_entry.data_sequence_number == 0 if format_version == 1 else 3 + assert isinstance(manifest_entry.data_file, DataFile) + + data_file = manifest_entry.data_file + + assert data_file.content is DataFileContent.DATA + assert ( + data_file.file_path + == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" + ) + assert data_file.file_format == FileFormat.PARQUET + assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]" + assert data_file.record_count == 19513 + assert data_file.file_size_in_bytes == 388872 + assert data_file.column_sizes == { + 1: 53, + 2: 98153, + 3: 98693, + 4: 53, + 5: 53, + 6: 53, + 7: 17425, + 8: 18528, + 9: 53, + 10: 44788, + 11: 35571, + 12: 53, + 13: 1243, + 14: 2355, + 15: 12750, + 16: 4029, + 17: 110, + 18: 47194, + 19: 2948, + } + assert data_file.value_counts == { + 1: 19513, + 2: 19513, + 3: 19513, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 19513, + 8: 19513, + 9: 19513, + 10: 19513, + 11: 19513, + 12: 19513, + 13: 19513, + 14: 19513, + 15: 19513, + 16: 19513, + 17: 19513, + 18: 19513, + 19: 19513, + } + assert data_file.null_value_counts == { + 1: 19513, + 2: 0, + 3: 0, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 0, + 8: 0, + 9: 19513, + 10: 0, + 11: 0, + 12: 19513, + 13: 0, + 14: 0, + 15: 0, + 16: 0, + 17: 0, + 18: 0, + 19: 0, + } + assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} + assert data_file.lower_bounds == { + 2: b"2020-04-01 00:00", + 3: b"2020-04-01 00:12", + 7: b"\x03\x00\x00\x00", + 8: b"\x01\x00\x00\x00", + 10: b"\xf6(\\\x8f\xc2\x05S\xc0", + 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", + 15: b")\\\x8f\xc2\xf5(\x08\xc0", + 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", + 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", + } + assert data_file.upper_bounds == { + 2: b"2020-04-30 23:5:", + 3: b"2020-05-01 00:41", + 7: b"\t\x01\x00\x00", + 8: b"\t\x01\x00\x00", + 10: b"\xcd\xcc\xcc\xcc\xcc,_@", + 11: b"\x1f\x85\xebQ\\\xe2\xfe@", + 13: b"\x00\x00\x00\x00\x00\x00\x12@", + 14: b"\x00\x00\x00\x00\x00\x00\xe0?", + 15: b"q=\n\xd7\xa3\xf01@", + 16: b"\x00\x00\x00\x00\x00`B@", + 17: b"333333\xd3?", + 18: b"\x00\x00\x00\x00\x00\x18b@", + 19: b"\x00\x00\x00\x00\x00\x00\x04@", + } + assert data_file.key_metadata is None + assert data_file.split_offsets == [4] + assert data_file.equality_ids is None + assert data_file.sort_order_id == 0 + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_manifest_list( + generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: int +) -> None: + io = load_file_io() + + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + demo_manifest_list = snapshot.manifests(io) + with TemporaryDirectory() as tmp_dir: + path = tmp_dir + "/manifest-list.avro" + output = io.new_output(path) + with write_manifest_list( + format_version=format_version, output_file=output, snapshot_id=25, parent_snapshot_id=19, sequence_number=0 + ) as writer: + writer.add_manifests(demo_manifest_list) + new_manifest_list = list(read_manifest_list(io.new_input(path))) + + expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "19", "format-version": str(format_version)} + if format_version == 2: + expected_metadata["sequence-number"] = "0" + _verify_metadata_with_fastavro(path, expected_metadata) + + manifest_file = new_manifest_list[0] + + assert manifest_file.manifest_length == 7989 + assert manifest_file.partition_spec_id == 0 + assert manifest_file.content == ManifestContent.DATA if format_version == 1 else ManifestContent.DELETES + assert manifest_file.sequence_number == 0 if format_version == 1 else 3 + assert manifest_file.min_sequence_number == 0 if format_version == 1 else 3 + assert manifest_file.added_snapshot_id == 9182715666859759686 + assert manifest_file.added_files_count == 3 + assert manifest_file.existing_files_count == 0 + assert manifest_file.deleted_files_count == 0 + assert manifest_file.added_rows_count == 237993 + assert manifest_file.existing_rows_count == 0 + assert manifest_file.deleted_rows_count == 0 + assert manifest_file.key_metadata is None + + assert isinstance(manifest_file.partitions, list) + + partition = manifest_file.partitions[0] + + assert isinstance(partition, PartitionFieldSummary) + + assert partition.contains_null is True + assert partition.contains_nan is False + assert partition.lower_bound == b"\x01\x00\x00\x00" + assert partition.upper_bound == b"\x02\x00\x00\x00" + + entries = manifest_file.fetch_manifest_entry(io) + + assert isinstance(entries, list) + + entry = entries[0] + + assert entry.data_sequence_number == 0 if format_version == 1 else 3 + assert entry.file_sequence_number == 0 if format_version == 1 else 3 + assert entry.snapshot_id == 8744736658442914487 + assert entry.status == ManifestEntryStatus.ADDED