From 710f913340b6bfd9baff127086e6690541f37192 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:38:50 +0000 Subject: [PATCH 1/9] current-snapshot-id as -1 --- Makefile | 2 +- pyiceberg/table/metadata.py | 4 ++-- tests/cli/test_console.py | 2 +- tests/table/test_metadata.py | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index c3e816ebd5..b31f7cee1e 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ lint: poetry run pre-commit run --all-files test: - poetry run pytest tests/ -m "(unmarked or parametrize) and not integration" ${PYTEST_ARGS} + poetry run pytest tests/ -m "(unmarked or parametrize) and not integration" -vv ${PYTEST_ARGS} test-s3: sh ./dev/run-minio.sh diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index a5dfb6ce4c..ea2aa6ec5c 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -77,7 +77,7 @@ def cleanup_snapshot_id(data: Dict[str, Any]) -> Dict[str, Any]: if CURRENT_SNAPSHOT_ID in data and data[CURRENT_SNAPSHOT_ID] == -1: # We treat -1 and None the same, by cleaning this up # in a pre-validator, we can simplify the logic later on - data[CURRENT_SNAPSHOT_ID] = None + data[CURRENT_SNAPSHOT_ID] = -1 return data @@ -120,7 +120,7 @@ def check_sort_orders(table_metadata: TableMetadata) -> TableMetadata: def construct_refs(table_metadata: TableMetadata) -> TableMetadata: """Set the main branch if missing.""" - if table_metadata.current_snapshot_id is not None: + if table_metadata.current_snapshot_id is not None and table_metadata.current_snapshot_id != -1: if MAIN_BRANCH not in table_metadata.refs: table_metadata.refs[MAIN_BRANCH] = SnapshotRef( snapshot_id=table_metadata.current_snapshot_id, snapshot_ref_type=SnapshotRefType.BRANCH diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index d77b290ec6..c227f05592 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -579,7 +579,7 @@ def test_json_describe_table(catalog: InMemoryCatalog) -> None: assert result.exit_code == 0 assert ( result.output - == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" + == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"current-snapshot-id":-1,"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" ) diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index 97a7931cbb..2483bc6fe5 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -131,7 +131,7 @@ def test_v1_metadata_parsing_directly(example_table_metadata_v1: Dict[str, Any]) ] assert table_metadata.default_spec_id == 0 assert table_metadata.last_partition_id == 1000 - assert table_metadata.current_snapshot_id is None + assert table_metadata.current_snapshot_id == -1 assert table_metadata.default_sort_order_id == 0 @@ -170,7 +170,7 @@ def test_updating_metadata(example_table_metadata_v2: Dict[str, Any]) -> None: def test_serialize_v1(example_table_metadata_v1: Dict[str, Any]) -> None: table_metadata = TableMetadataV1(**example_table_metadata_v1) table_metadata_json = table_metadata.model_dump_json() - expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" + expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"current-snapshot-id":-1,"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" assert table_metadata_json == expected From d4010454ca2c201eedd7271cc42c23af6e4caa06 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:40:53 +0000 Subject: [PATCH 2/9] fix --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b31f7cee1e..c3e816ebd5 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ lint: poetry run pre-commit run --all-files test: - poetry run pytest tests/ -m "(unmarked or parametrize) and not integration" -vv ${PYTEST_ARGS} + poetry run pytest tests/ -m "(unmarked or parametrize) and not integration" ${PYTEST_ARGS} test-s3: sh ./dev/run-minio.sh From cd38c5a1858074e8a4ccc15c1e80d909ea9e1e2c Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 29 Feb 2024 18:36:47 +0000 Subject: [PATCH 3/9] legacy-current-snapshot-id --- mkdocs/docs/configuration.md | 4 +++ pyiceberg/serializers.py | 11 ++++++-- pyiceberg/table/metadata.py | 2 +- pyiceberg/utils/concurrent.py | 11 +------- pyiceberg/utils/config.py | 17 ++++++++++++ tests/cli/test_console.py | 2 +- tests/table/test_metadata.py | 4 +-- tests/test_serializers.py | 50 +++++++++++++++++++++++++++++++++++ tests/utils/test_config.py | 17 ++++++++++++ 9 files changed, 102 insertions(+), 16 deletions(-) create mode 100644 tests/test_serializers.py diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 8acc0a98cb..0eaf960df7 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -249,3 +249,7 @@ catalog: # Concurrency PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details. + +# Backward Compatibility + +Previous versions of Java implementations incorrectly assumes the optional attribute `current-snapshot-id` to be a required attribute in the TableMetadata. Which means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 6a580ead80..0f5a6bb777 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -18,12 +18,14 @@ import codecs import gzip +import json from abc import ABC, abstractmethod from typing import Callable from pyiceberg.io import InputFile, InputStream, OutputFile -from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil +from pyiceberg.table.metadata import CURRENT_SNAPSHOT_ID, TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 +from pyiceberg.utils.config import Config GZIP = "gzip" @@ -127,6 +129,11 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. """ with output_file.create(overwrite=overwrite) as output_stream: - json_bytes = metadata.model_dump_json().encode(UTF8) + model_dump = metadata.model_dump_json() + if Config().get_bool("legacy-current-snapshot-id") and metadata.current_snapshot_id is None: + model_dict = json.loads(model_dump) + model_dict[CURRENT_SNAPSHOT_ID] = -1 + model_dump = json.dumps(model_dict) + json_bytes = model_dump.encode(UTF8) json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes) output_stream.write(json_bytes) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index ea2aa6ec5c..34d97b9bb4 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -77,7 +77,7 @@ def cleanup_snapshot_id(data: Dict[str, Any]) -> Dict[str, Any]: if CURRENT_SNAPSHOT_ID in data and data[CURRENT_SNAPSHOT_ID] == -1: # We treat -1 and None the same, by cleaning this up # in a pre-validator, we can simplify the logic later on - data[CURRENT_SNAPSHOT_ID] = -1 + data[CURRENT_SNAPSHOT_ID] = None return data diff --git a/pyiceberg/utils/concurrent.py b/pyiceberg/utils/concurrent.py index f6c0a23a9c..805599bf41 100644 --- a/pyiceberg/utils/concurrent.py +++ b/pyiceberg/utils/concurrent.py @@ -37,13 +37,4 @@ def get_or_create() -> Executor: @staticmethod def max_workers() -> Optional[int]: """Return the max number of workers configured.""" - config = Config() - val = config.config.get("max-workers") - - if val is None: - return None - - try: - return int(val) # type: ignore - except ValueError as err: - raise ValueError(f"Max workers should be an integer or left unset. Current value: {val}") from err + return Config().get_int("max-workers") diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py index e038005469..8b1b81d3a7 100644 --- a/pyiceberg/utils/config.py +++ b/pyiceberg/utils/config.py @@ -16,6 +16,7 @@ # under the License. import logging import os +from distutils.util import strtobool from typing import List, Optional import strictyaml @@ -154,3 +155,19 @@ def get_catalog_config(self, catalog_name: str) -> Optional[RecursiveDict]: assert isinstance(catalog_conf, dict), f"Configuration path catalogs.{catalog_name_lower} needs to be an object" return catalog_conf return None + + def get_int(self, key: str) -> Optional[int]: + if (val := self.config.get(key)) is not None: + try: + return int(val) # type: ignore + except ValueError as err: + raise ValueError(f"{key} should be an integer or left unset. Current value: {val}") from err + return None + + def get_bool(self, key: str) -> Optional[bool]: + if (val := self.config.get(key)) is not None: + try: + return strtobool(val) # type: ignore + except ValueError as err: + raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err + return None diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index c227f05592..d77b290ec6 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -579,7 +579,7 @@ def test_json_describe_table(catalog: InMemoryCatalog) -> None: assert result.exit_code == 0 assert ( result.output - == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"current-snapshot-id":-1,"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" + == """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n""" ) diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index 2483bc6fe5..97a7931cbb 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -131,7 +131,7 @@ def test_v1_metadata_parsing_directly(example_table_metadata_v1: Dict[str, Any]) ] assert table_metadata.default_spec_id == 0 assert table_metadata.last_partition_id == 1000 - assert table_metadata.current_snapshot_id == -1 + assert table_metadata.current_snapshot_id is None assert table_metadata.default_sort_order_id == 0 @@ -170,7 +170,7 @@ def test_updating_metadata(example_table_metadata_v2: Dict[str, Any]) -> None: def test_serialize_v1(example_table_metadata_v1: Dict[str, Any]) -> None: table_metadata = TableMetadataV1(**example_table_metadata_v1) table_metadata_json = table_metadata.model_dump_json() - expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"current-snapshot-id":-1,"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" + expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" assert table_metadata_json == expected diff --git a/tests/test_serializers.py b/tests/test_serializers.py new file mode 100644 index 0000000000..02f3af804e --- /dev/null +++ b/tests/test_serializers.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import uuid +import importlib +import pytest +import json +from typing import Any, Dict +import os +from pytest_mock import MockFixture + +from pyiceberg.utils.config import Config +from pyiceberg.table import StaticTable +from pyiceberg.table.metadata import TableMetadataV1 +from pyiceberg import serializers +from pyiceberg.serializers import ToOutputFile + + +def test_legacy_current_snapshot_id(mocker: MockFixture, tmp_path_factory: pytest.TempPathFactory, example_table_metadata_no_snapshot_v1: Dict[str, Any]) -> None: + from pyiceberg.io.pyarrow import PyArrowFileIO + + metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json") + metadata = TableMetadataV1(**example_table_metadata_no_snapshot_v1) + ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True) + static_table = StaticTable.from_metadata(metadata_location) + assert static_table.metadata.current_snapshot_id is None + + mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) + + ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True) + with PyArrowFileIO().new_input(location=metadata_location).open() as input_stream: + metadata = input_stream.read() + assert json.loads(metadata)['current-snapshot-id'] == -1 + backwards_compatible_static_table = StaticTable.from_metadata(metadata_location) + assert backwards_compatible_static_table.metadata.current_snapshot_id is None + assert backwards_compatible_static_table.metadata == static_table.metadata diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py index 5e3f72ccc6..2f15bb56d8 100644 --- a/tests/utils/test_config.py +++ b/tests/utils/test_config.py @@ -76,3 +76,20 @@ def test_merge_config() -> None: rhs: RecursiveDict = {"common_key": "xyz789"} result = merge_config(lhs, rhs) assert result["common_key"] == rhs["common_key"] + + +def test_from_configuration_files_get_typed_value(tmp_path_factory: pytest.TempPathFactory) -> None: + config_path = str(tmp_path_factory.mktemp("config")) + with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file: + yaml_str = as_document({"max-workers": "4", "legacy-current-snapshot-id": "True"}).as_yaml() + file.write(yaml_str) + + os.environ["PYICEBERG_HOME"] = config_path + with pytest.raises(ValueError): + Config().get_bool("max-workers") + + with pytest.raises(ValueError): + Config().get_int("legacy-current-snapshot-id") + + assert Config().get_bool("legacy-current-snapshot-id") + assert Config().get_int("max-workers") == 4 From 3a4ada5b8955d21e6ff7d0446a831d61a3650516 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 29 Feb 2024 18:47:48 +0000 Subject: [PATCH 4/9] lint --- tests/test_serializers.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 02f3af804e..140db02700 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -15,22 +15,22 @@ # specific language governing permissions and limitations # under the License. -import uuid -import importlib -import pytest import json -from typing import Any, Dict import os +import uuid +from typing import Any, Dict + +import pytest from pytest_mock import MockFixture -from pyiceberg.utils.config import Config +from pyiceberg.serializers import ToOutputFile from pyiceberg.table import StaticTable from pyiceberg.table.metadata import TableMetadataV1 -from pyiceberg import serializers -from pyiceberg.serializers import ToOutputFile -def test_legacy_current_snapshot_id(mocker: MockFixture, tmp_path_factory: pytest.TempPathFactory, example_table_metadata_no_snapshot_v1: Dict[str, Any]) -> None: +def test_legacy_current_snapshot_id( + mocker: MockFixture, tmp_path_factory: pytest.TempPathFactory, example_table_metadata_no_snapshot_v1: Dict[str, Any] +) -> None: from pyiceberg.io.pyarrow import PyArrowFileIO metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json") @@ -40,11 +40,11 @@ def test_legacy_current_snapshot_id(mocker: MockFixture, tmp_path_factory: pytes assert static_table.metadata.current_snapshot_id is None mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"}) - + ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True) with PyArrowFileIO().new_input(location=metadata_location).open() as input_stream: - metadata = input_stream.read() - assert json.loads(metadata)['current-snapshot-id'] == -1 + metadata_json_bytes = input_stream.read() + assert json.loads(metadata_json_bytes)['current-snapshot-id'] == -1 backwards_compatible_static_table = StaticTable.from_metadata(metadata_location) assert backwards_compatible_static_table.metadata.current_snapshot_id is None assert backwards_compatible_static_table.metadata == static_table.metadata From 7b810c767dc7852c482edfee016eda4ef2c0ff93 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:32:39 -0500 Subject: [PATCH 5/9] Update mkdocs/docs/configuration.md Co-authored-by: Fokko Driesprong --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 0eaf960df7..3c719aa844 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -252,4 +252,4 @@ PyIceberg uses multiple threads to parallelize operations. The number of workers # Backward Compatibility -Previous versions of Java implementations incorrectly assumes the optional attribute `current-snapshot-id` to be a required attribute in the TableMetadata. Which means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue +Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue From fecbd4d8ba3ef8ebf5af26a95c56a44cb371684e Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 29 Feb 2024 22:06:36 +0000 Subject: [PATCH 6/9] adopt review feedback --- pyiceberg/serializers.py | 11 ++--------- pyiceberg/table/metadata.py | 9 ++++++++- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 0f5a6bb777..4fd2fefb6a 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -18,14 +18,12 @@ import codecs import gzip -import json from abc import ABC, abstractmethod from typing import Callable from pyiceberg.io import InputFile, InputStream, OutputFile -from pyiceberg.table.metadata import CURRENT_SNAPSHOT_ID, TableMetadata, TableMetadataUtil +from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 -from pyiceberg.utils.config import Config GZIP = "gzip" @@ -129,11 +127,6 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. """ with output_file.create(overwrite=overwrite) as output_stream: - model_dump = metadata.model_dump_json() - if Config().get_bool("legacy-current-snapshot-id") and metadata.current_snapshot_id is None: - model_dict = json.loads(model_dump) - model_dict[CURRENT_SNAPSHOT_ID] = -1 - model_dump = json.dumps(model_dict) - json_bytes = model_dump.encode(UTF8) + json_bytes = metadata.model_dump_json(exclude_none=False).encode(UTF8) json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes) output_stream.write(json_bytes) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 34d97b9bb4..0d2e023922 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -28,7 +28,7 @@ Union, ) -from pydantic import Field, model_validator +from pydantic import Field, field_serializer, model_validator from pydantic import ValidationError as PydanticValidationError from typing_extensions import Annotated @@ -49,6 +49,7 @@ IcebergRootModel, Properties, ) +from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import datetime_to_millis CURRENT_SNAPSHOT_ID = "current-snapshot-id" @@ -226,6 +227,12 @@ def schema_by_id(self, schema_id: int) -> Optional[Schema]: """Get the schema by schema_id.""" return next((schema for schema in self.schemas if schema.schema_id == schema_id), None) + @field_serializer('current_snapshot_id') + def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> Optional[int]: + if current_snapshot_id is None and Config().get_bool("legacy-current-snapshot-id"): + return -1 + return current_snapshot_id + class TableMetadataV1(TableMetadataCommonFields, IcebergBaseModel): """Represents version 1 of the Table Metadata. From a1024674efeb416dae058b6919a0bda13c3007d5 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 29 Feb 2024 22:31:46 +0000 Subject: [PATCH 7/9] lint --- pyiceberg/table/metadata.py | 1 + tests/integration/test_writes.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index e59a7879a9..2850c68224 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -270,6 +270,7 @@ def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> O return -1 return current_snapshot_id + def _generate_snapshot_id() -> int: """Generate a new Snapshot ID from a UUID. diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 2b851e14e9..f0d1c85797 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -32,7 +32,7 @@ from pyspark.sql import SparkSession from pytest_mock.plugin import MockerFixture -from pyiceberg.catalog import Catalog, Properties, Table, load_catalog +from pyiceberg.catalog import Catalog, Properties, Table from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.schema import Schema From f4a302b716284c8f4be419634ea663b498a22836 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 29 Feb 2024 20:13:54 -0500 Subject: [PATCH 8/9] Update pyiceberg/table/metadata.py --- pyiceberg/table/metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 2850c68224..1e5f0fdcec 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -122,7 +122,7 @@ def check_sort_orders(table_metadata: TableMetadata) -> TableMetadata: def construct_refs(table_metadata: TableMetadata) -> TableMetadata: """Set the main branch if missing.""" - if table_metadata.current_snapshot_id is not None and table_metadata.current_snapshot_id != -1: + if table_metadata.current_snapshot_id is not None: if MAIN_BRANCH not in table_metadata.refs: table_metadata.refs[MAIN_BRANCH] = SnapshotRef( snapshot_id=table_metadata.current_snapshot_id, snapshot_ref_type=SnapshotRefType.BRANCH From 130b3cc0f756bd487ba182e04a74808cd65e2587 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Mon, 4 Mar 2024 15:41:41 +0000 Subject: [PATCH 9/9] adopt review comment --- pyiceberg/serializers.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 4fd2fefb6a..e2994884c6 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -24,6 +24,7 @@ from pyiceberg.io import InputFile, InputStream, OutputFile from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil from pyiceberg.typedef import UTF8 +from pyiceberg.utils.config import Config GZIP = "gzip" @@ -127,6 +128,9 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. """ with output_file.create(overwrite=overwrite) as output_stream: - json_bytes = metadata.model_dump_json(exclude_none=False).encode(UTF8) + # We need to serialize None values, in order to dump `None` current-snapshot-id as `-1` + exclude_none = False if Config().get_bool("legacy-current-snapshot-id") else True + + json_bytes = metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8) json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes) output_stream.write(json_bytes)