-
Notifications
You must be signed in to change notification settings - Fork 443
Upsert merge strategy for iceberg #2671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ccef83e
0643a51
fb67b79
3909e58
f07f995
b8f57be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,8 @@ | |
| from dlt.common.libs.pyarrow import cast_arrow_schema_types | ||
| from dlt.common.libs.utils import load_open_tables | ||
| from dlt.common.pipeline import SupportsPipeline | ||
| from dlt.common.schema.typing import TWriteDisposition | ||
| from dlt.common.schema.typing import TWriteDisposition, TTableSchema | ||
| from dlt.common.schema.utils import get_first_column_name_with_prop, get_columns_names_with_prop | ||
| from dlt.common.utils import assert_min_pkg_version | ||
| from dlt.common.exceptions import MissingDependencyException | ||
| from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration | ||
|
|
@@ -25,6 +26,7 @@ | |
| from pyiceberg.catalog import Catalog as IcebergCatalog | ||
| from pyiceberg.exceptions import NoSuchTableError | ||
| import pyarrow as pa | ||
| import pyiceberg.io.pyarrow as _pio | ||
| except ModuleNotFoundError: | ||
| raise MissingDependencyException( | ||
| "dlt pyiceberg helpers", | ||
|
|
@@ -33,6 +35,20 @@ | |
| ) | ||
|
|
||
|
|
||
| # TODO: remove with pyiceberg's release after 0.9.1 | ||
| _orig_get_kwargs = _pio._get_parquet_writer_kwargs | ||
|
|
||
|
|
||
| def _patched_get_parquet_writer_kwargs(table_properties): # type: ignore[no-untyped-def] | ||
| """Return the original kwargs **plus** store_decimal_as_integer=True.""" | ||
| kwargs = _orig_get_kwargs(table_properties) | ||
| kwargs.setdefault("store_decimal_as_integer", True) | ||
| return kwargs | ||
|
|
||
|
|
||
| _pio._get_parquet_writer_kwargs = _patched_get_parquet_writer_kwargs | ||
|
|
||
|
|
||
| def ensure_iceberg_compatible_arrow_schema(schema: pa.Schema) -> pa.Schema: | ||
| ARROW_TO_ICEBERG_COMPATIBLE_ARROW_TYPE_MAP = { | ||
| pa.types.is_time32: pa.time64("us"), | ||
|
|
@@ -63,6 +79,43 @@ def write_iceberg_table( | |
| ) | ||
|
|
||
|
|
||
| def merge_iceberg_table( | ||
| table: IcebergTable, | ||
| data: pa.Table, | ||
| schema: TTableSchema, | ||
| load_table_name: str, | ||
| ) -> None: | ||
| """Merges in-memory Arrow data into on-disk Iceberg table.""" | ||
| strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item] | ||
| if strategy == "upsert": | ||
| # evolve schema | ||
| with table.update_schema() as update: | ||
| update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema)) | ||
|
|
||
| if "parent" in schema: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this child table loading strategy. I know you took it from delta, but it seems to me that the first unique column will be the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| join_cols = [get_first_column_name_with_prop(schema, "unique")] | ||
| else: | ||
| join_cols = get_columns_names_with_prop(schema, "primary_key") | ||
|
|
||
| # TODO: replace the batching method with transaction with pyiceberg's release after 0.9.1 | ||
| for rb in data.to_batches(max_chunksize=1_000): | ||
anuunchin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| batch_tbl = pa.Table.from_batches([rb]) | ||
| batch_tbl = ensure_iceberg_compatible_arrow_data(batch_tbl) | ||
|
|
||
| table.upsert( | ||
| df=batch_tbl, | ||
| join_cols=join_cols, | ||
| when_matched_update_all=True, | ||
| when_not_matched_insert_all=True, | ||
| case_sensitive=True, | ||
| ) | ||
| else: | ||
| raise ValueError( | ||
| f'Merge strategy "{strategy}" is not supported for Iceberg tables. ' | ||
| f'Table: "{load_table_name}".' | ||
| ) | ||
|
|
||
|
|
||
| def get_sql_catalog( | ||
| catalog_name: str, | ||
| uri: str, | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.