Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for write.metadata.path #1642

Merged
merged 10 commits into from
Feb 14, 2025
Merged
11 changes: 8 additions & 3 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ Iceberg tables support table properties to configure table behavior.
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. |

### Table behavior options

Expand Down Expand Up @@ -203,12 +204,16 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya

## Location Providers

Apache Iceberg uses the concept of a `LocationProvider` to manage file paths for a table's data. In PyIceberg, the
Apache Iceberg uses the concept of a `LocationProvider` to manage file paths for a table's data and metadata files. In PyIceberg, the
`LocationProvider` module is designed to be pluggable, allowing customization for specific use cases. The
`LocationProvider` for a table can be specified through table properties.
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

PyIceberg defaults to the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider), which generates
file paths that are optimized for object storage.
Both data file and metadata file locations can be customized by configuring the table properties [write.data.path and write.metadata.path](#write-options), respectively.
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

For more granular control, you can override the `LocationProvider`'s `new_data_location` and `new_metadata_location` methods to define custom logic for generating file paths.
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

PyIceberg defaults to the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider), which generates file paths for
data files that are optimized for object storage.

### Simple Location Provider

Expand Down
14 changes: 5 additions & 9 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
Table,
TableProperties,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -857,7 +858,8 @@ def _create_staged_table(
database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
provider = load_location_provider(location, properties)
metadata_location = provider.new_table_metadata_file_location()
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down Expand Up @@ -888,7 +890,8 @@ def _update_and_stage_table(
)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
provider = load_location_provider(updated_metadata.location, updated_metadata.properties)
new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version)

return StagedTable(
identifier=table_identifier,
Expand Down Expand Up @@ -945,13 +948,6 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) -
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None:
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))

@staticmethod
def _get_metadata_location(location: str, new_version: int = 0) -> str:
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
version_str = f"{new_version:05d}"
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"

@staticmethod
def _parse_metadata_version(metadata_location: str) -> int:
"""Parse the version from the metadata location.
Expand Down
5 changes: 4 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -173,7 +174,9 @@ def create_table(
database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
provider = load_location_provider(table_location=location, table_properties=properties)
metadata_location = provider.new_table_metadata_file_location()

metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down
4 changes: 3 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -207,7 +208,8 @@ def create_table(

namespace = Catalog.namespace_to_string(namespace_identifier)
location = self._resolve_table_location(location, namespace, table_name)
metadata_location = self._get_metadata_location(location=location)
location_provider = load_location_provider(table_location=location, table_properties=properties)
metadata_location = location_provider.new_table_metadata_file_location()
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down
6 changes: 6 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
)
from pyiceberg.schema import Schema
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.metadata import (
INITIAL_SEQUENCE_NUMBER,
TableMetadata,
Expand Down Expand Up @@ -200,6 +201,7 @@ class TableProperties:
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True

WRITE_DATA_PATH = "write.data.path"
WRITE_METADATA_PATH = "write.metadata.path"

DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
Expand Down Expand Up @@ -1000,6 +1002,10 @@ def location(self) -> str:
"""Return the table's base location."""
return self.metadata.location

def location_provider(self) -> LocationProvider:
"""Return the table's location provider."""
return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)

@property
def last_sequence_number(self) -> int:
return self.metadata.last_sequence_number
Expand Down
38 changes: 37 additions & 1 deletion pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import importlib
import logging
import uuid
from abc import ABC, abstractmethod
from typing import Optional

Expand All @@ -29,7 +30,7 @@


class LocationProvider(ABC):
"""A base class for location providers, that provide data file locations for a table's write tasks.
"""A base class for location providers, that provide file locations for a table's write tasks.

Args:
table_location (str): The table's base storage location.
Expand All @@ -40,6 +41,7 @@ class LocationProvider(ABC):
table_properties: Properties

data_path: str
metadata_path: str

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
Expand All @@ -52,6 +54,11 @@ def __init__(self, table_location: str, table_properties: Properties):
else:
self.data_path = f"{self.table_location.rstrip('/')}/data"

