Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions python/pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,21 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
NoSuchTableError: If a table with the name does not exist.
"""

@abstractmethod
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""

@abstractmethod
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
Expand Down
15 changes: 15 additions & 0 deletions python/pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ def create_table(

return self.load_table(identifier=identifier)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand Down
15 changes: 15 additions & 0 deletions python/pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ def create_table(

return self.load_table(identifier=identifier)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand Down
15 changes: 15 additions & 0 deletions python/pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,21 @@ def create_table(

return self._convert_hive_into_iceberg(hive_table, io)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand Down
15 changes: 15 additions & 0 deletions python/pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ def create_table(
def load_table(self, identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError

def drop_table(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

Expand Down
37 changes: 37 additions & 0 deletions python/pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Endpoints:
update_namespace_properties: str = "namespaces/{namespace}/properties"
list_tables: str = "namespaces/{namespace}/tables"
create_table: str = "namespaces/{namespace}/tables"
register_table = "namespaces/{namespace}/register"
load_table: str = "namespaces/{namespace}/tables/{table}"
update_table: str = "namespaces/{namespace}/tables/{table}"
drop_table: str = "namespaces/{namespace}/tables/{table}?purgeRequested={purge}"
Expand Down Expand Up @@ -127,6 +128,11 @@ class CreateTableRequest(IcebergBaseModel):
properties: Properties = Field(default_factory=dict)


class RegisterTableRequest(IcebergBaseModel):
name: str
metadata_location: str = Field(..., alias="metadata-location")


class TokenResponse(IcebergBaseModel):
access_token: str = Field()
token_type: str = Field()
Expand Down Expand Up @@ -443,6 +449,37 @@ def create_table(
table_response = TableResponse(**response.json())
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
namespace_and_table = self._split_identifier_for_path(identifier)
request = RegisterTableRequest(
name=namespace_and_table["table"],
metadata_location=metadata_location,
)
serialized_json = request.model_dump_json().encode("utf-8")
response = self._session.post(
self.url(Endpoints.register_table, namespace=namespace_and_table["namespace"]),
data=serialized_json,
)
try:
response.raise_for_status()
except HTTPError as exc:
self._handle_non_200_response(exc, {409: TableAlreadyExistsError})

table_response = TableResponse(**response.json())
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
Expand Down
15 changes: 15 additions & 0 deletions python/pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ def create_table(

return self.load_table(identifier=identifier)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata

Returns:
Table: The newly registered table

Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and return the table instance.

Expand Down
3 changes: 3 additions & 0 deletions python/tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def create_table(
self.__tables[identifier] = table
return table

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
new_metadata: Optional[TableMetadata] = None
metadata_location = ""
Expand Down
136 changes: 136 additions & 0 deletions python/tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,142 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non
assert "Table already exists" in str(e.value)


def test_register_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/default/register",
json={
"metadata-location": "s3://warehouse/database/table/metadata.json",
"metadata": {
"format-version": 1,
"table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
"location": "s3://warehouse/database/table",
"last-updated-ms": 1657810967051,
"last-column-id": 3,
"schema": {
"type": "struct",
"schema-id": 0,
"identifier-field-ids": [2],
"fields": [
{"id": 1, "name": "foo", "required": False, "type": "string"},
{"id": 2, "name": "bar", "required": True, "type": "int"},
{"id": 3, "name": "baz", "required": False, "type": "boolean"},
],
},
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"identifier-field-ids": [2],
"fields": [
{"id": 1, "name": "foo", "required": False, "type": "string"},
{"id": 2, "name": "bar", "required": True, "type": "int"},
{"id": 3, "name": "baz", "required": False, "type": "boolean"},
],
}
],
"partition-spec": [],
"default-spec-id": 0,
"last-partition-id": 999,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"properties": {
"write.delete.parquet.compression-codec": "zstd",
"write.metadata.compression-codec": "gzip",
"write.summary.partition-limit": "100",
"write.parquet.compression-codec": "zstd",
},
"current-snapshot-id": -1,
"refs": {},
"snapshots": [],
"snapshot-log": [],
"metadata-log": [],
},
"config": {
"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory",
"region": "us-west-2",
},
},
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
actual = catalog.register_table(
identifier=("default", "registered_table"), metadata_location="s3://warehouse/database/table/metadata.json"
)
expected = Table(
identifier=("rest", "default", "registered_table"),
metadata_location="s3://warehouse/database/table/metadata.json",
metadata=TableMetadataV1(
location="s3://warehouse/database/table",
table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
last_updated_ms=1657810967051,
last_column_id=3,
schemas=[
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
schema_id=0,
identifier_field_ids=[2],
)
],
current_schema_id=0,
default_spec_id=0,
last_partition_id=999,
properties={
"write.delete.parquet.compression-codec": "zstd",
"write.metadata.compression-codec": "gzip",
"write.summary.partition-limit": "100",
"write.parquet.compression-codec": "zstd",
},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
sort_orders=[SortOrder(order_id=0)],
default_sort_order_id=0,
refs={},
format_version=1,
schema_=Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
schema_id=0,
identifier_field_ids=[2],
),
partition_spec=[],
),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual.metadata_location == expected.metadata_location
assert actual.identifier == expected.identifier


def test_register_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/default/register",
json={
"error": {
"message": "Table already exists: fokko.fokko2 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "AlreadyExistsException",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(TableAlreadyExistsError) as e:
catalog.register_table(
identifier=("default", "registered_table"), metadata_location="s3://warehouse/database/table/metadata.json"
)
assert "Table already exists" in str(e.value)


def test_delete_namespace_204(rest_mock: Mocker) -> None:
namespace = "example"
rest_mock.delete(
Expand Down