Skip to content

Commit

Permalink
Deprecate Redundant Identifier Support in TableIdentifier, and row_fi…
Browse files Browse the repository at this point in the history
…lter (#994)
  • Loading branch information
sungwy authored Aug 8, 2024
1 parent debda66 commit eca9870
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 101 deletions.
30 changes: 27 additions & 3 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.deprecated import deprecated, deprecation_message

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -613,6 +613,11 @@ def update_namespace_properties(
ValueError: If removals and updates have overlapping keys.
"""

@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.
Expand All @@ -627,6 +632,25 @@ def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

def _identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.
Args:
identifier (str | Identifier): Table identifier.
Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down Expand Up @@ -769,7 +793,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
return False

def purge_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
self.drop_table(identifier_tuple)
io = load_file_io(self.properties, table.metadata_location)
Expand Down Expand Up @@ -823,7 +847,7 @@ def _create_staged_table(
)
io = self._load_file_io(properties=properties, location=metadata_location)
return StagedTable(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=io,
Expand Down
8 changes: 4 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)
Expand All @@ -260,7 +260,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

try:
Expand Down Expand Up @@ -291,7 +291,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

Expand Down Expand Up @@ -638,7 +638,7 @@ def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[st
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
Expand Down
10 changes: 5 additions & 5 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
Expand Down Expand Up @@ -462,7 +462,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
Expand Down Expand Up @@ -541,7 +541,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))
Expand All @@ -555,7 +555,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
Expand All @@ -581,7 +581,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
Expand Down
14 changes: 7 additions & 7 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, table.dbName, table.tableName),
identifier=(table.dbName, table.tableName),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
catalog=self,
)

def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
identifier_tuple = self.identifier_to_tuple_without_catalog(table.identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
current_time_millis = int(time.time() * 1000)

Expand Down Expand Up @@ -431,7 +431,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
Expand Down Expand Up @@ -477,7 +477,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(self.name, database_name, table_name),
identifier=(database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
Expand Down Expand Up @@ -509,7 +509,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

with self._client as open_client:
Expand All @@ -526,7 +526,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
Expand Down Expand Up @@ -554,7 +554,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
Expand Down
12 changes: 6 additions & 6 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin

def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> Table:
return Table(
identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple,
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
Expand All @@ -506,7 +506,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:

def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable:
return StagedTable(
identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple,
identifier=identifier_tuple if self.name else identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
Expand Down Expand Up @@ -664,7 +664,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:

@retry(**_RETRY_ARGS)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.get(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
Expand All @@ -678,7 +678,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:

@retry(**_RETRY_ARGS)
def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(
Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple)
Expand All @@ -695,7 +695,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:

@retry(**_RETRY_ARGS)
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
payload = {
"source": self._split_identifier_for_json(from_identifier_tuple),
"destination": self._split_identifier_for_json(to_identifier),
Expand Down Expand Up @@ -830,7 +830,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
Returns:
bool: True if the table exists, False otherwise.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.head(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
Expand Down
16 changes: 8 additions & 8 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name,) + Catalog.identifier_to_tuple(table_namespace) + (table_name,),
identifier=Catalog.identifier_to_tuple(table_namespace) + (table_name,),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
Expand Down Expand Up @@ -192,7 +192,7 @@ def create_table(
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier_nocatalog = self.identifier_to_tuple_without_catalog(identifier)
identifier_nocatalog = self._identifier_to_tuple_without_catalog(identifier)
namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
table_name = Catalog.table_name_from(identifier_nocatalog)
if not self._namespace_exists(namespace_identifier):
Expand Down Expand Up @@ -238,7 +238,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
Expand Down Expand Up @@ -277,7 +277,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
Expand All @@ -301,7 +301,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
Expand Down Expand Up @@ -348,8 +348,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
TableAlreadyExistsError: If a table with the new name already exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
to_identifier_tuple = self.identifier_to_tuple_without_catalog(to_identifier)
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
to_identifier_tuple = self._identifier_to_tuple_without_catalog(to_identifier)
from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple)
from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
from_table_name = Catalog.table_name_from(from_identifier_tuple)
Expand Down Expand Up @@ -407,7 +407,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
Expand Down
Loading

0 comments on commit eca9870

Please sign in to comment.