diff --git a/python/pyiceberg/catalog/__init__.py b/python/pyiceberg/catalog/__init__.py index 7d42527a1c7c..dafef71287ab 100644 --- a/python/pyiceberg/catalog/__init__.py +++ b/python/pyiceberg/catalog/__init__.py @@ -314,6 +314,21 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: NoSuchTableError: If a table with the name does not exist. """ + @abstractmethod + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + @abstractmethod def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. diff --git a/python/pyiceberg/catalog/dynamodb.py b/python/pyiceberg/catalog/dynamodb.py index bf5dc8067a4a..848ec0312615 100644 --- a/python/pyiceberg/catalog/dynamodb.py +++ b/python/pyiceberg/catalog/dynamodb.py @@ -168,6 +168,21 @@ def create_table( return self.load_table(identifier=identifier) + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + raise NotImplementedError + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/python/pyiceberg/catalog/glue.py b/python/pyiceberg/catalog/glue.py index a191cfe25acf..405df6720fd9 100644 --- a/python/pyiceberg/catalog/glue.py +++ b/python/pyiceberg/catalog/glue.py @@ -225,6 +225,21 @@ def create_table( return self.load_table(identifier=identifier) + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + raise NotImplementedError + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/python/pyiceberg/catalog/hive.py b/python/pyiceberg/catalog/hive.py index 8942169db0df..7563270d09bb 100644 --- a/python/pyiceberg/catalog/hive.py +++ b/python/pyiceberg/catalog/hive.py @@ -303,6 +303,21 @@ def create_table( return self._convert_hive_into_iceberg(hive_table, io) + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + raise NotImplementedError + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/python/pyiceberg/catalog/noop.py b/python/pyiceberg/catalog/noop.py index bb93772aa759..083f851d1cbe 100644 --- a/python/pyiceberg/catalog/noop.py +++ b/python/pyiceberg/catalog/noop.py @@ -49,6 +49,21 @@ def create_table( def load_table(self, identifier: Union[str, Identifier]) -> Table: raise NotImplementedError + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + raise NotImplementedError + def drop_table(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py index 60c0e5748446..0023e1898464 100644 --- a/python/pyiceberg/catalog/rest.py +++ b/python/pyiceberg/catalog/rest.py @@ -79,6 +79,7 @@ class Endpoints: update_namespace_properties: str = "namespaces/{namespace}/properties" list_tables: str = "namespaces/{namespace}/tables" create_table: str = "namespaces/{namespace}/tables" + register_table = "namespaces/{namespace}/register" load_table: str = "namespaces/{namespace}/tables/{table}" update_table: str = "namespaces/{namespace}/tables/{table}" drop_table: str = "namespaces/{namespace}/tables/{table}?purgeRequested={purge}" @@ -127,6 +128,11 @@ class CreateTableRequest(IcebergBaseModel): properties: Properties = Field(default_factory=dict) +class RegisterTableRequest(IcebergBaseModel): + name: str + metadata_location: str = Field(..., alias="metadata-location") + + class TokenResponse(IcebergBaseModel): access_token: str = Field() token_type: str = Field() @@ -443,6 +449,37 @@ def create_table( table_response = TableResponse(**response.json()) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + namespace_and_table = self._split_identifier_for_path(identifier) + request = RegisterTableRequest( + name=namespace_and_table["table"], + metadata_location=metadata_location, + ) + serialized_json = request.model_dump_json().encode("utf-8") + response = self._session.post( + self.url(Endpoints.register_table, namespace=namespace_and_table["namespace"]), + data=serialized_json, + ) + try: + response.raise_for_status() + except HTTPError as exc: + self._handle_non_200_response(exc, {409: TableAlreadyExistsError}) + + table_response = TableResponse(**response.json()) + return self._response_to_table(self.identifier_to_tuple(identifier), table_response) + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) diff --git a/python/pyiceberg/catalog/sql.py b/python/pyiceberg/catalog/sql.py index 4cc5b224a271..56e460c49eb8 100644 --- a/python/pyiceberg/catalog/sql.py +++ b/python/pyiceberg/catalog/sql.py @@ -181,6 +181,21 @@ def create_table( return self.load_table(identifier=identifier) + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + """Register a new table using existing metadata. + + Args: + identifier Union[str, Identifier]: Table identifier for the table + metadata_location str: The location to the metadata + + Returns: + Table: The newly registered table + + Raises: + TableAlreadyExistsError: If the table already exists + """ + raise NotImplementedError + def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and return the table instance. diff --git a/python/tests/catalog/test_base.py b/python/tests/catalog/test_base.py index 29e93d0c9d05..562f1bf89db9 100644 --- a/python/tests/catalog/test_base.py +++ b/python/tests/catalog/test_base.py @@ -110,6 +110,9 @@ def create_table( self.__tables[identifier] = table return table + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + raise NotImplementedError + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: new_metadata: Optional[TableMetadata] = None metadata_location = "" diff --git a/python/tests/catalog/test_rest.py b/python/tests/catalog/test_rest.py index 829611e9889d..1c7581d24a6e 100644 --- a/python/tests/catalog/test_rest.py +++ b/python/tests/catalog/test_rest.py @@ -677,6 +677,142 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non assert "Table already exists" in str(e.value) +def test_register_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> None: + rest_mock.post( + f"{TEST_URI}v1/namespaces/default/register", + json={ + "metadata-location": "s3://warehouse/database/table/metadata.json", + "metadata": { + "format-version": 1, + "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", + "location": "s3://warehouse/database/table", + "last-updated-ms": 1657810967051, + "last-column-id": 3, + "schema": { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": False, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "int"}, + {"id": 3, "name": "baz", "required": False, "type": "boolean"}, + ], + }, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": False, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "int"}, + {"id": 3, "name": "baz", "required": False, "type": "boolean"}, + ], + } + ], + "partition-spec": [], + "default-spec-id": 0, + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": { + "write.delete.parquet.compression-codec": "zstd", + "write.metadata.compression-codec": "gzip", + "write.summary.partition-limit": "100", + "write.parquet.compression-codec": "zstd", + }, + "current-snapshot-id": -1, + "refs": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + }, + "config": { + "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", + "region": "us-west-2", + }, + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + actual = catalog.register_table( + identifier=("default", "registered_table"), metadata_location="s3://warehouse/database/table/metadata.json" + ) + expected = Table( + identifier=("rest", "default", "registered_table"), + metadata_location="s3://warehouse/database/table/metadata.json", + metadata=TableMetadataV1( + location="s3://warehouse/database/table", + table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"), + last_updated_ms=1657810967051, + last_column_id=3, + schemas=[ + Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + schema_id=0, + identifier_field_ids=[2], + ) + ], + current_schema_id=0, + default_spec_id=0, + last_partition_id=999, + properties={ + "write.delete.parquet.compression-codec": "zstd", + "write.metadata.compression-codec": "gzip", + "write.summary.partition-limit": "100", + "write.parquet.compression-codec": "zstd", + }, + current_snapshot_id=None, + snapshots=[], + snapshot_log=[], + metadata_log=[], + sort_orders=[SortOrder(order_id=0)], + default_sort_order_id=0, + refs={}, + format_version=1, + schema_=Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + schema_id=0, + identifier_field_ids=[2], + ), + partition_spec=[], + ), + io=load_file_io(), + catalog=catalog, + ) + assert actual.metadata.model_dump() == expected.metadata.model_dump() + assert actual.metadata_location == expected.metadata_location + assert actual.identifier == expected.identifier + + +def test_register_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> None: + rest_mock.post( + f"{TEST_URI}v1/namespaces/default/register", + json={ + "error": { + "message": "Table already exists: fokko.fokko2 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "AlreadyExistsException", + "code": 409, + } + }, + status_code=409, + request_headers=TEST_HEADERS, + ) + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + with pytest.raises(TableAlreadyExistsError) as e: + catalog.register_table( + identifier=("default", "registered_table"), metadata_location="s3://warehouse/database/table/metadata.json" + ) + assert "Table already exists" in str(e.value) + + def test_delete_namespace_204(rest_mock: Mocker) -> None: namespace = "example" rest_mock.delete(