if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
self.metadata_path = path.rstrip("/")
else:
self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.
Expand All @@ -64,6 +71,35 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
str: A fully-qualified location URI for the data file.
"""

def new_table_metadata_file_location(self, new_version: int = 0) -> str:
Copy link
Contributor

@smaheshwar-pltr smaheshwar-pltr Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally looks fine to me (and configuring metadata locations on LocationProviders is cool)!

  1. Spitballing (/nits): I wonder if custom location providers that technically didn't subclass LocationProvider but would've worked before still would now fail for metadata writing because they don't have these new methods. If this is the case, it's maybe fine because we can expect them to subclass LocationProvider as per the docs.
  2. The way this is written makes me wonder whether users might want to customise their metadata locations under a table instead of providing a hard-coded path - in a similar way to custom LocationProviders do for data files (maybe with mass updates they can have a large number of json/avro files so they could want to do the object storage hashing inside the metadata folder too to reduce throttling?) . I don't see that strong of a use case FWIW, but if I'm wrong could maybe add this to docs about custom location providers or start a larger discussion. Any thoughts folks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for 1, we haven't released a version with LocationProvider. so any references going forward should also have the new methods. We probably want to add a note in the location provider docs about the new metadata location feature

The way this is written makes me wonder whether users might want to customise their metadata locations under a table instead of providing a hard-coded path - in a similar way to custom LocationProviders do for data files

does "under a table" mean path relative to the table's base location? The SimpleLocation Provider uses absolute path. The ObjectStoreLocationProvider is special in that it injects some hash after the base location. But i think a custom metadata location provider can do the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly, I was just curious about user demand for a custom metadata location provider. It exceeds the capability of Java custom location providers and posing the question of whether it makes sense for other implementations.

To clarify, no objections from my end. Updating the docs sounds good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smaheshwar-pltr Regarding point 1, I had the same question while working on this. Centralizing locations/paths would definitely make future changes easier, especially with the ongoing discussions around relative path work. I found a related issue in the main repo here: apache/iceberg#6809. Which looks like it didn't get any traction but still holds some value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see the semantic difference.

new_data_location is an abstract method, which allows custom LocationProviders to override.
load_location_provider returns SimpleLocationProvider by default, which then use the SimpleLocationProvider's new_data_location method in most cases.

we can do the same for new_metadata_location, making it an abstract method so that custom LocationProviders can override. but also provide a default implementation for both SimpleLocationProvider and ObjectStoreLocationProvider

we can then mention in the LocationProvider documentation that new_metadata_location can be overriden

Does this make sense? @smaheshwar-pltr is this what you were pointing out?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is interesting because for data file path, we can use TableProperties.WRITE_DATA_PATH to override LocationProvider's self.data_path, which is the prefix of the data file path.
And new_data_location to override the behavior of the fully-qualified data file location.

We're trying recreate the same behavior for metadata path.
TableProperties.WRITE_METADATA_PATH to override the LocationProvider's self.metadata_path, which is the prefix of the metadata file path.
And new_metadata_location to override the behavior of the fully-qualified metadata file location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, brushing up on my python...
i think the code is fine as is. both new_metadata_location and new_table_metadata_file_location can provide default implementation and also be overriden by custom implementations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a related issue in the main repo here: apache/iceberg#6809. Which looks like it didn't get any traction but still holds some value.

Thanks for digging into this - I wasn't aware of that discussion. Quoting from it, "Object storage mode to not just data files but also metadata files" is what I was getting at here. It's interesting

"""Return a fully-qualified metadata file location for a new table version.

Args:
new_version (int): Version number of the metadata file.

Returns:
str: fully-qualified URI for the new table metadata file.

Raises:
ValueError: If the version is negative.
"""
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
return self.new_metadata_location(file_name)

def new_metadata_location(self, metadata_file_name: str) -> str:
"""Return a fully-qualified metadata file location for the given filename.

Args:
metadata_file_name (str): Name of the metadata file.

Returns:
str: A fully-qualified location URI for the metadata file.
"""
return f"{self.metadata_path}/{metadata_file_name}"


class SimpleLocationProvider(LocationProvider):
def __init__(self, table_location: str, table_properties: Properties):
Expand Down
25 changes: 11 additions & 14 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@
from pyiceberg.table import Transaction


def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
return f"{location}/metadata/{commit_uuid}-m{num}.avro"
def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
return f"{commit_uuid}-m{num}.avro"


def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
def _new_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
# Mimics the behavior in Java:
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"


class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
Expand Down Expand Up @@ -243,13 +243,13 @@ def _commit(self) -> UpdatesAndRequirements:
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

summary = self._summary(self.snapshot_properties)

manifest_list_file_path = _generate_manifest_list_path(
location=self._transaction.table_metadata.location,
file_name = _new_manifest_list_file_name(
snapshot_id=self._snapshot_id,
attempt=0,
commit_uuid=self.commit_uuid,
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)
with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -295,13 +295,10 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
)

def new_manifest_output(self) -> OutputFile:
return self._io.new_output(
_new_manifest_path(
location=self._transaction.table_metadata.location,
num=next(self._manifest_num_counter),
commit_uuid=self.commit_uuid,
)
)
location_provider = self._transaction._table.location_provider()
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
file_path = location_provider.new_metadata_location(file_name)
return self._io.new_output(file_path)

def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)
Expand Down
59 changes: 59 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Table,
TableProperties,
)
from pyiceberg.table.update import (
AddSchemaUpdate,
Expand Down Expand Up @@ -563,3 +565,60 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
with pytest.raises(ValidationError) as exc_info:
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)


def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None:
metadata_path = f"{catalog._warehouse_location}/custom/path"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
)
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore
location_provider = table.location_provider()

assert location_provider.new_metadata_location("").startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.location() != metadata_path
assert table.metadata_location.startswith(metadata_path)


def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
metadata_path = f"{table.location()}/metadata"
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore
location_provider = table.location_provider()

assert location_provider.new_metadata_location("").startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.metadata_location.startswith(metadata_path)


def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

initial_metadata_path = f"{table.location()}/metadata"
assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json"

# update table with new path for metadata
new_metadata_path = f"{table.location()}/custom/path"
table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction()

assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json"
24 changes: 24 additions & 0 deletions tests/table/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,27 @@ def test_simple_location_provider_write_data_path() -> None:
)

assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"


def test_location_provider_metadata_default_location() -> None:
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)

assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro"


def test_location_provider_metadata_location_with_custom_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"},
)

assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"


def test_metadata_location_with_trailing_slash() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"},
)

assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"