Skip to content

Commit dd2dc23

Browse files
feat: Unity Catalog writes using daft.DataFrame.write_deltalake() (#3522)
This pull request covers the below 4 workflows that were tested internally (on Databricks on Azure and AWS) after building the package in a local environment: - Load existing table in Unity Catalog and append to it without schema change : `df.write_deltalake( uc_table, mode=‘append’)` to existing table in UC retrieved using `unity.load_table(table_name)` - Load existing table in Unity Catalog and overwrite it without schema change : `df.write_deltalake( uc_table, mode=‘overwrite’)` overwrite existing table in UC retrieved using `unity.load_table(table_name)` - Load existing table in Unity Catalog and overwrite it with schema change : `df.w rite_deltalake( uc_table, mode=‘overwrite’, schema_mode = ‘overwrite’)` overwrite existing table, with schema change, in UC retrieved using `unity.load_table(table_name)` - Create new table in Unity Catalog using Daft engine and populate it with data : Register a new table in UC without any schema using `unity.load_table(table_name, storage_path=“<some_valid_cloud_storage_path>”)` and `df.write_deltalake( uc_table, mode=‘overwrite’ , schema_mode = ‘overwrite’)` A few notes : - `deltalake` (0.22.3) does not support writes to table with Deletion vectors enabled. For appends to existing table, to avoid `CommitFailedError: Unsupported reader features required: [DeletionVectors]`, ensure the tables being written to do not have Deletion vector enabled. - `httpx==0.27.2` pinned dependency is due to a defect with unitycatalog-python, which is affecting Daft as well for all the previous versions. Fixing it from this PR. - If schema updates are performed by Daft, readers will immediately see the new schema since Delta log is self-containing. However, in Unity Catalog UI for the schema to update, will need to use `REPAIR TABLE catalog.schema.table SYNC METADATA;` from Databricks compute to update UC metadata to match what is in Delta log. - In this version, append to an existing table after changing schema is not supported. Only overwrites are supported. - For AWS, needed to set environment variable using `export AWS_S3_ALLOW_UNSAFE_RENAME=true`. - There appears to be a defect with the `allow_unsafe_rename` parameter in df.write_deltalake as it did not work during internal testing. This could be a new issue to log , once confirmed. --------- Co-authored-by: Kev Wang <[email protected]>
1 parent c057493 commit dd2dc23

File tree

3 files changed

+61
-7
lines changed

3 files changed

+61
-7
lines changed

daft/dataframe/dataframe.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import torch
5454

5555
from daft.io import DataCatalogTable
56+
from daft.unity_catalog import UnityCatalogTable
5657

5758
from daft.logical.schema import Schema
5859

@@ -826,7 +827,7 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
826827
@DataframePublicAPI
827828
def write_deltalake(
828829
self,
829-
table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable"],
830+
table: Union[str, pathlib.Path, "DataCatalogTable", "deltalake.DeltaTable", "UnityCatalogTable"],
830831
partition_cols: Optional[List[str]] = None,
831832
mode: Literal["append", "overwrite", "error", "ignore"] = "append",
832833
schema_mode: Optional[Literal["merge", "overwrite"]] = None,
@@ -844,7 +845,7 @@ def write_deltalake(
844845
This call is **blocking** and will execute the DataFrame when called
845846
846847
Args:
847-
table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable]): Destination `Delta Lake Table <https://delta-io.github.io/delta-rs/api/delta_table/>`__ or table URI to write dataframe to.
848+
table (Union[str, pathlib.Path, DataCatalogTable, deltalake.DeltaTable, UnityCatalogTable]): Destination `Delta Lake Table <https://delta-io.github.io/delta-rs/api/delta_table/>`__ or table URI to write dataframe to.
848849
partition_cols (List[str], optional): How to subpartition each partition further. If table exists, expected to match table's existing partitioning scheme, otherwise creates the table with specified partition columns. Defaults to None.
849850
mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data, `error` will raise an error if table already exists, and `ignore` will not write anything if table already exists. Defaults to "append".
850851
schema_mode (str, optional): Schema mode of the write. If set to `overwrite`, allows replacing the schema of the table when doing `mode=overwrite`. Schema mode `merge` is currently not supported.
@@ -872,6 +873,7 @@ def write_deltalake(
872873
from daft.io import DataCatalogTable
873874
from daft.io._deltalake import large_dtypes_kwargs
874875
from daft.io.object_store_options import io_config_to_storage_options
876+
from daft.unity_catalog import UnityCatalogTable
875877

876878
if schema_mode == "merge":
877879
raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")
@@ -881,14 +883,21 @@ def write_deltalake(
881883

882884
io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
883885

884-
if isinstance(table, (str, pathlib.Path, DataCatalogTable)):
886+
if isinstance(table, (str, pathlib.Path, DataCatalogTable, UnityCatalogTable)):
885887
if isinstance(table, str):
886888
table_uri = table
887889
elif isinstance(table, pathlib.Path):
888890
table_uri = str(table)
891+
elif isinstance(table, UnityCatalogTable):
892+
table_uri = table.table_uri
893+
io_config = table.io_config
889894
else:
890895
table_uri = table.table_uri(io_config)
891896

897+
if io_config is None:
898+
raise ValueError(
899+
"io_config was not provided to write_deltalake and could not be retrieved from the default configuration."
900+
)
892901
storage_options = io_config_to_storage_options(io_config, table_uri) or {}
893902
table = try_get_deltatable(table_uri, storage_options=storage_options)
894903
elif isinstance(table, deltalake.DeltaTable):

daft/unity_catalog/unity_catalog.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,56 @@ def _paginated_list_tables(client: unitycatalog.Unitycatalog, page_token: str |
9090

9191
return self._paginate_to_completion(_paginated_list_tables)
9292

93-
def load_table(self, table_name: str) -> UnityCatalogTable:
93+
def load_table(self, table_name: str, new_table_storage_path: str | None = None) -> UnityCatalogTable:
94+
"""Loads an existing Unity Catalog table. If the table is not found, and information is provided in the method to create a new table, a new table will be attempted to be registered.
95+
96+
Args:
97+
table_name (str): Name of the table in Unity Catalog in the form of dot-separated, 3-level namespace
98+
new_table_storage_path (str, optional): Cloud storage path URI to register a new external table using this path. Unity Catalog will validate if the path is valid and authorized for the principal, else will raise an exception.
99+
100+
Returns:
101+
UnityCatalogTable
102+
"""
94103
# Load the table ID
95-
table_info = self._client.tables.retrieve(table_name)
104+
try:
105+
table_info = self._client.tables.retrieve(table_name)
106+
if new_table_storage_path:
107+
warnings.warn(
108+
f"Table {table_name} is an existing storage table with a valid storage path. The 'new_table_storage_path' argument provided will be ignored."
109+
)
110+
except unitycatalog.NotFoundError:
111+
if not new_table_storage_path:
112+
raise ValueError(
113+
f"Table {table_name} is not an existing table. If a new table needs to be created, provide 'new_table_storage_path' value."
114+
)
115+
try:
116+
three_part_namesplit = table_name.split(".")
117+
if len(three_part_namesplit) != 3 or not all(three_part_namesplit):
118+
raise ValueError(
119+
f"Expected table name to be in the format of 'catalog.schema.table', received: {table_name}"
120+
)
121+
122+
params = {
123+
"catalog_name": three_part_namesplit[0],
124+
"schema_name": three_part_namesplit[1],
125+
"name": three_part_namesplit[2],
126+
"columns": None,
127+
"data_source_format": "DELTA",
128+
"table_type": "EXTERNAL",
129+
"storage_location": new_table_storage_path,
130+
"comment": None,
131+
}
132+
133+
table_info = self._client.tables.create(**params)
134+
except Exception as e:
135+
raise Exception(f"An error occurred while registering the table in Unity Catalog: {e}")
136+
96137
table_id = table_info.table_id
97138
storage_location = table_info.storage_location
98-
99139
# Grab credentials from Unity catalog and place it into the Table
100-
temp_table_credentials = self._client.temporary_table_credentials.create(operation="READ", table_id=table_id)
140+
temp_table_credentials = self._client.temporary_table_credentials.create(
141+
operation="READ_WRITE", table_id=table_id
142+
)
101143

102144
scheme = urlparse(storage_location).scheme
103145
if scheme == "s3" or scheme == "s3a":

requirements-dev.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ docker
77
# Pinned requests due to docker-py issue: https://github.com/docker/docker-py/issues/3256
88
requests<2.32.0
99

10+
# Pinned httpx due to unitycatalog-python issue: https://github.com/unitycatalog/unitycatalog-python/issues/9
11+
httpx==0.27.2
12+
1013
# Tracing
1114
orjson==3.10.12 # orjson recommended for viztracer
1215
py-spy==0.3.14

0 commit comments

Comments
 (0)