From b324403fe9e0a302a803a02f3b56d43f3edf20a9 Mon Sep 17 00:00:00 2001 From: samredai <43911210+samredai@users.noreply.github.com> Date: Mon, 23 May 2022 12:54:20 -0700 Subject: [PATCH 1/2] Add TableMetadata class --- python/src/iceberg/table/metadata.py | 227 ++++++++++++++++++++++ python/tests/conftest.py | 128 ++++++++++++- python/tests/io/test_io_base.py | 202 ++++++++------------ python/tests/table/test_metadata.py | 273 +++++++++++++++++++++++++++ 4 files changed, 707 insertions(+), 123 deletions(-) create mode 100644 python/src/iceberg/table/metadata.py create mode 100644 python/tests/table/test_metadata.py diff --git a/python/src/iceberg/table/metadata.py b/python/src/iceberg/table/metadata.py new file mode 100644 index 000000000000..caed369b477f --- /dev/null +++ b/python/src/iceberg/table/metadata.py @@ -0,0 +1,227 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import codecs +import json +from dataclasses import dataclass + +from iceberg.io.base import InputFile, OutputFile + + +@dataclass(frozen=True) +class TableMetadata: + """Metadata for an Iceberg table as specified in the Apache Iceberg + spec (https://iceberg.apache.org/spec/#iceberg-table-spec)""" + + format_version: int + """An integer version number for the format. Currently, this can be 1 or 2 + based on the spec. Implementations must throw an exception if a table’s + version is higher than the supported version.""" + + table_uuid: str + """A UUID that identifies the table, generated when the table is created. + Implementations must throw an exception if a table’s UUID does not match + the expected UUID after refreshing metadata.""" + + location: str + """The table’s base location. This is used by writers to determine where + to store data files, manifest files, and table metadata files.""" + + last_sequence_number: int + """The table’s highest assigned sequence number, a monotonically + increasing long that tracks the order of snapshots in a table.""" + + last_updated_ms: int + """Timestamp in milliseconds from the unix epoch when the table + was last updated. Each table metadata file should update this + field just before writing.""" + + last_column_id: int + """An integer; the highest assigned column ID for the table. + This is used to ensure columns are always assigned an unused ID + when evolving schemas.""" + + schema: dict + """The table’s current schema. (Deprecated: use schemas and + current-schema-id instead)""" + + schemas: list + """A list of schemas, stored as objects with schema-id.""" + + current_schema_id: int + """ID of the table’s current schema.""" + + partition_spec: dict + """The table’s current partition spec, stored as only fields. + Note that this is used by writers to partition data, but is + not used when reading because reads use the specs stored in + manifest files. (Deprecated: use partition-specs and default-spec-id + instead)""" + + partition_specs: list + """A list of partition specs, stored as full partition spec objects.""" + + default_spec_id: int + """ID of the “current” spec that writers should use by default.""" + + last_partition_id: int + """An integer; the highest assigned partition field ID across all + partition specs for the table. This is used to ensure partition fields + are always assigned an unused ID when evolving specs.""" + + properties: dict + """ A string to string map of table properties. This is used to + control settings that affect reading and writing and is not intended + to be used for arbitrary metadata. For example, commit.retry.num-retries + is used to control the number of commit retries.""" + + current_snapshot_id: int + """ID of the current table snapshot.""" + + snapshots: list + """A list of valid snapshots. Valid snapshots are snapshots for which + all data files exist in the file system. A data file must not be + deleted from the file system until the last snapshot in which it was + listed is garbage collected.""" + + snapshot_log: list + """A list (optional) of timestamp and snapshot ID pairs that encodes + changes to the current snapshot for the table. Each time the + current-snapshot-id is changed, a new entry should be added with the + last-updated-ms and the new current-snapshot-id. When snapshots are + expired from the list of valid snapshots, all entries before a snapshot + that has expired should be removed.""" + + metadata_log: list + """A list (optional) of timestamp and metadata file location pairs that + encodes changes to the previous metadata files for the table. Each time + a new metadata file is created, a new entry of the previous metadata + file location should be added to the list. Tables can be configured to + remove oldest metadata log entries and keep a fixed-size log of the most + recent entries after a commit.""" + + sort_orders: list + """A list of sort orders, stored as full sort order objects.""" + + default_sort_order_id: int + """Default sort order id of the table. Note that this could be used by + writers, but is not used when reading because reads use the specs stored + in manifest files.""" + + @classmethod + def from_byte_stream(cls, byte_stream, encoding: str = "utf-8") -> "TableMetadata": + """Instantiate a TableMetadata object from a byte stream + + Args: + byte_stream: A file-like byte stream object + encoding (default "utf-8"): The byte encoder to use for the reader + """ + reader = codecs.getreader(encoding) + metadata = json.load(reader(byte_stream)) + return cls.from_dict(metadata) + + @classmethod + def from_input_file(cls, input_file: InputFile, encoding: str = "utf-8") -> "TableMetadata": + """Create a TableMetadata instance from an input file + + Args: + input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract + base class + encoding (str): Encoding to use when loading bytestream + + Returns: + TableMetadata: A table metadata instance + + """ + return cls.from_byte_stream(byte_stream=input_file.open(), encoding=encoding) + + def to_output_file(self, output_file: OutputFile, overwrite: bool = False) -> None: + """Write a TableMetadata instance to an output file + + Args: + output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract + base class + overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. + """ + f = output_file.create(overwrite=overwrite) + f.write(json.dumps(self.to_dict()).encode("utf-8")) + + @classmethod + def from_dict(cls, d: dict) -> "TableMetadata": + """Instantiates a TableMetadata object using a dictionary + + Args: + d: A dictionary object that conforms to table metadata specification + Returns: + TableMetadata: A table metadata instance + """ + return cls( # type: ignore + format_version=d.get("format-version"), # type: ignore + table_uuid=d.get("table-uuid"), # type: ignore + location=d.get("location"), # type: ignore + last_sequence_number=d.get("last-sequence-number"), # type: ignore + last_updated_ms=d.get("last-updated-ms"), # type: ignore + last_column_id=d.get("last-column-id"), # type: ignore + schema=d.get("schema") or {}, # type: ignore + schemas=d.get("schemas") or [], # type: ignore + current_schema_id=d.get("current-schema-id"), # type: ignore + partition_spec=d.get("partition-spec") or [], # type: ignore + partition_specs=d.get("partition-specs") or [], # type: ignore + default_spec_id=d.get("default-spec-id"), # type: ignore + last_partition_id=d.get("last-partition-id"), # type: ignore + properties=d.get("properties") or {}, # type: ignore + current_snapshot_id=d.get("current-snapshot-id"), # type: ignore + snapshots=d.get("snapshots") or [], # type: ignore + snapshot_log=d.get("snapshot-log") or [], # type: ignore + metadata_log=d.get("metadata-log") or [], # type: ignore + sort_orders=d.get("sort-orders") or [], # type: ignore + default_sort_order_id=d.get("default-sort-order-id"), # type: ignore + ) # type: ignore + + def to_dict(self) -> dict: + """Generate a dictionary representation of a TableMetadata instance + + Returns: + dict: A dictionary representation of a TableMetadata instance + """ + d = { + "format-version": self.format_version, + "table-uuid": self.table_uuid, + "location": self.location, + "last-updated-ms": self.last_updated_ms, + "last-column-id": self.last_column_id, + "schemas": self.schemas, + "current-schema-id": self.current_schema_id, + "partition-specs": self.partition_specs, + "default-spec-id": self.default_spec_id, + "last-partition-id": self.last_partition_id, + "properties": self.properties, + "current-snapshot-id": self.current_snapshot_id, + "snapshots": self.snapshots, + "snapshot-log": self.snapshot_log, + "metadata-log": self.metadata_log, + "sort-orders": self.sort_orders, + "default-sort-order-id": self.default_sort_order_id, + } + + if self.format_version == 1: + d["schema"] = self.schema + d["partition-spec"] = self.partition_spec + if self.format_version == 2: + d["last-sequence-number"] = self.last_sequence_number + + return d diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 5f9a13a47bc6..26b94d666e66 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -14,12 +14,24 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""This contains global pytest configurations. -from typing import Any +Fixtures contained in this file will be automatically used if provided as an argument +to any pytest function. + +In the case where the fixture must be used in a pytest.mark.parametrize decorator, the string representation can be used +and the built-in pytest fixture request should be used as an additional argument in the function. The fixture can then be +retrieved using `request.getfixturevalue(fixture_name)`. +""" + +import os +from typing import Any, Union +from urllib.parse import ParseResult, urlparse import pytest from iceberg import schema +from iceberg.io.base import FileIO, InputFile, InputStream, OutputFile, OutputStream from iceberg.types import ( BooleanType, FloatType, @@ -45,6 +57,105 @@ def set(self, pos: int, value) -> None: self.content[pos] = value +class LocalInputFile(InputFile): + """An InputFile implementation for local files (for test use only)""" + + def __init__(self, location: str): + + parsed_location = urlparse(location) # Create a ParseResult from the uri + if parsed_location.scheme and parsed_location.scheme != "file": # Validate that a uri is provided with a scheme of `file` + raise ValueError("LocalInputFile location must have a scheme of `file`") + elif parsed_location.netloc: + raise ValueError(f"Network location is not allowed for LocalInputFile: {parsed_location.netloc}") + + super().__init__(location=location) + self._parsed_location = parsed_location + + @property + def parsed_location(self) -> ParseResult: + """The parsed location + + Returns: + ParseResult: The parsed results which has attributes `scheme`, `netloc`, `path`, + `params`, `query`, and `fragments`. + """ + return self._parsed_location + + def __len__(self): + return os.path.getsize(self.parsed_location.path) + + def exists(self): + return os.path.exists(self.parsed_location.path) + + def open(self) -> InputStream: + input_file = open(self.parsed_location.path, "rb") + if not isinstance(input_file, InputStream): + raise TypeError("Object returned from LocalInputFile.open() does not match the OutputStream protocol.") + return input_file + + +class LocalOutputFile(OutputFile): + """An OutputFile implementation for local files (for test use only)""" + + def __init__(self, location: str): + + parsed_location = urlparse(location) # Create a ParseResult from the uri + if parsed_location.scheme and parsed_location.scheme != "file": # Validate that a uri is provided with a scheme of `file` + raise ValueError("LocalOutputFile location must have a scheme of `file`") + elif parsed_location.netloc: + raise ValueError(f"Network location is not allowed for LocalOutputFile: {parsed_location.netloc}") + + super().__init__(location=location) + self._parsed_location = parsed_location + + @property + def parsed_location(self) -> ParseResult: + """The parsed location + + Returns: + ParseResult: The parsed results which has attributes `scheme`, `netloc`, `path`, + `params`, `query`, and `fragments`. + """ + return self._parsed_location + + def __len__(self): + return os.path.getsize(self.parsed_location.path) + + def exists(self): + return os.path.exists(self.parsed_location.path) + + def to_input_file(self): + return LocalInputFile(location=self.location) + + def create(self, overwrite: bool = False) -> OutputStream: + output_file = open(self.parsed_location.path, "wb" if overwrite else "xb") + if not isinstance(output_file, OutputStream): + raise TypeError("Object returned from LocalOutputFile.create(...) does not match the OutputStream protocol.") + return output_file + + +class LocalFileIO(FileIO): + """A FileIO implementation for local files (for test use only)""" + + def new_input(self, location: str): + return LocalInputFile(location=location) + + def new_output(self, location: str): + return LocalOutputFile(location=location) + + def delete(self, location: Union[str, LocalInputFile, LocalOutputFile]): + parsed_location = location.parsed_location if isinstance(location, (InputFile, OutputFile)) else urlparse(location) + try: + os.remove(parsed_location.path) + except FileNotFoundError as e: + raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path} - Caused by: " + str(e)) + + +@pytest.fixture(scope="session", autouse=True) +def foo_struct(): + return FooStruct() + + @pytest.fixture(scope="session", autouse=True) def table_schema_simple(): return schema.Schema( @@ -112,3 +223,18 @@ def table_schema_nested(): @pytest.fixture(scope="session", autouse=True) def foo_struct(): return FooStruct() + + +@pytest.fixture(scope="session", autouse=True) +def LocalInputFileFixture(): + return LocalInputFile + + +@pytest.fixture(scope="session", autouse=True) +def LocalOutputFileFixture(): + return LocalOutputFile + + +@pytest.fixture(scope="session", autouse=True) +def LocalFileIOFixture(): + return LocalFileIO diff --git a/python/tests/io/test_io_base.py b/python/tests/io/test_io_base.py index 7bd61a0e0e3a..a11557ee94f3 100644 --- a/python/tests/io/test_io_base.py +++ b/python/tests/io/test_io_base.py @@ -17,112 +17,20 @@ import os import tempfile -from typing import Union -from urllib.parse import ParseResult, urlparse import pytest -from iceberg.io.base import FileIO, InputFile, InputStream, OutputFile, OutputStream from iceberg.io.pyarrow import PyArrowFile, PyArrowFileIO -class LocalInputFile(InputFile): - """An InputFile implementation for local files (for test use only)""" - - def __init__(self, location: str): - - parsed_location = urlparse(location) # Create a ParseResult from the uri - if parsed_location.scheme and parsed_location.scheme != "file": # Validate that a uri is provided with a scheme of `file` - raise ValueError("LocalInputFile location must have a scheme of `file`") - elif parsed_location.netloc: - raise ValueError(f"Network location is not allowed for LocalInputFile: {parsed_location.netloc}") - - super().__init__(location=location) - self._parsed_location = parsed_location - - @property - def parsed_location(self) -> ParseResult: - """The parsed location - - Returns: - ParseResult: The parsed results which has attributes `scheme`, `netloc`, `path`, - `params`, `query`, and `fragments`. - """ - return self._parsed_location - - def __len__(self): - return os.path.getsize(self.parsed_location.path) - - def exists(self): - return os.path.exists(self.parsed_location.path) - - def open(self) -> InputStream: - input_file = open(self.parsed_location.path, "rb") - if not isinstance(input_file, InputStream): - raise TypeError("Object returned from LocalInputFile.open() does not match the OutputStream protocol.") - return input_file - - -class LocalOutputFile(OutputFile): - """An OutputFile implementation for local files (for test use only)""" - - def __init__(self, location: str): - - parsed_location = urlparse(location) # Create a ParseResult from the uri - if parsed_location.scheme and parsed_location.scheme != "file": # Validate that a uri is provided with a scheme of `file` - raise ValueError("LocalOutputFile location must have a scheme of `file`") - elif parsed_location.netloc: - raise ValueError(f"Network location is not allowed for LocalOutputFile: {parsed_location.netloc}") - - super().__init__(location=location) - self._parsed_location = parsed_location - - @property - def parsed_location(self) -> ParseResult: - """The parsed location - - Returns: - ParseResult: The parsed results which has attributes `scheme`, `netloc`, `path`, - `params`, `query`, and `fragments`. - """ - return self._parsed_location - - def __len__(self): - return os.path.getsize(self.parsed_location.path) - - def exists(self): - return os.path.exists(self.parsed_location.path) - - def to_input_file(self): - return LocalInputFile(location=self.location) - - def create(self, overwrite: bool = False) -> OutputStream: - output_file = open(self.parsed_location.path, "wb" if overwrite else "xb") - if not isinstance(output_file, OutputStream): - raise TypeError("Object returned from LocalOutputFile.create(...) does not match the OutputStream protocol.") - return output_file - - -class LocalFileIO(FileIO): - """A FileIO implementation for local files (for test use only)""" - - def new_input(self, location: str): - return LocalInputFile(location=location) - - def new_output(self, location: str): - return LocalOutputFile(location=location) - - def delete(self, location: Union[str, LocalInputFile, LocalOutputFile]): - parsed_location = location.parsed_location if isinstance(location, (InputFile, OutputFile)) else urlparse(location) - try: - os.remove(parsed_location.path) - except FileNotFoundError as e: - raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path} - Caused by: " + str(e)) - - -@pytest.mark.parametrize("CustomInputFile", [LocalInputFile, PyArrowFile]) -def test_custom_local_input_file(CustomInputFile): +@pytest.mark.parametrize("CustomInputFileFixture", ["LocalInputFileFixture", PyArrowFile]) +def test_custom_local_input_file(CustomInputFileFixture, request): """Test initializing an InputFile implementation to read a local file""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a InputFile class was passed directly + CustomInputFile = ( + request.getfixturevalue(CustomInputFileFixture) if isinstance(CustomInputFileFixture, str) else CustomInputFileFixture + ) + with tempfile.TemporaryDirectory() as tmpdirname: file_location = os.path.join(tmpdirname, "foo.txt") with open(file_location, "wb") as f: @@ -142,9 +50,14 @@ def test_custom_local_input_file(CustomInputFile): assert len(input_file) == 3 -@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile]) -def test_custom_local_output_file(CustomOutputFile): +@pytest.mark.parametrize("CustomOutputFileFixture", ["LocalOutputFileFixture", PyArrowFile]) +def test_custom_local_output_file(CustomOutputFileFixture, request): """Test initializing an OutputFile implementation to write to a local file""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a OutputFile class was passed directly + CustomOutputFile = ( + request.getfixturevalue(CustomOutputFileFixture) if isinstance(CustomOutputFileFixture, str) else CustomOutputFileFixture + ) + with tempfile.TemporaryDirectory() as tmpdirname: file_location = os.path.join(tmpdirname, "foo.txt") @@ -163,9 +76,14 @@ def test_custom_local_output_file(CustomOutputFile): assert len(output_file) == 3 -@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile]) -def test_custom_local_output_file_with_overwrite(CustomOutputFile): +@pytest.mark.parametrize("CustomOutputFileFixture", ["LocalOutputFileFixture", PyArrowFile]) +def test_custom_local_output_file_with_overwrite(CustomOutputFileFixture, request): """Test initializing an OutputFile implementation to overwrite a local file""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a OutputFile class was passed directly + CustomOutputFile = ( + request.getfixturevalue(CustomOutputFileFixture) if isinstance(CustomOutputFileFixture, str) else CustomOutputFileFixture + ) + with tempfile.TemporaryDirectory() as tmpdirname: output_file_location = os.path.join(tmpdirname, "foo.txt") @@ -188,9 +106,12 @@ def test_custom_local_output_file_with_overwrite(CustomOutputFile): assert f.read() == b"bar" -@pytest.mark.parametrize("CustomFile", [LocalInputFile, LocalOutputFile, PyArrowFile, PyArrowFile]) -def test_custom_file_exists(CustomFile): +@pytest.mark.parametrize("CustomFileFixture", ["LocalInputFileFixture", "LocalOutputFileFixture", PyArrowFile, PyArrowFile]) +def test_custom_file_exists(CustomFileFixture, request): """Test that the exists property returns the proper value for existing and non-existing files""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a file class was passed directly + CustomFile = request.getfixturevalue(CustomFileFixture) if isinstance(CustomFileFixture, str) else CustomFileFixture + with tempfile.TemporaryDirectory() as tmpdirname: file_location = os.path.join(tmpdirname, "foo.txt") with open(file_location, "wb") as f: @@ -214,9 +135,14 @@ def test_custom_file_exists(CustomFile): assert not non_existent_file.exists() -@pytest.mark.parametrize("CustomOutputFile", [LocalOutputFile, PyArrowFile]) -def test_output_file_to_input_file(CustomOutputFile): +@pytest.mark.parametrize("CustomOutputFileFixture", ["LocalOutputFileFixture", PyArrowFile]) +def test_output_file_to_input_file(CustomOutputFileFixture, request): """Test initializing an InputFile using the `to_input_file()` method on an OutputFile instance""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a OutputFile class was passed directly + CustomOutputFile = ( + request.getfixturevalue(CustomOutputFileFixture) if isinstance(CustomOutputFileFixture, str) else CustomOutputFileFixture + ) + with tempfile.TemporaryDirectory() as tmpdirname: output_file_location = os.path.join(tmpdirname, "foo.txt") @@ -234,18 +160,21 @@ def test_output_file_to_input_file(CustomOutputFile): @pytest.mark.parametrize( - "CustomFileIO,string_uri", + "CustomFileIOFixture,string_uri", [ - (LocalFileIO, "foo/bar.parquet"), - (LocalFileIO, "file:///foo/bar.parquet"), - (LocalFileIO, "file:/foo/bar/baz.parquet"), + ("LocalFileIOFixture", "foo/bar.parquet"), + ("LocalFileIOFixture", "file:///foo/bar.parquet"), + ("LocalFileIOFixture", "file:/foo/bar/baz.parquet"), (PyArrowFileIO, "foo/bar/baz.parquet"), (PyArrowFileIO, "file:/foo/bar/baz.parquet"), (PyArrowFileIO, "file:/foo/bar/baz.parquet"), ], ) -def test_custom_file_io_locations(CustomFileIO, string_uri): +def test_custom_file_io_locations(CustomFileIOFixture, string_uri, request): """Test that the location property is maintained as the value of the location argument""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a FileIO class was passed directly + CustomFileIO = request.getfixturevalue(CustomFileIOFixture) if isinstance(CustomFileIOFixture, str) else CustomFileIOFixture + # Instantiate the file-io and create a new input and output file file_io = CustomFileIO() input_file = file_io.new_input(location=string_uri) @@ -259,8 +188,9 @@ def test_custom_file_io_locations(CustomFileIO, string_uri): "string_uri_w_netloc", ["file://localhost:80/foo/bar.parquet", "file://foo/bar.parquet"], ) -def test_raise_on_network_location_in_input_file(string_uri_w_netloc): +def test_raise_on_network_location_in_input_file(string_uri_w_netloc, request): """Test raising a ValueError when providing a network location to a LocalInputFile""" + LocalInputFile = request.getfixturevalue("LocalInputFileFixture") with pytest.raises(ValueError) as exc_info: LocalInputFile(location=string_uri_w_netloc) @@ -271,17 +201,21 @@ def test_raise_on_network_location_in_input_file(string_uri_w_netloc): "string_uri_w_netloc", ["file://localhost:80/foo/bar.parquet", "file://foo/bar.parquet"], ) -def test_raise_on_network_location_in_output_file(string_uri_w_netloc): +def test_raise_on_network_location_in_output_file(string_uri_w_netloc, request): """Test raising a ValueError when providing a network location to a LocalOutputFile""" + LocalInputFile = request.getfixturevalue("LocalInputFileFixture") with pytest.raises(ValueError) as exc_info: LocalInputFile(location=string_uri_w_netloc) assert ("Network location is not allowed for LocalInputFile") in str(exc_info.value) -@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO]) -def test_deleting_local_file_using_file_io(CustomFileIO): +@pytest.mark.parametrize("CustomFileIOFixture", ["LocalFileIOFixture", PyArrowFileIO]) +def test_deleting_local_file_using_file_io(CustomFileIOFixture, request): """Test deleting a local file using FileIO.delete(...)""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a FileIO class was passed directly + CustomFileIO = request.getfixturevalue(CustomFileIOFixture) if isinstance(CustomFileIOFixture, str) else CustomFileIOFixture + with tempfile.TemporaryDirectory() as tmpdirname: # Write to the temporary file output_file_location = os.path.join(tmpdirname, "foo.txt") @@ -301,9 +235,12 @@ def test_deleting_local_file_using_file_io(CustomFileIO): assert not os.path.exists(output_file_location) -@pytest.mark.parametrize("CustomFileIO", [LocalFileIO, PyArrowFileIO]) -def test_raise_file_not_found_error_for_fileio_delete(CustomFileIO): +@pytest.mark.parametrize("CustomFileIOFixture", ["LocalFileIOFixture", PyArrowFileIO]) +def test_raise_file_not_found_error_for_fileio_delete(CustomFileIOFixture, request): """Test raising a FileNotFound error when trying to delete a non-existent file""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a FileIO class was passed directly + CustomFileIO = request.getfixturevalue(CustomFileIOFixture) if isinstance(CustomFileIOFixture, str) else CustomFileIOFixture + with tempfile.TemporaryDirectory() as tmpdirname: # Write to the temporary file output_file_location = os.path.join(tmpdirname, "foo.txt") @@ -321,9 +258,19 @@ def test_raise_file_not_found_error_for_fileio_delete(CustomFileIO): assert not os.path.exists(output_file_location) -@pytest.mark.parametrize("CustomFileIO, CustomInputFile", [(LocalFileIO, LocalInputFile), (PyArrowFileIO, PyArrowFile)]) -def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputFile): +@pytest.mark.parametrize( + "CustomFileIOFixture, CustomInputFileFixture", [("LocalFileIOFixture", "LocalInputFileFixture"), (PyArrowFileIO, PyArrowFile)] +) +def test_deleting_local_file_using_file_io_input_file(CustomFileIOFixture, CustomInputFileFixture, request): """Test deleting a local file by passing an InputFile instance to FileIO.delete(...)""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a FileIO class was passed directly + CustomFileIO = request.getfixturevalue(CustomFileIOFixture) if isinstance(CustomFileIOFixture, str) else CustomFileIOFixture + + # If a fixture name is used, retrieve the fixture, otherwise assume that a InputFile class was passed directly + CustomInputFile = ( + request.getfixturevalue(CustomInputFileFixture) if isinstance(CustomInputFileFixture, str) else CustomInputFileFixture + ) + with tempfile.TemporaryDirectory() as tmpdirname: # Write to the temporary file file_location = os.path.join(tmpdirname, "foo.txt") @@ -346,9 +293,20 @@ def test_deleting_local_file_using_file_io_input_file(CustomFileIO, CustomInputF assert not os.path.exists(file_location) -@pytest.mark.parametrize("CustomFileIO, CustomOutputFile", [(LocalFileIO, LocalOutputFile), (PyArrowFileIO, PyArrowFile)]) -def test_deleting_local_file_using_file_io_output_file(CustomFileIO, CustomOutputFile): +@pytest.mark.parametrize( + "CustomFileIOFixture, CustomOutputFileFixture", + [("LocalFileIOFixture", "LocalOutputFileFixture"), (PyArrowFileIO, PyArrowFile)], +) +def test_deleting_local_file_using_file_io_output_file(CustomFileIOFixture, CustomOutputFileFixture, request): """Test deleting a local file by passing an OutputFile instance to FileIO.delete(...)""" + # If a fixture name is used, retrieve the fixture, otherwise assume that a FileIO class was passed directly + CustomFileIO = request.getfixturevalue(CustomFileIOFixture) if isinstance(CustomFileIOFixture, str) else CustomFileIOFixture + + # If a fixture name is used, retrieve the fixture, otherwise assume that a OutputFile class was passed directly + CustomOutputFile = ( + request.getfixturevalue(CustomOutputFileFixture) if isinstance(CustomOutputFileFixture, str) else CustomOutputFileFixture + ) + with tempfile.TemporaryDirectory() as tmpdirname: # Write to the temporary file file_location = os.path.join(tmpdirname, "foo.txt") diff --git a/python/tests/table/test_metadata.py b/python/tests/table/test_metadata.py new file mode 100644 index 000000000000..b7eb8b93dc38 --- /dev/null +++ b/python/tests/table/test_metadata.py @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +import json +import os +import tempfile + +import pytest + +from iceberg.table.metadata import TableMetadata + +EXAMPLE_TABLE_METADATA_V1 = { + "format-version": 1, + "table-uuid": "foo-table-uuid", + "location": "s3://foo/bar/baz.metadata.json", + "last-updated-ms": 1600000000000, + "last-column-id": 4, + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "foo", "required": True, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "string"}, + {"id": 3, "name": "baz", "required": True, "type": "string"}, + {"id": 4, "name": "qux", "required": True, "type": "string"}, + ], + }, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "foo", "required": True, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "string"}, + {"id": 3, "name": "baz", "required": True, "type": "string"}, + {"id": 4, "name": "qux", "required": True, "type": "string"}, + ], + }, + ], + "current-schema-id": 0, + "partition-spec": [], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {"owner": "root", "write.format.default": "parquet"}, + "current-snapshot-id": 7681945274687743099, + "snapshots": [ + { + "snapshot-id": 7681945274687743099, + "timestamp-ms": 1637943123188, + "summary": { + "operation": "append", + "added-data-files": "6", + "added-records": "237993", + "added-files-size": "3386901", + "changed-partition-count": "1", + "total-records": "237993", + "total-files-size": "3386901", + "total-data-files": "6", + "total-delete-files": "0", + "total-position-deletes": "0", + "total-equality-deletes": "0", + }, + "manifest-list": "s3://foo/bar/baz/snap-2874264644797652805-1-9cb3c3cf-5a04-40c1-bdd9-d8d7e38cd8e3.avro", + "schema-id": 0, + }, + ], + "snapshot-log": [ + {"timestamp-ms": 1637943123188, "snapshot-id": 7681945274687743099}, + ], + "metadata-log": [ + { + "timestamp-ms": 1637943123331, + "metadata-file": "3://foo/bar/baz/00000-907830f8-1a92-4944-965a-ff82c890e912.metadata.json", + } + ], +} +EXAMPLE_TABLE_METADATA_V2 = { + "format-version": 2, + "table-uuid": "foo-table-uuid", + "location": "s3://foo/bar/baz.metadata.json", + "last-updated-ms": 1600000000000, + "last-column-id": 4, + "last-sequence-number": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "foo", "required": True, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "string"}, + {"id": 3, "name": "baz", "required": True, "type": "string"}, + {"id": 4, "name": "qux", "required": True, "type": "string"}, + ], + } + ], + "current-schema-id": 0, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {"owner": "root", "write.format.default": "parquet"}, + "current-snapshot-id": 7681945274687743099, + "snapshots": [ + { + "snapshot-id": 7681945274687743099, + "timestamp-ms": 1637943123188, + "summary": { + "operation": "append", + "added-data-files": "6", + "added-records": "237993", + "added-files-size": "3386901", + "changed-partition-count": "1", + "total-records": "237993", + "total-files-size": "3386901", + "total-data-files": "6", + "total-delete-files": "0", + "total-position-deletes": "0", + "total-equality-deletes": "0", + }, + "manifest-list": "s3://foo/bar/baz/snap-2874264644797652805-1-9cb3c3cf-5a04-40c1-bdd9-d8d7e38cd8e3.avro", + "schema-id": 0, + }, + ], + "snapshot-log": [ + {"timestamp-ms": 1637943123188, "snapshot-id": 7681945274687743099}, + ], + "metadata-log": [ + { + "timestamp-ms": 1637943123331, + "metadata-file": "3://foo/bar/baz/00000-907830f8-1a92-4944-965a-ff82c890e912.metadata.json", + } + ], + "properties": {"read.split.target.size": 134217728}, +} + + +@pytest.mark.parametrize( + "metadata", + [ + (EXAMPLE_TABLE_METADATA_V1), + (EXAMPLE_TABLE_METADATA_V2), + ], +) +def test_from_dict(metadata): + """Test initialization of a TableMetadata instance from a dictionary""" + table_metadata = TableMetadata.from_dict(metadata) + + +@pytest.mark.parametrize( + "metadata", + [ + (EXAMPLE_TABLE_METADATA_V1), + (EXAMPLE_TABLE_METADATA_V2), + ], +) +def test_from_input_file(metadata, LocalFileIOFixture): + """Test initialization of a TableMetadata instance from a LocalInputFile instance""" + with tempfile.TemporaryDirectory() as tmpdirname: + file_location = os.path.join(tmpdirname, "table_metadata.json") + file_io = LocalFileIOFixture() + + # Instantiate the output file + absolute_file_location = os.path.abspath(file_location) + output_file = file_io.new_output(location=f"file:{absolute_file_location}") + + # Create the output file and write the metadata file to it + f = output_file.create() + f.write(json.dumps(metadata).encode("utf-8")) + f.close() + + input_file = file_io.new_input(location=f"file:{absolute_file_location}") + table_metadata = TableMetadata.from_input_file(input_file) + + +@pytest.mark.parametrize( + "metadata", + [ + (EXAMPLE_TABLE_METADATA_V1), + (EXAMPLE_TABLE_METADATA_V2), + ], +) +def test_to_output_file(metadata, LocalFileIOFixture): + """Test writing a TableMetadata instance to a LocalOutputFile instance""" + with tempfile.TemporaryDirectory() as tmpdirname: + table_metadata = TableMetadata.from_dict(metadata) # Create TableMetadata instance from dictionary + file_io = LocalFileIOFixture() # Use LocalFileIO fixture defined in conftest.py + + # Create an output file in the temporary directory + file_location = os.path.join(tmpdirname, "table_metadata.json") + absolute_file_location = os.path.abspath(file_location) + output_file = file_io.new_output(location=f"file:{absolute_file_location}") + + # Write the TableMetadata instance to the output file + table_metadata.to_output_file(output_file) + + # Read the raw json file and compare to metadata dictionary + table_metadata_dict = json.load(open(file_location, "r")) + assert table_metadata_dict == metadata + + +def test_from_byte_stream(): + """Test generating a TableMetadata instance from a file-like byte stream""" + data = bytes(json.dumps(EXAMPLE_TABLE_METADATA_V2), encoding="utf-8") + byte_stream = io.BytesIO(data) + TableMetadata.from_byte_stream(byte_stream) + + +def test_v2_metadata_parsing(): + """Test retrieveing values from a TableMetadata instance of version 2""" + table_metadata = TableMetadata.from_dict(EXAMPLE_TABLE_METADATA_V2) + + assert table_metadata.format_version == 2 + assert table_metadata.table_uuid == "foo-table-uuid" + assert table_metadata.location == "s3://foo/bar/baz.metadata.json" + assert table_metadata.last_sequence_number == 1 + assert table_metadata.last_updated_ms == 1600000000000 + assert table_metadata.last_column_id == 4 + assert table_metadata.schemas[0]["schema-id"] == 0 + assert table_metadata.current_schema_id == 0 + assert table_metadata.partition_specs[0]["spec-id"] == 0 + assert table_metadata.default_spec_id == 0 + assert table_metadata.last_partition_id == 999 + assert table_metadata.properties["read.split.target.size"] == 134217728 + assert table_metadata.current_snapshot_id == 7681945274687743099 + assert table_metadata.snapshots[0]["snapshot-id"] == 7681945274687743099 + assert table_metadata.snapshot_log[0]["timestamp-ms"] == 1637943123188 + assert table_metadata.metadata_log[0]["timestamp-ms"] == 1637943123331 + assert table_metadata.sort_orders[0]["order-id"] == 0 + assert table_metadata.default_sort_order_id == 0 + + +def test_updating_metadata(): + """Test creating a new TableMetadata instance that's an updated version of + an existing TableMetadata instance""" + table_metadata = TableMetadata.from_dict(EXAMPLE_TABLE_METADATA_V2) + + new_schema = { + "type": "struct", + "schema-id": 1, + "fields": [ + {"id": 1, "name": "foo", "required": True, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "string"}, + {"id": 3, "name": "baz", "required": True, "type": "string"}, + ], + } + + mutable_table_metadata = table_metadata.to_dict() + mutable_table_metadata["schemas"].append(new_schema) + mutable_table_metadata["current-schema-id"] = 1 + + new_table_metadata = TableMetadata.from_dict(mutable_table_metadata) + + assert new_table_metadata.current_schema_id == 1 + assert new_table_metadata.schemas[-1] == new_schema From 18d09e2dcf3a3690d89223a463598f287c113b79 Mon Sep 17 00:00:00 2001 From: samredai <43911210+samredai@users.noreply.github.com> Date: Mon, 23 May 2022 18:12:52 -0700 Subject: [PATCH 2/2] Add serialize.py with dict, bytestream, and input_file/output_file serialization for table metadata --- python/spellcheck-dictionary.txt | 12 ++- python/src/iceberg/serializers.py | 146 +++++++++++++++++++++++++++ python/src/iceberg/table/metadata.py | 113 +-------------------- python/tests/conftest.py | 2 +- python/tests/table/test_metadata.py | 33 +++--- 5 files changed, 181 insertions(+), 125 deletions(-) create mode 100644 python/src/iceberg/serializers.py diff --git a/python/spellcheck-dictionary.txt b/python/spellcheck-dictionary.txt index 1100dd7036a9..eeb177949de0 100644 --- a/python/spellcheck-dictionary.txt +++ b/python/spellcheck-dictionary.txt @@ -46,4 +46,14 @@ Timestamptz Timestamptzs unscaled URI - +json +py +conftest +pytest +parametrize +uri +URI +InputFile +OutputFile +bytestream +deserialize \ No newline at end of file diff --git a/python/src/iceberg/serializers.py b/python/src/iceberg/serializers.py new file mode 100644 index 000000000000..153a5fa3427a --- /dev/null +++ b/python/src/iceberg/serializers.py @@ -0,0 +1,146 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import codecs +import json + +from iceberg.io.base import InputFile, OutputFile +from iceberg.table.metadata import TableMetadata + + +class FromDict: + """A collection of methods that deserialize dictionaries into Iceberg objects""" + + @staticmethod + def table_metadata(d: dict) -> TableMetadata: + """Instantiates a TableMetadata object using a dictionary + + Args: + d: A dictionary object that conforms to table metadata specification + Returns: + TableMetadata: A table metadata instance + """ + return TableMetadata( # type: ignore + format_version=d.get("format-version"), # type: ignore + table_uuid=d.get("table-uuid"), # type: ignore + location=d.get("location"), # type: ignore + last_sequence_number=d.get("last-sequence-number"), # type: ignore + last_updated_ms=d.get("last-updated-ms"), # type: ignore + last_column_id=d.get("last-column-id"), # type: ignore + schema=d.get("schema"), # type: ignore + schemas=d.get("schemas"), # type: ignore + current_schema_id=d.get("current-schema-id"), # type: ignore + partition_spec=d.get("partition-spec") or [], # type: ignore + partition_specs=d.get("partition-specs") or [], # type: ignore + default_spec_id=d.get("default-spec-id"), # type: ignore + last_partition_id=d.get("last-partition-id"), # type: ignore + properties=d.get("properties") or {}, # type: ignore + current_snapshot_id=d.get("current-snapshot-id"), # type: ignore + snapshots=d.get("snapshots") or [], # type: ignore + snapshot_log=d.get("snapshot-log") or [], # type: ignore + metadata_log=d.get("metadata-log") or [], # type: ignore + sort_orders=d.get("sort-orders") or [], # type: ignore + default_sort_order_id=d.get("default-sort-order-id"), # type: ignore + ) # type: ignore + + +class ToDict: + """A collection of methods that serialize Iceberg objects into dictionaries""" + + @staticmethod + def table_metadata(metadata: TableMetadata) -> dict: + """Generate a dictionary representation of a TableMetadata instance + + Returns: + dict: A dictionary representation of a TableMetadata instance + """ + d = { + "format-version": metadata.format_version, + "table-uuid": metadata.table_uuid, + "location": metadata.location, + "last-updated-ms": metadata.last_updated_ms, + "last-column-id": metadata.last_column_id, + "schemas": metadata.schemas, + "current-schema-id": metadata.current_schema_id, + "partition-specs": metadata.partition_specs, + "default-spec-id": metadata.default_spec_id, + "last-partition-id": metadata.last_partition_id, + "properties": metadata.properties, + "current-snapshot-id": metadata.current_snapshot_id, + "snapshots": metadata.snapshots, + "snapshot-log": metadata.snapshot_log, + "metadata-log": metadata.metadata_log, + "sort-orders": metadata.sort_orders, + "default-sort-order-id": metadata.default_sort_order_id, + } + + if metadata.format_version == 1: + d["schema"] = metadata.schema + d["partition-spec"] = metadata.partition_spec + if metadata.format_version == 2: + d["last-sequence-number"] = metadata.last_sequence_number + + return d + + +class FromByteStream: + """A collection of methods that deserialize dictionaries into Iceberg objects""" + + @staticmethod + def table_metadata(byte_stream, encoding: str = "utf-8") -> TableMetadata: + """Instantiate a TableMetadata object from a byte stream + + Args: + byte_stream: A file-like byte stream object + encoding (default "utf-8"): The byte encoder to use for the reader + """ + reader = codecs.getreader(encoding) + metadata = json.load(reader(byte_stream)) + return FromDict.table_metadata(metadata) + + +class FromInputFile: + """A collection of methods that deserialize InputFiles into Iceberg objects""" + + @staticmethod + def table_metadata(input_file: InputFile, encoding: str = "utf-8") -> TableMetadata: + """Create a TableMetadata instance from an input file + + Args: + input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract base class + encoding (str): Encoding to use when loading bytestream + + Returns: + TableMetadata: A table metadata instance + + """ + return FromByteStream.table_metadata(byte_stream=input_file.open(), encoding=encoding) + + +class ToOutputFile: + """A collection of methods that serialize Iceberg objects into files given an OutputFile instance""" + + @staticmethod + def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None: + """Write a TableMetadata instance to an output file + + Args: + output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract base class + overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. + """ + f = output_file.create(overwrite=overwrite) + f.write(json.dumps(ToDict.table_metadata(metadata)).encode("utf-8")) diff --git a/python/src/iceberg/table/metadata.py b/python/src/iceberg/table/metadata.py index caed369b477f..cc50f432a88b 100644 --- a/python/src/iceberg/table/metadata.py +++ b/python/src/iceberg/table/metadata.py @@ -15,11 +15,10 @@ # specific language governing permissions and limitations # under the License. -import codecs -import json from dataclasses import dataclass +from typing import List -from iceberg.io.base import InputFile, OutputFile +from iceberg.schema import Schema @dataclass(frozen=True) @@ -55,11 +54,11 @@ class TableMetadata: This is used to ensure columns are always assigned an unused ID when evolving schemas.""" - schema: dict + schema: Schema """The table’s current schema. (Deprecated: use schemas and current-schema-id instead)""" - schemas: list + schemas: List[Schema] """A list of schemas, stored as objects with schema-id.""" current_schema_id: int @@ -121,107 +120,3 @@ class TableMetadata: """Default sort order id of the table. Note that this could be used by writers, but is not used when reading because reads use the specs stored in manifest files.""" - - @classmethod - def from_byte_stream(cls, byte_stream, encoding: str = "utf-8") -> "TableMetadata": - """Instantiate a TableMetadata object from a byte stream - - Args: - byte_stream: A file-like byte stream object - encoding (default "utf-8"): The byte encoder to use for the reader - """ - reader = codecs.getreader(encoding) - metadata = json.load(reader(byte_stream)) - return cls.from_dict(metadata) - - @classmethod - def from_input_file(cls, input_file: InputFile, encoding: str = "utf-8") -> "TableMetadata": - """Create a TableMetadata instance from an input file - - Args: - input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract - base class - encoding (str): Encoding to use when loading bytestream - - Returns: - TableMetadata: A table metadata instance - - """ - return cls.from_byte_stream(byte_stream=input_file.open(), encoding=encoding) - - def to_output_file(self, output_file: OutputFile, overwrite: bool = False) -> None: - """Write a TableMetadata instance to an output file - - Args: - output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract - base class - overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. - """ - f = output_file.create(overwrite=overwrite) - f.write(json.dumps(self.to_dict()).encode("utf-8")) - - @classmethod - def from_dict(cls, d: dict) -> "TableMetadata": - """Instantiates a TableMetadata object using a dictionary - - Args: - d: A dictionary object that conforms to table metadata specification - Returns: - TableMetadata: A table metadata instance - """ - return cls( # type: ignore - format_version=d.get("format-version"), # type: ignore - table_uuid=d.get("table-uuid"), # type: ignore - location=d.get("location"), # type: ignore - last_sequence_number=d.get("last-sequence-number"), # type: ignore - last_updated_ms=d.get("last-updated-ms"), # type: ignore - last_column_id=d.get("last-column-id"), # type: ignore - schema=d.get("schema") or {}, # type: ignore - schemas=d.get("schemas") or [], # type: ignore - current_schema_id=d.get("current-schema-id"), # type: ignore - partition_spec=d.get("partition-spec") or [], # type: ignore - partition_specs=d.get("partition-specs") or [], # type: ignore - default_spec_id=d.get("default-spec-id"), # type: ignore - last_partition_id=d.get("last-partition-id"), # type: ignore - properties=d.get("properties") or {}, # type: ignore - current_snapshot_id=d.get("current-snapshot-id"), # type: ignore - snapshots=d.get("snapshots") or [], # type: ignore - snapshot_log=d.get("snapshot-log") or [], # type: ignore - metadata_log=d.get("metadata-log") or [], # type: ignore - sort_orders=d.get("sort-orders") or [], # type: ignore - default_sort_order_id=d.get("default-sort-order-id"), # type: ignore - ) # type: ignore - - def to_dict(self) -> dict: - """Generate a dictionary representation of a TableMetadata instance - - Returns: - dict: A dictionary representation of a TableMetadata instance - """ - d = { - "format-version": self.format_version, - "table-uuid": self.table_uuid, - "location": self.location, - "last-updated-ms": self.last_updated_ms, - "last-column-id": self.last_column_id, - "schemas": self.schemas, - "current-schema-id": self.current_schema_id, - "partition-specs": self.partition_specs, - "default-spec-id": self.default_spec_id, - "last-partition-id": self.last_partition_id, - "properties": self.properties, - "current-snapshot-id": self.current_snapshot_id, - "snapshots": self.snapshots, - "snapshot-log": self.snapshot_log, - "metadata-log": self.metadata_log, - "sort-orders": self.sort_orders, - "default-sort-order-id": self.default_sort_order_id, - } - - if self.format_version == 1: - d["schema"] = self.schema - d["partition-spec"] = self.partition_spec - if self.format_version == 2: - d["last-sequence-number"] = self.last_sequence_number - - return d diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 26b94d666e66..d9fdc224fe45 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -148,7 +148,7 @@ def delete(self, location: Union[str, LocalInputFile, LocalOutputFile]): try: os.remove(parsed_location.path) except FileNotFoundError as e: - raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path} - Caused by: " + str(e)) + raise FileNotFoundError(f"Cannot delete file, does not exist: {parsed_location.path} - Caused by: " + str(e)) from e @pytest.fixture(scope="session", autouse=True) diff --git a/python/tests/table/test_metadata.py b/python/tests/table/test_metadata.py index b7eb8b93dc38..fa75f65f43bd 100644 --- a/python/tests/table/test_metadata.py +++ b/python/tests/table/test_metadata.py @@ -22,7 +22,13 @@ import pytest -from iceberg.table.metadata import TableMetadata +from iceberg.serializers import ( + FromByteStream, + FromDict, + FromInputFile, + ToDict, + ToOutputFile, +) EXAMPLE_TABLE_METADATA_V1 = { "format-version": 1, @@ -117,7 +123,7 @@ "last-partition-id": 999, "default-sort-order-id": 0, "sort-orders": [{"order-id": 0, "fields": []}], - "properties": {"owner": "root", "write.format.default": "parquet"}, + "properties": {"owner": "root", "write.format.default": "parquet", "read.split.target.size": 134217728}, "current-snapshot-id": 7681945274687743099, "snapshots": [ { @@ -149,7 +155,6 @@ "metadata-file": "3://foo/bar/baz/00000-907830f8-1a92-4944-965a-ff82c890e912.metadata.json", } ], - "properties": {"read.split.target.size": 134217728}, } @@ -162,7 +167,7 @@ ) def test_from_dict(metadata): """Test initialization of a TableMetadata instance from a dictionary""" - table_metadata = TableMetadata.from_dict(metadata) + FromDict.table_metadata(d=metadata) @pytest.mark.parametrize( @@ -188,7 +193,7 @@ def test_from_input_file(metadata, LocalFileIOFixture): f.close() input_file = file_io.new_input(location=f"file:{absolute_file_location}") - table_metadata = TableMetadata.from_input_file(input_file) + FromInputFile.table_metadata(input_file=input_file) @pytest.mark.parametrize( @@ -201,7 +206,7 @@ def test_from_input_file(metadata, LocalFileIOFixture): def test_to_output_file(metadata, LocalFileIOFixture): """Test writing a TableMetadata instance to a LocalOutputFile instance""" with tempfile.TemporaryDirectory() as tmpdirname: - table_metadata = TableMetadata.from_dict(metadata) # Create TableMetadata instance from dictionary + table_metadata = FromDict.table_metadata(d=metadata) # Create TableMetadata instance from dictionary file_io = LocalFileIOFixture() # Use LocalFileIO fixture defined in conftest.py # Create an output file in the temporary directory @@ -210,10 +215,10 @@ def test_to_output_file(metadata, LocalFileIOFixture): output_file = file_io.new_output(location=f"file:{absolute_file_location}") # Write the TableMetadata instance to the output file - table_metadata.to_output_file(output_file) + ToOutputFile.table_metadata(metadata=table_metadata, output_file=output_file) # Read the raw json file and compare to metadata dictionary - table_metadata_dict = json.load(open(file_location, "r")) + table_metadata_dict = json.load(open(file_location, "r", encoding="utf-8")) assert table_metadata_dict == metadata @@ -221,12 +226,12 @@ def test_from_byte_stream(): """Test generating a TableMetadata instance from a file-like byte stream""" data = bytes(json.dumps(EXAMPLE_TABLE_METADATA_V2), encoding="utf-8") byte_stream = io.BytesIO(data) - TableMetadata.from_byte_stream(byte_stream) + FromByteStream.table_metadata(byte_stream=byte_stream) def test_v2_metadata_parsing(): - """Test retrieveing values from a TableMetadata instance of version 2""" - table_metadata = TableMetadata.from_dict(EXAMPLE_TABLE_METADATA_V2) + """Test retrieving values from a TableMetadata instance of version 2""" + table_metadata = FromDict.table_metadata(d=EXAMPLE_TABLE_METADATA_V2) assert table_metadata.format_version == 2 assert table_metadata.table_uuid == "foo-table-uuid" @@ -251,7 +256,7 @@ def test_v2_metadata_parsing(): def test_updating_metadata(): """Test creating a new TableMetadata instance that's an updated version of an existing TableMetadata instance""" - table_metadata = TableMetadata.from_dict(EXAMPLE_TABLE_METADATA_V2) + table_metadata = FromDict.table_metadata(d=EXAMPLE_TABLE_METADATA_V2) new_schema = { "type": "struct", @@ -263,11 +268,11 @@ def test_updating_metadata(): ], } - mutable_table_metadata = table_metadata.to_dict() + mutable_table_metadata = ToDict.table_metadata(metadata=table_metadata) mutable_table_metadata["schemas"].append(new_schema) mutable_table_metadata["current-schema-id"] = 1 - new_table_metadata = TableMetadata.from_dict(mutable_table_metadata) + new_table_metadata = FromDict.table_metadata(d=mutable_table_metadata) assert new_table_metadata.current_schema_id == 1 assert new_table_metadata.schemas[-1] == new_schema