Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions mkdocs/docs/multi-part-namespace.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Multi-Part Namespace Support

Some catalog implementations support multi-part namespaces, which allows for hierarchical organization of tables. The following table summarizes the support for multi-part namespaces across different catalog implementations in Iceberg Python.

| Catalog Implementation | Multi-Part Namespace Support | Notes |
|------------------------|------------------------------|-------|
| REST Catalog | ✅ Yes | Fully supports multi-part namespace as defined by the REST catalog specification. |
| Hive Catalog | ❌ No | Spark does not support multi-part namespace. |
| DynamoDB Catalog | ✅ Yes | Namespace is represented as a composite key in DynamoDB. |
| Glue Catalog | ❌ No | Uses AWS Glue databases which don't support multi-part namespace. |
| File Catalog | ✅ Yes | Namespace parts are represented as directory hierarchies in the file system. |
| In-Memory Catalog | ✅ Yes | Supports multi-part namespace for testing purposes. |

## Usage Example

```python
from pyiceberg.catalog import load_catalog

# Using a catalog with multi-part namespace support
catalog = load_catalog("my_catalog")

# Creating a table with a multi-part namespace
catalog.create_table("default.multi.table_name", schema, spec)

# Listing tables in a multi-part namespace
tables = catalog.list_tables("default.multi")
```

## Configuration

When using catalogs that support multi-part namespaces, make sure to use the appropriate delimiter (typically `.`) when referencing namespaces in your code.
38 changes: 29 additions & 9 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
cast,
)

from pyiceberg.catalog._meta import CatalogABCMeta
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NoSuchNamespaceError,
Expand Down Expand Up @@ -331,7 +332,7 @@ class PropertiesUpdateSummary:
missing: list[str]


class Catalog(ABC):
class Catalog(ABC, metaclass=CatalogABCMeta):
"""Base Catalog for table operations like - create, drop, load, list and others.

The catalog table APIs accept a table identifier, which is fully classified table name. The identifier can be a string or
Expand Down Expand Up @@ -723,25 +724,44 @@ def namespace_to_string(identifier: str | Identifier, err: type[ValueError] | ty
return ".".join(segment.strip() for segment in tuple_identifier)

@staticmethod
def namespace_level(identifier: str | Identifier) -> int:
"""Get the level of a namespace identifier.

Args:
identifier (Union[str, Identifier]): a namespace identifier.

Returns:
int: The level of the namespace.
"""
if not identifier:
return 1
tuple_identifier = Catalog.identifier_to_tuple(identifier)
return len(tuple_identifier) + 1

@classmethod
def identifier_to_database(
identifier: str | Identifier, err: type[ValueError] | type[NoSuchNamespaceError] = ValueError
cls, identifier: str | Identifier, err: type[ValueError] | type[NoSuchNamespaceError] = ValueError
) -> str:
tuple_identifier = Catalog.identifier_to_tuple(identifier)
if len(tuple_identifier) != 1:
raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}")

return tuple_identifier[0]
if not cls._support_namespaces:
if len(tuple_identifier) != 1:
raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}")
else:
return tuple_identifier[0]

return ".".join(tuple_identifier)

@staticmethod
@classmethod
def identifier_to_database_and_table(
cls,
identifier: str | Identifier,
err: type[ValueError] | type[NoSuchTableError] | type[NoSuchNamespaceError] = ValueError,
) -> tuple[str, str]:
tuple_identifier = Catalog.identifier_to_tuple(identifier)
if len(tuple_identifier) != 2:
if not cls._support_namespaces and len(tuple_identifier) != 2:
raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}")

return tuple_identifier[0], tuple_identifier[1]
return ".".join(tuple_identifier[:-1]), tuple_identifier[-1]

def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)
Expand Down
76 changes: 76 additions & 0 deletions pyiceberg/catalog/_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import ABCMeta
from typing import TYPE_CHECKING, Any, Callable, TypeVar

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog

_T = TypeVar("_T", bound="Catalog")


class NamespaceMeta(type):
"""Metaclass for managing namespace support configuration in catalog implementations.

This metaclass automatically handles the inheritance and initialization of namespace-related
attributes for catalog classes. It ensures that namespace support configuration is properly
propagated through class inheritance hierarchies.

Attributes:
_support_namespaces (bool): Indicates whether the catalog supports nested namespaces.
Defaults to False. When True, the catalog can handle hierarchical namespace
structures beyond simple flat namespaces.
_max_namespace_depth (int): Maximum depth allowed for nested namespaces.
Defaults to -1 (unlimited depth). When set to a positive integer,
restricts the nesting level of namespaces that can be created.
"""

_support_namespaces: bool = False
_max_namespace_depth: int = -1

def __new__(mcls, name: str, bases: tuple[type, ...], atrrs: dict[str, Any], /, **kwargs: Any) -> type:
cls = super().__new__(mcls, name, bases, atrrs, **kwargs)
if "_support_namespaces" in atrrs:
pass # Already defined in the class
elif hasattr(bases[0], "_support_namespaces"):
cls._support_namespaces = bases[0]._support_namespaces
else:
cls._support_namespaces = NamespaceMeta._support_namespaces
return cls


class CatalogABCMeta(NamespaceMeta, ABCMeta):
"""Metaclass for catalog implementations that combines namespace and abstract base class functionality.

This metaclass inherits from both NamespaceMeta and ABCMeta.
"""


def multiple_namespaces(
_cls: type[_T] | None = None, /, disabled: bool = False, max_depth: int = -1
) -> type[_T] | Callable[[type[_T]], type[_T]]:
def wrapper(cls: type[_T]) -> type[_T]:
if not hasattr(cls, "_support_namespaces"):
raise ValueError(f"{cls.__name__} must inherit Catalog with CatalogABCMeta and NamespaceMeta to use this decorator")
if max_depth >= 0 and max_depth <= 1 or disabled:
cls._support_namespaces = False
else:
cls._support_namespaces = True
cls._max_namespace_depth = max_depth
return cls

return wrapper if _cls is None else wrapper(_cls)
33 changes: 19 additions & 14 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@


class DynamoDbCatalog(MetastoreCatalog):
_support_namespaces: bool = True

def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str):
"""Dynamodb catalog.

Expand Down Expand Up @@ -441,29 +443,32 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
return table_identifiers

def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
"""List top-level namespaces from the catalog.

We do not support hierarchical namespace.
"""List namespaces from the catalog.

Returns:
List[Identifier]: a List of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
level = self.namespace_level(namespace)
conditions = f"{DYNAMODB_COL_IDENTIFIER} = :identifier"
expression_attribute_values = {
":identifier": {
"S": DYNAMODB_NAMESPACE,
}
}
if namespace:
return []
conditions += f" AND begins_with({DYNAMODB_COL_NAMESPACE},:ns)"
expression_attribute_values[":ns"] = {
"S": self.namespace_to_string(namespace) + ".",
}

paginator = self.dynamodb.get_paginator("query")

try:
page_iterator = paginator.paginate(
TableName=self.dynamodb_table_name,
ConsistentRead=True,
KeyConditionExpression=f"{DYNAMODB_COL_IDENTIFIER} = :identifier",
ExpressionAttributeValues={
":identifier": {
"S": DYNAMODB_NAMESPACE,
}
},
KeyConditionExpression=conditions,
ExpressionAttributeValues=expression_attribute_values,
)
except (
self.dynamodb.exceptions.ProvisionedThroughputExceededException,
Expand All @@ -473,14 +478,14 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
) as e:
raise GenericDynamoDbError(e.message) from e

database_identifiers = []
database_identifiers = set()
for page in page_iterator:
for item in page["Items"]:
_dict = _convert_dynamo_item_to_regular_dict(item)
namespace_col = _dict[DYNAMODB_COL_NAMESPACE]
database_identifiers.append(self.identifier_to_tuple(namespace_col))
database_identifiers.add(self.identifier_to_tuple(namespace_col)[:level])

return database_identifiers
return list(database_identifiers)

def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
"""
Expand Down
1 change: 1 addition & 0 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class ListViewsResponse(IcebergBaseModel):
class RestCatalog(Catalog):
uri: str
_session: Session
_support_namespaces: bool = True

def __init__(self, name: str, **properties: str):
"""Rest Catalog.
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class SqlCatalog(MetastoreCatalog):
The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`.
"""

_support_namespaces: bool = True

def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

Expand Down
56 changes: 54 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Iterable,
Iterator,
Sequence,
TypeVar,
)

Expand Down Expand Up @@ -1020,20 +1022,70 @@ def commit_transaction(self) -> Table:
return self._table


class Namespace(IcebergRootModel[list[str]]):
class Namespace(IcebergRootModel[tuple[str, ...]]):
"""Reference to one or more levels of a namespace."""

root: list[str] = Field(
root: tuple[str, ...] = Field(
...,
description="Reference to one or more levels of a namespace",
)

def __len__(self) -> int:
"""Fetch the size of Namespace."""
return len(self.root)

def __getitem__(self, index: int) -> str:
"""Fetch a value from a Namespace."""
return self.root[index]

def __iter__(self) -> Iterator[str]:
"""Return an iterator over the elements in the root of the table."""
return iter(self.root)

def levels(self) -> int:
"""Return the number of levels in this namespace."""
return len(self.root)

def __repr__(self) -> str:
"""Return a string representation of the namespace."""
return f"Namespace({self.root})"


class TableIdentifier(IcebergBaseModel):
"""Fully Qualified identifier to a table."""

namespace: Namespace
name: str
_separator: ClassVar[str] = "."

@classmethod
def from_string(cls, identifier: str) -> TableIdentifier:
"""Create a TableIdentifier from a separator.

Args:
identifier: A separator representing the table identifier, e.g., "db.schema.table".

Returns:
A TableIdentifier instance.
"""
parts = identifier.split(cls._separator)
return cls.from_tuple(parts)

@classmethod
def from_tuple(cls, identifier: Sequence[str]) -> TableIdentifier:
"""Create a TableIdentifier from a tuple.

Args:
identifier: A tuple representing the table identifier, e.g., ("db", "schema", "table").

Returns:
A TableIdentifier instance.
"""
if len(identifier) < 2:
raise ValueError("Identifier must include at least a namespace and a table name.")
namespace = Namespace(root=tuple(identifier[:-1]))
name = identifier[-1]
return cls(namespace=namespace, name=name)


class CommitTableRequest(IcebergBaseModel):
Expand Down
17 changes: 17 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,23 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None:
]


def test_list_multipart_namespaces_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces",
json={"namespaces": [["default"], ["multipart"]]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [("default",), ("multipart",)]

rest_mock.get(
f"{TEST_URI}v1/namespaces?parent=multipart",
json={"namespaces": [["multipart", "namespace1"], ["multipart", "namespace2"]]},
status_code=200,
request_headers=TEST_HEADERS,
)


def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces?parent=accounting",
Expand Down