diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py index 9af14b8dac..cb552505e8 100644 --- a/dlt/common/libs/deltalake.py +++ b/dlt/common/libs/deltalake.py @@ -118,6 +118,7 @@ def merge_delta_table( table: DeltaTable, data: Union[pa.Table, pa.RecordBatchReader], schema: TTableSchema, + load_table_name: str, ) -> None: """Merges in-memory Arrow data into on-disk Delta table.""" @@ -149,7 +150,10 @@ def merge_delta_table( qry.execute() else: - ValueError(f'Merge strategy "{strategy}" not supported.') + raise ValueError( + f'Merge strategy "{strategy}" is not supported for Delta tables. ' + f'Table: "{load_table_name}".' + ) def get_delta_tables( diff --git a/dlt/common/libs/pyiceberg.py b/dlt/common/libs/pyiceberg.py index ddfc89cce7..e4c0e9bee8 100644 --- a/dlt/common/libs/pyiceberg.py +++ b/dlt/common/libs/pyiceberg.py @@ -10,7 +10,8 @@ from dlt.common.libs.pyarrow import cast_arrow_schema_types from dlt.common.libs.utils import load_open_tables from dlt.common.pipeline import SupportsPipeline -from dlt.common.schema.typing import TWriteDisposition +from dlt.common.schema.typing import TWriteDisposition, TTableSchema +from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop from dlt.common.utils import assert_min_pkg_version from dlt.common.exceptions import MissingDependencyException from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration @@ -25,6 +26,7 @@ from pyiceberg.catalog import Catalog as IcebergCatalog from pyiceberg.exceptions import NoSuchTableError import pyarrow as pa + import pyiceberg.io.pyarrow as _pio except ModuleNotFoundError: raise MissingDependencyException( "dlt pyiceberg helpers", @@ -33,6 +35,20 @@ ) +# TODO: remove with pyiceberg's release after 0.9.1 +_orig_get_kwargs = _pio._get_parquet_writer_kwargs + + +def _patched_get_parquet_writer_kwargs(table_properties): # type: ignore[no-untyped-def] + """Return the original kwargs **plus** store_decimal_as_integer=True.""" + kwargs = _orig_get_kwargs(table_properties) + kwargs.setdefault("store_decimal_as_integer", True) + return kwargs + + +_pio._get_parquet_writer_kwargs = _patched_get_parquet_writer_kwargs + + def ensure_iceberg_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema: ARROW_TO_ICEBERG_COMPATIBLE_ARROW_TYPE_MAP = { pa.types.is_time32: pa.time64("us"), @@ -63,6 +79,43 @@ def write_iceberg_table( ) +def merge_iceberg_table( + table: IcebergTable, + data: pa.Table, + schema: TTableSchema, + load_table_name: str, +) -> None: + """Merges in-memory Arrow data into on-disk Iceberg table.""" + strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item] + if strategy == "upsert": + # evolve schema + with table.update_schema() as update: + update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema)) + + if "parent" in schema: + join_cols = [get_first_column_name_with_prop(schema, "unique")] + else: + join_cols = get_columns_names_with_prop(schema, "primary_key") + + # TODO: replace the batching method with transaction with pyiceberg's release after 0.9.1 + for rb in data.to_batches(max_chunksize=1_000): + batch_tbl = pa.Table.from_batches([rb]) + batch_tbl = ensure_iceberg_compatible_arrow_data(batch_tbl) + + table.upsert( + df=batch_tbl, + join_cols=join_cols, + when_matched_update_all=True, + when_not_matched_insert_all=True, + case_sensitive=True, + ) + else: + raise ValueError( + f'Merge strategy "{strategy}" is not supported for Iceberg tables. ' + f'Table: "{load_table_name}".' + ) + + def get_sql_catalog( catalog_name: str, uri: str, diff --git a/dlt/destinations/impl/filesystem/factory.py b/dlt/destinations/impl/filesystem/factory.py index aa075ed7b9..9e0d62257e 100644 --- a/dlt/destinations/impl/filesystem/factory.py +++ b/dlt/destinations/impl/filesystem/factory.py @@ -30,7 +30,7 @@ def filesystem_merge_strategies_selector( *, table_schema: TTableSchema, ) -> Sequence[TLoaderMergeStrategy]: - if table_schema.get("table_format") == "delta": + if table_schema.get("table_format") in ["delta", "iceberg"]: return supported_merge_strategies else: return [] diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 0a9a3acafd..71c90d0192 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -190,6 +190,7 @@ def run(self) -> None: table=delta_table, data=arrow_rbr, schema=self._load_table, + load_table_name=self.load_table_name, ) else: location = self._job_client.get_open_table_location("delta", self.load_table_name) @@ -212,7 +213,7 @@ def run(self) -> None: class IcebergLoadFilesystemJob(TableFormatLoadFilesystemJob): def run(self) -> None: - from dlt.common.libs.pyiceberg import write_iceberg_table, create_table + from dlt.common.libs.pyiceberg import write_iceberg_table, merge_iceberg_table, create_table try: table = self._job_client.load_open_table( @@ -234,11 +235,19 @@ def run(self) -> None: self.run() return - write_iceberg_table( - table=table, - data=self.arrow_dataset.to_table(), - write_disposition=self._load_table["write_disposition"], - ) + if self._load_table["write_disposition"] == "merge" and table is not None: + merge_iceberg_table( + table=table, + data=self.arrow_dataset.to_table(), + schema=self._load_table, + load_table_name=self.load_table_name, + ) + else: + write_iceberg_table( + table=table, + data=self.arrow_dataset.to_table(), + write_disposition=self._load_table["write_disposition"], + ) class FilesystemLoadJobWithFollowup(HasFollowupJobs, FilesystemLoadJob): diff --git a/docs/website/docs/dlt-ecosystem/destinations/iceberg.md b/docs/website/docs/dlt-ecosystem/destinations/iceberg.md index 273065a7d4..1220c63669 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/iceberg.md +++ b/docs/website/docs/dlt-ecosystem/destinations/iceberg.md @@ -120,4 +120,18 @@ The [S3-compatible](./filesystem.md#using-s3-compatible-storage) interface for G The `az` [scheme](./filesystem.md#supported-schemes) is not supported when using the `iceberg` table format. Please use the `abfss` scheme. This is because `pyiceberg`, which dlt used under the hood, currently does not support `az`. ## Table format `merge` support -The `merge` write disposition is not supported for Iceberg and falls back to `append`. If you're interested in support for the `merge` write disposition with Iceberg, check out [dlt+ Iceberg destination](../../plus/ecosystem/iceberg.md). +The [`upsert`](../../general-usage/merge-loading.md#upsert-strategy) merge strategy is supported for `iceberg`. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys. + +:::warning +Until _pyiceberg_ > 0.9.1 is released, upsert is executed in chunks of **1000** rows. +::: + +```py +@dlt.resource( + write_disposition={"disposition": "merge", "strategy": "upsert"}, + primary_key="my_primary_key", + table_format="iceberg" +) +def my_upsert_resource(): + ... +``` diff --git a/docs/website/docs/general-usage/merge-loading.md b/docs/website/docs/general-usage/merge-loading.md index 7da28b7876..42037d404f 100644 --- a/docs/website/docs/general-usage/merge-loading.md +++ b/docs/website/docs/general-usage/merge-loading.md @@ -554,7 +554,7 @@ The `upsert` merge strategy is currently supported for these destinations: - `mssql` - `postgres` - `snowflake` -- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/delta-iceberg#known-limitations)) +- `filesystem` with `delta` table format (see limitations [here](../dlt-ecosystem/destinations/delta-iceberg#known-limitations)) and `iceberg` table format ::: The `upsert` merge strategy does primary-key based *upserts*: diff --git a/poetry.lock b/poetry.lock index 72add2a121..d56732a0f1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "about-time" @@ -8378,40 +8378,40 @@ plugins = ["importlib-metadata"] [[package]] name = "pyiceberg" -version = "0.9.0" +version = "0.9.1" description = "Apache Iceberg is an open table format for huge analytic datasets" optional = true python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,!=3.8.*,>=3.9" files = [ - {file = "pyiceberg-0.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b454d186c33aa3f0d03e4fa888df50d4861ffa4cdcc7c6f766237485d9a091d9"}, - {file = "pyiceberg-0.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e4f6800f8bd5cb30fd095cf58498b45d8c42709330a0ce72df4e92e030eba402"}, - {file = "pyiceberg-0.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c7a7f83805dfc3af8aaaa88ac7d208aafe5005400cb9238d2195d8b7113927ef"}, - {file = "pyiceberg-0.9.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:218d31b81c91cd3acf775bd796f8c02740b4bdb8a7bde7278029710c94eb136a"}, - {file = "pyiceberg-0.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:f3680ac4aa6bada5a6823d4ded1e78ac86207fd3b275ca1a688bad5cb9191c3b"}, - {file = "pyiceberg-0.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0e37f2dc0fef4fba1a51e5a7c87d3aee5bb98bdd82cde9f219b5542201919055"}, - {file = "pyiceberg-0.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b9d4939c41daf94562b9a29ef322fe42e1aa2c886a23cefe23b5f013f27b3854"}, - {file = "pyiceberg-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:91c86e00684427d02ace00fb765af13f75bbff3dd813a6e3928f2974b0ff150c"}, - {file = "pyiceberg-0.9.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:d5c4d6819b2668c3da82683a8f0e69b282b8092c390d7b2c2c99d6234905574c"}, - {file = "pyiceberg-0.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:a1832f49831d92aac3f62462f2d5fbad05eeb5e93f25e0e308c0d8053cab9fa6"}, - {file = "pyiceberg-0.9.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:6b868726045ccc013a723130aaa7cf2f2ddeae359930b0c54de8bc29f7103326"}, - {file = "pyiceberg-0.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:785b5ee8d00b1f38c8643f9c1ca22f2dd034cf9610804972fddfc6ac97ced002"}, - {file = "pyiceberg-0.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d6630cac07feb5894c2311be5ca62ffa3432803878fb112ae47c1d3edbd08609"}, - {file = "pyiceberg-0.9.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:ac640aa29f57b2cb282f9a25427b73373d6fb54e82a589e8cc616f90e6f5e5b7"}, - {file = "pyiceberg-0.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:c13328f6b5bd5210e845e6a69977b38f2d0272ed431d27c825c587b6d7999b5e"}, - {file = "pyiceberg-0.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:868c795b9bb49cea30b32cee4ba3fceb346664e24abbba5a3c0330a0015388c2"}, - {file = "pyiceberg-0.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:58ceef4fbacf4eda19e2b84a9a850ffc661b489e08d5010a2c206583f387df83"}, - {file = "pyiceberg-0.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:38d221a963907a4f706fbd811e638e451efd4491952166550664df156e1ca02c"}, - {file = "pyiceberg-0.9.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:b7b4de0b94d5f4c83bab443aa449a1714f784953d56f415380a8bc4b5e14c988"}, - {file = "pyiceberg-0.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:c3bca11ccabfa98a17962b4ffe6d3eaaa83f66d6b997b79c20966907b9c7ccb0"}, - {file = "pyiceberg-0.9.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:6d818b01ab259f4892e486b960e999b7a724b6829f9e3919d2ec454f5f3f857b"}, - {file = "pyiceberg-0.9.0-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:8161dc350e885d7bdc46f4fb4e9698bf1a84861056687823d53eaeed217e4324"}, - {file = "pyiceberg-0.9.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f3bf765b91e96f66a01205a87cd8fd0eb8ffb148fdd9bf621d9a2a3249336116"}, - {file = "pyiceberg-0.9.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:a9a8699dbdec4ee81ac4dfc77d7489bffac3a7625a28df296657cec1edf79d6d"}, - {file = "pyiceberg-0.9.0-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:821c8ff026819038780559207cd32ee0500f719fd51ed2a1ab919b21a60ce5f2"}, - {file = "pyiceberg-0.9.0-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:2ed7af929ba1b8faef98113b8da0512914450bdcb90d2fb46efe5319800c36ad"}, - {file = "pyiceberg-0.9.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:936fea58f468359a58e9fd03b7d6b1136bf6c5163a5a666e5ea43ebe70a0dba0"}, - {file = "pyiceberg-0.9.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:76581d226ae67d8be5210bdab60dcdd8fc3a4d6745192a2b446eb746201abdb3"}, - {file = "pyiceberg-0.9.0.tar.gz", hash = "sha256:70d255903dda31ed1f7753d41fec0c031aae36ef95e8a824cdae7df593439d8b"}, + {file = "pyiceberg-0.9.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0a183d9217eb82159c01b23c683057f96c8b2375f592b921721d1c157895e2df"}, + {file = "pyiceberg-0.9.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:57030bb15c397b0379242907c5611f5b4338fb799e972353fd0edafde6cfd2ef"}, + {file = "pyiceberg-0.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3ba4cd9a8f6a04cfbc68e0c83f2db3ffd14244da8601a142cc05965d4b343645"}, + {file = "pyiceberg-0.9.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:d5a48c6a2016d0dcde8c9079cc5e6b6d2e2ac663eddfe4697e7ea03a0edc40b7"}, + {file = "pyiceberg-0.9.1-cp310-cp310-win_amd64.whl", hash = "sha256:8bebfa5a804a95a9f3d98d88cbeb37430b09add04592238bba2a2b2e0466d60d"}, + {file = "pyiceberg-0.9.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0e75c502dd56ac3d77036ce8a3b2566348da5ff4367c7c671981616ef6dcc883"}, + {file = "pyiceberg-0.9.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0a8189c9b3ba81dd12493d6bb874a656a4d4909904552b97a629d1d43b3a0e90"}, + {file = "pyiceberg-0.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7c03065d5c5b704444ab8fb18cdd232ec43994db95b9e53444008ebc2cf9dc2c"}, + {file = "pyiceberg-0.9.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:93f2586a5da737de6e4643bf096a01772f068d1eedb7ffde6b36c60b6b9e6bd3"}, + {file = "pyiceberg-0.9.1-cp311-cp311-win_amd64.whl", hash = "sha256:94e45c10051110ba7a43b85a1f0a680b4a31d1d6cee593c8e62e14d22d18c47d"}, + {file = "pyiceberg-0.9.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b8a958e3bbe919026533cee1f0fb6b7040928fce8d42c2ecea228de7c17578fa"}, + {file = "pyiceberg-0.9.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b7e956b35c6822600c45fd8f3ea8cfea328cc406fefa534afeb6fdb325d05406"}, + {file = "pyiceberg-0.9.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1e4e585164d7d86f5c9a609a1bc2abeae2f0ea0680a11a2064d3a945866b5311"}, + {file = "pyiceberg-0.9.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:5fee08dac30e8524526f7d18468f9670f8606905b850b261314c597c6633f3b4"}, + {file = "pyiceberg-0.9.1-cp312-cp312-win_amd64.whl", hash = "sha256:124793c54a0c2fb5ac4ab19c38da116c068e277c85cbaa7e4064e635a70b595e"}, + {file = "pyiceberg-0.9.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a6e29eb5ce63e8a14738f3efeb54022093456e02b681f0b8c815f7ef9e20ddcb"}, + {file = "pyiceberg-0.9.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:1ebd4f74da8a3f7b78ad746c1d91d8cd9aa9cf97f4d36da164e3550f6a06b00e"}, + {file = "pyiceberg-0.9.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b357638a58d9b0a5d7018fbe88fa84469c980c80d86441b7b9cd99871512447d"}, + {file = "pyiceberg-0.9.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:f8a93c1e4ab35195018ce8fbbb6d973e099194ffe06d859bdf069d7b846da7aa"}, + {file = "pyiceberg-0.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:5c1b3598d521476ffce13949ae762a3dec49287198b26de445caa0daf2e395fa"}, + {file = "pyiceberg-0.9.1-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:77aec1c77d675603e0c5358e74adcae8d13b323753d702011be3f309d26af355"}, + {file = "pyiceberg-0.9.1-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:cf567438bf6267bbb67fdfdfc72ac500d523725fca9a6a38f93e8acd4146190e"}, + {file = "pyiceberg-0.9.1-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5992db7c00d789a33ff117700d453126803e769507a5edeb79bb6510ff72fc00"}, + {file = "pyiceberg-0.9.1-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:c9e460fca26162a3822c0e8d50b49c80928a0e35cb41698748d7a26f8c016215"}, + {file = "pyiceberg-0.9.1-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:037aa7a8bfaf7f1482e6a3532217b5f4281bc81db6698c3ea87771d0453a8232"}, + {file = "pyiceberg-0.9.1-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:5150464428a0568c4f46405884bc777dde37935580fb72b0030dfa28805d82e7"}, + {file = "pyiceberg-0.9.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:af2a6c273cfaf2b21b319fcf79489f87604220a0497942303b2a715a9d0f29e9"}, + {file = "pyiceberg-0.9.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:023c3fcee36a441b7e20418b6e9cdc6f904141bfda09f8580dfe022d7faa7a53"}, + {file = "pyiceberg-0.9.1.tar.gz", hash = "sha256:3634134ce33859a441768b39df179b2c6f3de2bbbf506622884f553b013ee799"}, ] [package.dependencies] @@ -8435,7 +8435,7 @@ dynamodb = ["boto3 (>=1.24.59)"] gcsfs = ["gcsfs (>=2023.1.0)"] glue = ["boto3 (>=1.24.59)", "mypy-boto3-glue (>=1.28.18)"] hive = ["thrift (>=0.13.0,<1.0.0)"] -hive-kerberos = ["thrift (>=0.13.0,<1.0.0)", "thrift-sasl (>=0.4.3)"] +hive-kerberos = ["kerberos (>=1.3.1,<2.0.0)", "thrift (>=0.13.0,<1.0.0)", "thrift-sasl (>=0.4.3)"] pandas = ["pandas (>=1.0.0,<3.0.0)", "pyarrow (>=17.0.0,<20.0.0)"] polars = ["polars (>=1.21.0,<2.0.0)"] pyarrow = ["pyarrow (>=17.0.0,<20.0.0)"] @@ -11887,5 +11887,5 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" -python-versions = ">=3.9,<3.14" -content-hash = "e24fd97cbc5cfa7b289ca89f68bcbb027e523c42f3bedeb7ddbfa63cf1187e96" +python-versions = ">=3.9.2, <3.14, !=3.9.7" +content-hash = "3ddf7fef3a7f660e6c39f5b1c92db4bfc18e8a94d9f3026a9002ccabb5a870bc" diff --git a/pyproject.toml b/pyproject.toml index c3cae9de8a..99784b1659 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ packages = [ ] [tool.poetry.dependencies] -python = ">=3.9,<3.14" +python = ">=3.9.2, <3.14, !=3.9.7" requests = ">=2.26.0" pendulum = ">=2.1.2" simplejson = ">=3.17.5" @@ -104,7 +104,8 @@ db-dtypes = { version = ">=1.2.0", optional = true } # https://github.com/apache/airflow/issues/28723 # pyiceberg = { version = ">=0.7.1", optional = true, extras = ["sql-sqlite"] } # we will rely on manual installation of `sqlalchemy>=2.0.18` instead -pyiceberg = { version = ">=0.9.0", optional = true } +pyiceberg = { version = ">=0.9.1" , optional = true } + databricks-sdk = {version = ">=0.38.0", optional = true} pywin32 = {version = ">=306", optional = true, platform = "win32"} rich-argparse = "^1.6.0" diff --git a/tests/common/destination/test_destination_capabilities.py b/tests/common/destination/test_destination_capabilities.py index eb66e762e6..57bcd41e3a 100644 --- a/tests/common/destination/test_destination_capabilities.py +++ b/tests/common/destination/test_destination_capabilities.py @@ -42,7 +42,10 @@ def test_resolve_merge_strategy() -> None: ) # unknown table formats - assert resolve_merge_strategy(schema.tables, iceberg_table, filesystem().capabilities()) is None + assert ( + resolve_merge_strategy(schema.tables, iceberg_table, filesystem().capabilities()) + == "upsert" + ) assert resolve_merge_strategy(schema.tables, delta_table, athena().capabilities()) is None # not supported strategy diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index ecf9661e56..f7b3553b9a 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -440,7 +440,10 @@ def r(data): table_data = load_tables_to_dicts(p, "parent", "parent__child", exclude_system_cols=True) if merge_strategy == "upsert": # merge keys will not apply and parent will not be deleted - if destination_config.table_format == "delta": + if ( + destination_config.table_format in ["delta", "iceberg"] + and destination_config.destination_type != "athena" + ): # delta merges cannot delete from nested tables assert table_counts == { "parent": 3, # id == 3 not deleted (not present in the data) @@ -825,11 +828,22 @@ def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration) github_data.max_table_nesting = 2 github_data_copy = github() github_data_copy.max_table_nesting = 2 - info = p.run( - [github_data, github_data_copy], - write_disposition="merge", - **destination_config.run_kwargs, - ) + # iceberg filesystem requires input data without duplicates + if ( + destination_config.table_format == "iceberg" + and destination_config.destination_type == "filesystem" + ): + info = p.run( + github_data, + write_disposition="merge", + **destination_config.run_kwargs, + ) + else: + info = p.run( + [github_data, github_data_copy], + write_disposition="merge", + **destination_config.run_kwargs, + ) assert_load_info(info) # make sure it was parquet or sql transforms expected_formats = ["parquet"] @@ -841,10 +855,9 @@ def test_pipeline_load_parquet(destination_config: DestinationTestConfiguration) github_1_counts = load_table_counts(p) expected_rows = 100 - # if table_format is set we use upsert which does not deduplicate input data - if not destination_config.supports_merge or ( - destination_config.table_format and destination_config.destination_type != "athena" - ): + # if table_format is set to delta we use upsert which does not deduplicate input data + # otherwise the data is either deduplicated or it's iceberg filesystem for which we didn't pass duplicates at all + if destination_config.table_format == "delta": expected_rows *= 2 assert github_1_counts["issues"] == expected_rows diff --git a/tests/load/pipeline/test_open_table_pipeline.py b/tests/load/pipeline/test_open_table_pipeline.py index 1672bd3534..140bf04d45 100644 --- a/tests/load/pipeline/test_open_table_pipeline.py +++ b/tests/load/pipeline/test_open_table_pipeline.py @@ -375,16 +375,15 @@ def nested_table(): assert len(rows_dict["nested_table__child"]) == 3 assert len(rows_dict["nested_table__child__grandchild"]) == 5 - if destination_config.supports_merge: - # now drop children and grandchildren, use merge write disposition to create and pass full table chain - # also for tables that do not have jobs - info = pipeline.run( - [{"foo": 3}] * 10000, - table_name="nested_table", - primary_key="foo", - write_disposition="merge", - ) - assert_load_info(info) + # now drop children and grandchildren, use merge write disposition to create and pass full table chain + # also for tables that do not have jobs + info = pipeline.run( + [{"foo": i} for i in range(3, 10003)], + table_name="nested_table", + primary_key="foo", + write_disposition="merge", + ) + assert_load_info(info) @pytest.mark.parametrize( diff --git a/tests/load/utils.py b/tests/load/utils.py index 0cbd857150..0163977192 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -651,7 +651,7 @@ def destinations_configs( bucket_url=bucket, extra_info=bucket, table_format="iceberg", - supports_merge=False, + supports_merge=True, file_format="parquet", destination_name="fsgcpoauth" if bucket == GCS_BUCKET else None, )