diff --git a/python/mkdocs/docs/api.md b/python/mkdocs/docs/api.md index ddd5ca180f73..d3b8fceee5c5 100644 --- a/python/mkdocs/docs/api.md +++ b/python/mkdocs/docs/api.md @@ -78,110 +78,15 @@ catalog.load_table(("nyc", "taxis")) # The tuple syntax can be used if the namespace or table contains a dot. ``` -This returns a `Table` that represents an Iceberg table: - -```python -Table( - identifier=('nyc', 'taxis'), - metadata_location='s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json', - metadata=TableMetadataV2( - location='s3a://warehouse/wh/nyc.db/taxis', - table_uuid=UUID('ebd5d172-2162-453d-b586-1cdce52c1116'), - last_updated_ms=1662633437826, - last_column_id=19, - schemas=[Schema( - NestedField(field_id=1, name='VendorID', field_type=LongType(), required=False), - NestedField(field_id=2, name='tpep_pickup_datetime', field_type=TimestamptzType(), required=False), - NestedField(field_id=3, name='tpep_dropoff_datetime', field_type=TimestamptzType(), required=False), - NestedField(field_id=4, name='passenger_count', field_type=DoubleType(), required=False), - NestedField(field_id=5, name='trip_distance', field_type=DoubleType(), required=False), - NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(), required=False), - NestedField(field_id=7, name='store_and_fwd_flag', field_type=StringType(), required=False), - NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False), - NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False), - NestedField(field_id=10, name='payment_type', field_type=LongType(), required=False), - NestedField(field_id=11, name='fare_amount', field_type=DoubleType(), required=False), - NestedField(field_id=12, name='extra', field_type=DoubleType(), required=False), - NestedField(field_id=13, name='mta_tax', field_type=DoubleType(), required=False), - NestedField(field_id=14, name='tip_amount', field_type=DoubleType(), required=False), - NestedField(field_id=15, name='tolls_amount', field_type=DoubleType(), required=False), - NestedField(field_id=16, name='improvement_surcharge', field_type=DoubleType(), required=False), - NestedField(field_id=17, name='total_amount', field_type=DoubleType(), required=False), - NestedField(field_id=18, name='congestion_surcharge', field_type=DoubleType(), required=False), - NestedField(field_id=19, name='airport_fee', field_type=DoubleType(), required=False) - ), - schema_id=0, - identifier_field_ids=[] - )], - current_schema_id=0, - partition_specs=[PartitionSpec(spec_id=0)], - default_spec_id=0, - last_partition_id=999, - properties={ - 'owner': 'root', - 'write.format.default': 'parquet' - }, - current_snapshot_id=8334458494559715805, - snapshots=[ - Snapshot( - snapshot_id=7910949481055846233, - parent_snapshot_id=None, - sequence_number=None, - timestamp_ms=1662489306555, - manifest_list='s3a://warehouse/wh/nyc.db/taxis/metadata/snap-7910949481055846233-1-3eb7a2e1-5b7a-4e76-a29a-3e29c176eea4.avro', - summary=Summary( - Operation.APPEND, - **{ - 'spark.app.id': 'local-1662489289173', - 'added-data-files': '1', - 'added-records': '2979431', - 'added-files-size': '46600777', - 'changed-partition-count': '1', - 'total-records': '2979431', - 'total-files-size': '46600777', - 'total-data-files': '1', - 'total-delete-files': '0', - 'total-position-deletes': '0', - 'total-equality-deletes': '0' - } - ), - schema_id=0 - ), - ], - snapshot_log=[ - SnapshotLogEntry( - snapshot_id='7910949481055846233', - timestamp_ms=1662489306555 - ) - ], - metadata_log=[ - MetadataLogEntry( - metadata_file='s3a://warehouse/wh/nyc.db/taxis/metadata/00000-b58341ba-6a63-4eea-9b2f-e85e47c7d09f.metadata.json', - timestamp_ms=1662489306555 - ) - ], - sort_orders=[SortOrder(order_id=0)], - default_sort_order_id=0, - refs={ - 'main': SnapshotRef( - snapshot_id=8334458494559715805, - snapshot_ref_type=SnapshotRefType.BRANCH, - min_snapshots_to_keep=None, - max_snapshot_age_ms=None, - max_ref_age_ms=None - ) - }, - format_version=2, - last_sequence_number=1 - ) -) -``` +This returns a `Table` that represents an Iceberg table that can be queried and altered. ### Directly from a metadata file To load a table directly from a metadata file (i.e., **without** using a catalog), you can use a `StaticTable` as follows: ```python +from pyiceberg.table import StaticTable + table = StaticTable.from_metadata( "s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json" ) @@ -241,52 +146,37 @@ catalog.create_table( ) ``` -Which returns a newly created table: +### Update table properties + +Set and remove properties through the `Transaction` API: ```python -Table( - identifier=('default', 'bids'), - metadata_location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids//metadata/00000-c8cd93ab-f784-474d-a167-b1a86b05195f.metadata.json', - metadata=TableMetadataV2( - location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/', - table_uuid=UUID('38d4cb39-4945-4bf2-b374-984b5c4984d2'), - last_updated_ms=1661847562069, - last_column_id=4, - schemas=[ - Schema( - NestedField(field_id=1, name='datetime', field_type=TimestampType(), required=False), - NestedField(field_id=2, name='bid', field_type=DoubleType(), required=False), - NestedField(field_id=3, name='ask', field_type=DoubleType(), required=False), - NestedField(field_id=4, name='symbol', field_type=StringType(), required=False)), - schema_id=1, - identifier_field_ids=[]) - ], - current_schema_id=1, - partition_specs=[ - PartitionSpec( - PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name='datetime_day'),)) - ], - default_spec_id=0, - last_partition_id=1000, - properties={}, - current_snapshot_id=None, - snapshots=[], - snapshot_log=[], - metadata_log=[], - sort_orders=[ - SortOrder(order_id=1, fields=[SortField(source_id=4, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST)]) - ], - default_sort_order_id=1, - refs={}, - format_version=2, - last_sequence_number=0 - ) -) +with table.transaction() as transaction: + transaction.set_properties(abc="def") + +assert table.properties == {"abc": "def"} + +with table.transaction() as transaction: + transaction.remove_properties("abc") + +assert table.properties == {} +``` + +Or, without a context manager: + +```python +table = table.transaction().set_properties(abc="def").commit_transaction() + +assert table.properties == {"abc": "def"} + +table = table.transaction().remove_properties("abc").commit_transaction() + +assert table.properties == {} ``` -## Query a table +## Query the data -To query a table, a table scan is needed. A table scan accepts a filter, columns and optionally a limit and a snapshot ID: +To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID: ```python from pyiceberg.catalog import load_catalog diff --git a/python/poetry.lock b/python/poetry.lock index 729f4fc5eabc..74dbe1683bc9 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -1703,13 +1703,13 @@ files = [ [[package]] name = "platformdirs" -version = "3.6.0" +version = "3.7.0" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." optional = false python-versions = ">=3.7" files = [ - {file = "platformdirs-3.6.0-py3-none-any.whl", hash = "sha256:ffa199e3fbab8365778c4a10e1fbf1b9cd50707de826eb304b50e57ec0cc8d38"}, - {file = "platformdirs-3.6.0.tar.gz", hash = "sha256:57e28820ca8094678b807ff529196506d7a21e17156cb1cddb3e74cebce54640"}, + {file = "platformdirs-3.7.0-py3-none-any.whl", hash = "sha256:cfd065ba43133ff103ab3bd10aecb095c2a0035fcd1f07217c9376900d94ba07"}, + {file = "platformdirs-3.7.0.tar.gz", hash = "sha256:87fbf6473e87c078d536980ba970a472422e94f17b752cfad17024c18876d481"}, ] [package.extras] @@ -1718,13 +1718,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.3.1)", "pytest- [[package]] name = "pluggy" -version = "1.0.0" +version = "1.2.0" description = "plugin and hook calling mechanisms for python" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" files = [ - {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, - {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, + {file = "pluggy-1.2.0-py3-none-any.whl", hash = "sha256:c2fd55a7d7a3863cba1a013e4e2414658b1d07b6bc57b3919e0c63c9abb99849"}, + {file = "pluggy-1.2.0.tar.gz", hash = "sha256:d12f0c4b579b15f5e054301bb226ee85eeeba08ffec228092f8defbaa3a4c4b3"}, ] [package.extras] @@ -2186,32 +2186,27 @@ files = [ [[package]] name = "ray" -version = "2.5.0" +version = "2.5.1" description = "Ray provides a simple, universal API for building distributed applications." optional = true python-versions = "*" files = [ - {file = "ray-2.5.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:d1bebc874e896880c1215f4c1a11697ada49fa1595d6d99d7c5b4dc03030df36"}, - {file = "ray-2.5.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c0285df2d24cacc36ca64b7852178a9bf37e3fc88545752fc2b46c27396965c1"}, - {file = "ray-2.5.0-cp310-cp310-manylinux2014_aarch64.whl", hash = "sha256:38935d46c2597c1d1f113e1c8f88e2716c67052c480de5b2a0265e0a1a5ce88f"}, - {file = "ray-2.5.0-cp310-cp310-manylinux2014_x86_64.whl", hash = "sha256:d53a07c9a9dbc134945a26980f557e9ff0f591bf8cabed1a6ebf921768d1c8bd"}, - {file = "ray-2.5.0-cp310-cp310-win_amd64.whl", hash = "sha256:ef26ba24461dad98365b48ef01e27e70bc9737f4cf4734115804153d7d9195dc"}, - {file = "ray-2.5.0-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:d714175a5000ca91f82646a9b72521118bb6d2db5568e1b7ae9ceb64769716b6"}, - {file = "ray-2.5.0-cp311-cp311-manylinux2014_x86_64.whl", hash = "sha256:0cde929e63497ed5f1c8626e5ccf7595ef6acaf1e7e270ad7c12f8e1c7695244"}, - {file = "ray-2.5.0-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:7e5512abf62c05c9ff90b1c89a4e0f2e45ee00e73f816eb8265e3ebd92fe4064"}, - {file = "ray-2.5.0-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:3bf36beb213f89c0eb1ec5ac6ffddc8f53e616be745167f00ca017abd8672a2d"}, - {file = "ray-2.5.0-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:59c2448b07f45d9a9d8e594bb5337bd35a5fea04e42cb4211a3346c2c0d066b0"}, - {file = "ray-2.5.0-cp37-cp37m-win_amd64.whl", hash = "sha256:63008dd659d9ef25b0e20f0e1a285e8266e0af68b1178bca1b6ae43e49a68104"}, - {file = "ray-2.5.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:e9464e93d6b72e0da69b9c5ab0501cc40f2db14801e22c6b97fa4e8039647892"}, - {file = "ray-2.5.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7dc00fac119bfa1c2f8ac456d50a728346d6f2722fb7a21bf70841fc7476c285"}, - {file = "ray-2.5.0-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:d76051519bd4ae39fda4a87536978cafdebf2843c1c29a9f734c503d8ea676cd"}, - {file = "ray-2.5.0-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:9a8e06dc5d4201129c28b6768a971c474b82a23935b2e40461ffc7f1c2f4942a"}, - {file = "ray-2.5.0-cp38-cp38-win_amd64.whl", hash = "sha256:849014b62ca50ff106b7a5d41430346e2762b1c4c803673af076209925b8f912"}, - {file = "ray-2.5.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:a1b52c12a3349d8e37357df31438b6f1b12c7719ef41bdf5089fc7e78e8ab212"}, - {file = "ray-2.5.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:25f3d50c27c4c4756259d093d152381c6604bb96684a0cf43c55ddcc2eb73f79"}, - {file = "ray-2.5.0-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:1cb4f6ef9cfdb69d2ae582f357e977527944390e2f5cbbf51efd8252ed4c9a11"}, - {file = "ray-2.5.0-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:662cff303c086369a29283badcd7445b7f911874d8407b2c589b1ccbf6028d2e"}, - {file = "ray-2.5.0-cp39-cp39-win_amd64.whl", hash = "sha256:a2cea10981dad7cfd187edf5e225a667eb114269afc5f2321b52113ef2d86123"}, + {file = "ray-2.5.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:43c1f3c662692bca6a03a7eb118e310c563ac64901b838c74ec1fe34c28dfce7"}, + {file = "ray-2.5.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:bd47d617fcfc969d8f22814ebe283f2e7d3de63b3d4d34b968d905b7055f85cd"}, + {file = "ray-2.5.1-cp310-cp310-manylinux2014_x86_64.whl", hash = "sha256:75f2f91c80cf2cbfee630175bd0fff6ba109bcc054e378229134dd2736083dd7"}, + {file = "ray-2.5.1-cp310-cp310-win_amd64.whl", hash = "sha256:03bc24f865da7ad94b142c5c12b3827a95505cc1968366c825990b183d54a797"}, + {file = "ray-2.5.1-cp311-cp311-manylinux2014_x86_64.whl", hash = "sha256:8f561bb6067e5c10af4eb0f8c9185dedda5a0214fae4b187675632220dab5b69"}, + {file = "ray-2.5.1-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:2f3a3667b5f40ed984ddab9854f3a0e9d88e12b8c078dd1b7964a7276693570d"}, + {file = "ray-2.5.1-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:9c4ede34d265fe52da74c8966e45c5d2f4c27a2d5fb68d4828bc3b428050ca39"}, + {file = "ray-2.5.1-cp37-cp37m-win_amd64.whl", hash = "sha256:e507339b6bb0bb85433dbce4e073f125a6e1da1009658963de0eaeb7ef4b7a93"}, + {file = "ray-2.5.1-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:4019a5000b23909c004dba4cc6b7457ffc798ace2db5489d2a2d7c37cb053323"}, + {file = "ray-2.5.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0289d3b6ecc86a260e51d23f7b04875ace5f9716ffd9a5a9f0072120d0866c37"}, + {file = "ray-2.5.1-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:e7e91a93fced3695405d61849dff4efb8cec74f669b22c0276164136992f4528"}, + {file = "ray-2.5.1-cp38-cp38-win_amd64.whl", hash = "sha256:0e92c41c9326378bf2cdab3ae401a6279cda496da58d7a60564999de5dedcd2f"}, + {file = "ray-2.5.1-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:c7ca8df922d2a090c4f4a6e017e43e1da98b30e3ed086fc01f25624874c6b64a"}, + {file = "ray-2.5.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3cbe4284ee952bd9379210f08a2917bc7e74efb6ec71bd4f156e282e031cdbec"}, + {file = "ray-2.5.1-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:a9b8bdcfce1646df2cdc89456bf6e1ff57091c433789e1fc34a8caf3a874b8c8"}, + {file = "ray-2.5.1-cp39-cp39-win_amd64.whl", hash = "sha256:5d6689e95706ba7ba4d13f9e09aed1498cee5736e0b74c61331585a3c1b086bf"}, ] [package.dependencies] @@ -2239,8 +2234,8 @@ requests = "*" [package.extras] air = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "fastapi", "fsspec", "gpustat (>=1.0.0)", "numpy (>=1.20)", "opencensus", "pandas", "pandas (>=1.3)", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)", "pydantic", "requests", "smart-open", "starlette", "tensorboardX (>=1.9)", "uvicorn", "virtualenv (>=20.0.24,<20.21.1)"] -all = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "dm-tree", "fastapi", "fsspec", "gpustat (>=1.0.0)", "gymnasium (==0.26.3)", "kubernetes", "lz4", "numpy (>=1.20)", "opencensus", "opentelemetry-api", "opentelemetry-exporter-otlp", "opentelemetry-sdk", "pandas", "pandas (>=1.3)", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)", "pydantic", "pyyaml", "ray-cpp (==2.5.0)", "requests", "rich", "scikit-image", "scipy", "smart-open", "starlette", "tensorboardX (>=1.9)", "typer", "urllib3", "uvicorn", "virtualenv (>=20.0.24,<20.21.1)"] -cpp = ["ray-cpp (==2.5.0)"] +all = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "dm-tree", "fastapi", "fsspec", "gpustat (>=1.0.0)", "gymnasium (==0.26.3)", "kubernetes", "lz4", "numpy (>=1.20)", "opencensus", "opentelemetry-api", "opentelemetry-exporter-otlp", "opentelemetry-sdk", "pandas", "pandas (>=1.3)", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)", "pydantic", "pyyaml", "ray-cpp (==2.5.1)", "requests", "rich", "scikit-image", "scipy", "smart-open", "starlette", "tensorboardX (>=1.9)", "typer", "urllib3", "uvicorn", "virtualenv (>=20.0.24,<20.21.1)"] +cpp = ["ray-cpp (==2.5.1)"] data = ["fsspec", "numpy (>=1.20)", "pandas (>=1.3)", "pyarrow (>=6.0.1)"] default = ["aiohttp (>=3.7)", "aiohttp-cors", "colorful", "gpustat (>=1.0.0)", "opencensus", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pydantic", "requests", "smart-open", "virtualenv (>=20.0.24,<20.21.1)"] k8s = ["kubernetes", "urllib3"] diff --git a/python/pyiceberg/catalog/__init__.py b/python/pyiceberg/catalog/__init__.py index 490d9c2060df..f1be22b99e82 100644 --- a/python/pyiceberg/catalog/__init__.py +++ b/python/pyiceberg/catalog/__init__.py @@ -40,7 +40,12 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import ToOutputFile -from pyiceberg.table import Table, TableMetadata +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + Table, + TableMetadata, +) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import ( EMPTY_DICT, @@ -322,6 +327,20 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U NoSuchTableError: If a table with the name does not exist. """ + @abstractmethod + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + """Updates one or more tables. + + Args: + table_request (CommitTableRequest): The table requests to be carried out. + + Returns: + CommitTableResponse: The updated metadata. + + Raises: + NoSuchTableError: If a table with the given identifier does not exist. + """ + @abstractmethod def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. @@ -392,7 +411,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper @abstractmethod def update_namespace_properties( - self, namespace: Union[str, Identifier], removals: set[str] | None = None, updates: Properties = EMPTY_DICT + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: """Removes provided property keys and updates properties for a namespace. diff --git a/python/pyiceberg/catalog/dynamodb.py b/python/pyiceberg/catalog/dynamodb.py index 642f6fbabf06..d11ffb5d3941 100644 --- a/python/pyiceberg/catalog/dynamodb.py +++ b/python/pyiceberg/catalog/dynamodb.py @@ -52,7 +52,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT @@ -168,6 +168,20 @@ def create_table( return self.load_table(identifier=identifier) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + """Updates the table. + + Args: + table_request (CommitTableRequest): The table requests to be carried out. + + Returns: + CommitTableResponse: The updated metadata. + + Raises: + NoSuchTableError: If a table with the given identifier does not exist. + """ + raise NotImplementedError + def load_table(self, identifier: Union[str, Identifier]) -> Table: """ Loads the table's metadata and returns the table instance. @@ -577,6 +591,7 @@ def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[st metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), + catalog=self, ) diff --git a/python/pyiceberg/catalog/glue.py b/python/pyiceberg/catalog/glue.py index 24c077615e62..7e06f2e47f0d 100644 --- a/python/pyiceberg/catalog/glue.py +++ b/python/pyiceberg/catalog/glue.py @@ -51,7 +51,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT @@ -170,6 +170,7 @@ def _convert_glue_to_iceberg(self, glue_table: Dict[str, Any]) -> Table: metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), + catalog=self, ) def _create_glue_table(self, database_name: str, table_name: str, table_input: Dict[str, Any]) -> None: @@ -224,6 +225,20 @@ def create_table( return self.load_table(identifier=identifier) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + """Updates the table. + + Args: + table_request (CommitTableRequest): The table requests to be carried out. + + Returns: + CommitTableResponse: The updated metadata. + + Raises: + NoSuchTableError: If a table with the given identifier does not exist. + """ + raise NotImplementedError + def load_table(self, identifier: Union[str, Identifier]) -> Table: """Loads the table's metadata and returns the table instance. diff --git a/python/pyiceberg/catalog/hive.py b/python/pyiceberg/catalog/hive.py index b8c91796a21d..839fb2a3d570 100644 --- a/python/pyiceberg/catalog/hive.py +++ b/python/pyiceberg/catalog/hive.py @@ -66,7 +66,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT @@ -243,6 +243,7 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), + catalog=self, ) def create_table( @@ -302,6 +303,20 @@ def create_table( return self._convert_hive_into_iceberg(hive_table, io) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + """Updates the table. + + Args: + table_request (CommitTableRequest): The table requests to be carried out. + + Returns: + CommitTableResponse: The updated metadata. + + Raises: + NoSuchTableError: If a table with the given identifier does not exist. + """ + raise NotImplementedError + def load_table(self, identifier: Union[str, Identifier]) -> Table: """Loads the table's metadata and returns the table instance. diff --git a/python/pyiceberg/catalog/noop.py b/python/pyiceberg/catalog/noop.py new file mode 100644 index 000000000000..bb93772aa759 --- /dev/null +++ b/python/pyiceberg/catalog/noop.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import ( + List, + Optional, + Set, + Union, +) + +from pyiceberg.catalog import Catalog, PropertiesUpdateSummary +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + SortOrder, + Table, +) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties + + +class NoopCatalog(Catalog): + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + raise NotImplementedError + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + raise NotImplementedError + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + raise NotImplementedError + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + raise NotImplementedError + + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + raise NotImplementedError + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + raise NotImplementedError + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + raise NotImplementedError + + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + raise NotImplementedError + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + raise NotImplementedError + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + raise NotImplementedError + + def update_namespace_properties( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT + ) -> PropertiesUpdateSummary: + raise NotImplementedError diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py index 4c36390171a3..56b8d9316ebf 100644 --- a/python/pyiceberg/catalog/rest.py +++ b/python/pyiceberg/catalog/rest.py @@ -22,6 +22,7 @@ Literal, Optional, Set, + Tuple, Type, Union, ) @@ -42,6 +43,8 @@ from pyiceberg.exceptions import ( AuthorizationExpiredError, BadRequestError, + CommitFailedException, + CommitStateUnknownException, ForbiddenError, NamespaceAlreadyExistsError, NoSuchNamespaceError, @@ -55,7 +58,12 @@ ) from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import Table, TableMetadata +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + Table, + TableMetadata, +) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel @@ -68,7 +76,7 @@ class Endpoints: create_namespace: str = "namespaces" load_namespace_metadata: str = "namespaces/{namespace}" drop_namespace: str = "namespaces/{namespace}" - update_properties: str = "namespaces/{namespace}/properties" + update_namespace_properties: str = "namespaces/{namespace}/properties" list_tables: str = "namespaces/{namespace}/tables" create_table: str = "namespaces/{namespace}/tables" load_table: str = "namespaces/{namespace}/tables/{table}" @@ -393,6 +401,17 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin session.mount(self.uri, SigV4Adapter(**self.properties)) + def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> Table: + return Table( + identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple, + metadata_location=table_response.metadata_location, + metadata=table_response.metadata, + io=self._load_file_io( + {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + ), + catalog=self, + ) + def create_table( self, identifier: Union[str, Identifier], @@ -422,15 +441,7 @@ def create_table( self._handle_non_200_response(exc, {409: TableAlreadyExistsError}) table_response = TableResponse(**response.json()) - - return Table( - identifier=(self.name,) + self.identifier_to_tuple(identifier), - metadata_location=table_response.metadata_location, - metadata=table_response.metadata, - io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location - ), - ) + return self._response_to_table(self.identifier_to_tuple(identifier), table_response) def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -455,14 +466,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: self._handle_non_200_response(exc, {404: NoSuchTableError}) table_response = TableResponse(**response.json()) - return Table( - identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple, - metadata_location=table_response.metadata_location, - metadata=table_response.metadata, - io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location - ), - ) + return self._response_to_table(identifier_tuple, table_response) def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: response = self._session.delete( @@ -489,6 +493,36 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U return self.load_table(to_identifier) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + """Updates the table. + + Args: + table_request (CommitTableRequest): The table requests to be carried out. + + Returns: + CommitTableResponse: The updated metadata. + + Raises: + NoSuchTableError: If a table with the given identifier does not exist. + """ + response = self._session.post( + self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), + data=table_request.json(), + ) + try: + response.raise_for_status() + except HTTPError as exc: + self._handle_non_200_response( + exc, + { + 409: CommitFailedException, + 500: CommitStateUnknownException, + 502: CommitStateUnknownException, + 504: CommitStateUnknownException, + }, + ) + return CommitTableResponse(**response.json()) + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} @@ -541,7 +575,7 @@ def update_namespace_properties( namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) payload = {"removals": list(removals or []), "updates": updates} - response = self._session.post(self.url(Endpoints.update_properties, namespace=namespace), json=payload) + response = self._session.post(self.url(Endpoints.update_namespace_properties, namespace=namespace), json=payload) try: response.raise_for_status() except HTTPError as exc: diff --git a/python/pyiceberg/exceptions.py b/python/pyiceberg/exceptions.py index 65b20caa8891..f5555437234d 100644 --- a/python/pyiceberg/exceptions.py +++ b/python/pyiceberg/exceptions.py @@ -102,3 +102,11 @@ class ConditionalCheckFailedException(DynamoDbError): class GenericDynamoDbError(DynamoDbError): pass + + +class CommitFailedException(RESTError): + """Commit failed, refresh and try again.""" + + +class CommitStateUnknownException(RESTError): + """Commit failed due to unknown reason.""" diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py index 9c095a5319e5..0d433a65c6b2 100644 --- a/python/pyiceberg/table/__init__.py +++ b/python/pyiceberg/table/__init__.py @@ -18,6 +18,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass +from enum import Enum from functools import cached_property from itertools import chain from multiprocessing.pool import ThreadPool @@ -28,6 +29,7 @@ Dict, Iterable, List, + Literal, Optional, Set, Tuple, @@ -63,6 +65,7 @@ from pyiceberg.table.sorting import SortOrder from pyiceberg.typedef import ( EMPTY_DICT, + IcebergBaseModel, Identifier, KeyDefaultDict, Properties, @@ -74,24 +77,327 @@ import ray from duckdb import DuckDBPyConnection + from pyiceberg.catalog import Catalog + + ALWAYS_TRUE = AlwaysTrue() +class Transaction: + _table: Table + _updates: Tuple[TableUpdate, ...] + _requirements: Tuple[TableRequirement, ...] + + def __init__( + self, + table: Table, + actions: Optional[Tuple[TableUpdate, ...]] = None, + requirements: Optional[Tuple[TableRequirement, ...]] = None, + ): + self._table = table + self._updates = actions or () + self._requirements = requirements or () + + def __enter__(self) -> Transaction: + return self + + def __exit__(self, _: Any, value: Any, traceback: Any) -> None: + fresh_table = self.commit_transaction() + # Update the new data in place + self._table.metadata = fresh_table.metadata + self._table.metadata_location = fresh_table.metadata_location + + def _append_updates(self, *new_updates: TableUpdate) -> Transaction: + """Appends updates to the set of staged updates. + + Args: + *new_updates: Any new updates. + + Raises: + ValueError: When the type of update is not unique. + + Returns: + A new AlterTable object with the new updates appended. + """ + for new_update in new_updates: + type_new_update = type(new_update) + if any(type(update) == type_new_update for update in self._updates): + raise ValueError(f"Updates in a single commit need to be unique, duplicate: {type_new_update}") + self._updates = self._updates + new_updates + return self + + def set_table_version(self, format_version: Literal[1, 2]) -> Transaction: + """Sets the table to a certain version. + + Args: + format_version: The newly set version. + + Returns: + The alter table builder. + """ + raise NotImplementedError("Not yet implemented") + + def set_properties(self, **updates: str) -> Transaction: + """Set properties. + + When a property is already set, it will be overwritten. + + Args: + updates: The properties set on the table. + + Returns: + The alter table builder. + """ + return self._append_updates(SetPropertiesUpdate(updates=updates)) + + def remove_properties(self, *removals: str) -> Transaction: + """Removes properties. + + Args: + removals: Properties to be removed. + + Returns: + The alter table builder. + """ + return self._append_updates(RemovePropertiesUpdate(removals=removals)) + + def update_location(self, location: str) -> Transaction: + """Sets the new table location. + + Args: + location: The new location of the table. + + Returns: + The alter table builder. + """ + raise NotImplementedError("Not yet implemented") + + def commit_transaction(self) -> Table: + """Commits the changes to the catalog. + + Returns: + The table with the updates applied. + """ + # Strip the catalog name + if len(self._updates) > 0: + response = self._table.catalog._commit_table( # pylint: disable=W0212 + CommitTableRequest( + identifier=self._table.identifier[1:], + requirements=self._requirements, + updates=self._updates, + ) + ) + # Update the metadata with the new one + self._table.metadata = response.metadata + self._table.metadata_location = response.metadata_location + + return self._table + else: + return self._table + + +class TableUpdateAction(Enum): + upgrade_format_version = "upgrade-format-version" + add_schema = "add-schema" + set_current_schema = "set-current-schema" + add_spec = "add-spec" + set_default_spec = "set-default-spec" + add_sort_order = "add-sort-order" + set_default_sort_order = "set-default-sort-order" + add_snapshot = "add-snapshot" + set_snapshot_ref = "set-snapshot-ref" + remove_snapshots = "remove-snapshots" + remove_snapshot_ref = "remove-snapshot-ref" + set_location = "set-location" + set_properties = "set-properties" + remove_properties = "remove-properties" + + +class TableUpdate(IcebergBaseModel): + action: TableUpdateAction + + +class UpgradeFormatVersionUpdate(TableUpdate): + action = TableUpdateAction.upgrade_format_version + format_version: int = Field(alias="format-version") + + +class AddSchemaUpdate(TableUpdate): + action = TableUpdateAction.add_schema + schema_: Schema = Field(alias="schema") + + +class SetCurrentSchemaUpdate(TableUpdate): + action = TableUpdateAction.set_current_schema + schema_id: int = Field( + alias="schema-id", description="Schema ID to set as current, or -1 to set last added schema", default=-1 + ) + + +class AddPartitionSpecUpdate(TableUpdate): + action = TableUpdateAction.add_spec + spec: PartitionSpec + + +class SetDefaultSpecUpdate(TableUpdate): + action = TableUpdateAction.set_default_spec + spec_id: int = Field( + alias="spec-id", description="Partition spec ID to set as the default, or -1 to set last added spec", default=-1 + ) + + +class AddSortOrderUpdate(TableUpdate): + action = TableUpdateAction.add_sort_order + sort_order: SortOrder = Field(alias="sort-order") + + +class SetDefaultSortOrderUpdate(TableUpdate): + action = TableUpdateAction.set_default_sort_order + sort_order_id: int = Field( + alias="sort-order-id", description="Sort order ID to set as the default, or -1 to set last added sort order", default=-1 + ) + + +class AddSnapshotUpdate(TableUpdate): + action = TableUpdateAction.add_snapshot + snapshot: Snapshot + + +class SetSnapshotRefUpdate(TableUpdate): + action = TableUpdateAction.set_snapshot_ref + ref_name: str = Field(alias="ref-name") + type: Literal["tag", "branch"] + snapshot_id: int = Field(alias="snapshot-id") + max_age_ref_ms: int = Field(alias="max-ref-age-ms") + max_snapshot_age_ms: int = Field(alias="max-snapshot-age-ms") + min_snapshots_to_keep: int = Field(alias="min-snapshots-to-keep") + + +class RemoveSnapshotsUpdate(TableUpdate): + action = TableUpdateAction.remove_snapshots + snapshot_ids: List[int] = Field(alias="snapshot-ids") + + +class RemoveSnapshotRefUpdate(TableUpdate): + action = TableUpdateAction.remove_snapshot_ref + ref_name: str = Field(alias="ref-name") + + +class SetLocationUpdate(TableUpdate): + action = TableUpdateAction.set_location + location: str + + +class SetPropertiesUpdate(TableUpdate): + action = TableUpdateAction.set_properties + updates: Dict[str, str] + + +class RemovePropertiesUpdate(TableUpdate): + action = TableUpdateAction.remove_properties + removals: List[str] + + +class TableRequirement(IcebergBaseModel): + type: str + + +class AssertCreate(TableRequirement): + """The table must not already exist; used for create transactions.""" + + type: Literal["assert-create"] + + +class AssertTableUUID(TableRequirement): + """The table UUID must match the requirement's `uuid`.""" + + type: Literal["assert-table-uuid"] + uuid: str + + +class AssertRefSnapshotId(TableRequirement): + """The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`. + + if `snapshot-id` is `null` or missing, the ref must not already exist. + """ + + type: Literal["assert-ref-snapshot-id"] + ref: str + snapshot_id: int = Field(..., alias="snapshot-id") + + +class AssertLastAssignedFieldId(TableRequirement): + """The table's last assigned column id must match the requirement's `last-assigned-field-id`.""" + + type: Literal["assert-last-assigned-field-id"] + last_assigned_field_id: int = Field(..., alias="last-assigned-field-id") + + +class AssertCurrentSchemaId(TableRequirement): + """The table's current schema id must match the requirement's `current-schema-id`.""" + + type: Literal["assert-current-schema-id"] + current_schema_id: int = Field(..., alias="current-schema-id") + + +class AssertLastAssignedPartitionId(TableRequirement): + """The table's last assigned partition id must match the requirement's `last-assigned-partition-id`.""" + + type: Literal["assert-last-assigned-partition-id"] + last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id") + + +class AssertDefaultSpecId(TableRequirement): + """The table's default spec id must match the requirement's `default-spec-id`.""" + + type: Literal["assert-default-spec-id"] + default_spec_id: int = Field(..., alias="default-spec-id") + + +class AssertDefaultSortOrderId(TableRequirement): + """The table's default sort order id must match the requirement's `default-sort-order-id`.""" + + type: Literal["assert-default-sort-order-id"] + default_sort_order_id: int = Field(..., alias="default-sort-order-id") + + +class CommitTableRequest(IcebergBaseModel): + identifier: Identifier = Field() + requirements: List[TableRequirement] = Field(default_factory=list) + updates: List[TableUpdate] = Field(default_factory=list) + + +class CommitTableResponse(IcebergBaseModel): + metadata: TableMetadata = Field() + metadata_location: str = Field(alias="metadata-location") + + class Table: identifier: Identifier = Field() metadata: TableMetadata = Field() metadata_location: str = Field() io: FileIO + catalog: Catalog - def __init__(self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO) -> None: + def __init__( + self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO, catalog: Catalog + ) -> None: self.identifier = identifier self.metadata = metadata self.metadata_location = metadata_location self.io = io + self.catalog = catalog + + def transaction(self) -> Transaction: + return Transaction(self) def refresh(self) -> Table: """Refresh the current table metadata.""" - raise NotImplementedError("To be implemented") + fresh = self.catalog.load_table(self.identifier[1:]) + self.metadata = fresh.metadata + self.io = fresh.io + self.metadata_location = fresh.metadata_location + return self def name(self) -> Identifier: """Return the identifier of this table.""" @@ -142,6 +448,11 @@ def sort_orders(self) -> Dict[int, SortOrder]: """Return a dict of the sort orders of this table.""" return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders} + @property + def properties(self) -> Dict[str, str]: + """Properties of the table.""" + return self.metadata.properties + def location(self) -> str: """Return the table's base location.""" return self.metadata.location @@ -195,11 +506,14 @@ def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DI metadata = FromInputFile.table_metadata(file) + from pyiceberg.catalog.noop import NoopCatalog + return cls( identifier=("static-table", metadata_location), metadata_location=metadata_location, metadata=metadata, io=load_file_io({**properties, **metadata.properties}), + catalog=NoopCatalog("static-table"), ) diff --git a/python/tests/catalog/test_base.py b/python/tests/catalog/test_base.py index 742549a6543c..29b63e0900e8 100644 --- a/python/tests/catalog/test_base.py +++ b/python/tests/catalog/test_base.py @@ -42,7 +42,7 @@ from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table from pyiceberg.table.metadata import TableMetadataV1 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.transforms import IdentityTransform @@ -103,10 +103,14 @@ def create_table( ), metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', io=load_file_io(), + catalog=self, ) self.__tables[identifier] = table return table + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + raise NotImplementedError + def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier = Catalog.identifier_to_tuple(identifier) try: @@ -141,6 +145,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U metadata=table.metadata, metadata_location=table.metadata_location, io=load_file_io(), + catalog=self, ) return self.__tables[to_identifier] diff --git a/python/tests/catalog/test_rest.py b/python/tests/catalog/test_rest.py index 6f3cafffc175..a7663ac5117f 100644 --- a/python/tests/catalog/test_rest.py +++ b/python/tests/catalog/test_rest.py @@ -408,7 +408,8 @@ def test_load_table_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - actual = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "table")) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + actual = catalog.load_table(("fokko", "table")) expected = Table( identifier=("rest", "fokko", "table"), metadata_location="s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", @@ -484,6 +485,7 @@ def test_load_table_200(rest_mock: Mocker) -> None: partition_spec=[], ), io=load_file_io(), + catalog=catalog, ) assert actual == expected @@ -585,7 +587,8 @@ def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> Non status_code=200, request_headers=TEST_HEADERS, ) - table = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table( + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.create_table( identifier=("fokko", "fokko2"), schema=table_schema_simple, location=None, @@ -639,6 +642,7 @@ def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> Non partition_spec=[], ), io=load_file_io(), + catalog=catalog, ) diff --git a/python/tests/cli/test_console.py b/python/tests/cli/test_console.py index 555071dd0dc2..4aaec26a78ee 100644 --- a/python/tests/cli/test_console.py +++ b/python/tests/cli/test_console.py @@ -26,12 +26,13 @@ from click.testing import CliRunner from pyiceberg.catalog import Catalog, PropertiesUpdateSummary +from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.cli.console import run from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -115,8 +116,12 @@ def create_table( metadata_location="s3://tmp/", metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2), io=load_file_io(), + catalog=self, ) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + raise NotImplementedError + def load_table(self, identifier: Union[str, Identifier]) -> Table: tuple_identifier = Catalog.identifier_to_tuple(identifier) if tuple_identifier == ("default", "foo"): @@ -125,6 +130,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: metadata_location="s3://tmp/", metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2), io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), ) else: raise NoSuchTableError(f"Table does not exist: {'.'.join(tuple_identifier)}") @@ -147,6 +153,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U metadata_location="s3://tmp/", metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2), io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), ) else: raise NoSuchTableError(f"Table does not exist: {from_identifier}") diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py index 3792aea03e9d..7a07532c6632 100644 --- a/python/tests/io/test_pyarrow.py +++ b/python/tests/io/test_pyarrow.py @@ -27,6 +27,7 @@ from pyarrow.fs import FileType, LocalFileSystem from pyiceberg.avro.resolver import ResolveError +from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.expressions import ( AlwaysFalse, AlwaysTrue, @@ -821,6 +822,7 @@ def project( ), metadata_location="file://a/b/c.json", io=PyArrowFileIO(), + catalog=NoopCatalog("NoopCatalog"), ), expr or AlwaysTrue(), schema, @@ -1232,6 +1234,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp ), metadata_location=metadata_location, io=load_file_io(), + catalog=NoopCatalog("noop"), ), row_filter=AlwaysTrue(), projected_schema=table_schema_simple, @@ -1274,6 +1277,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ ), metadata_location=metadata_location, io=load_file_io(), + catalog=NoopCatalog("noop"), ), row_filter=AlwaysTrue(), projected_schema=table_schema_simple, @@ -1308,6 +1312,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc ), metadata_location=metadata_location, io=load_file_io(properties={"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location), + catalog=NoopCatalog("NoopCatalog"), ), case_sensitive=True, projected_schema=table_schema_simple, diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py index 36a518076b0c..b421f8149376 100644 --- a/python/tests/table/test_init.py +++ b/python/tests/table/test_init.py @@ -20,6 +20,7 @@ import pytest from sortedcontainers import SortedList +from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.expressions import ( AlwaysTrue, And, @@ -62,6 +63,7 @@ def table(example_table_metadata_v2: Dict[str, Any]) -> Table: metadata=table_metadata, metadata_location=f"{table_metadata.location}/uuid.metadata.json", io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), ) diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index 14b3a36bdae6..36099208190b 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -24,6 +24,7 @@ from pyarrow.fs import S3FileSystem from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import ( And, GreaterThanOrEqual, @@ -34,6 +35,13 @@ from pyiceberg.io.pyarrow import pyarrow_to_schema from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.types import ( + BooleanType, + IntegerType, + NestedField, + StringType, + TimestampType, +) @pytest.fixture() @@ -70,6 +78,50 @@ def table_test_all_types(catalog: Catalog) -> Table: return catalog.load_table("default.test_all_types") +TABLE_NAME = ("default", "t1") + + +@pytest.fixture() +def table(catalog: Catalog) -> Table: + try: + catalog.drop_table(TABLE_NAME) + except NoSuchTableError: + pass # Just to make sure that the table doesn't exist + + schema = Schema( + NestedField(field_id=1, name="str", field_type=StringType(), required=False), + NestedField(field_id=2, name="int", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="bool", field_type=BooleanType(), required=False), + NestedField(field_id=4, name="datetime", field_type=TimestampType(), required=False), + schema_id=1, + ) + + return catalog.create_table(identifier=TABLE_NAME, schema=schema) + + +@pytest.mark.integration +def test_table_properties(table: Table) -> None: + assert table.properties == {} + + with table.transaction() as transaction: + transaction.set_properties(abc="🤪") + + assert table.properties == {"abc": "🤪"} + + with table.transaction() as transaction: + transaction.remove_properties("abc") + + assert table.properties == {} + + table = table.transaction().set_properties(abc="def").commit_transaction() + + assert table.properties == {"abc": "def"} + + table = table.transaction().remove_properties("abc").commit_transaction() + + assert table.properties == {} + + @pytest.fixture() def test_positional_mor_deletes(catalog: Catalog) -> Table: """Table that has positional deletes"""