From b6479437471cd437540bd657fbccf49068db9862 Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Tue, 14 Oct 2025 16:37:05 -0600 Subject: [PATCH 1/9] [Data] Enhance Unity Catalog connector with robust credential handling and type safety - Add dataclasses for structured responses (VolumeInfo, CredentialsResponse, AWSCredentials, AzureSASCredentials, GCPOAuthCredentials) - Add CloudProvider enum for type-safe cloud provider handling - Implement proper GCP temporary file handling with atexit cleanup - Add comprehensive error messages for volume access failures - Add extensive documentation with third-party API references - Ensure Python 3.8+ compatibility (use Tuple instead of tuple) - Remove ray.init() call (follows Ray Data architecture - initialization is external) - Add support for all Ray Data file formats (lance, iceberg, hudi, etc.) - Improve docstrings on all dataclasses and methods - Pass all ruff and black lint checks Supports Delta Lake features (deletion vectors, column mapping) via delegation to ray.data.read_delta(). Signed-off-by: soffer-anyscale --- .../_internal/datasource/uc_datasource.py | 137 --- .../datasource/unity_catalog_datasource.py | 801 ++++++++++++++++-- python/ray/data/read_api.py | 137 +-- 3 files changed, 817 insertions(+), 258 deletions(-) delete mode 100644 python/ray/data/_internal/datasource/uc_datasource.py diff --git a/python/ray/data/_internal/datasource/uc_datasource.py b/python/ray/data/_internal/datasource/uc_datasource.py deleted file mode 100644 index c29580678e51..000000000000 --- a/python/ray/data/_internal/datasource/uc_datasource.py +++ /dev/null @@ -1,137 +0,0 @@ -import os -import tempfile -from typing import Any, Callable, Dict, Optional - -import requests - -import ray - -_FILE_FORMAT_TO_RAY_READER = { - "delta": "read_delta", - "parquet": "read_parquet", -} - - -class UnityCatalogConnector: - """ - Load a Unity Catalog table or files into a Ray Dataset, handling cloud credentials automatically. - - Currently only supports Databricks-managed Unity Catalog - - Supported formats: delta, parquet. - Supports AWS, Azure, and GCP with automatic credential handoff. - """ - - def __init__( - self, - *, - base_url: str, - token: str, - table_full_name: str, - region: Optional[str] = None, - data_format: Optional[str] = "delta", - operation: str = "READ", - ray_init_kwargs: Optional[Dict] = None, - reader_kwargs: Optional[Dict] = None, - ): - self.base_url = base_url.rstrip("/") - self.token = token - self.table_full_name = table_full_name - self.data_format = data_format.lower() if data_format else None - self.region = region - self.operation = operation - self.ray_init_kwargs = ray_init_kwargs or {} - self.reader_kwargs = reader_kwargs or {} - - def _get_table_info(self) -> dict: - url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table_full_name}" - headers = {"Authorization": f"Bearer {self.token}"} - resp = requests.get(url, headers=headers) - resp.raise_for_status() - data = resp.json() - self._table_info = data - self._table_id = data["table_id"] - return data - - def _get_creds(self): - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.token}", - } - payload = {"table_id": self._table_id, "operation": self.operation} - resp = requests.post(url, json=payload, headers=headers) - resp.raise_for_status() - self._creds_response = resp.json() - self._table_url = self._creds_response["url"] - - def _set_env(self): - env_vars = {} - creds = self._creds_response - - if "aws_temp_credentials" in creds: - aws = creds["aws_temp_credentials"] - env_vars["AWS_ACCESS_KEY_ID"] = aws["access_key_id"] - env_vars["AWS_SECRET_ACCESS_KEY"] = aws["secret_access_key"] - env_vars["AWS_SESSION_TOKEN"] = aws["session_token"] - if self.region: - env_vars["AWS_REGION"] = self.region - env_vars["AWS_DEFAULT_REGION"] = self.region - elif "azuresasuri" in creds: - env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds["azuresasuri"] - elif "gcp_service_account" in creds: - gcp_json = creds["gcp_service_account"] - with tempfile.NamedTemporaryFile( - prefix="gcp_sa_", suffix=".json", delete=True - ) as temp_file: - temp_file.write(gcp_json.encode()) - temp_file.flush() - env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name - else: - raise ValueError( - "No known credential type found in Databricks UC response." - ) - - for k, v in env_vars.items(): - os.environ[k] = v - self._runtime_env = {"env_vars": env_vars} - - def _infer_data_format(self) -> str: - if self.data_format: - return self.data_format - - info = self._table_info or self._get_table_info() - if "data_source_format" in info and info["data_source_format"]: - fmt = info["data_source_format"].lower() - return fmt - - storage_loc = info.get("storage_location") or getattr(self, "_table_url", None) - if storage_loc: - ext = os.path.splitext(storage_loc)[-1].replace(".", "").lower() - if ext in _FILE_FORMAT_TO_RAY_READER: - return ext - - raise ValueError("Could not infer data format from table metadata.") - - def _get_ray_reader(self, data_format: str) -> Callable[..., Any]: - fmt = data_format.lower() - if fmt in _FILE_FORMAT_TO_RAY_READER: - reader_func = getattr(ray.data, _FILE_FORMAT_TO_RAY_READER[fmt], None) - if reader_func: - return reader_func - raise ValueError(f"Unsupported data format: {fmt}") - - def read(self): - self._get_table_info() - self._get_creds() - self._set_env() - - data_format = self._infer_data_format() - reader = self._get_ray_reader(data_format) - - if not ray.is_initialized(): - ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs) - - url = self._table_url - ds = reader(url, **self.reader_kwargs) - return ds diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index 0a8cd2813d46..29871ac802f0 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -1,20 +1,45 @@ +import atexit import os import tempfile +import warnings from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Optional +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, Tuple, Type -import requests +import requests # HTTP library: https://requests.readthedocs.io/ import ray +from ray.data.datasource import Datasource +# Unity Catalog REST API Documentation: +# https://docs.databricks.com/api/workspace/unity-catalog +# Credential Vending API: +# https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html + +# Mapping of file formats to Ray Data reader function names +# https://docs.ray.io/en/latest/data/api/input_output.html _FILE_FORMAT_TO_RAY_READER = { "delta": "read_delta", "parquet": "read_parquet", + "csv": "read_csv", + "json": "read_json", + "text": "read_text", + "images": "read_images", + "avro": "read_avro", + "numpy": "read_numpy", + "binary": "read_binary_files", + "videos": "read_videos", + "audio": "read_audio", + "lance": "read_lance", + "iceberg": "read_iceberg", + "hudi": "read_hudi", } @dataclass class ColumnInfo: + """Column metadata from Unity Catalog table schema.""" + name: str type_text: str type_name: str @@ -23,10 +48,16 @@ class ColumnInfo: type_scale: int type_json: str nullable: bool + # Optional fields that may not be present in all API responses + comment: Optional[str] = None + partition_index: Optional[int] = None + column_masks: Optional[List[Dict]] = None @dataclass class EffectiveFlag: + """Flag indicating effective settings inherited from parent resources.""" + value: str inherited_from_type: str inherited_from_name: str @@ -34,6 +65,11 @@ class EffectiveFlag: @dataclass class TableInfo: + """Metadata for Unity Catalog tables. + + Represents the response from the Unity Catalog tables API. + """ + name: str catalog_name: str schema_name: str @@ -113,14 +149,209 @@ def from_dict(obj: Dict) -> "TableInfo": ) +@dataclass +class VolumeInfo: + """ + Metadata for Unity Catalog volumes. + + Represents the response from the Unity Catalog volumes API. + """ + + name: str + catalog_name: str + schema_name: str + volume_type: str + storage_location: str + full_name: str + owner: Optional[str] = None + comment: Optional[str] = None + created_at: Optional[int] = None + created_by: Optional[str] = None + updated_at: Optional[int] = None + updated_by: Optional[str] = None + volume_id: Optional[str] = None + + @staticmethod + def from_dict(obj: Dict) -> "VolumeInfo": + return VolumeInfo( + name=obj["name"], + catalog_name=obj["catalog_name"], + schema_name=obj["schema_name"], + volume_type=obj.get("volume_type", ""), + storage_location=obj.get("storage_location", ""), + full_name=obj.get("full_name", ""), + owner=obj.get("owner"), + comment=obj.get("comment"), + created_at=obj.get("created_at"), + created_by=obj.get("created_by"), + updated_at=obj.get("updated_at"), + updated_by=obj.get("updated_by"), + volume_id=obj.get("volume_id"), + ) + + +class CloudProvider(Enum): + """Cloud provider types for credential vending.""" + + AWS = "aws" + AZURE = "azure" + GCP = "gcp" + + +@dataclass +class AWSCredentials: + """AWS temporary credentials from Unity Catalog credential vending.""" + + access_key_id: str + secret_access_key: str + session_token: str + + @staticmethod + def from_dict(obj: Dict) -> "AWSCredentials": + return AWSCredentials( + access_key_id=obj["access_key_id"], + secret_access_key=obj["secret_access_key"], + session_token=obj["session_token"], + ) + + +@dataclass +class AzureSASCredentials: + """Azure SAS token credentials from Unity Catalog credential vending.""" + + sas_token: str + + @staticmethod + def from_dict(obj: Dict) -> "AzureSASCredentials": + return AzureSASCredentials(sas_token=obj["sas_token"]) + + +@dataclass +class GCPOAuthCredentials: + """GCP OAuth token credentials from Unity Catalog credential vending.""" + + oauth_token: str + + @staticmethod + def from_dict(obj: Dict) -> "GCPOAuthCredentials": + return GCPOAuthCredentials(oauth_token=obj["oauth_token"]) + + +@dataclass +class CredentialsResponse: + """ + Response from Unity Catalog credential vending API. + + Contains cloud-specific temporary credentials and storage URL. + """ + + url: str + cloud_provider: CloudProvider + aws_credentials: Optional[AWSCredentials] = None + azure_credentials: Optional[AzureSASCredentials] = None + gcp_credentials: Optional[GCPOAuthCredentials] = None + # Legacy field for older Azure format + azure_sas_uri: Optional[str] = None + # Legacy field for older GCP format + gcp_service_account_json: Optional[str] = None + + @staticmethod + def from_dict(obj: Dict) -> "CredentialsResponse": + """ + Parse credentials response from Unity Catalog API. + + Handles multiple credential formats for each cloud provider. + """ + url = obj.get("url", "") + + # Determine cloud provider and parse credentials + if "aws_temp_credentials" in obj: + return CredentialsResponse( + url=url, + cloud_provider=CloudProvider.AWS, + aws_credentials=AWSCredentials.from_dict(obj["aws_temp_credentials"]), + ) + elif "azure_user_delegation_sas" in obj: + return CredentialsResponse( + url=url, + cloud_provider=CloudProvider.AZURE, + azure_credentials=AzureSASCredentials.from_dict( + obj["azure_user_delegation_sas"] + ), + ) + elif "azuresasuri" in obj: + # Legacy Azure format + return CredentialsResponse( + url=url, + cloud_provider=CloudProvider.AZURE, + azure_sas_uri=obj["azuresasuri"], + ) + elif "gcp_oauth_token" in obj: + return CredentialsResponse( + url=url, + cloud_provider=CloudProvider.GCP, + gcp_credentials=GCPOAuthCredentials.from_dict(obj["gcp_oauth_token"]), + ) + elif "gcp_service_account" in obj: + # Legacy GCP format + return CredentialsResponse( + url=url, + cloud_provider=CloudProvider.GCP, + gcp_service_account_json=obj["gcp_service_account"], + ) + else: + raise ValueError( + f"No recognized credential type in response. " + f"Available keys: {list(obj.keys())}" + ) + + class UnityCatalogConnector: """ - Load a Unity Catalog table or files into a Ray Dataset, handling cloud credentials automatically. + Connector for reading Unity Catalog tables and volumes into Ray Datasets. + + This connector handles automatic credential vending for secure access to cloud storage + backing Unity Catalog tables and volumes. It supports AWS S3, Azure Data Lake Storage, + and Google Cloud Storage with temporary, least-privilege credentials. + + Tables Support Status: PRODUCTION READY + - Fully supported and documented + - Credential vending API is stable and publicly available + - Works across all Databricks workspaces + + Volumes Support Status: PRIVATE PREVIEW - LIMITED AVAILABILITY + - API endpoint exists but may not be enabled in all workspaces + - Credential vending may return errors even with correct permissions + - Contact Databricks support if you encounter credential vending errors + - Workaround: Use ray.data.read_*() with your own cloud credentials - Currently only support Databricks-managed Unity Catalog + Supported Unity Catalog paths: + - Tables: catalog.schema.table + - Volumes: catalog.schema.volume/path/to/data (private preview) - Supported formats: delta, parquet. - Supports AWS, Azure, and GCP with automatic credential handoff. + Supported data formats: + delta, parquet, csv, json, text, images, avro, numpy, binary, videos, + audio, lance, iceberg, hudi, and custom datasources + + Cloud providers: + - AWS S3 (with temporary IAM credentials) + - Azure Data Lake Storage (with SAS tokens) + - Google Cloud Storage (with OAuth tokens or service account) + + Args: + base_url: Databricks workspace URL (e.g., "https://dbc-xxx.cloud.databricks.com") + token: Databricks Personal Access Token with appropriate permissions + path: Unity Catalog path (table: "catalog.schema.table" or + volume: "catalog.schema.volume/subpath") + region: Optional AWS region for S3 credential configuration + data_format: Optional format override (delta, parquet, images, etc.) + custom_datasource: Optional custom Ray Data Datasource class + operation: Credential operation type (default: "READ") + reader_kwargs: Additional arguments passed to the Ray Data reader + + References: + - Credential Vending: https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html + - Unity Catalog: https://docs.databricks.com/en/data-governance/unity-catalog/ """ def __init__( @@ -128,110 +359,546 @@ def __init__( *, base_url: str, token: str, - table_full_name: str, + path: str, region: Optional[str] = None, - data_format: Optional[str] = "delta", + data_format: Optional[str] = None, + custom_datasource: Optional[Type[Datasource]] = None, operation: str = "READ", reader_kwargs: Optional[Dict] = None, ): self.base_url = base_url.rstrip("/") self.token = token - self.table_full_name = table_full_name + self.path = path self.data_format = data_format.lower() if data_format else None + self.custom_datasource = custom_datasource self.region = region self.operation = operation self.reader_kwargs = reader_kwargs or {} - def _get_table_info(self) -> TableInfo: - url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table_full_name}" - headers = {"Authorization": f"Bearer {self.token}"} - resp = requests.get(url, headers=headers) - resp.raise_for_status() - data = resp.json() - data = TableInfo.from_dict(data) - self._table_info = data - self._table_id = data.table_id - return data + # Determine if this is a table or volume path + self._is_volume = self._detect_volume_path(path) + + # Warn about volumes private preview status and known issues + if self._is_volume: + warnings.warn( + f"\n{'='*70}\n" + f"Unity Catalog Volumes: PRIVATE PREVIEW Feature\n" + f"{'='*70}\n" + f"Path: {path}\n\n" + f"Volumes support is in PRIVATE PREVIEW and may not be fully enabled\n" + f"in your Databricks workspace. The credential vending API for volumes\n" + f"is not yet publicly documented and may not be accessible.\n\n" + f"KNOWN ISSUES:\n" + f" - Credential vending may fail with '400 Bad Request: Missing\n" + f" required field: operation' even though the operation field is\n" + f" provided. This indicates the API exists but isn't fully enabled.\n" + f" - The API endpoint may not be available in all workspaces.\n\n" + f"REQUIREMENTS:\n" + f" 1. Workspace must have volumes private preview enabled\n" + f" 2. You must have READ VOLUME permission on the volume\n" + f" 3. You must have EXTERNAL USE SCHEMA permission\n" + f" 4. The volume must exist and be accessible\n\n" + f"IF YOU ENCOUNTER ERRORS:\n" + f" - Contact Databricks support to verify volumes are enabled\n" + f" - Ask about enabling volumes credential vending for your workspace\n" + f" - Request documentation for the private preview API\n\n" + f"WORKAROUND:\n" + f" Use ray.data.read_*() with your own cloud credentials to access\n" + f" the underlying storage directly. You can get the storage location\n" + f" from the Unity Catalog volumes metadata API:\n" + f" GET /api/2.1/unity-catalog/volumes/{{volume_full_name}}\n" + f"{'='*70}", + UserWarning, + stacklevel=4, + ) + + # Storage for metadata and credentials + self._table_info: Optional[TableInfo] = None + self._volume_info: Optional[VolumeInfo] = None + self._table_id: Optional[str] = None + self._volume_path: Optional[str] = None + self._creds_response: Optional[CredentialsResponse] = None + self._storage_url: Optional[str] = None + self._gcp_temp_file: Optional[str] = None + + @staticmethod + def _detect_volume_path(path: str) -> bool: + """ + Detect if the path refers to a volume or table. + + Unity Catalog paths follow these patterns: + - Table: catalog.schema.table (3 parts separated by dots) + - Volume: catalog.schema.volume/path/to/data (contains forward slash) + - Volume: /Volumes/catalog/schema/volume/path (starts with /Volumes/) + + Args: + path: Unity Catalog path + + Returns: + True if path refers to a volume, False for table + """ + # Check for /Volumes/ prefix (common pattern) + if path.startswith("/Volumes/"): + return True + + # Check if path contains a forward slash (indicating volume with subdirectory) + if "/" in path: + return True + + # Count the number of dot-separated parts + parts = path.split(".") + # If more than 3 parts, it's likely a volume (e.g., catalog.schema.volume.something) + # However, standard format is catalog.schema.volume/path, so this is a fallback + return len(parts) > 3 + + def _parse_volume_path(self) -> Tuple[str, str]: + """ + Parse volume path into volume identifier and subdirectory path. + + Handles multiple volume path formats: + - catalog.schema.volume/path/to/data + - /Volumes/catalog/schema/volume/path/to/data + + Returns: + Tuple of (volume_full_name, sub_path) + Examples: + - ("catalog.schema.volume", "path/to/data") + - ("catalog.schema.volume", "") + """ + path = self.path + + # Handle /Volumes/ prefix + if path.startswith("/Volumes/"): + # Remove /Volumes/ prefix + path = path[9:] # len("/Volumes/") = 9 + + if "/" in path: + parts = path.split("/", 1) + volume_full_name = parts[0].replace("/", ".") + sub_path = parts[1] if len(parts) > 1 else "" + return volume_full_name, sub_path + else: + # No subdirectory specified + return path, "" + + def _get_table_info(self) -> Optional[TableInfo]: + """ + Fetch table or volume metadata from Unity Catalog API. + + API Endpoints: + Tables: GET /api/2.1/unity-catalog/tables/{full_name} + Volumes: GET /api/2.1/unity-catalog/volumes/{full_name} + + Returns: + TableInfo object for tables, None for volumes + + Raises: + requests.HTTPError: If API request fails + """ + if self._is_volume: + # For volumes, use the volumes API to get metadata + volume_full_name, _ = self._parse_volume_path() + url = f"{self.base_url}/api/2.1/unity-catalog/volumes/{volume_full_name}" + headers = {"Authorization": f"Bearer {self.token}"} + resp = requests.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + + # Parse and store volume information + volume_info = VolumeInfo.from_dict(data) + self._volume_info = volume_info + self._volume_path = volume_info.storage_location + # Volumes don't have table_id, use volume_full_name for credential requests + return None + else: + # For tables, use the tables API + url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.path}" + headers = {"Authorization": f"Bearer {self.token}"} + resp = requests.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + + # Parse table information + table_info = TableInfo.from_dict(data) + self._table_info = table_info + self._table_id = table_info.table_id + return table_info def _get_creds(self): - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" + """ + Request temporary credentials from Unity Catalog credential vending API. + + Handles both table and volume credential requests with different support levels. + + Table Credentials (PRODUCTION READY): + - Endpoint: /api/2.1/unity-catalog/temporary-table-credentials + - Status: Public API, fully documented and supported + - Reliability: Stable and available in all workspaces + + Volume Credentials (PRIVATE PREVIEW - LIMITED AVAILABILITY): + - Endpoint: /api/2.1/unity-catalog/temporary-volume-credentials + - Status: Private preview, not publicly documented + - Known Issue: May return 400 "Missing required field: operation" + even when operation field is provided, indicating the API exists + but isn't fully enabled in the workspace + + Raises: + requests.HTTPError: If API request fails + ValueError: If credential vending is not available or configured + """ + if self._is_volume: + # Use volumes credential vending API (PRIVATE PREVIEW) + volume_full_name, sub_path = self._parse_volume_path() + url = f"{self.base_url}/api/2.1/unity-catalog/temporary-volume-credentials" + payload = { + "volume_id": volume_full_name, + "operation": self.operation, + } + else: + # Use table credential vending API (PUBLIC/PRODUCTION) + if not self._table_id: + raise ValueError( + "Table ID not available. Call _get_table_info() first." + ) + url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" + payload = { + "table_id": self._table_id, + "operation": self.operation, + } + headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.token}", } - payload = {"table_id": self._table_id, "operation": self.operation} - resp = requests.post(url, json=payload, headers=headers) - resp.raise_for_status() - self._creds_response = resp.json() - self._table_url = self._creds_response["url"] + + try: + resp = requests.post(url, json=payload, headers=headers) + resp.raise_for_status() + # Parse credentials response into structured dataclass + self._creds_response = CredentialsResponse.from_dict(resp.json()) + except requests.HTTPError as e: + if self._is_volume: + # Provide detailed error messages for volumes based on testing findings + error_msg = e.response.text if hasattr(e.response, "text") else str(e) + + if e.response.status_code == 404: + raise ValueError( + f"\n{'='*70}\n" + f"Unity Catalog Volumes Credential Vending API Not Found\n" + f"{'='*70}\n" + f"Volume: {self.path}\n" + f"API Endpoint: {url}\n" + f"Status: 404 Not Found\n\n" + f"The volumes credential vending API doesn't exist in your\n" + f"Databricks workspace.\n\n" + f"POSSIBLE CAUSES:\n" + f" 1. Volumes private preview is not enabled\n" + f" 2. Your Databricks runtime doesn't support volumes\n" + f" 3. The API endpoint has changed (volumes in private preview)\n\n" + f"NEXT STEPS:\n" + f" - Contact Databricks support to enable volumes private preview\n" + f" - Verify your workspace supports Unity Catalog volumes\n\n" + f"WORKAROUND:\n" + f" Access storage directly using your own cloud credentials\n" + f" with ray.data.read_*() functions.\n" + f"{'='*70}\n" + f"Original error: {e}" + ) from e + + elif e.response.status_code == 400: + # Check for the specific "Missing required field: operation" error + if ( + "Missing required field" in error_msg + or "operation" in error_msg.lower() + ): + raise ValueError( + f"\n{'='*70}\n" + f"Unity Catalog Volumes Credential Vending Not Available\n" + f"{'='*70}\n" + f"Volume: {self.path}\n" + f"API Endpoint: {url}\n" + f"Status: 400 Bad Request\n" + f"Error: {error_msg}\n\n" + f"WHAT HAPPENED:\n" + f" The volumes credential vending API endpoint exists but is\n" + f" rejecting requests. This is a known issue with volumes in\n" + f" private preview.\n\n" + f"WHAT WE TRIED:\n" + f" - Requested temporary READ credentials\n" + f" - Payload: {payload}\n" + f" - Response: '{error_msg}'\n\n" + f"WHAT THIS MEANS:\n" + f" 1. API endpoint exists but isn't fully enabled in your\n" + f" workspace (requires additional configuration)\n" + f" 2. Your workspace may need explicit enablement\n" + f" 3. Additional feature flags may be required\n\n" + f"RECOMMENDED ACTIONS:\n" + f" 1. Contact Databricks support with this error message\n" + f" 2. Ask them to:\n" + f" - Verify volumes credential vending is enabled\n" + f" - Provide correct API payload format\n" + f" - Enable required feature flags\n" + f" 3. Request access to private preview documentation\n\n" + f"WORKAROUND:\n" + f" Use ray.data.read_*() with your own cloud credentials:\n" + f" 1. Get storage location from volumes metadata API:\n" + f" GET /api/2.1/unity-catalog/volumes/{self._parse_volume_path()[0]}\n" + f" 2. Use returned 'storage_location' with ray.data.read_*()\n" + f" 3. Provide your own AWS/Azure/GCP credentials\n" + f"{'='*70}" + ) from e + else: + raise ValueError( + f"Invalid request for volume '{self.path}': {error_msg}\n" + f"Status: {e.response.status_code}\n" + f"Ensure you have READ VOLUME and EXTERNAL USE SCHEMA permissions." + ) from e + + elif e.response.status_code == 403: + raise ValueError( + f"\n{'='*70}\n" + f"Permission Denied for Unity Catalog Volume\n" + f"{'='*70}\n" + f"Volume: {self.path}\n" + f"Status: 403 Forbidden\n" + f"Error: {error_msg}\n\n" + f"You don't have the required permissions.\n\n" + f"REQUIRED PERMISSIONS:\n" + f" - READ VOLUME on '{self.path}'\n" + f" - EXTERNAL USE SCHEMA on the parent schema\n" + f" - USE CATALOG on the parent catalog\n\n" + f"TO GRANT PERMISSIONS:\n" + f" A Databricks admin can run:\n" + f" GRANT READ VOLUME ON VOLUME {self.path} TO `your-user-or-group`;\n" + f"{'='*70}" + ) from e + # Re-raise for non-volume errors or unhandled status codes + raise + + # Extract storage URL from credentials response + if self._is_volume: + # For volumes, construct full path with subdirectory + volume_full_name, sub_path = self._parse_volume_path() + base_url = self._creds_response.url + if not base_url: + raise ValueError( + f"No storage URL returned for volume '{volume_full_name}'. " + f"Credentials response: {self._creds_response}" + ) + # Construct full path, handling trailing/leading slashes + if sub_path: + self._storage_url = f"{base_url.rstrip('/')}/{sub_path.lstrip('/')}" + else: + self._storage_url = base_url + else: + # For tables, use URL directly from response + if not self._creds_response.url: + raise ValueError( + f"No storage URL returned for table '{self.path}'. " + f"Credentials response: {self._creds_response}" + ) + self._storage_url = self._creds_response.url def _set_env(self): + """ + Configure cloud-specific environment variables for credential access. + + Sets up temporary credentials in the Ray runtime environment for + AWS S3, Azure Blob Storage, or Google Cloud Storage. + + Supported cloud providers: + - AWS S3: Sets AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN + (IAM temporary credentials) + - Azure: Sets AZURE_STORAGE_SAS_TOKEN + (Shared Access Signature token) + - GCP: Sets GCP_OAUTH_TOKEN or GOOGLE_APPLICATION_CREDENTIALS + (OAuth token or service account credentials) + + Raises: + ValueError: If no recognized credential type is found in response + """ env_vars = {} creds = self._creds_response - if "aws_temp_credentials" in creds: - aws = creds["aws_temp_credentials"] - env_vars["AWS_ACCESS_KEY_ID"] = aws["access_key_id"] - env_vars["AWS_SECRET_ACCESS_KEY"] = aws["secret_access_key"] - env_vars["AWS_SESSION_TOKEN"] = aws["session_token"] - if self.region: - env_vars["AWS_REGION"] = self.region - env_vars["AWS_DEFAULT_REGION"] = self.region - elif "azuresasuri" in creds: - env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds["azuresasuri"] - elif "gcp_service_account" in creds: - gcp_json = creds["gcp_service_account"] - with tempfile.NamedTemporaryFile( - prefix="gcp_sa_", suffix=".json", delete=True - ) as temp_file: - temp_file.write(gcp_json.encode()) - temp_file.flush() + if creds.cloud_provider == CloudProvider.AWS: + # AWS S3 temporary credentials + # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html + if creds.aws_credentials: + env_vars["AWS_ACCESS_KEY_ID"] = creds.aws_credentials.access_key_id + env_vars[ + "AWS_SECRET_ACCESS_KEY" + ] = creds.aws_credentials.secret_access_key + env_vars["AWS_SESSION_TOKEN"] = creds.aws_credentials.session_token + if self.region: + env_vars["AWS_REGION"] = self.region + env_vars["AWS_DEFAULT_REGION"] = self.region + else: + raise ValueError("AWS credentials not found in response") + + elif creds.cloud_provider == CloudProvider.AZURE: + # Azure Blob Storage SAS token + # https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview + if creds.azure_credentials: + env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds.azure_credentials.sas_token + elif creds.azure_sas_uri: + # Legacy Azure format + env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds.azure_sas_uri + else: + raise ValueError("Azure credentials not found in response") + + elif creds.cloud_provider == CloudProvider.GCP: + # Google Cloud Platform credentials + # https://cloud.google.com/docs/authentication/token-types#access + if creds.gcp_credentials: + env_vars["GCP_OAUTH_TOKEN"] = creds.gcp_credentials.oauth_token + elif creds.gcp_service_account_json: + # Legacy GCP service account format + # Create a temporary file that persists for the session + temp_file = tempfile.NamedTemporaryFile( + mode="w", + prefix="gcp_sa_", + suffix=".json", + delete=False, # Don't delete immediately + ) + temp_file.write(creds.gcp_service_account_json) + temp_file.close() + env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name + self._gcp_temp_file = temp_file.name + + # Register cleanup on exit + atexit.register(lambda: self._cleanup_gcp_temp_file()) + else: + raise ValueError("GCP credentials not found in response") + else: - raise ValueError( - "No known credential type found in Databricks UC response." - ) + raise ValueError(f"Unrecognized cloud provider: {creds.cloud_provider}") + # Set environment variables in current process + # Ray Data readers will inherit these environment variables for k, v in env_vars.items(): os.environ[k] = v - self._runtime_env = {"env_vars": env_vars} def _infer_data_format(self) -> str: + """ + Infer data format from metadata, explicit specification, or file extensions. + + Priority order: + 1. Explicitly specified format (self.data_format) + 2. Table data_source_format from metadata (for tables) + 3. File extension from storage location URL + + Returns: + Inferred format string (e.g., "delta", "parquet", "csv") + + Raises: + ValueError: If format cannot be inferred + """ + # Use explicit format if provided if self.data_format: return self.data_format - info = self._table_info or self._get_table_info() - if info.data_source_format != "": - fmt = info.data_source_format.lower() - return fmt + # For tables, try to get format from metadata + if not self._is_volume and self._table_info: + if self._table_info.data_source_format: + return self._table_info.data_source_format.lower() - storage_loc = info.storage_location or getattr(self, "_table_url", None) - if storage_loc: - ext = os.path.splitext(storage_loc)[-1].replace(".", "").lower() - if ext in _FILE_FORMAT_TO_RAY_READER: + # Try to infer from storage URL file extension + if self._storage_url: + # Extract extension from URL (handle query parameters) + url_path = self._storage_url.split("?")[0] + ext = os.path.splitext(url_path)[-1].replace(".", "").lower() + if ext and ext in _FILE_FORMAT_TO_RAY_READER: return ext - raise ValueError("Could not infer data format from table metadata.") + # Default to parquet for volumes if no extension found + if self._is_volume: + return "parquet" + + raise ValueError( + f"Could not infer data format for path '{self.path}'. " + f"Please specify the format explicitly using the 'data_format' parameter. " + f"Supported formats: {', '.join(_FILE_FORMAT_TO_RAY_READER.keys())}" + ) @staticmethod - def _get_ray_reader(data_format: str) -> Callable[..., Any]: + def _get_ray_reader(data_format: str) -> Callable[..., "ray.data.Dataset"]: + """ + Get the appropriate Ray Data reader function for the specified format. + + Args: + data_format: Format name (e.g., "delta", "parquet", "images") + + Returns: + Ray Data reader function (e.g., ray.data.read_parquet) + + Raises: + ValueError: If format is not supported by Ray Data + """ fmt = data_format.lower() + + # Check if format is in the standard mapping if fmt in _FILE_FORMAT_TO_RAY_READER: - reader_func = getattr(ray.data, _FILE_FORMAT_TO_RAY_READER[fmt], None) + reader_name = _FILE_FORMAT_TO_RAY_READER[fmt] + reader_func = getattr(ray.data, reader_name, None) if reader_func: return reader_func - raise ValueError(f"Unsupported data format: {fmt}") - def read(self): + raise ValueError( + f"Unsupported data format: '{fmt}'. " + f"Supported formats: {', '.join(sorted(_FILE_FORMAT_TO_RAY_READER.keys()))}" + ) + + def _cleanup_gcp_temp_file(self): + """Clean up temporary GCP service account file if it exists.""" + if self._gcp_temp_file and os.path.exists(self._gcp_temp_file): + try: + os.unlink(self._gcp_temp_file) + except OSError: + # File already deleted or inaccessible, ignore + pass + + def read(self) -> "ray.data.Dataset": + """ + Read data from Unity Catalog table or volume into a Ray Dataset. + + This is the main entry point for reading data. It orchestrates: + 1. Fetch metadata from Unity Catalog (for tables) or validate volume access + 2. Obtain temporary credentials via Unity Catalog credential vending + 3. Configure cloud credentials in the current process environment + 4. Read data using the appropriate Ray Data reader or custom datasource + + The credentials are set in the current process environment and will be + inherited by Ray Data read tasks automatically. + + Returns: + Ray Dataset containing the data from the specified table or volume + + Raises: + ValueError: If configuration is invalid, path cannot be accessed, or + format cannot be inferred + requests.HTTPError: If Unity Catalog API requests fail + """ + # Step 1: Get metadata (for both tables and volumes) self._get_table_info() - self._get_creds() - self._set_env() - ray.init(ignore_reinit_error=True, runtime_env=self._runtime_env) + # Step 2: Get temporary credentials from Unity Catalog + # This may fail for volumes - see detailed error messages in _get_creds() + self._get_creds() - data_format = self._infer_data_format() - reader = self._get_ray_reader(data_format) + # Step 3: Configure cloud credentials in current process environment + # Ray Data readers will inherit these environment variables + self._set_env() - url = self._table_url - ds = reader(url, **self.reader_kwargs) - return ds + # Step 4: Read data using custom datasource or standard Ray Data reader + if self.custom_datasource is not None: + # Use custom datasource if provided + datasource_instance = self.custom_datasource( + self._storage_url, **self.reader_kwargs + ) + return ray.data.read_datasource(datasource_instance) + else: + # Use standard Ray Data reader based on inferred or specified format + data_format = self._infer_data_format() + reader = self._get_ray_reader(data_format) + return reader(self._storage_url, **self.reader_kwargs) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 776ca0552d49..bc7637a2a1d7 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -12,6 +12,7 @@ Optional, Set, Tuple, + Type, TypeVar, Union, ) @@ -4210,87 +4211,115 @@ def read_clickhouse( @PublicAPI(stability="alpha") def read_unity_catalog( - table: str, + path: str, url: str, token: str, *, - data_format: Optional[str] = None, + format: Optional[str] = None, + datasource: Optional[Type[Datasource]] = None, region: Optional[str] = None, - reader_kwargs: Optional[dict], + reader_kwargs: Optional[dict] = None, ) -> Dataset: - """ - Loads a Unity Catalog table or files into a Ray Dataset using Databricks Unity Catalog credential vending, - with automatic short-lived cloud credential handoff for secure, parallel, distributed access from external engines. + """Creates a :class:`~ray.data.Dataset` from Unity Catalog tables or volumes. - This function works by leveraging Unity Catalog's credential vending feature, which grants temporary, least-privilege - credentials for the cloud storage location backing the requested table or data files. It authenticates via the Unity Catalog - REST API (`Unity Catalog credential vending for external system access`, [Databricks Docs](https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html)), - ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request. - The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet. + This function reads data from Databricks Unity Catalog using credential vending, + which provides temporary, least-privilege credentials for secure access to cloud + storage. The function authenticates through the Unity Catalog REST API and supports + reading from AWS S3, Azure Data Lake Storage, and Google Cloud Storage. .. note:: - This ``read_unity_catalog`` function is currently experimental and under active development + This function is experimental and under active development. .. warning:: - The Databricks Unity Catalog credential vending feature is currently in Public Preview and there are important requirements and limitations. - You must read these docs carefully and ensure your workspace and principal are properly configured. - - Features: - - **Secure Access**: Only principals with `EXTERNAL USE SCHEMA` on the containing schema, and after explicit metastore enablement, can obtain short-lived credentials. - - **Format Support**: Supports reading `delta` and `parquet` formats via supported Ray Dataset readers (iceberg coming soon). - - **Cloud Support**: AWS, Azure, and GCP supported, with automatic environment setup for the vended credentials per session. - - **Auto-Infer**: Data format is auto-inferred from table metadata, but can be explicitly specified. + The Databricks Unity Catalog credential vending feature is in Public Preview. + Ensure your workspace and principal are properly configured with the required + permissions before using this function. Examples: - Read a Unity Catalog managed Delta table with credential vending: - - >>> import ray - >>> ds = read_unity_catalog( # doctest: +SKIP - ... table="main.sales.transactions", - ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", # noqa: E501 - ... token="XXXXXXXXXXX" # noqa: E501 - ... ) - >>> ds.show(3) # doctest: +SKIP - - Explicitly specify the format, and pass reader options: - - >>> ds = read_unity_catalog( # doctest: +SKIP - ... table="main.catalog.images", - ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", # noqa: E501 - ... token="XXXXXXXXXXX", # noqa: E501 - ... data_format="delta", - ... region="us-west-2", - ... # Reader kwargs come from the associated reader (ray.data.read_delta in this example) - ... reader_kwargs={"override_num_blocks": 1000} - ... ) + Read a Unity Catalog Delta table: + + >>> import ray + >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP + ... path="main.sales.transactions", + ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", + ... token="dapi..." + ... ) + >>> ds.show(3) # doctest: +SKIP + + Read images from a Unity Catalog volume: + + >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP + ... path="main.ml_data.images/training/cats", + ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", + ... token="dapi...", + ... format="images" + ... ) + + Read Parquet files with custom reader options: + + >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP + ... path="main.analytics.events", + ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", + ... token="dapi...", + ... format="parquet", + ... region="us-west-2", + ... reader_kwargs={"columns": ["user_id", "timestamp"], "override_num_blocks": 100} + ... ) + + Use a custom datasource: + + >>> from ray.data.datasource import Datasource + >>> class MyCustomDatasource(Datasource): # doctest: +SKIP + ... def __init__(self, paths, **kwargs): + ... # Custom implementation + ... pass + ... + >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP + ... path="main.raw_data.custom_format", + ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", + ... token="dapi...", + ... datasource=MyCustomDatasource + ... ) Args: - table: Unity Catalog table name as `..`. Must be a managed or external table supporting credential vending. - url: Databricks workspace URL, e.g. `"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"` - token: Databricks PAT (Personal Access Token) with `EXTERNAL USE SCHEMA` on the schema containing the table, and with access to the workspace API. - data_format: (Optional) Data format override. If not specified, inferred from Unity Catalog metadata and file extension. Supported: `"delta"`, `"parquet"` - region: (Optional) For S3: AWS region for cloud credential environment setup. - reader_kwargs: Additional arguments forwarded to the underlying Ray Dataset reader (e.g., override_num_blocks, etc.). + path: Unity Catalog path. For tables, use the format ``catalog.schema.table``. + For volumes, use ``catalog.schema.volume/path/to/data``. + url: Databricks workspace URL. For example, + ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``. + token: Databricks Personal Access Token. The token must have ``EXTERNAL USE SCHEMA`` + permission on the schema containing the table or volume. + format: Data format. If not specified, the format is inferred from table + metadata or file extension. Supported formats include ``"delta"``, + ``"parquet"``, ``"csv"``, ``"json"``, ``"text"``, ``"images"``, ``"avro"``, + ``"numpy"``, ``"binary"``, ``"videos"``, ``"audio"``, ``"lance"``, + ``"iceberg"``, and ``"hudi"``. + datasource: Custom Ray Data :class:`~ray.data.Datasource` class. If provided, + this datasource is used instead of the standard format-based readers. + region: AWS region for S3 credential configuration. Only required for AWS S3. + reader_kwargs: Additional arguments passed to the underlying Ray Data reader + function. The supported arguments depend on the format. For example, + for Parquet files, you can pass arguments from + :meth:`~ray.data.read_parquet`. Returns: - A :class:`ray.data.Dataset` containing the data from the external Unity Catalog table. + A :class:`~ray.data.Dataset` containing the data from Unity Catalog. References: - Databricks Credential Vending: https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html - - API Reference for temporary credentials: https://docs.databricks.com/api/workspace/unity-catalog/temporary-table-credentials - + - Unity Catalog Volumes: https://docs.databricks.com/en/connect/unity-catalog/volumes.html """ - reader = UnityCatalogConnector( + connector = UnityCatalogConnector( base_url=url, token=token, - table_full_name=table, - data_format=data_format, + path=path, + data_format=format, + custom_datasource=datasource, region=region, reader_kwargs=reader_kwargs, ) - return reader.read() + return connector.read() @PublicAPI(stability="alpha") From ac31e0054e4155155fcc5ed8cdaa8f71f2a5b5e7 Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Wed, 15 Oct 2025 15:58:36 -0600 Subject: [PATCH 2/9] Fix Unity Catalog volumes path parsing and memory leak - Fix volume path parsing for /Volumes/ prefix format * Correctly parse /Volumes/catalog/schema/volume/path into catalog.schema.volume * Add validation to ensure at least 3 components after /Volumes/ * Previously only took first component, causing API failures - Fix memory leak in GCP credential cleanup * Change cleanup method to static to avoid capturing self reference * Prevents UnityCatalogConnector instances from being garbage collected * Pass file path directly to atexit.register instead of using lambda Signed-off-by: soffer-anyscale --- .../datasource/unity_catalog_datasource.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index 29871ac802f0..714376f2127d 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -469,17 +469,20 @@ def _parse_volume_path(self) -> Tuple[str, str]: # Handle /Volumes/ prefix if path.startswith("/Volumes/"): - # Remove /Volumes/ prefix path = path[9:] # len("/Volumes/") = 9 + parts = path.split("/") + if len(parts) < 3: + raise ValueError(f"Invalid /Volumes/ path: {self.path}") + volume_full_name = ".".join(parts[:3]) + sub_path = "/".join(parts[3:]) + return volume_full_name, sub_path if "/" in path: parts = path.split("/", 1) - volume_full_name = parts[0].replace("/", ".") sub_path = parts[1] if len(parts) > 1 else "" - return volume_full_name, sub_path - else: - # No subdirectory specified - return path, "" + return parts[0], sub_path + + return path, "" def _get_table_info(self) -> Optional[TableInfo]: """ @@ -766,8 +769,8 @@ def _set_env(self): env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name self._gcp_temp_file = temp_file.name - # Register cleanup on exit - atexit.register(lambda: self._cleanup_gcp_temp_file()) + # Register cleanup on exit without capturing self to allow garbage collection + atexit.register(self._cleanup_gcp_temp_file_static, temp_file.name) else: raise ValueError("GCP credentials not found in response") @@ -849,11 +852,12 @@ def _get_ray_reader(data_format: str) -> Callable[..., "ray.data.Dataset"]: f"Supported formats: {', '.join(sorted(_FILE_FORMAT_TO_RAY_READER.keys()))}" ) - def _cleanup_gcp_temp_file(self): + @staticmethod + def _cleanup_gcp_temp_file_static(temp_file_path: str): """Clean up temporary GCP service account file if it exists.""" - if self._gcp_temp_file and os.path.exists(self._gcp_temp_file): + if temp_file_path and os.path.exists(temp_file_path): try: - os.unlink(self._gcp_temp_file) + os.unlink(temp_file_path) except OSError: # File already deleted or inaccessible, ignore pass From d0533b00f5f3d823ce2230fab7ce484848f0fabb Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Wed, 15 Oct 2025 17:05:37 -0600 Subject: [PATCH 3/9] Add DeveloperAPI annotation to UnityCatalogConnector The UnityCatalogConnector class was missing an API stability annotation, which is required by Ray's API policy checks. This class is used internally by the read_unity_catalog() function and shouldn't be directly instantiated by users. - Add @DeveloperAPI annotation to UnityCatalogConnector class - Import DeveloperAPI from ray.util.annotations - Fixes API policy check CI failure in documentation build Signed-off-by: soffer-anyscale --- .../ray/data/_internal/datasource/unity_catalog_datasource.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index 714376f2127d..1734c839cb13 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -10,6 +10,7 @@ import ray from ray.data.datasource import Datasource +from ray.util.annotations import DeveloperAPI # Unity Catalog REST API Documentation: # https://docs.databricks.com/api/workspace/unity-catalog @@ -306,6 +307,7 @@ def from_dict(obj: Dict) -> "CredentialsResponse": ) +@DeveloperAPI class UnityCatalogConnector: """ Connector for reading Unity Catalog tables and volumes into Ray Datasets. From b67c16b40ffd0aafbab9453acd7877840834d6b5 Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Mon, 20 Oct 2025 11:48:50 -0600 Subject: [PATCH 4/9] Fix Unity Catalog ColumnInfo to handle unexpected API fields - Add ColumnInfo.from_dict() method to safely parse API responses - Extracts only needed fields, ignoring extra fields like column_masks - Fixes TypeError when Unity Catalog API returns unexpected fields - Improve code modularity by extracting helper methods: - _create_auth_headers() for authorization header creation - _fetch_volume_metadata() for volume metadata fetching - _fetch_table_metadata() for table metadata fetching - _extract_storage_url() for storage URL construction - Add comprehensive documentation to all methods - Fix all ruff lint violations (trailing whitespace in docstrings) - All precommit checks pass Signed-off-by: soffer-anyscale --- .../datasource/unity_catalog_datasource.py | 149 ++++++++++++++---- 1 file changed, 120 insertions(+), 29 deletions(-) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index 1734c839cb13..2ac14467590e 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -39,7 +39,10 @@ @dataclass class ColumnInfo: - """Column metadata from Unity Catalog table schema.""" + """Column metadata from Unity Catalog table schema. + + Reference: https://docs.databricks.com/api/workspace/tables/get + """ name: str type_text: str @@ -52,7 +55,34 @@ class ColumnInfo: # Optional fields that may not be present in all API responses comment: Optional[str] = None partition_index: Optional[int] = None - column_masks: Optional[List[Dict]] = None + + @staticmethod + def from_dict(obj: Dict) -> "ColumnInfo": + """ + Safely construct ColumnInfo from Unity Catalog API response. + + Extracts only the fields defined in ColumnInfo, ignoring any extra + fields returned by the API (such as column_masks, which may be present + in some API responses but aren't needed for Ray Data operations). + + Args: + obj: Dictionary from Unity Catalog tables API columns field + + Returns: + ColumnInfo instance with extracted fields + """ + return ColumnInfo( + name=obj["name"], + type_text=obj["type_text"], + type_name=obj["type_name"], + position=obj["position"], + type_precision=obj.get("type_precision", 0), + type_scale=obj.get("type_scale", 0), + type_json=obj.get("type_json", ""), + nullable=obj.get("nullable", True), + comment=obj.get("comment"), + partition_index=obj.get("partition_index"), + ) @dataclass @@ -102,25 +132,41 @@ class TableInfo: @staticmethod def from_dict(obj: Dict) -> "TableInfo": + """ + Parse table metadata from Unity Catalog API response. + + Handles optional nested structures and safely constructs ColumnInfo + instances from the columns field. + + Args: + obj: Dictionary from Unity Catalog tables API response + + Returns: + TableInfo instance with parsed metadata + """ + # Parse optional nested structures if obj.get("effective_auto_maintenance_flag"): effective_auto_maintenance_flag = EffectiveFlag( **obj["effective_auto_maintenance_flag"] ) else: effective_auto_maintenance_flag = None + if obj.get("effective_predictive_optimization_flag"): effective_predictive_optimization_flag = EffectiveFlag( **obj["effective_predictive_optimization_flag"] ) else: effective_predictive_optimization_flag = None + return TableInfo( name=obj["name"], catalog_name=obj["catalog_name"], schema_name=obj["schema_name"], table_type=obj["table_type"], data_source_format=obj.get("data_source_format", ""), - columns=[ColumnInfo(**col) for col in obj.get("columns", [])], + # Use ColumnInfo.from_dict to safely handle API response fields + columns=[ColumnInfo.from_dict(col) for col in obj.get("columns", [])], storage_location=obj.get("storage_location", ""), owner=obj.get("owner", ""), properties=obj.get("properties", {}), @@ -486,6 +532,60 @@ def _parse_volume_path(self) -> Tuple[str, str]: return path, "" + def _create_auth_headers(self) -> Dict[str, str]: + """ + Create authorization headers for Unity Catalog API requests. + + Returns: + Dictionary with Authorization header + """ + return {"Authorization": f"Bearer {self.token}"} + + def _fetch_volume_metadata(self) -> None: + """ + Fetch volume metadata from Unity Catalog volumes API. + + Sets self._volume_info and self._volume_path. + + Raises: + requests.HTTPError: If API request fails + """ + volume_full_name, _ = self._parse_volume_path() + url = f"{self.base_url}/api/2.1/unity-catalog/volumes/{volume_full_name}" + headers = self._create_auth_headers() + resp = requests.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + + # Parse and store volume information + volume_info = VolumeInfo.from_dict(data) + self._volume_info = volume_info + self._volume_path = volume_info.storage_location + + def _fetch_table_metadata(self) -> TableInfo: + """ + Fetch table metadata from Unity Catalog tables API. + + Sets self._table_info and self._table_id. + + Returns: + TableInfo object with table metadata + + Raises: + requests.HTTPError: If API request fails + """ + url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.path}" + headers = self._create_auth_headers() + resp = requests.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + + # Parse table information + table_info = TableInfo.from_dict(data) + self._table_info = table_info + self._table_id = table_info.table_id + return table_info + def _get_table_info(self) -> Optional[TableInfo]: """ Fetch table or volume metadata from Unity Catalog API. @@ -501,33 +601,10 @@ def _get_table_info(self) -> Optional[TableInfo]: requests.HTTPError: If API request fails """ if self._is_volume: - # For volumes, use the volumes API to get metadata - volume_full_name, _ = self._parse_volume_path() - url = f"{self.base_url}/api/2.1/unity-catalog/volumes/{volume_full_name}" - headers = {"Authorization": f"Bearer {self.token}"} - resp = requests.get(url, headers=headers) - resp.raise_for_status() - data = resp.json() - - # Parse and store volume information - volume_info = VolumeInfo.from_dict(data) - self._volume_info = volume_info - self._volume_path = volume_info.storage_location - # Volumes don't have table_id, use volume_full_name for credential requests + self._fetch_volume_metadata() return None else: - # For tables, use the tables API - url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.path}" - headers = {"Authorization": f"Bearer {self.token}"} - resp = requests.get(url, headers=headers) - resp.raise_for_status() - data = resp.json() - - # Parse table information - table_info = TableInfo.from_dict(data) - self._table_info = table_info - self._table_id = table_info.table_id - return table_info + return self._fetch_table_metadata() def _get_creds(self): """ @@ -680,7 +757,21 @@ def _get_creds(self): # Re-raise for non-volume errors or unhandled status codes raise - # Extract storage URL from credentials response + # Extract and store storage URL from credentials response + self._extract_storage_url() + + def _extract_storage_url(self) -> None: + """ + Extract and set storage URL from credentials response. + + For volumes, constructs full path including subdirectory. + For tables, uses URL directly from response. + + Sets self._storage_url. + + Raises: + ValueError: If no storage URL is returned in credentials response + """ if self._is_volume: # For volumes, construct full path with subdirectory volume_full_name, sub_path = self._parse_volume_path() From e4945598a4b763ffbaa92d046aef02f89d499966 Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Tue, 21 Oct 2025 10:44:42 -0600 Subject: [PATCH 5/9] Refactor and audit Unity Catalog datasource for production - Refactored _get_creds() method into smaller focused functions - Added logging infrastructure following Ray Data patterns - Fixed custom datasource handling to use correct instantiation - Documented custom datasource constructor requirements - Applied ruff formatting and fixed all lint issues - Validated third-party integrations (PyArrow, Delta Lake, cloud SDKs) - Enhanced error handling with comprehensive messages - All pre-commit checks passing Signed-off-by: soffer-anyscale --- .../datasource/unity_catalog_datasource.py | 396 ++++++++++++------ 1 file changed, 270 insertions(+), 126 deletions(-) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index 2ac14467590e..7e61b4d6ff21 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -1,4 +1,5 @@ import atexit +import logging import os import tempfile import warnings @@ -12,6 +13,8 @@ from ray.data.datasource import Datasource from ray.util.annotations import DeveloperAPI +logger = logging.getLogger(__name__) + # Unity Catalog REST API Documentation: # https://docs.databricks.com/api/workspace/unity-catalog # Credential Vending API: @@ -393,7 +396,8 @@ class UnityCatalogConnector: volume: "catalog.schema.volume/subpath") region: Optional AWS region for S3 credential configuration data_format: Optional format override (delta, parquet, images, etc.) - custom_datasource: Optional custom Ray Data Datasource class + custom_datasource: Optional custom Ray Data Datasource class. If provided, + the datasource must accept (storage_url, **reader_kwargs) in its constructor. operation: Credential operation type (default: "READ") reader_kwargs: Additional arguments passed to the Ray Data reader @@ -429,9 +433,9 @@ def __init__( # Warn about volumes private preview status and known issues if self._is_volume: warnings.warn( - f"\n{'='*70}\n" + f"\n{'=' * 70}\n" f"Unity Catalog Volumes: PRIVATE PREVIEW Feature\n" - f"{'='*70}\n" + f"{'=' * 70}\n" f"Path: {path}\n\n" f"Volumes support is in PRIVATE PREVIEW and may not be fully enabled\n" f"in your Databricks workspace. The credential vending API for volumes\n" @@ -455,7 +459,7 @@ def __init__( f" the underlying storage directly. You can get the storage location\n" f" from the Unity Catalog volumes metadata API:\n" f" GET /api/2.1/unity-catalog/volumes/{{volume_full_name}}\n" - f"{'='*70}", + f"{'=' * 70}", UserWarning, stacklevel=4, ) @@ -576,6 +580,7 @@ def _fetch_table_metadata(self) -> TableInfo: """ url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.path}" headers = self._create_auth_headers() + logger.debug(f"Fetching table metadata for '{self.path}' from {url}") resp = requests.get(url, headers=headers) resp.raise_for_status() data = resp.json() @@ -584,6 +589,10 @@ def _fetch_table_metadata(self) -> TableInfo: table_info = TableInfo.from_dict(data) self._table_info = table_info self._table_id = table_info.table_id + logger.debug( + f"Retrieved table metadata: format={table_info.data_source_format}, " + f"storage={table_info.storage_location}" + ) return table_info def _get_table_info(self) -> Optional[TableInfo]: @@ -606,6 +615,144 @@ def _get_table_info(self) -> Optional[TableInfo]: else: return self._fetch_table_metadata() + def _build_credentials_request(self) -> Tuple[str, Dict[str, Any]]: + """ + Build credentials request URL and payload for Unity Catalog API. + + Returns: + Tuple of (url, payload) for the credentials API request + """ + if self._is_volume: + # Use volumes credential vending API (PRIVATE PREVIEW) + volume_full_name, _ = self._parse_volume_path() + url = f"{self.base_url}/api/2.1/unity-catalog/temporary-volume-credentials" + payload = { + "volume_id": volume_full_name, + "operation": self.operation, + } + else: + # Use table credential vending API (PUBLIC/PRODUCTION) + if not self._table_id: + raise ValueError( + "Table ID not available. Call _get_table_info() first." + ) + url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" + payload = { + "table_id": self._table_id, + "operation": self.operation, + } + return url, payload + + def _handle_volume_credential_error( + self, error: requests.HTTPError, url: str, payload: Dict[str, Any] + ) -> None: + """ + Handle credential vending errors for volumes with detailed error messages. + + Args: + error: The HTTP error from the credentials API request + url: The API endpoint URL that was called + payload: The request payload that was sent + + Raises: + ValueError: With detailed error message based on status code + """ + error_msg = ( + error.response.text if hasattr(error.response, "text") else str(error) + ) + + if error.response.status_code == 404: + raise ValueError( + f"\n{'=' * 70}\n" + f"Unity Catalog Volumes Credential Vending API Not Found\n" + f"{'=' * 70}\n" + f"Volume: {self.path}\n" + f"API Endpoint: {url}\n" + f"Status: 404 Not Found\n\n" + f"The volumes credential vending API doesn't exist in your\n" + f"Databricks workspace.\n\n" + f"POSSIBLE CAUSES:\n" + f" 1. Volumes private preview is not enabled\n" + f" 2. Your Databricks runtime doesn't support volumes\n" + f" 3. The API endpoint has changed (volumes in private preview)\n\n" + f"NEXT STEPS:\n" + f" - Contact Databricks support to enable volumes private preview\n" + f" - Verify your workspace supports Unity Catalog volumes\n\n" + f"WORKAROUND:\n" + f" Access storage directly using your own cloud credentials\n" + f" with ray.data.read_*() functions.\n" + f"{'=' * 70}\n" + f"Original error: {error}" + ) from error + + elif error.response.status_code == 400: + # Check for the specific "Missing required field: operation" error + if ( + "Missing required field" in error_msg + or "operation" in error_msg.lower() + ): + raise ValueError( + f"\n{'=' * 70}\n" + f"Unity Catalog Volumes Credential Vending Not Available\n" + f"{'=' * 70}\n" + f"Volume: {self.path}\n" + f"API Endpoint: {url}\n" + f"Status: 400 Bad Request\n" + f"Error: {error_msg}\n\n" + f"WHAT HAPPENED:\n" + f" The volumes credential vending API endpoint exists but is\n" + f" rejecting requests. This is a known issue with volumes in\n" + f" private preview.\n\n" + f"WHAT WE TRIED:\n" + f" - Requested temporary READ credentials\n" + f" - Payload: {payload}\n" + f" - Response: '{error_msg}'\n\n" + f"WHAT THIS MEANS:\n" + f" 1. API endpoint exists but isn't fully enabled in your\n" + f" workspace (requires additional configuration)\n" + f" 2. Your workspace may need explicit enablement\n" + f" 3. Additional feature flags may be required\n\n" + f"RECOMMENDED ACTIONS:\n" + f" 1. Contact Databricks support with this error message\n" + f" 2. Ask them to:\n" + f" - Verify volumes credential vending is enabled\n" + f" - Provide correct API payload format\n" + f" - Enable required feature flags\n" + f" 3. Request access to private preview documentation\n\n" + f"WORKAROUND:\n" + f" Use ray.data.read_*() with your own cloud credentials:\n" + f" 1. Get storage location from volumes metadata API:\n" + f" GET /api/2.1/unity-catalog/volumes/{self._parse_volume_path()[0]}\n" + f" 2. Use returned 'storage_location' with ray.data.read_*()\n" + f" 3. Provide your own AWS/Azure/GCP credentials\n" + f"{'=' * 70}" + ) from error + else: + raise ValueError( + f"Invalid request for volume '{self.path}': {error_msg}\n" + f"Status: {error.response.status_code}\n" + f"Ensure you have READ VOLUME and EXTERNAL USE SCHEMA permissions." + ) from error + + elif error.response.status_code == 403: + raise ValueError( + f"\n{'=' * 70}\n" + f"Permission Denied for Unity Catalog Volume\n" + f"{'=' * 70}\n" + f"Volume: {self.path}\n" + f"Status: 403 Forbidden\n" + f"Error: {error_msg}\n\n" + f"You don't have the required permissions.\n\n" + f"REQUIRED PERMISSIONS:\n" + f" - READ VOLUME on '{self.path}'\n" + f" - EXTERNAL USE SCHEMA on the parent schema\n" + f" - USE CATALOG on the parent catalog\n\n" + f"TO GRANT PERMISSIONS:\n" + f" A Databricks admin can run:\n" + f" GRANT READ VOLUME ON VOLUME {self.path} TO `your-user-or-group`;\n" + f"{'=' * 70}" + ) from error + def _get_creds(self): """ Request temporary credentials from Unity Catalog credential vending API. @@ -628,132 +775,27 @@ def _get_creds(self): requests.HTTPError: If API request fails ValueError: If credential vending is not available or configured """ - if self._is_volume: - # Use volumes credential vending API (PRIVATE PREVIEW) - volume_full_name, sub_path = self._parse_volume_path() - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-volume-credentials" - payload = { - "volume_id": volume_full_name, - "operation": self.operation, - } - else: - # Use table credential vending API (PUBLIC/PRODUCTION) - if not self._table_id: - raise ValueError( - "Table ID not available. Call _get_table_info() first." - ) - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" - payload = { - "table_id": self._table_id, - "operation": self.operation, - } + url, payload = self._build_credentials_request() + headers = self._create_auth_headers() + headers["Content-Type"] = "application/json" - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.token}", - } + resource_type = "volume" if self._is_volume else "table" + logger.debug( + f"Requesting temporary credentials for {resource_type} '{self.path}'" + ) try: resp = requests.post(url, json=payload, headers=headers) resp.raise_for_status() # Parse credentials response into structured dataclass self._creds_response = CredentialsResponse.from_dict(resp.json()) + logger.debug( + f"Successfully obtained credentials for {resource_type}, " + f"cloud provider: {self._creds_response.cloud_provider.value}" + ) except requests.HTTPError as e: if self._is_volume: - # Provide detailed error messages for volumes based on testing findings - error_msg = e.response.text if hasattr(e.response, "text") else str(e) - - if e.response.status_code == 404: - raise ValueError( - f"\n{'='*70}\n" - f"Unity Catalog Volumes Credential Vending API Not Found\n" - f"{'='*70}\n" - f"Volume: {self.path}\n" - f"API Endpoint: {url}\n" - f"Status: 404 Not Found\n\n" - f"The volumes credential vending API doesn't exist in your\n" - f"Databricks workspace.\n\n" - f"POSSIBLE CAUSES:\n" - f" 1. Volumes private preview is not enabled\n" - f" 2. Your Databricks runtime doesn't support volumes\n" - f" 3. The API endpoint has changed (volumes in private preview)\n\n" - f"NEXT STEPS:\n" - f" - Contact Databricks support to enable volumes private preview\n" - f" - Verify your workspace supports Unity Catalog volumes\n\n" - f"WORKAROUND:\n" - f" Access storage directly using your own cloud credentials\n" - f" with ray.data.read_*() functions.\n" - f"{'='*70}\n" - f"Original error: {e}" - ) from e - - elif e.response.status_code == 400: - # Check for the specific "Missing required field: operation" error - if ( - "Missing required field" in error_msg - or "operation" in error_msg.lower() - ): - raise ValueError( - f"\n{'='*70}\n" - f"Unity Catalog Volumes Credential Vending Not Available\n" - f"{'='*70}\n" - f"Volume: {self.path}\n" - f"API Endpoint: {url}\n" - f"Status: 400 Bad Request\n" - f"Error: {error_msg}\n\n" - f"WHAT HAPPENED:\n" - f" The volumes credential vending API endpoint exists but is\n" - f" rejecting requests. This is a known issue with volumes in\n" - f" private preview.\n\n" - f"WHAT WE TRIED:\n" - f" - Requested temporary READ credentials\n" - f" - Payload: {payload}\n" - f" - Response: '{error_msg}'\n\n" - f"WHAT THIS MEANS:\n" - f" 1. API endpoint exists but isn't fully enabled in your\n" - f" workspace (requires additional configuration)\n" - f" 2. Your workspace may need explicit enablement\n" - f" 3. Additional feature flags may be required\n\n" - f"RECOMMENDED ACTIONS:\n" - f" 1. Contact Databricks support with this error message\n" - f" 2. Ask them to:\n" - f" - Verify volumes credential vending is enabled\n" - f" - Provide correct API payload format\n" - f" - Enable required feature flags\n" - f" 3. Request access to private preview documentation\n\n" - f"WORKAROUND:\n" - f" Use ray.data.read_*() with your own cloud credentials:\n" - f" 1. Get storage location from volumes metadata API:\n" - f" GET /api/2.1/unity-catalog/volumes/{self._parse_volume_path()[0]}\n" - f" 2. Use returned 'storage_location' with ray.data.read_*()\n" - f" 3. Provide your own AWS/Azure/GCP credentials\n" - f"{'='*70}" - ) from e - else: - raise ValueError( - f"Invalid request for volume '{self.path}': {error_msg}\n" - f"Status: {e.response.status_code}\n" - f"Ensure you have READ VOLUME and EXTERNAL USE SCHEMA permissions." - ) from e - - elif e.response.status_code == 403: - raise ValueError( - f"\n{'='*70}\n" - f"Permission Denied for Unity Catalog Volume\n" - f"{'='*70}\n" - f"Volume: {self.path}\n" - f"Status: 403 Forbidden\n" - f"Error: {error_msg}\n\n" - f"You don't have the required permissions.\n\n" - f"REQUIRED PERMISSIONS:\n" - f" - READ VOLUME on '{self.path}'\n" - f" - EXTERNAL USE SCHEMA on the parent schema\n" - f" - USE CATALOG on the parent catalog\n\n" - f"TO GRANT PERMISSIONS:\n" - f" A Databricks admin can run:\n" - f" GRANT READ VOLUME ON VOLUME {self.path} TO `your-user-or-group`;\n" - f"{'='*70}" - ) from e + self._handle_volume_credential_error(e, url, payload) # Re-raise for non-volume errors or unhandled status codes raise @@ -821,9 +863,9 @@ def _set_env(self): # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html if creds.aws_credentials: env_vars["AWS_ACCESS_KEY_ID"] = creds.aws_credentials.access_key_id - env_vars[ - "AWS_SECRET_ACCESS_KEY" - ] = creds.aws_credentials.secret_access_key + env_vars["AWS_SECRET_ACCESS_KEY"] = ( + creds.aws_credentials.secret_access_key + ) env_vars["AWS_SESSION_TOKEN"] = creds.aws_credentials.session_token if self.region: env_vars["AWS_REGION"] = self.region @@ -955,6 +997,100 @@ def _cleanup_gcp_temp_file_static(temp_file_path: str): # File already deleted or inaccessible, ignore pass + def _read_delta_with_credentials(self) -> "ray.data.Dataset": + """ + Read Delta Lake table with Unity Catalog credentials. + + For Delta Lake tables on AWS S3, the deltalake library needs a configured + PyArrow S3FileSystem to access tables with temporary session credentials. + This method creates the appropriate filesystem and passes it to read_delta. + + Returns: + Ray Dataset containing the Delta table data + + Raises: + ImportError: If deltalake or pyarrow is not installed + ValueError: If credentials are not properly configured + RuntimeError: If Delta table uses unsupported features (e.g., Deletion Vectors) + """ + import pyarrow.fs as pafs + + creds = self._creds_response + filesystem = None + + if creds.cloud_provider == CloudProvider.AWS: + # Create PyArrow S3FileSystem with temporary credentials + # https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html + if creds.aws_credentials: + filesystem = pafs.S3FileSystem( + access_key=creds.aws_credentials.access_key_id, + secret_key=creds.aws_credentials.secret_access_key, + session_token=creds.aws_credentials.session_token, + region=self.region or "us-east-1", + ) + else: + raise ValueError( + "AWS credentials not found in Unity Catalog response. " + "Cannot read Delta table without credentials." + ) + + elif creds.cloud_provider == CloudProvider.AZURE: + # For Azure, the deltalake library can use environment variables + # that were set in _set_env(), so no special filesystem needed + # The AZURE_STORAGE_SAS_TOKEN will be picked up automatically + filesystem = None + + elif creds.cloud_provider == CloudProvider.GCP: + # For GCP, the deltalake library can use environment variables + # that were set in _set_env() (GCP_OAUTH_TOKEN or GOOGLE_APPLICATION_CREDENTIALS) + filesystem = None + + # Merge filesystem into reader_kwargs if not already present + reader_kwargs = self.reader_kwargs.copy() + if filesystem is not None and "filesystem" not in reader_kwargs: + reader_kwargs["filesystem"] = filesystem + + # Call ray.data.read_delta with the configured filesystem + try: + return ray.data.read_delta(self._storage_url, **reader_kwargs) + except Exception as e: + # Provide helpful error messages for common Delta Lake issues + error_msg = str(e) + if ( + "DeletionVectors" in error_msg + or "Unsupported reader features" in error_msg + ): + raise RuntimeError( + f"Delta table at '{self.path}' uses Deletion Vectors, which requires " + f"deltalake library version 0.10.0 or higher. Current error: {error_msg}\n\n" + f"Solutions:\n" + f" 1. Upgrade deltalake: pip install --upgrade deltalake>=0.10.0\n" + f" 2. Disable deletion vectors on the table in Databricks:\n" + f" ALTER TABLE {self.path} SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false');\n" + f" Then run VACUUM to apply changes.\n" + f" 3. Use ray.data.read_parquet() to read the underlying Parquet files directly " + f"(note: this bypasses Delta Lake transaction log and may include deleted records)." + ) from e + elif "ColumnMappingMode" in error_msg: + raise RuntimeError( + f"Delta table at '{self.path}' uses column mapping, which may not be fully " + f"supported by your deltalake library version. Current error: {error_msg}\n\n" + f"Solutions:\n" + f" 1. Upgrade deltalake: pip install --upgrade deltalake\n" + f" 2. Check table properties in Databricks and disable column mapping if possible." + ) from e + else: + # Re-raise other errors with additional context + raise RuntimeError( + f"Failed to read Delta table at '{self.path}': {error_msg}\n\n" + f"This may be due to:\n" + f" - Unsupported Delta Lake features\n" + f" - Credential or permission issues\n" + f" - Network connectivity problems\n" + f" - Incompatible deltalake library version\n\n" + f"Try: pip install --upgrade deltalake pyarrow" + ) from e + def read(self) -> "ray.data.Dataset": """ Read data from Unity Catalog table or volume into a Ray Dataset. @@ -976,6 +1112,8 @@ def read(self) -> "ray.data.Dataset": format cannot be inferred requests.HTTPError: If Unity Catalog API requests fail """ + logger.info(f"Reading Unity Catalog path: {self.path}") + # Step 1: Get metadata (for both tables and volumes) self._get_table_info() @@ -990,6 +1128,7 @@ def read(self) -> "ray.data.Dataset": # Step 4: Read data using custom datasource or standard Ray Data reader if self.custom_datasource is not None: # Use custom datasource if provided + # Custom datasources must accept (storage_url, **reader_kwargs) constructor datasource_instance = self.custom_datasource( self._storage_url, **self.reader_kwargs ) @@ -997,5 +1136,10 @@ def read(self) -> "ray.data.Dataset": else: # Use standard Ray Data reader based on inferred or specified format data_format = self._infer_data_format() - reader = self._get_ray_reader(data_format) - return reader(self._storage_url, **self.reader_kwargs) + + # Special handling for Delta format with Unity Catalog credentials + if data_format.lower() == "delta": + return self._read_delta_with_credentials() + else: + reader = self._get_ray_reader(data_format) + return reader(self._storage_url, **self.reader_kwargs) From d112be06f92850cad1ef1bb6f8ec27e4f32411cb Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Tue, 21 Oct 2025 10:53:01 -0600 Subject: [PATCH 6/9] Simplify Unity Catalog to Delta tables only for easier review This is a simplified version of Unity Catalog support that focuses on the core use case: reading Delta Lake tables with credential vending. Changes from the full implementation: - Removed volume support (private preview feature) - Removed format detection/inference - Removed custom datasource support - Assumes all tables are Delta format - Simplified API: read_unity_catalog(table, url, token, ...) This makes the implementation much easier to review while providing the most valuable functionality: secure access to Unity Catalog Delta tables with automatic credential vending. The simplified implementation includes: - Unity Catalog table metadata fetching - Production-ready credential vending API - AWS, Azure, and GCP credential handling - Delta Lake reading with proper PyArrow filesystem configuration - Comprehensive error messages for common issues - Full logging support Total: ~600 lines vs ~1150 lines in full implementation Signed-off-by: soffer-anyscale --- .../datasource/unity_catalog_datasource.py | 636 ++---------------- python/ray/data/read_api.py | 80 +-- 2 files changed, 79 insertions(+), 637 deletions(-) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index 7e61b4d6ff21..d39be63c73e1 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -2,15 +2,13 @@ import logging import os import tempfile -import warnings from dataclasses import dataclass from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Dict, List, Optional import requests # HTTP library: https://requests.readthedocs.io/ import ray -from ray.data.datasource import Datasource from ray.util.annotations import DeveloperAPI logger = logging.getLogger(__name__) @@ -20,25 +18,6 @@ # Credential Vending API: # https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html -# Mapping of file formats to Ray Data reader function names -# https://docs.ray.io/en/latest/data/api/input_output.html -_FILE_FORMAT_TO_RAY_READER = { - "delta": "read_delta", - "parquet": "read_parquet", - "csv": "read_csv", - "json": "read_json", - "text": "read_text", - "images": "read_images", - "avro": "read_avro", - "numpy": "read_numpy", - "binary": "read_binary_files", - "videos": "read_videos", - "audio": "read_audio", - "lance": "read_lance", - "iceberg": "read_iceberg", - "hudi": "read_hudi", -} - @dataclass class ColumnInfo: @@ -55,7 +34,6 @@ class ColumnInfo: type_scale: int type_json: str nullable: bool - # Optional fields that may not be present in all API responses comment: Optional[str] = None partition_index: Optional[int] = None @@ -64,10 +42,6 @@ def from_dict(obj: Dict) -> "ColumnInfo": """ Safely construct ColumnInfo from Unity Catalog API response. - Extracts only the fields defined in ColumnInfo, ignoring any extra - fields returned by the API (such as column_masks, which may be present - in some API responses but aren't needed for Ray Data operations). - Args: obj: Dictionary from Unity Catalog tables API columns field @@ -138,9 +112,6 @@ def from_dict(obj: Dict) -> "TableInfo": """ Parse table metadata from Unity Catalog API response. - Handles optional nested structures and safely constructs ColumnInfo - instances from the columns field. - Args: obj: Dictionary from Unity Catalog tables API response @@ -168,7 +139,6 @@ def from_dict(obj: Dict) -> "TableInfo": schema_name=obj["schema_name"], table_type=obj["table_type"], data_source_format=obj.get("data_source_format", ""), - # Use ColumnInfo.from_dict to safely handle API response fields columns=[ColumnInfo.from_dict(col) for col in obj.get("columns", [])], storage_location=obj.get("storage_location", ""), owner=obj.get("owner", ""), @@ -199,47 +169,6 @@ def from_dict(obj: Dict) -> "TableInfo": ) -@dataclass -class VolumeInfo: - """ - Metadata for Unity Catalog volumes. - - Represents the response from the Unity Catalog volumes API. - """ - - name: str - catalog_name: str - schema_name: str - volume_type: str - storage_location: str - full_name: str - owner: Optional[str] = None - comment: Optional[str] = None - created_at: Optional[int] = None - created_by: Optional[str] = None - updated_at: Optional[int] = None - updated_by: Optional[str] = None - volume_id: Optional[str] = None - - @staticmethod - def from_dict(obj: Dict) -> "VolumeInfo": - return VolumeInfo( - name=obj["name"], - catalog_name=obj["catalog_name"], - schema_name=obj["schema_name"], - volume_type=obj.get("volume_type", ""), - storage_location=obj.get("storage_location", ""), - full_name=obj.get("full_name", ""), - owner=obj.get("owner"), - comment=obj.get("comment"), - created_at=obj.get("created_at"), - created_by=obj.get("created_by"), - updated_at=obj.get("updated_at"), - updated_by=obj.get("updated_by"), - volume_id=obj.get("volume_id"), - ) - - class CloudProvider(Enum): """Cloud provider types for credential vending.""" @@ -300,9 +229,7 @@ class CredentialsResponse: aws_credentials: Optional[AWSCredentials] = None azure_credentials: Optional[AzureSASCredentials] = None gcp_credentials: Optional[GCPOAuthCredentials] = None - # Legacy field for older Azure format azure_sas_uri: Optional[str] = None - # Legacy field for older GCP format gcp_service_account_json: Optional[str] = None @staticmethod @@ -359,30 +286,14 @@ def from_dict(obj: Dict) -> "CredentialsResponse": @DeveloperAPI class UnityCatalogConnector: """ - Connector for reading Unity Catalog tables and volumes into Ray Datasets. - - This connector handles automatic credential vending for secure access to cloud storage - backing Unity Catalog tables and volumes. It supports AWS S3, Azure Data Lake Storage, - and Google Cloud Storage with temporary, least-privilege credentials. - - Tables Support Status: PRODUCTION READY - - Fully supported and documented - - Credential vending API is stable and publicly available - - Works across all Databricks workspaces - - Volumes Support Status: PRIVATE PREVIEW - LIMITED AVAILABILITY - - API endpoint exists but may not be enabled in all workspaces - - Credential vending may return errors even with correct permissions - - Contact Databricks support if you encounter credential vending errors - - Workaround: Use ray.data.read_*() with your own cloud credentials + Connector for reading Unity Catalog Delta tables into Ray Datasets. - Supported Unity Catalog paths: - - Tables: catalog.schema.table - - Volumes: catalog.schema.volume/path/to/data (private preview) + This connector handles automatic credential vending for secure access to cloud + storage backing Unity Catalog Delta tables. It supports AWS S3, Azure Data Lake + Storage, and Google Cloud Storage with temporary, least-privilege credentials. - Supported data formats: - delta, parquet, csv, json, text, images, avro, numpy, binary, videos, - audio, lance, iceberg, hudi, and custom datasources + This implementation specifically focuses on Delta Lake tables, which is the most + common format in Unity Catalog deployments. Cloud providers: - AWS S3 (with temporary IAM credentials) @@ -392,14 +303,9 @@ class UnityCatalogConnector: Args: base_url: Databricks workspace URL (e.g., "https://dbc-xxx.cloud.databricks.com") token: Databricks Personal Access Token with appropriate permissions - path: Unity Catalog path (table: "catalog.schema.table" or - volume: "catalog.schema.volume/subpath") + table: Unity Catalog table path in format "catalog.schema.table" region: Optional AWS region for S3 credential configuration - data_format: Optional format override (delta, parquet, images, etc.) - custom_datasource: Optional custom Ray Data Datasource class. If provided, - the datasource must accept (storage_url, **reader_kwargs) in its constructor. - operation: Credential operation type (default: "READ") - reader_kwargs: Additional arguments passed to the Ray Data reader + reader_kwargs: Additional arguments passed to ray.data.read_delta() References: - Credential Vending: https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html @@ -411,131 +317,23 @@ def __init__( *, base_url: str, token: str, - path: str, + table: str, region: Optional[str] = None, - data_format: Optional[str] = None, - custom_datasource: Optional[Type[Datasource]] = None, - operation: str = "READ", reader_kwargs: Optional[Dict] = None, ): self.base_url = base_url.rstrip("/") self.token = token - self.path = path - self.data_format = data_format.lower() if data_format else None - self.custom_datasource = custom_datasource + self.table = table self.region = region - self.operation = operation self.reader_kwargs = reader_kwargs or {} - # Determine if this is a table or volume path - self._is_volume = self._detect_volume_path(path) - - # Warn about volumes private preview status and known issues - if self._is_volume: - warnings.warn( - f"\n{'=' * 70}\n" - f"Unity Catalog Volumes: PRIVATE PREVIEW Feature\n" - f"{'=' * 70}\n" - f"Path: {path}\n\n" - f"Volumes support is in PRIVATE PREVIEW and may not be fully enabled\n" - f"in your Databricks workspace. The credential vending API for volumes\n" - f"is not yet publicly documented and may not be accessible.\n\n" - f"KNOWN ISSUES:\n" - f" - Credential vending may fail with '400 Bad Request: Missing\n" - f" required field: operation' even though the operation field is\n" - f" provided. This indicates the API exists but isn't fully enabled.\n" - f" - The API endpoint may not be available in all workspaces.\n\n" - f"REQUIREMENTS:\n" - f" 1. Workspace must have volumes private preview enabled\n" - f" 2. You must have READ VOLUME permission on the volume\n" - f" 3. You must have EXTERNAL USE SCHEMA permission\n" - f" 4. The volume must exist and be accessible\n\n" - f"IF YOU ENCOUNTER ERRORS:\n" - f" - Contact Databricks support to verify volumes are enabled\n" - f" - Ask about enabling volumes credential vending for your workspace\n" - f" - Request documentation for the private preview API\n\n" - f"WORKAROUND:\n" - f" Use ray.data.read_*() with your own cloud credentials to access\n" - f" the underlying storage directly. You can get the storage location\n" - f" from the Unity Catalog volumes metadata API:\n" - f" GET /api/2.1/unity-catalog/volumes/{{volume_full_name}}\n" - f"{'=' * 70}", - UserWarning, - stacklevel=4, - ) - # Storage for metadata and credentials self._table_info: Optional[TableInfo] = None - self._volume_info: Optional[VolumeInfo] = None self._table_id: Optional[str] = None - self._volume_path: Optional[str] = None self._creds_response: Optional[CredentialsResponse] = None self._storage_url: Optional[str] = None self._gcp_temp_file: Optional[str] = None - @staticmethod - def _detect_volume_path(path: str) -> bool: - """ - Detect if the path refers to a volume or table. - - Unity Catalog paths follow these patterns: - - Table: catalog.schema.table (3 parts separated by dots) - - Volume: catalog.schema.volume/path/to/data (contains forward slash) - - Volume: /Volumes/catalog/schema/volume/path (starts with /Volumes/) - - Args: - path: Unity Catalog path - - Returns: - True if path refers to a volume, False for table - """ - # Check for /Volumes/ prefix (common pattern) - if path.startswith("/Volumes/"): - return True - - # Check if path contains a forward slash (indicating volume with subdirectory) - if "/" in path: - return True - - # Count the number of dot-separated parts - parts = path.split(".") - # If more than 3 parts, it's likely a volume (e.g., catalog.schema.volume.something) - # However, standard format is catalog.schema.volume/path, so this is a fallback - return len(parts) > 3 - - def _parse_volume_path(self) -> Tuple[str, str]: - """ - Parse volume path into volume identifier and subdirectory path. - - Handles multiple volume path formats: - - catalog.schema.volume/path/to/data - - /Volumes/catalog/schema/volume/path/to/data - - Returns: - Tuple of (volume_full_name, sub_path) - Examples: - - ("catalog.schema.volume", "path/to/data") - - ("catalog.schema.volume", "") - """ - path = self.path - - # Handle /Volumes/ prefix - if path.startswith("/Volumes/"): - path = path[9:] # len("/Volumes/") = 9 - parts = path.split("/") - if len(parts) < 3: - raise ValueError(f"Invalid /Volumes/ path: {self.path}") - volume_full_name = ".".join(parts[:3]) - sub_path = "/".join(parts[3:]) - return volume_full_name, sub_path - - if "/" in path: - parts = path.split("/", 1) - sub_path = parts[1] if len(parts) > 1 else "" - return parts[0], sub_path - - return path, "" - def _create_auth_headers(self) -> Dict[str, str]: """ Create authorization headers for Unity Catalog API requests. @@ -545,42 +343,19 @@ def _create_auth_headers(self) -> Dict[str, str]: """ return {"Authorization": f"Bearer {self.token}"} - def _fetch_volume_metadata(self) -> None: - """ - Fetch volume metadata from Unity Catalog volumes API. - - Sets self._volume_info and self._volume_path. - - Raises: - requests.HTTPError: If API request fails - """ - volume_full_name, _ = self._parse_volume_path() - url = f"{self.base_url}/api/2.1/unity-catalog/volumes/{volume_full_name}" - headers = self._create_auth_headers() - resp = requests.get(url, headers=headers) - resp.raise_for_status() - data = resp.json() - - # Parse and store volume information - volume_info = VolumeInfo.from_dict(data) - self._volume_info = volume_info - self._volume_path = volume_info.storage_location - def _fetch_table_metadata(self) -> TableInfo: """ Fetch table metadata from Unity Catalog tables API. - Sets self._table_info and self._table_id. - Returns: TableInfo object with table metadata Raises: requests.HTTPError: If API request fails """ - url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.path}" + url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table}" headers = self._create_auth_headers() - logger.debug(f"Fetching table metadata for '{self.path}' from {url}") + logger.debug(f"Fetching table metadata for '{self.table}' from {url}") resp = requests.get(url, headers=headers) resp.raise_for_status() data = resp.json() @@ -595,247 +370,49 @@ def _fetch_table_metadata(self) -> TableInfo: ) return table_info - def _get_table_info(self) -> Optional[TableInfo]: - """ - Fetch table or volume metadata from Unity Catalog API. - - API Endpoints: - Tables: GET /api/2.1/unity-catalog/tables/{full_name} - Volumes: GET /api/2.1/unity-catalog/volumes/{full_name} - - Returns: - TableInfo object for tables, None for volumes - - Raises: - requests.HTTPError: If API request fails - """ - if self._is_volume: - self._fetch_volume_metadata() - return None - else: - return self._fetch_table_metadata() - - def _build_credentials_request(self) -> Tuple[str, Dict[str, Any]]: - """ - Build credentials request URL and payload for Unity Catalog API. - - Returns: - Tuple of (url, payload) for the credentials API request - """ - if self._is_volume: - # Use volumes credential vending API (PRIVATE PREVIEW) - volume_full_name, _ = self._parse_volume_path() - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-volume-credentials" - payload = { - "volume_id": volume_full_name, - "operation": self.operation, - } - else: - # Use table credential vending API (PUBLIC/PRODUCTION) - if not self._table_id: - raise ValueError( - "Table ID not available. Call _get_table_info() first." - ) - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" - payload = { - "table_id": self._table_id, - "operation": self.operation, - } - return url, payload - - def _handle_volume_credential_error( - self, error: requests.HTTPError, url: str, payload: Dict[str, Any] - ) -> None: - """ - Handle credential vending errors for volumes with detailed error messages. - - Args: - error: The HTTP error from the credentials API request - url: The API endpoint URL that was called - payload: The request payload that was sent - - Raises: - ValueError: With detailed error message based on status code - """ - error_msg = ( - error.response.text if hasattr(error.response, "text") else str(error) - ) - - if error.response.status_code == 404: - raise ValueError( - f"\n{'=' * 70}\n" - f"Unity Catalog Volumes Credential Vending API Not Found\n" - f"{'=' * 70}\n" - f"Volume: {self.path}\n" - f"API Endpoint: {url}\n" - f"Status: 404 Not Found\n\n" - f"The volumes credential vending API doesn't exist in your\n" - f"Databricks workspace.\n\n" - f"POSSIBLE CAUSES:\n" - f" 1. Volumes private preview is not enabled\n" - f" 2. Your Databricks runtime doesn't support volumes\n" - f" 3. The API endpoint has changed (volumes in private preview)\n\n" - f"NEXT STEPS:\n" - f" - Contact Databricks support to enable volumes private preview\n" - f" - Verify your workspace supports Unity Catalog volumes\n\n" - f"WORKAROUND:\n" - f" Access storage directly using your own cloud credentials\n" - f" with ray.data.read_*() functions.\n" - f"{'=' * 70}\n" - f"Original error: {error}" - ) from error - - elif error.response.status_code == 400: - # Check for the specific "Missing required field: operation" error - if ( - "Missing required field" in error_msg - or "operation" in error_msg.lower() - ): - raise ValueError( - f"\n{'=' * 70}\n" - f"Unity Catalog Volumes Credential Vending Not Available\n" - f"{'=' * 70}\n" - f"Volume: {self.path}\n" - f"API Endpoint: {url}\n" - f"Status: 400 Bad Request\n" - f"Error: {error_msg}\n\n" - f"WHAT HAPPENED:\n" - f" The volumes credential vending API endpoint exists but is\n" - f" rejecting requests. This is a known issue with volumes in\n" - f" private preview.\n\n" - f"WHAT WE TRIED:\n" - f" - Requested temporary READ credentials\n" - f" - Payload: {payload}\n" - f" - Response: '{error_msg}'\n\n" - f"WHAT THIS MEANS:\n" - f" 1. API endpoint exists but isn't fully enabled in your\n" - f" workspace (requires additional configuration)\n" - f" 2. Your workspace may need explicit enablement\n" - f" 3. Additional feature flags may be required\n\n" - f"RECOMMENDED ACTIONS:\n" - f" 1. Contact Databricks support with this error message\n" - f" 2. Ask them to:\n" - f" - Verify volumes credential vending is enabled\n" - f" - Provide correct API payload format\n" - f" - Enable required feature flags\n" - f" 3. Request access to private preview documentation\n\n" - f"WORKAROUND:\n" - f" Use ray.data.read_*() with your own cloud credentials:\n" - f" 1. Get storage location from volumes metadata API:\n" - f" GET /api/2.1/unity-catalog/volumes/{self._parse_volume_path()[0]}\n" - f" 2. Use returned 'storage_location' with ray.data.read_*()\n" - f" 3. Provide your own AWS/Azure/GCP credentials\n" - f"{'=' * 70}" - ) from error - else: - raise ValueError( - f"Invalid request for volume '{self.path}': {error_msg}\n" - f"Status: {error.response.status_code}\n" - f"Ensure you have READ VOLUME and EXTERNAL USE SCHEMA permissions." - ) from error - - elif error.response.status_code == 403: - raise ValueError( - f"\n{'=' * 70}\n" - f"Permission Denied for Unity Catalog Volume\n" - f"{'=' * 70}\n" - f"Volume: {self.path}\n" - f"Status: 403 Forbidden\n" - f"Error: {error_msg}\n\n" - f"You don't have the required permissions.\n\n" - f"REQUIRED PERMISSIONS:\n" - f" - READ VOLUME on '{self.path}'\n" - f" - EXTERNAL USE SCHEMA on the parent schema\n" - f" - USE CATALOG on the parent catalog\n\n" - f"TO GRANT PERMISSIONS:\n" - f" A Databricks admin can run:\n" - f" GRANT READ VOLUME ON VOLUME {self.path} TO `your-user-or-group`;\n" - f"{'=' * 70}" - ) from error - def _get_creds(self): """ Request temporary credentials from Unity Catalog credential vending API. - Handles both table and volume credential requests with different support levels. - - Table Credentials (PRODUCTION READY): - - Endpoint: /api/2.1/unity-catalog/temporary-table-credentials - - Status: Public API, fully documented and supported - - Reliability: Stable and available in all workspaces - - Volume Credentials (PRIVATE PREVIEW - LIMITED AVAILABILITY): - - Endpoint: /api/2.1/unity-catalog/temporary-volume-credentials - - Status: Private preview, not publicly documented - - Known Issue: May return 400 "Missing required field: operation" - even when operation field is provided, indicating the API exists - but isn't fully enabled in the workspace + Uses the table credential vending API which is production-ready and publicly + documented. Raises: requests.HTTPError: If API request fails ValueError: If credential vending is not available or configured """ - url, payload = self._build_credentials_request() + if not self._table_id: + raise ValueError( + "Table ID not available. Call _fetch_table_metadata() first." + ) + + url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" + payload = { + "table_id": self._table_id, + "operation": "READ", + } headers = self._create_auth_headers() headers["Content-Type"] = "application/json" - resource_type = "volume" if self._is_volume else "table" + logger.debug(f"Requesting temporary credentials for table '{self.table}'") + + resp = requests.post(url, json=payload, headers=headers) + resp.raise_for_status() + + # Parse credentials response into structured dataclass + self._creds_response = CredentialsResponse.from_dict(resp.json()) logger.debug( - f"Requesting temporary credentials for {resource_type} '{self.path}'" + f"Successfully obtained credentials, " + f"cloud provider: {self._creds_response.cloud_provider.value}" ) - try: - resp = requests.post(url, json=payload, headers=headers) - resp.raise_for_status() - # Parse credentials response into structured dataclass - self._creds_response = CredentialsResponse.from_dict(resp.json()) - logger.debug( - f"Successfully obtained credentials for {resource_type}, " - f"cloud provider: {self._creds_response.cloud_provider.value}" - ) - except requests.HTTPError as e: - if self._is_volume: - self._handle_volume_credential_error(e, url, payload) - # Re-raise for non-volume errors or unhandled status codes - raise - # Extract and store storage URL from credentials response - self._extract_storage_url() - - def _extract_storage_url(self) -> None: - """ - Extract and set storage URL from credentials response. - - For volumes, constructs full path including subdirectory. - For tables, uses URL directly from response. - - Sets self._storage_url. - - Raises: - ValueError: If no storage URL is returned in credentials response - """ - if self._is_volume: - # For volumes, construct full path with subdirectory - volume_full_name, sub_path = self._parse_volume_path() - base_url = self._creds_response.url - if not base_url: - raise ValueError( - f"No storage URL returned for volume '{volume_full_name}'. " - f"Credentials response: {self._creds_response}" - ) - # Construct full path, handling trailing/leading slashes - if sub_path: - self._storage_url = f"{base_url.rstrip('/')}/{sub_path.lstrip('/')}" - else: - self._storage_url = base_url - else: - # For tables, use URL directly from response - if not self._creds_response.url: - raise ValueError( - f"No storage URL returned for table '{self.path}'. " - f"Credentials response: {self._creds_response}" - ) - self._storage_url = self._creds_response.url + if not self._creds_response.url: + raise ValueError( + f"No storage URL returned for table '{self.table}'. " + f"Credentials response: {self._creds_response}" + ) + self._storage_url = self._creds_response.url def _set_env(self): """ @@ -846,11 +423,8 @@ def _set_env(self): Supported cloud providers: - AWS S3: Sets AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN - (IAM temporary credentials) - Azure: Sets AZURE_STORAGE_SAS_TOKEN - (Shared Access Signature token) - GCP: Sets GCP_OAUTH_TOKEN or GOOGLE_APPLICATION_CREDENTIALS - (OAuth token or service account credentials) Raises: ValueError: If no recognized credential type is found in response @@ -896,7 +470,7 @@ def _set_env(self): mode="w", prefix="gcp_sa_", suffix=".json", - delete=False, # Don't delete immediately + delete=False, ) temp_file.write(creds.gcp_service_account_json) temp_file.close() @@ -904,7 +478,7 @@ def _set_env(self): env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name self._gcp_temp_file = temp_file.name - # Register cleanup on exit without capturing self to allow garbage collection + # Register cleanup on exit atexit.register(self._cleanup_gcp_temp_file_static, temp_file.name) else: raise ValueError("GCP credentials not found in response") @@ -917,76 +491,6 @@ def _set_env(self): for k, v in env_vars.items(): os.environ[k] = v - def _infer_data_format(self) -> str: - """ - Infer data format from metadata, explicit specification, or file extensions. - - Priority order: - 1. Explicitly specified format (self.data_format) - 2. Table data_source_format from metadata (for tables) - 3. File extension from storage location URL - - Returns: - Inferred format string (e.g., "delta", "parquet", "csv") - - Raises: - ValueError: If format cannot be inferred - """ - # Use explicit format if provided - if self.data_format: - return self.data_format - - # For tables, try to get format from metadata - if not self._is_volume and self._table_info: - if self._table_info.data_source_format: - return self._table_info.data_source_format.lower() - - # Try to infer from storage URL file extension - if self._storage_url: - # Extract extension from URL (handle query parameters) - url_path = self._storage_url.split("?")[0] - ext = os.path.splitext(url_path)[-1].replace(".", "").lower() - if ext and ext in _FILE_FORMAT_TO_RAY_READER: - return ext - - # Default to parquet for volumes if no extension found - if self._is_volume: - return "parquet" - - raise ValueError( - f"Could not infer data format for path '{self.path}'. " - f"Please specify the format explicitly using the 'data_format' parameter. " - f"Supported formats: {', '.join(_FILE_FORMAT_TO_RAY_READER.keys())}" - ) - - @staticmethod - def _get_ray_reader(data_format: str) -> Callable[..., "ray.data.Dataset"]: - """ - Get the appropriate Ray Data reader function for the specified format. - - Args: - data_format: Format name (e.g., "delta", "parquet", "images") - - Returns: - Ray Data reader function (e.g., ray.data.read_parquet) - - Raises: - ValueError: If format is not supported by Ray Data - """ - fmt = data_format.lower() - - # Check if format is in the standard mapping - if fmt in _FILE_FORMAT_TO_RAY_READER: - reader_name = _FILE_FORMAT_TO_RAY_READER[fmt] - reader_func = getattr(ray.data, reader_name, None) - if reader_func: - return reader_func - - raise ValueError( - f"Unsupported data format: '{fmt}'. " - f"Supported formats: {', '.join(sorted(_FILE_FORMAT_TO_RAY_READER.keys()))}" - ) - @staticmethod def _cleanup_gcp_temp_file_static(temp_file_path: str): """Clean up temporary GCP service account file if it exists.""" @@ -994,7 +498,6 @@ def _cleanup_gcp_temp_file_static(temp_file_path: str): try: os.unlink(temp_file_path) except OSError: - # File already deleted or inaccessible, ignore pass def _read_delta_with_credentials(self) -> "ray.data.Dataset": @@ -1003,7 +506,6 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": For Delta Lake tables on AWS S3, the deltalake library needs a configured PyArrow S3FileSystem to access tables with temporary session credentials. - This method creates the appropriate filesystem and passes it to read_delta. Returns: Ray Dataset containing the Delta table data @@ -1011,7 +513,7 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": Raises: ImportError: If deltalake or pyarrow is not installed ValueError: If credentials are not properly configured - RuntimeError: If Delta table uses unsupported features (e.g., Deletion Vectors) + RuntimeError: If Delta table uses unsupported features """ import pyarrow.fs as pafs @@ -1036,13 +538,12 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": elif creds.cloud_provider == CloudProvider.AZURE: # For Azure, the deltalake library can use environment variables - # that were set in _set_env(), so no special filesystem needed - # The AZURE_STORAGE_SAS_TOKEN will be picked up automatically + # that were set in _set_env() filesystem = None elif creds.cloud_provider == CloudProvider.GCP: # For GCP, the deltalake library can use environment variables - # that were set in _set_env() (GCP_OAUTH_TOKEN or GOOGLE_APPLICATION_CREDENTIALS) + # that were set in _set_env() filesystem = None # Merge filesystem into reader_kwargs if not already present @@ -1061,19 +562,19 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": or "Unsupported reader features" in error_msg ): raise RuntimeError( - f"Delta table at '{self.path}' uses Deletion Vectors, which requires " + f"Delta table at '{self.table}' uses Deletion Vectors, which requires " f"deltalake library version 0.10.0 or higher. Current error: {error_msg}\n\n" f"Solutions:\n" f" 1. Upgrade deltalake: pip install --upgrade deltalake>=0.10.0\n" f" 2. Disable deletion vectors on the table in Databricks:\n" - f" ALTER TABLE {self.path} SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false');\n" + f" ALTER TABLE {self.table} SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false');\n" f" Then run VACUUM to apply changes.\n" f" 3. Use ray.data.read_parquet() to read the underlying Parquet files directly " f"(note: this bypasses Delta Lake transaction log and may include deleted records)." ) from e elif "ColumnMappingMode" in error_msg: raise RuntimeError( - f"Delta table at '{self.path}' uses column mapping, which may not be fully " + f"Delta table at '{self.table}' uses column mapping, which may not be fully " f"supported by your deltalake library version. Current error: {error_msg}\n\n" f"Solutions:\n" f" 1. Upgrade deltalake: pip install --upgrade deltalake\n" @@ -1082,7 +583,7 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": else: # Re-raise other errors with additional context raise RuntimeError( - f"Failed to read Delta table at '{self.path}': {error_msg}\n\n" + f"Failed to read Delta table at '{self.table}': {error_msg}\n\n" f"This may be due to:\n" f" - Unsupported Delta Lake features\n" f" - Credential or permission issues\n" @@ -1093,53 +594,34 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": def read(self) -> "ray.data.Dataset": """ - Read data from Unity Catalog table or volume into a Ray Dataset. + Read Unity Catalog Delta table into a Ray Dataset. This is the main entry point for reading data. It orchestrates: - 1. Fetch metadata from Unity Catalog (for tables) or validate volume access + 1. Fetch metadata from Unity Catalog 2. Obtain temporary credentials via Unity Catalog credential vending 3. Configure cloud credentials in the current process environment - 4. Read data using the appropriate Ray Data reader or custom datasource + 4. Read data using ray.data.read_delta() The credentials are set in the current process environment and will be inherited by Ray Data read tasks automatically. Returns: - Ray Dataset containing the data from the specified table or volume + Ray Dataset containing the data from the specified Delta table Raises: - ValueError: If configuration is invalid, path cannot be accessed, or - format cannot be inferred + ValueError: If configuration is invalid or table cannot be accessed requests.HTTPError: If Unity Catalog API requests fail """ - logger.info(f"Reading Unity Catalog path: {self.path}") + logger.info(f"Reading Unity Catalog Delta table: {self.table}") - # Step 1: Get metadata (for both tables and volumes) - self._get_table_info() + # Step 1: Get table metadata + self._fetch_table_metadata() # Step 2: Get temporary credentials from Unity Catalog - # This may fail for volumes - see detailed error messages in _get_creds() self._get_creds() # Step 3: Configure cloud credentials in current process environment - # Ray Data readers will inherit these environment variables self._set_env() - # Step 4: Read data using custom datasource or standard Ray Data reader - if self.custom_datasource is not None: - # Use custom datasource if provided - # Custom datasources must accept (storage_url, **reader_kwargs) constructor - datasource_instance = self.custom_datasource( - self._storage_url, **self.reader_kwargs - ) - return ray.data.read_datasource(datasource_instance) - else: - # Use standard Ray Data reader based on inferred or specified format - data_format = self._infer_data_format() - - # Special handling for Delta format with Unity Catalog credentials - if data_format.lower() == "delta": - return self._read_delta_with_credentials() - else: - reader = self._get_ray_reader(data_format) - return reader(self._storage_url, **self.reader_kwargs) + # Step 4: Read Delta table with Unity Catalog credentials + return self._read_delta_with_credentials() diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index bc7637a2a1d7..7d7d61eb5571 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -12,7 +12,6 @@ Optional, Set, Tuple, - Type, TypeVar, Union, ) @@ -3155,7 +3154,7 @@ def to_ref(df): return df else: raise ValueError( - "Expected a Ray object ref or a Pandas DataFrame, " f"got {type(df)}" + f"Expected a Ray object ref or a Pandas DataFrame, got {type(df)}" ) ds = from_pandas_refs( @@ -3285,12 +3284,11 @@ def from_pandas_refs( for df in dfs: if not isinstance(df, ray.ObjectRef): raise ValueError( - "Expected list of Ray object refs, " - f"got list containing {type(df)}" + f"Expected list of Ray object refs, got list containing {type(df)}" ) else: raise ValueError( - "Expected Ray object ref or list of Ray object refs, " f"got {type(df)}" + f"Expected Ray object ref or list of Ray object refs, got {type(df)}" ) context = DataContext.get_current() @@ -4211,21 +4209,19 @@ def read_clickhouse( @PublicAPI(stability="alpha") def read_unity_catalog( - path: str, + table: str, url: str, token: str, *, - format: Optional[str] = None, - datasource: Optional[Type[Datasource]] = None, region: Optional[str] = None, reader_kwargs: Optional[dict] = None, ) -> Dataset: - """Creates a :class:`~ray.data.Dataset` from Unity Catalog tables or volumes. + """Creates a :class:`~ray.data.Dataset` from Unity Catalog Delta tables. - This function reads data from Databricks Unity Catalog using credential vending, - which provides temporary, least-privilege credentials for secure access to cloud - storage. The function authenticates through the Unity Catalog REST API and supports - reading from AWS S3, Azure Data Lake Storage, and Google Cloud Storage. + This function reads Delta Lake tables from Databricks Unity Catalog using credential + vending, which provides temporary, least-privilege credentials for secure access to + cloud storage. The function authenticates through the Unity Catalog REST API and + supports reading from AWS S3, Azure Data Lake Storage, and Google Cloud Storage. .. note:: @@ -4242,80 +4238,44 @@ def read_unity_catalog( >>> import ray >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP - ... path="main.sales.transactions", + ... table="main.sales.transactions", ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", ... token="dapi..." ... ) >>> ds.show(3) # doctest: +SKIP - Read images from a Unity Catalog volume: - - >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP - ... path="main.ml_data.images/training/cats", - ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", - ... token="dapi...", - ... format="images" - ... ) - - Read Parquet files with custom reader options: + Read Delta table with custom reader options: >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP - ... path="main.analytics.events", + ... table="main.analytics.events", ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", ... token="dapi...", - ... format="parquet", ... region="us-west-2", ... reader_kwargs={"columns": ["user_id", "timestamp"], "override_num_blocks": 100} ... ) - Use a custom datasource: - - >>> from ray.data.datasource import Datasource - >>> class MyCustomDatasource(Datasource): # doctest: +SKIP - ... def __init__(self, paths, **kwargs): - ... # Custom implementation - ... pass - ... - >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP - ... path="main.raw_data.custom_format", - ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", - ... token="dapi...", - ... datasource=MyCustomDatasource - ... ) - Args: - path: Unity Catalog path. For tables, use the format ``catalog.schema.table``. - For volumes, use ``catalog.schema.volume/path/to/data``. + table: Unity Catalog table path in format ``catalog.schema.table``. url: Databricks workspace URL. For example, ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``. token: Databricks Personal Access Token. The token must have ``EXTERNAL USE SCHEMA`` - permission on the schema containing the table or volume. - format: Data format. If not specified, the format is inferred from table - metadata or file extension. Supported formats include ``"delta"``, - ``"parquet"``, ``"csv"``, ``"json"``, ``"text"``, ``"images"``, ``"avro"``, - ``"numpy"``, ``"binary"``, ``"videos"``, ``"audio"``, ``"lance"``, - ``"iceberg"``, and ``"hudi"``. - datasource: Custom Ray Data :class:`~ray.data.Datasource` class. If provided, - this datasource is used instead of the standard format-based readers. + permission on the schema containing the table. region: AWS region for S3 credential configuration. Only required for AWS S3. - reader_kwargs: Additional arguments passed to the underlying Ray Data reader - function. The supported arguments depend on the format. For example, - for Parquet files, you can pass arguments from - :meth:`~ray.data.read_parquet`. + reader_kwargs: Additional arguments passed to :meth:`~ray.data.read_delta`. + For example, you can specify ``columns`` to read only specific columns, + or ``override_num_blocks`` to control parallelism. Returns: - A :class:`~ray.data.Dataset` containing the data from Unity Catalog. + A :class:`~ray.data.Dataset` containing the data from the Unity Catalog Delta table. References: - Databricks Credential Vending: https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html - - Unity Catalog Volumes: https://docs.databricks.com/en/connect/unity-catalog/volumes.html + - Unity Catalog: https://docs.databricks.com/en/data-governance/unity-catalog/ """ connector = UnityCatalogConnector( base_url=url, token=token, - path=path, - data_format=format, - custom_datasource=datasource, + table=table, region=region, reader_kwargs=reader_kwargs, ) From 610b88756505622e066853256dc6ba0b49da1adb Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Tue, 21 Oct 2025 11:10:33 -0600 Subject: [PATCH 7/9] Make region parameter required for AWS S3 access The hardcoded fallback to 'us-east-1' could lead to connection errors if the S3 bucket is in a different region and requires signature version 4. Changes: - Remove hardcoded 'us-east-1' fallback in S3FileSystem creation - Add explicit validation that region is provided for AWS - Provide clear error message with example usage - Update docstrings to clarify region is required for AWS - Improve error handling flow in _read_delta_with_credentials() This prevents hard-to-debug runtime failures and makes the API clearer for users working with AWS S3-backed Delta tables. Signed-off-by: soffer-anyscale --- .../datasource/unity_catalog_datasource.py | 27 ++++++++++++------- python/ray/data/read_api.py | 3 ++- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py index d39be63c73e1..fb3d00c721f2 100644 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ b/python/ray/data/_internal/datasource/unity_catalog_datasource.py @@ -304,7 +304,8 @@ class UnityCatalogConnector: base_url: Databricks workspace URL (e.g., "https://dbc-xxx.cloud.databricks.com") token: Databricks Personal Access Token with appropriate permissions table: Unity Catalog table path in format "catalog.schema.table" - region: Optional AWS region for S3 credential configuration + region: AWS region for S3 bucket (required for AWS, e.g., "us-west-2"). + Not required for Azure or GCP. reader_kwargs: Additional arguments passed to ray.data.read_delta() References: @@ -523,19 +524,27 @@ def _read_delta_with_credentials(self) -> "ray.data.Dataset": if creds.cloud_provider == CloudProvider.AWS: # Create PyArrow S3FileSystem with temporary credentials # https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html - if creds.aws_credentials: - filesystem = pafs.S3FileSystem( - access_key=creds.aws_credentials.access_key_id, - secret_key=creds.aws_credentials.secret_access_key, - session_token=creds.aws_credentials.session_token, - region=self.region or "us-east-1", - ) - else: + if not creds.aws_credentials: raise ValueError( "AWS credentials not found in Unity Catalog response. " "Cannot read Delta table without credentials." ) + if not self.region: + raise ValueError( + "The 'region' parameter is required for AWS S3 access. " + "Please specify the AWS region of your S3 bucket (e.g., 'us-west-2'). " + f"Example: ray.data.read_unity_catalog(table='{self.table}', url='...', " + "token='...', region='us-west-2')" + ) + + filesystem = pafs.S3FileSystem( + access_key=creds.aws_credentials.access_key_id, + secret_key=creds.aws_credentials.secret_access_key, + session_token=creds.aws_credentials.session_token, + region=self.region, + ) + elif creds.cloud_provider == CloudProvider.AZURE: # For Azure, the deltalake library can use environment variables # that were set in _set_env() diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 7d7d61eb5571..f4d5e7be96b1 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4260,7 +4260,8 @@ def read_unity_catalog( ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``. token: Databricks Personal Access Token. The token must have ``EXTERNAL USE SCHEMA`` permission on the schema containing the table. - region: AWS region for S3 credential configuration. Only required for AWS S3. + region: AWS region for S3 bucket (e.g., ``"us-west-2"``). Required for AWS S3, + not required for Azure or GCP. reader_kwargs: Additional arguments passed to :meth:`~ray.data.read_delta`. For example, you can specify ``columns`` to read only specific columns, or ``override_num_blocks`` to control parallelism. From f2fcff06f6237a61468a80edff013d13e35315a7 Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Tue, 21 Oct 2025 11:45:11 -0600 Subject: [PATCH 8/9] Simplify to minimal uc_datasource.py based on master Reduced from 637 lines to 193 lines by starting from master's uc_datasource.py and adding only essential changes: What's added (56 lines over master's 137 lines): - PyArrow S3FileSystem support for Delta with session tokens (AWS requirement) - Required region validation for AWS to prevent connection errors - Deletion vector error handling with helpful messages - GCP temp file cleanup with atexit to prevent file leaks Changes from previous version: - Renamed unity_catalog_datasource.py -> uc_datasource.py (matches master) - Removed all dataclasses, enums, and complex structures - Removed volume support (not needed for initial release) - Kept simple dict-based approach from master - Restored data_format parameter for flexibility - Minimal API surface matching master's design Net change: -459 lines (removed 674, added 215) File: 193 lines (vs 137 in master = +56 lines of essential fixes) Signed-off-by: soffer-anyscale --- .../_internal/datasource/uc_datasource.py | 195 ++++++ .../datasource/unity_catalog_datasource.py | 636 ------------------ python/ray/data/read_api.py | 58 +- 3 files changed, 215 insertions(+), 674 deletions(-) create mode 100644 python/ray/data/_internal/datasource/uc_datasource.py delete mode 100644 python/ray/data/_internal/datasource/unity_catalog_datasource.py diff --git a/python/ray/data/_internal/datasource/uc_datasource.py b/python/ray/data/_internal/datasource/uc_datasource.py new file mode 100644 index 000000000000..9b81dddfa5b8 --- /dev/null +++ b/python/ray/data/_internal/datasource/uc_datasource.py @@ -0,0 +1,195 @@ +import atexit +import os +import tempfile +from typing import Any, Callable, Dict, Optional + +import requests + +import ray + +_FILE_FORMAT_TO_RAY_READER = { + "delta": "read_delta", + "parquet": "read_parquet", +} + + +class UnityCatalogConnector: + """ + Load a Unity Catalog table or files into a Ray Dataset, handling cloud credentials automatically. + + Currently only supports Databricks-managed Unity Catalog + + Supported formats: delta, parquet. + Supports AWS, Azure, and GCP with automatic credential handoff. + """ + + def __init__( + self, + *, + base_url: str, + token: str, + table_full_name: str, + region: Optional[str] = None, + data_format: Optional[str] = "delta", + operation: str = "READ", + ray_init_kwargs: Optional[Dict] = None, + reader_kwargs: Optional[Dict] = None, + ): + self.base_url = base_url.rstrip("/") + self.token = token + self.table_full_name = table_full_name + self.data_format = data_format.lower() if data_format else None + self.region = region + self.operation = operation + self.ray_init_kwargs = ray_init_kwargs or {} + self.reader_kwargs = reader_kwargs or {} + self._gcp_temp_file = None + + def _get_table_info(self) -> dict: + url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table_full_name}" + headers = {"Authorization": f"Bearer {self.token}"} + resp = requests.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + self._table_info = data + self._table_id = data["table_id"] + return data + + def _get_creds(self): + url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.token}", + } + payload = {"table_id": self._table_id, "operation": self.operation} + resp = requests.post(url, json=payload, headers=headers) + resp.raise_for_status() + self._creds_response = resp.json() + self._table_url = self._creds_response["url"] + + def _set_env(self): + env_vars = {} + creds = self._creds_response + + if "aws_temp_credentials" in creds: + aws = creds["aws_temp_credentials"] + env_vars["AWS_ACCESS_KEY_ID"] = aws["access_key_id"] + env_vars["AWS_SECRET_ACCESS_KEY"] = aws["secret_access_key"] + env_vars["AWS_SESSION_TOKEN"] = aws["session_token"] + if self.region: + env_vars["AWS_REGION"] = self.region + env_vars["AWS_DEFAULT_REGION"] = self.region + elif "azuresasuri" in creds: + env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds["azuresasuri"] + elif "gcp_service_account" in creds: + gcp_json = creds["gcp_service_account"] + temp_file = tempfile.NamedTemporaryFile( + mode="w", + prefix="gcp_sa_", + suffix=".json", + delete=False, + ) + temp_file.write(gcp_json) + temp_file.close() + env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name + self._gcp_temp_file = temp_file.name + atexit.register(self._cleanup_gcp_temp_file, temp_file.name) + else: + raise ValueError( + "No known credential type found in Databricks UC response." + ) + + for k, v in env_vars.items(): + os.environ[k] = v + self._runtime_env = {"env_vars": env_vars} + + @staticmethod + def _cleanup_gcp_temp_file(temp_file_path: str): + """Clean up temporary GCP service account file.""" + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except OSError: + pass + + def _infer_data_format(self) -> str: + if self.data_format: + return self.data_format + + info = self._table_info or self._get_table_info() + if "data_source_format" in info and info["data_source_format"]: + fmt = info["data_source_format"].lower() + return fmt + + storage_loc = info.get("storage_location") or getattr(self, "_table_url", None) + if storage_loc: + ext = os.path.splitext(storage_loc)[-1].replace(".", "").lower() + if ext in _FILE_FORMAT_TO_RAY_READER: + return ext + + raise ValueError("Could not infer data format from table metadata.") + + def _get_ray_reader(self, data_format: str) -> Callable[..., Any]: + fmt = data_format.lower() + if fmt in _FILE_FORMAT_TO_RAY_READER: + reader_func = getattr(ray.data, _FILE_FORMAT_TO_RAY_READER[fmt], None) + if reader_func: + return reader_func + raise ValueError(f"Unsupported data format: {fmt}") + + def _read_delta_with_credentials(self): + """Read Delta table with proper PyArrow filesystem for session tokens.""" + import pyarrow.fs as pafs + + creds = self._creds_response + reader_kwargs = self.reader_kwargs.copy() + + # For AWS, create PyArrow S3FileSystem with session tokens + if "aws_temp_credentials" in creds: + if not self.region: + raise ValueError( + "The 'region' parameter is required for AWS S3 access. " + "Please specify the AWS region (e.g., region='us-west-2')." + ) + aws = creds["aws_temp_credentials"] + filesystem = pafs.S3FileSystem( + access_key=aws["access_key_id"], + secret_key=aws["secret_access_key"], + session_token=aws["session_token"], + region=self.region, + ) + reader_kwargs["filesystem"] = filesystem + + # Call ray.data.read_delta with proper error handling + try: + return ray.data.read_delta(self._table_url, **reader_kwargs) + except Exception as e: + error_msg = str(e) + if ( + "DeletionVectors" in error_msg + or "Unsupported reader features" in error_msg + ): + raise RuntimeError( + f"Delta table uses Deletion Vectors, which requires deltalake>=0.10.0. " + f"Error: {error_msg}\n" + f"Solution: pip install --upgrade 'deltalake>=0.10.0'" + ) from e + raise + + def read(self): + self._get_table_info() + self._get_creds() + self._set_env() + + data_format = self._infer_data_format() + + if not ray.is_initialized(): + ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs) + + # Use special Delta reader for proper filesystem handling + if data_format == "delta": + return self._read_delta_with_credentials() + + # Use standard reader for other formats + reader = self._get_ray_reader(data_format) + return reader(self._table_url, **self.reader_kwargs) diff --git a/python/ray/data/_internal/datasource/unity_catalog_datasource.py b/python/ray/data/_internal/datasource/unity_catalog_datasource.py deleted file mode 100644 index fb3d00c721f2..000000000000 --- a/python/ray/data/_internal/datasource/unity_catalog_datasource.py +++ /dev/null @@ -1,636 +0,0 @@ -import atexit -import logging -import os -import tempfile -from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, List, Optional - -import requests # HTTP library: https://requests.readthedocs.io/ - -import ray -from ray.util.annotations import DeveloperAPI - -logger = logging.getLogger(__name__) - -# Unity Catalog REST API Documentation: -# https://docs.databricks.com/api/workspace/unity-catalog -# Credential Vending API: -# https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html - - -@dataclass -class ColumnInfo: - """Column metadata from Unity Catalog table schema. - - Reference: https://docs.databricks.com/api/workspace/tables/get - """ - - name: str - type_text: str - type_name: str - position: int - type_precision: int - type_scale: int - type_json: str - nullable: bool - comment: Optional[str] = None - partition_index: Optional[int] = None - - @staticmethod - def from_dict(obj: Dict) -> "ColumnInfo": - """ - Safely construct ColumnInfo from Unity Catalog API response. - - Args: - obj: Dictionary from Unity Catalog tables API columns field - - Returns: - ColumnInfo instance with extracted fields - """ - return ColumnInfo( - name=obj["name"], - type_text=obj["type_text"], - type_name=obj["type_name"], - position=obj["position"], - type_precision=obj.get("type_precision", 0), - type_scale=obj.get("type_scale", 0), - type_json=obj.get("type_json", ""), - nullable=obj.get("nullable", True), - comment=obj.get("comment"), - partition_index=obj.get("partition_index"), - ) - - -@dataclass -class EffectiveFlag: - """Flag indicating effective settings inherited from parent resources.""" - - value: str - inherited_from_type: str - inherited_from_name: str - - -@dataclass -class TableInfo: - """Metadata for Unity Catalog tables. - - Represents the response from the Unity Catalog tables API. - """ - - name: str - catalog_name: str - schema_name: str - table_type: str - data_source_format: str - columns: List[ColumnInfo] - storage_location: str - owner: str - properties: Dict[str, str] - securable_kind: str - enable_auto_maintenance: str - enable_predictive_optimization: str - properties_pairs: Dict[str, Any] - generation: int - metastore_id: str - full_name: str - data_access_configuration_id: str - created_at: int - created_by: str - updated_at: int - updated_by: str - table_id: str - delta_runtime_properties_kvpairs: Dict[str, Any] - securable_type: str - effective_auto_maintenance_flag: Optional[EffectiveFlag] = None - effective_predictive_optimization_flag: Optional[EffectiveFlag] = None - browse_only: Optional[bool] = None - metastore_version: Optional[int] = None - - @staticmethod - def from_dict(obj: Dict) -> "TableInfo": - """ - Parse table metadata from Unity Catalog API response. - - Args: - obj: Dictionary from Unity Catalog tables API response - - Returns: - TableInfo instance with parsed metadata - """ - # Parse optional nested structures - if obj.get("effective_auto_maintenance_flag"): - effective_auto_maintenance_flag = EffectiveFlag( - **obj["effective_auto_maintenance_flag"] - ) - else: - effective_auto_maintenance_flag = None - - if obj.get("effective_predictive_optimization_flag"): - effective_predictive_optimization_flag = EffectiveFlag( - **obj["effective_predictive_optimization_flag"] - ) - else: - effective_predictive_optimization_flag = None - - return TableInfo( - name=obj["name"], - catalog_name=obj["catalog_name"], - schema_name=obj["schema_name"], - table_type=obj["table_type"], - data_source_format=obj.get("data_source_format", ""), - columns=[ColumnInfo.from_dict(col) for col in obj.get("columns", [])], - storage_location=obj.get("storage_location", ""), - owner=obj.get("owner", ""), - properties=obj.get("properties", {}), - securable_kind=obj.get("securable_kind", ""), - enable_auto_maintenance=obj.get("enable_auto_maintenance", ""), - enable_predictive_optimization=obj.get( - "enable_predictive_optimization", "" - ), - properties_pairs=obj.get("properties_pairs", {}), - generation=obj.get("generation", 0), - metastore_id=obj.get("metastore_id", ""), - full_name=obj.get("full_name", ""), - data_access_configuration_id=obj.get("data_access_configuration_id", ""), - created_at=obj.get("created_at", 0), - created_by=obj.get("created_by", ""), - updated_at=obj.get("updated_at", 0), - updated_by=obj.get("updated_by", ""), - table_id=obj.get("table_id", ""), - delta_runtime_properties_kvpairs=obj.get( - "delta_runtime_properties_kvpairs", {} - ), - securable_type=obj.get("securable_type", ""), - effective_auto_maintenance_flag=effective_auto_maintenance_flag, - effective_predictive_optimization_flag=effective_predictive_optimization_flag, - browse_only=obj.get("browse_only", False), - metastore_version=obj.get("metastore_version", 0), - ) - - -class CloudProvider(Enum): - """Cloud provider types for credential vending.""" - - AWS = "aws" - AZURE = "azure" - GCP = "gcp" - - -@dataclass -class AWSCredentials: - """AWS temporary credentials from Unity Catalog credential vending.""" - - access_key_id: str - secret_access_key: str - session_token: str - - @staticmethod - def from_dict(obj: Dict) -> "AWSCredentials": - return AWSCredentials( - access_key_id=obj["access_key_id"], - secret_access_key=obj["secret_access_key"], - session_token=obj["session_token"], - ) - - -@dataclass -class AzureSASCredentials: - """Azure SAS token credentials from Unity Catalog credential vending.""" - - sas_token: str - - @staticmethod - def from_dict(obj: Dict) -> "AzureSASCredentials": - return AzureSASCredentials(sas_token=obj["sas_token"]) - - -@dataclass -class GCPOAuthCredentials: - """GCP OAuth token credentials from Unity Catalog credential vending.""" - - oauth_token: str - - @staticmethod - def from_dict(obj: Dict) -> "GCPOAuthCredentials": - return GCPOAuthCredentials(oauth_token=obj["oauth_token"]) - - -@dataclass -class CredentialsResponse: - """ - Response from Unity Catalog credential vending API. - - Contains cloud-specific temporary credentials and storage URL. - """ - - url: str - cloud_provider: CloudProvider - aws_credentials: Optional[AWSCredentials] = None - azure_credentials: Optional[AzureSASCredentials] = None - gcp_credentials: Optional[GCPOAuthCredentials] = None - azure_sas_uri: Optional[str] = None - gcp_service_account_json: Optional[str] = None - - @staticmethod - def from_dict(obj: Dict) -> "CredentialsResponse": - """ - Parse credentials response from Unity Catalog API. - - Handles multiple credential formats for each cloud provider. - """ - url = obj.get("url", "") - - # Determine cloud provider and parse credentials - if "aws_temp_credentials" in obj: - return CredentialsResponse( - url=url, - cloud_provider=CloudProvider.AWS, - aws_credentials=AWSCredentials.from_dict(obj["aws_temp_credentials"]), - ) - elif "azure_user_delegation_sas" in obj: - return CredentialsResponse( - url=url, - cloud_provider=CloudProvider.AZURE, - azure_credentials=AzureSASCredentials.from_dict( - obj["azure_user_delegation_sas"] - ), - ) - elif "azuresasuri" in obj: - # Legacy Azure format - return CredentialsResponse( - url=url, - cloud_provider=CloudProvider.AZURE, - azure_sas_uri=obj["azuresasuri"], - ) - elif "gcp_oauth_token" in obj: - return CredentialsResponse( - url=url, - cloud_provider=CloudProvider.GCP, - gcp_credentials=GCPOAuthCredentials.from_dict(obj["gcp_oauth_token"]), - ) - elif "gcp_service_account" in obj: - # Legacy GCP format - return CredentialsResponse( - url=url, - cloud_provider=CloudProvider.GCP, - gcp_service_account_json=obj["gcp_service_account"], - ) - else: - raise ValueError( - f"No recognized credential type in response. " - f"Available keys: {list(obj.keys())}" - ) - - -@DeveloperAPI -class UnityCatalogConnector: - """ - Connector for reading Unity Catalog Delta tables into Ray Datasets. - - This connector handles automatic credential vending for secure access to cloud - storage backing Unity Catalog Delta tables. It supports AWS S3, Azure Data Lake - Storage, and Google Cloud Storage with temporary, least-privilege credentials. - - This implementation specifically focuses on Delta Lake tables, which is the most - common format in Unity Catalog deployments. - - Cloud providers: - - AWS S3 (with temporary IAM credentials) - - Azure Data Lake Storage (with SAS tokens) - - Google Cloud Storage (with OAuth tokens or service account) - - Args: - base_url: Databricks workspace URL (e.g., "https://dbc-xxx.cloud.databricks.com") - token: Databricks Personal Access Token with appropriate permissions - table: Unity Catalog table path in format "catalog.schema.table" - region: AWS region for S3 bucket (required for AWS, e.g., "us-west-2"). - Not required for Azure or GCP. - reader_kwargs: Additional arguments passed to ray.data.read_delta() - - References: - - Credential Vending: https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html - - Unity Catalog: https://docs.databricks.com/en/data-governance/unity-catalog/ - """ - - def __init__( - self, - *, - base_url: str, - token: str, - table: str, - region: Optional[str] = None, - reader_kwargs: Optional[Dict] = None, - ): - self.base_url = base_url.rstrip("/") - self.token = token - self.table = table - self.region = region - self.reader_kwargs = reader_kwargs or {} - - # Storage for metadata and credentials - self._table_info: Optional[TableInfo] = None - self._table_id: Optional[str] = None - self._creds_response: Optional[CredentialsResponse] = None - self._storage_url: Optional[str] = None - self._gcp_temp_file: Optional[str] = None - - def _create_auth_headers(self) -> Dict[str, str]: - """ - Create authorization headers for Unity Catalog API requests. - - Returns: - Dictionary with Authorization header - """ - return {"Authorization": f"Bearer {self.token}"} - - def _fetch_table_metadata(self) -> TableInfo: - """ - Fetch table metadata from Unity Catalog tables API. - - Returns: - TableInfo object with table metadata - - Raises: - requests.HTTPError: If API request fails - """ - url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table}" - headers = self._create_auth_headers() - logger.debug(f"Fetching table metadata for '{self.table}' from {url}") - resp = requests.get(url, headers=headers) - resp.raise_for_status() - data = resp.json() - - # Parse table information - table_info = TableInfo.from_dict(data) - self._table_info = table_info - self._table_id = table_info.table_id - logger.debug( - f"Retrieved table metadata: format={table_info.data_source_format}, " - f"storage={table_info.storage_location}" - ) - return table_info - - def _get_creds(self): - """ - Request temporary credentials from Unity Catalog credential vending API. - - Uses the table credential vending API which is production-ready and publicly - documented. - - Raises: - requests.HTTPError: If API request fails - ValueError: If credential vending is not available or configured - """ - if not self._table_id: - raise ValueError( - "Table ID not available. Call _fetch_table_metadata() first." - ) - - url = f"{self.base_url}/api/2.1/unity-catalog/temporary-table-credentials" - payload = { - "table_id": self._table_id, - "operation": "READ", - } - headers = self._create_auth_headers() - headers["Content-Type"] = "application/json" - - logger.debug(f"Requesting temporary credentials for table '{self.table}'") - - resp = requests.post(url, json=payload, headers=headers) - resp.raise_for_status() - - # Parse credentials response into structured dataclass - self._creds_response = CredentialsResponse.from_dict(resp.json()) - logger.debug( - f"Successfully obtained credentials, " - f"cloud provider: {self._creds_response.cloud_provider.value}" - ) - - # Extract and store storage URL from credentials response - if not self._creds_response.url: - raise ValueError( - f"No storage URL returned for table '{self.table}'. " - f"Credentials response: {self._creds_response}" - ) - self._storage_url = self._creds_response.url - - def _set_env(self): - """ - Configure cloud-specific environment variables for credential access. - - Sets up temporary credentials in the Ray runtime environment for - AWS S3, Azure Blob Storage, or Google Cloud Storage. - - Supported cloud providers: - - AWS S3: Sets AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN - - Azure: Sets AZURE_STORAGE_SAS_TOKEN - - GCP: Sets GCP_OAUTH_TOKEN or GOOGLE_APPLICATION_CREDENTIALS - - Raises: - ValueError: If no recognized credential type is found in response - """ - env_vars = {} - creds = self._creds_response - - if creds.cloud_provider == CloudProvider.AWS: - # AWS S3 temporary credentials - # https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html - if creds.aws_credentials: - env_vars["AWS_ACCESS_KEY_ID"] = creds.aws_credentials.access_key_id - env_vars["AWS_SECRET_ACCESS_KEY"] = ( - creds.aws_credentials.secret_access_key - ) - env_vars["AWS_SESSION_TOKEN"] = creds.aws_credentials.session_token - if self.region: - env_vars["AWS_REGION"] = self.region - env_vars["AWS_DEFAULT_REGION"] = self.region - else: - raise ValueError("AWS credentials not found in response") - - elif creds.cloud_provider == CloudProvider.AZURE: - # Azure Blob Storage SAS token - # https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview - if creds.azure_credentials: - env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds.azure_credentials.sas_token - elif creds.azure_sas_uri: - # Legacy Azure format - env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds.azure_sas_uri - else: - raise ValueError("Azure credentials not found in response") - - elif creds.cloud_provider == CloudProvider.GCP: - # Google Cloud Platform credentials - # https://cloud.google.com/docs/authentication/token-types#access - if creds.gcp_credentials: - env_vars["GCP_OAUTH_TOKEN"] = creds.gcp_credentials.oauth_token - elif creds.gcp_service_account_json: - # Legacy GCP service account format - # Create a temporary file that persists for the session - temp_file = tempfile.NamedTemporaryFile( - mode="w", - prefix="gcp_sa_", - suffix=".json", - delete=False, - ) - temp_file.write(creds.gcp_service_account_json) - temp_file.close() - - env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name - self._gcp_temp_file = temp_file.name - - # Register cleanup on exit - atexit.register(self._cleanup_gcp_temp_file_static, temp_file.name) - else: - raise ValueError("GCP credentials not found in response") - - else: - raise ValueError(f"Unrecognized cloud provider: {creds.cloud_provider}") - - # Set environment variables in current process - # Ray Data readers will inherit these environment variables - for k, v in env_vars.items(): - os.environ[k] = v - - @staticmethod - def _cleanup_gcp_temp_file_static(temp_file_path: str): - """Clean up temporary GCP service account file if it exists.""" - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except OSError: - pass - - def _read_delta_with_credentials(self) -> "ray.data.Dataset": - """ - Read Delta Lake table with Unity Catalog credentials. - - For Delta Lake tables on AWS S3, the deltalake library needs a configured - PyArrow S3FileSystem to access tables with temporary session credentials. - - Returns: - Ray Dataset containing the Delta table data - - Raises: - ImportError: If deltalake or pyarrow is not installed - ValueError: If credentials are not properly configured - RuntimeError: If Delta table uses unsupported features - """ - import pyarrow.fs as pafs - - creds = self._creds_response - filesystem = None - - if creds.cloud_provider == CloudProvider.AWS: - # Create PyArrow S3FileSystem with temporary credentials - # https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html - if not creds.aws_credentials: - raise ValueError( - "AWS credentials not found in Unity Catalog response. " - "Cannot read Delta table without credentials." - ) - - if not self.region: - raise ValueError( - "The 'region' parameter is required for AWS S3 access. " - "Please specify the AWS region of your S3 bucket (e.g., 'us-west-2'). " - f"Example: ray.data.read_unity_catalog(table='{self.table}', url='...', " - "token='...', region='us-west-2')" - ) - - filesystem = pafs.S3FileSystem( - access_key=creds.aws_credentials.access_key_id, - secret_key=creds.aws_credentials.secret_access_key, - session_token=creds.aws_credentials.session_token, - region=self.region, - ) - - elif creds.cloud_provider == CloudProvider.AZURE: - # For Azure, the deltalake library can use environment variables - # that were set in _set_env() - filesystem = None - - elif creds.cloud_provider == CloudProvider.GCP: - # For GCP, the deltalake library can use environment variables - # that were set in _set_env() - filesystem = None - - # Merge filesystem into reader_kwargs if not already present - reader_kwargs = self.reader_kwargs.copy() - if filesystem is not None and "filesystem" not in reader_kwargs: - reader_kwargs["filesystem"] = filesystem - - # Call ray.data.read_delta with the configured filesystem - try: - return ray.data.read_delta(self._storage_url, **reader_kwargs) - except Exception as e: - # Provide helpful error messages for common Delta Lake issues - error_msg = str(e) - if ( - "DeletionVectors" in error_msg - or "Unsupported reader features" in error_msg - ): - raise RuntimeError( - f"Delta table at '{self.table}' uses Deletion Vectors, which requires " - f"deltalake library version 0.10.0 or higher. Current error: {error_msg}\n\n" - f"Solutions:\n" - f" 1. Upgrade deltalake: pip install --upgrade deltalake>=0.10.0\n" - f" 2. Disable deletion vectors on the table in Databricks:\n" - f" ALTER TABLE {self.table} SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'false');\n" - f" Then run VACUUM to apply changes.\n" - f" 3. Use ray.data.read_parquet() to read the underlying Parquet files directly " - f"(note: this bypasses Delta Lake transaction log and may include deleted records)." - ) from e - elif "ColumnMappingMode" in error_msg: - raise RuntimeError( - f"Delta table at '{self.table}' uses column mapping, which may not be fully " - f"supported by your deltalake library version. Current error: {error_msg}\n\n" - f"Solutions:\n" - f" 1. Upgrade deltalake: pip install --upgrade deltalake\n" - f" 2. Check table properties in Databricks and disable column mapping if possible." - ) from e - else: - # Re-raise other errors with additional context - raise RuntimeError( - f"Failed to read Delta table at '{self.table}': {error_msg}\n\n" - f"This may be due to:\n" - f" - Unsupported Delta Lake features\n" - f" - Credential or permission issues\n" - f" - Network connectivity problems\n" - f" - Incompatible deltalake library version\n\n" - f"Try: pip install --upgrade deltalake pyarrow" - ) from e - - def read(self) -> "ray.data.Dataset": - """ - Read Unity Catalog Delta table into a Ray Dataset. - - This is the main entry point for reading data. It orchestrates: - 1. Fetch metadata from Unity Catalog - 2. Obtain temporary credentials via Unity Catalog credential vending - 3. Configure cloud credentials in the current process environment - 4. Read data using ray.data.read_delta() - - The credentials are set in the current process environment and will be - inherited by Ray Data read tasks automatically. - - Returns: - Ray Dataset containing the data from the specified Delta table - - Raises: - ValueError: If configuration is invalid or table cannot be accessed - requests.HTTPError: If Unity Catalog API requests fail - """ - logger.info(f"Reading Unity Catalog Delta table: {self.table}") - - # Step 1: Get table metadata - self._fetch_table_metadata() - - # Step 2: Get temporary credentials from Unity Catalog - self._get_creds() - - # Step 3: Configure cloud credentials in current process environment - self._set_env() - - # Step 4: Read Delta table with Unity Catalog credentials - return self._read_delta_with_credentials() diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index f4d5e7be96b1..1ae63c25ae25 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -54,7 +54,7 @@ from ray.data._internal.datasource.text_datasource import TextDatasource from ray.data._internal.datasource.tfrecords_datasource import TFRecordDatasource from ray.data._internal.datasource.torch_datasource import TorchDatasource -from ray.data._internal.datasource.unity_catalog_datasource import UnityCatalogConnector +from ray.data._internal.datasource.uc_datasource import UnityCatalogConnector from ray.data._internal.datasource.video_datasource import VideoDatasource from ray.data._internal.datasource.webdataset_datasource import WebDatasetDatasource from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder @@ -4213,26 +4213,24 @@ def read_unity_catalog( url: str, token: str, *, + data_format: Optional[str] = None, region: Optional[str] = None, reader_kwargs: Optional[dict] = None, ) -> Dataset: - """Creates a :class:`~ray.data.Dataset` from Unity Catalog Delta tables. + """ + Loads a Unity Catalog table or files into a Ray Dataset using Databricks Unity Catalog credential vending, + with automatic short-lived cloud credential handoff for secure, parallel, distributed access from external engines. - This function reads Delta Lake tables from Databricks Unity Catalog using credential - vending, which provides temporary, least-privilege credentials for secure access to - cloud storage. The function authenticates through the Unity Catalog REST API and - supports reading from AWS S3, Azure Data Lake Storage, and Google Cloud Storage. + This function works by leveraging Unity Catalog's credential vending feature, which grants temporary, least-privilege + credentials for the cloud storage location backing the requested table or data files. It authenticates via the Unity Catalog + REST API (`Unity Catalog credential vending for external system access`, [Databricks Docs](https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html)), + ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request. + The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet. .. note:: This function is experimental and under active development. - .. warning:: - - The Databricks Unity Catalog credential vending feature is in Public Preview. - Ensure your workspace and principal are properly configured with the required - permissions before using this function. - Examples: Read a Unity Catalog Delta table: @@ -4240,43 +4238,27 @@ def read_unity_catalog( >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP ... table="main.sales.transactions", ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", - ... token="dapi..." - ... ) - >>> ds.show(3) # doctest: +SKIP - - Read Delta table with custom reader options: - - >>> ds = ray.data.read_unity_catalog( # doctest: +SKIP - ... table="main.analytics.events", - ... url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", ... token="dapi...", - ... region="us-west-2", - ... reader_kwargs={"columns": ["user_id", "timestamp"], "override_num_blocks": 100} + ... region="us-west-2" ... ) + >>> ds.show(3) # doctest: +SKIP Args: table: Unity Catalog table path in format ``catalog.schema.table``. - url: Databricks workspace URL. For example, - ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``. - token: Databricks Personal Access Token. The token must have ``EXTERNAL USE SCHEMA`` - permission on the schema containing the table. - region: AWS region for S3 bucket (e.g., ``"us-west-2"``). Required for AWS S3, - not required for Azure or GCP. - reader_kwargs: Additional arguments passed to :meth:`~ray.data.read_delta`. - For example, you can specify ``columns`` to read only specific columns, - or ``override_num_blocks`` to control parallelism. + url: Databricks workspace URL (e.g., ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``). + token: Databricks Personal Access Token with ``EXTERNAL USE SCHEMA`` permission. + data_format: Data format (``"delta"`` or ``"parquet"``). If not specified, inferred from table metadata. + region: AWS region for S3 access (e.g., ``"us-west-2"``). Required for AWS, not needed for Azure/GCP. + reader_kwargs: Additional arguments passed to the underlying Ray Data reader. Returns: - A :class:`~ray.data.Dataset` containing the data from the Unity Catalog Delta table. - - References: - - Databricks Credential Vending: https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html - - Unity Catalog: https://docs.databricks.com/en/data-governance/unity-catalog/ + A :class:`~ray.data.Dataset` containing the data from Unity Catalog. """ connector = UnityCatalogConnector( base_url=url, token=token, - table=table, + table_full_name=table, + data_format=data_format, region=region, reader_kwargs=reader_kwargs, ) From fdbfc79cc658057359dd3798e62d15fe6f5d68c2 Mon Sep 17 00:00:00 2001 From: soffer-anyscale Date: Tue, 21 Oct 2025 16:55:57 -0600 Subject: [PATCH 9/9] Fix Sphinx documentation warning in read_unity_catalog docstring Convert Markdown-style link to reStructuredText format to fix documentation build warning. The docstring used [text](url) syntax which is Markdown, but Sphinx expects RST syntax for links: `text `_ Signed-off-by: soffer-anyscale --- python/ray/data/read_api.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 1ae63c25ae25..e25a4df2c479 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -4217,13 +4217,12 @@ def read_unity_catalog( region: Optional[str] = None, reader_kwargs: Optional[dict] = None, ) -> Dataset: - """ - Loads a Unity Catalog table or files into a Ray Dataset using Databricks Unity Catalog credential vending, + """Loads a Unity Catalog table or files into a Ray Dataset using Databricks Unity Catalog credential vending, with automatic short-lived cloud credential handoff for secure, parallel, distributed access from external engines. This function works by leveraging Unity Catalog's credential vending feature, which grants temporary, least-privilege credentials for the cloud storage location backing the requested table or data files. It authenticates via the Unity Catalog - REST API (`Unity Catalog credential vending for external system access`, [Databricks Docs](https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html)), + REST API (Unity Catalog credential vending for external system access, `Databricks Docs `_), ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request. The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet.