Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion python/pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,28 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:

Raises:
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
raise NotImplementedError
database_name, table_name = self.identifier_to_database_and_table(identifier)
if not self._namespace_exists(database_name):
raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}")

with Session(self.engine) as session:
try:
session.add(
IcebergTables(
catalog_name=self.name,
table_namespace=database_name,
table_name=table_name,
metadata_location=metadata_location,
previous_metadata_location=None,
)
)
session.commit()
except IntegrityError as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e

return self.load_table(identifier=identifier)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and return the table instance.
Expand Down
29 changes: 29 additions & 0 deletions python/tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,35 @@ def test_create_table_without_namespace(test_catalog: SqlCatalog, table_schema_n
test_catalog.create_table(table_name, table_schema_nested)


def test_register_table(test_catalog: SqlCatalog, random_identifier: Identifier, metadata_location: str) -> None:
database_name, _table_name = random_identifier
test_catalog.create_namespace(database_name)
table = test_catalog.register_table(random_identifier, metadata_location)
assert table.identifier == (test_catalog.name,) + random_identifier
assert table.metadata_location == metadata_location
assert os.path.exists(metadata_location)
test_catalog.drop_table(random_identifier)


def test_register_existing_table(test_catalog: SqlCatalog, random_identifier: Identifier, metadata_location: str) -> None:
database_name, _table_name = random_identifier
test_catalog.create_namespace(database_name)
test_catalog.register_table(random_identifier, metadata_location)
with pytest.raises(TableAlreadyExistsError):
test_catalog.register_table(random_identifier, metadata_location)


def test_register_table_with_non_existing_namespace(test_catalog: SqlCatalog, metadata_location: str, table_name: str) -> None:
identifier = ("invalid", table_name)
with pytest.raises(NoSuchNamespaceError):
test_catalog.register_table(identifier, metadata_location)


def test_register_table_without_namespace(test_catalog: SqlCatalog, metadata_location: str, table_name: str) -> None:
with pytest.raises(ValueError):
test_catalog.register_table(table_name, metadata_location)


def test_load_table(test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None:
database_name, _table_name = random_identifier
test_catalog.create_namespace(database_name)
Expand Down