From 7f17d38705134e98541b55d9d9fe3d4c2f509c7e Mon Sep 17 00:00:00 2001 From: chenghao Date: Tue, 30 Dec 2025 14:28:02 -0600 Subject: [PATCH 1/4] feat: support ray distributed IVF index builder --- lance_ray/__init__.py | 3 +- lance_ray/index.py | 668 +++++++++++++++++++++++------ tests/test_distributed_indexing.py | 89 ++++ 3 files changed, 620 insertions(+), 140 deletions(-) diff --git a/lance_ray/__init__.py b/lance_ray/__init__.py index d5480ea9..97a2b712 100644 --- a/lance_ray/__init__.py +++ b/lance_ray/__init__.py @@ -15,7 +15,7 @@ # Fragment API imports from .fragment import LanceFragmentWriter -from .index import create_scalar_index +from .index import create_index, create_scalar_index from .io import add_columns, read_lance, write_lance __all__ = [ @@ -23,6 +23,7 @@ "write_lance", "add_columns", "create_scalar_index", + "create_index", "compact_files", "LanceFragmentWriter", "LanceFragmentCommitter", diff --git a/lance_ray/index.py b/lance_ray/index.py index 8fab46d9..b8d3b999 100755 --- a/lance_ray/index.py +++ b/lance_ray/index.py @@ -3,11 +3,12 @@ import logging import uuid -from typing import Any, Literal, Optional +from typing import Any, Literal, Optional, Union import lance import pyarrow as pa from lance.dataset import Index, IndexConfig, LanceDataset +from lance.indices import IndicesBuilder from packaging import version from ray.util.multiprocessing import Pool @@ -23,89 +24,114 @@ def _distribute_fragments_balanced( fragments: list[Any], num_workers: int, logger: logging.Logger ) -> list[list[int]]: - """ - Distribute fragments across workers using a balanced algorithm that considers fragment sizes. - - This function implements a greedy algorithm that assigns fragments to the worker - with the currently smallest total workload, helping to balance the processing - time across workers. - - Args: - fragments: List of Lance fragment objects - num_workers: Number of workers to distribute fragments across - logger: Logger instance for debugging information - - Returns: - List of lists, where each inner list contains fragment IDs for one worker + """Distribute fragments across workers using a balanced algorithm. + + This function implements a greedy algorithm that assigns fragments to the + worker with the currently smallest total workload, helping to balance the + processing time across workers. + + Parameters + ---------- + fragments : list + List of Lance fragment objects. + num_workers : int + Number of workers to distribute fragments across. + logger : logging.Logger + Logger instance for debugging information. + + Returns + ------- + list[list[int]] + Each inner list contains fragment IDs for one worker. """ if not fragments: return [[] for _ in range(num_workers)] - # Get fragment information (ID and size) - fragment_info = [] + fragment_info: list[dict[str, int]] = [] for fragment in fragments: try: # Try to get fragment size information # fragment.count_rows() gives us the number of rows in the fragment row_count = fragment.count_rows() - fragment_info.append( - { - "id": fragment.fragment_id, - "size": row_count, - } - ) - except Exception as e: - # If we can't get size info, use fragment_id as a fallback + fragment_info.append({"id": fragment.fragment_id, "size": row_count}) + except Exception as exc: # pragma: no cover - defensive logger.warning( - f"Could not get size for fragment {fragment.fragment_id}: {e}. " - "Using fragment_id as size estimate." + "Could not get size for fragment %s: %s. Using fragment_id as size estimate.", + fragment.fragment_id, + exc, ) fragment_info.append( - { - "id": fragment.fragment_id, - "size": fragment.fragment_id, # Fallback to fragment_id - } + {"id": fragment.fragment_id, "size": fragment.fragment_id} ) # Sort fragments by size in descending order (largest first) # This helps with better load balancing using the greedy algorithm fragment_info.sort(key=lambda x: x["size"], reverse=True) - # Initialize worker batches and their current workloads - worker_batches = [[] for _ in range(num_workers)] + worker_batches: list[list[int]] = [[] for _ in range(num_workers)] worker_workloads = [0] * num_workers # Greedy assignment: assign each fragment to the worker with minimum workload for frag_info in fragment_info: # Find the worker with the minimum current workload min_workload_idx = min(range(num_workers), key=lambda i: worker_workloads[i]) - - # Assign fragment to this worker worker_batches[min_workload_idx].append(frag_info["id"]) worker_workloads[min_workload_idx] += frag_info["size"] - # Log distribution statistics for debugging - total_size = sum(frag_info["size"] for frag_info in fragment_info) + total_size = sum(info["size"] for info in fragment_info) logger.info("Fragment distribution statistics:") - logger.info(f" Total fragments: {len(fragment_info)}") - logger.info(f" Total size: {total_size}") - logger.info(f" Workers: {num_workers}") + logger.info(" Total fragments: %d", len(fragment_info)) + logger.info(" Total size: %d", total_size) + logger.info(" Workers: %d", num_workers) for i, (batch, workload) in enumerate( zip(worker_batches, worker_workloads, strict=False) ): percentage = (workload / total_size * 100) if total_size > 0 else 0 logger.info( - f" Worker {i}: {len(batch)} fragments, " - f"workload: {workload} ({percentage:.1f}%)" + " Worker %d: %d fragments, workload: %d (%.1f%%)", + i, + len(batch), + workload, + percentage, ) - # Filter out empty batches (shouldn't happen with proper input validation) non_empty_batches = [batch for batch in worker_batches if batch] - return non_empty_batches +def _map_async_with_pool( + fragment_handler: Any, + fragment_batches: list[list[int]], + *, + num_workers: int, + ray_remote_args: Optional[dict[str, Any]], + error_prefix: str, +) -> list[dict[str, Any]]: + """Run fragment tasks in a Ray-backed multiprocessing Pool. + + This helper encapsulates the common Pool.map_async + get + error wrapping + logic so that both scalar and vector distributed index builders can share + the same implementation. + """ + pool = Pool(processes=num_workers, ray_remote_args=ray_remote_args) + rst_futures = pool.map_async( + fragment_handler, + fragment_batches, + chunksize=1, + ) + + try: + results = rst_futures.get() + except Exception as exc: # pragma: no cover - exercised via integration tests + pool.close() + raise RuntimeError(f"{error_prefix}: {exc}") from exc + finally: + pool.close() + + return results + + def _handle_fragment_index( dataset_uri: str, column: str, @@ -120,31 +146,17 @@ def _handle_fragment_index( table_id: Optional[list[str]] = None, **kwargs: Any, ): - """ - Create a function to handle fragment index building for use with Pool. - This function returns a callable that can be used with Pool.map_async - to build indices for specific fragments. + """Create a fragment handler closure for scalar index builds. + + The returned callable can be used with :func:`Pool.map_async` to build + indices for specific fragments. """ def func(fragment_ids: list[int]) -> dict[str, Any]: - """ - Handle fragment index building using the distributed API. - - This function calls create_scalar_index directly for specific fragments. - After execution, fragment-level indices are automatically built. - - Args: - fragment_ids: List of fragment IDs to build index for - - Returns: - Dictionary with status and result information - """ try: - # Basic input validation if not fragment_ids: raise ValueError("fragment_ids cannot be empty") - # Validate fragment_id ranges for fragment_id in fragment_ids: if fragment_id < 0 or fragment_id > 0xFFFFFFFF: raise ValueError(f"Invalid fragment_id: {fragment_id}") @@ -161,19 +173,16 @@ def func(fragment_ids: list[int]) -> dict[str, Any]: storage_options_provider=storage_options_provider, ) - # Validate fragments exist available_fragments = {f.fragment_id for f in dataset.get_fragments()} invalid_fragments = set(fragment_ids) - available_fragments if invalid_fragments: raise ValueError(f"Fragment IDs {invalid_fragments} do not exist") - # Use the distributed index building API - Phase 1: Fragment index creation logger.info( - f"Building distributed index for fragments {fragment_ids} using create_scalar_index" + "Building distributed scalar index for fragments %s using create_scalar_index", + fragment_ids, ) - # Call create_scalar_index directly - no return value expected - # After execution, fragment-level indices are automatically built dataset.create_scalar_index( column=column, index_type=index_type, @@ -185,11 +194,11 @@ def func(fragment_ids: list[int]) -> dict[str, Any]: **kwargs, ) - # Get field ID for the indexed column field_id = dataset.schema.get_field_index(column) logger.info( - f"Fragment index created successfully for fragments {fragment_ids}" + "Fragment scalar index created successfully for fragments %s", + fragment_ids, ) return { @@ -199,20 +208,23 @@ def func(fragment_ids: list[int]) -> dict[str, Any]: "uuid": index_uuid, } - except Exception as e: + except Exception as exc: # pragma: no cover - exercised via integration tests logger.error( - f"Fragment index task failed for fragments {fragment_ids}: {e}" + "Fragment scalar index task failed for fragments %s: %s", + fragment_ids, + exc, ) return { "status": "error", "fragment_ids": fragment_ids, - "error": str(e), + "error": str(exc), } return func def merge_index_metadata_compat(dataset, index_id, index_type, **kwargs): + """Call ``merge_index_metadata`` with backwards compatible signature.""" try: return dataset.merge_index_metadata( index_id, index_type, batch_readhead=kwargs.get("batch_readhead") @@ -246,12 +258,7 @@ def create_scalar_index( ray_remote_args: Optional[dict[str, Any]] = None, **kwargs: Any, ) -> "lance.LanceDataset": - """ - Build scalar indices with Ray in a distributed workflow (supports FTS/INVERTED and BTREE). - - This function distributes the index building process across multiple Ray workers, - with each worker building indices for a subset of fragments. The indices are then - merged and committed as a single index. + """Build scalar indices with Ray in a distributed workflow. Args: uri: The URI of the Lance dataset to build index on. Either uri OR @@ -291,34 +298,33 @@ def create_scalar_index( if lance_version < min_required_version: raise RuntimeError( - f"Distributed indexing requires pylance >= 0.36.0, but found {lance.__version__}. " - "The distribute-related interfaces are not available in older versions. " - "Please upgrade pylance by running: pip install --upgrade pylance" + "Distributed indexing requires pylance >= 0.36.0, but found " + f"{lance.__version__}. The distribute-related interfaces are " + "not available in older versions. Please upgrade pylance by " + "running: pip install --upgrade pylance" ) - logger.info(f"Pylance version check passed: {lance.__version__} >= 0.36.0") + logger.info("Pylance version check passed: %s >= 0.36.0", lance.__version__) - except AttributeError as err: - # If lance.__version__ doesn't exist, assume it's too old + except AttributeError as err: # pragma: no cover - defensive raise RuntimeError( - "Cannot determine pylance version. Distributed indexing requires pylance >= 0.36.0. " - "Please upgrade pylance by running: pip install --upgrade pylance" + "Cannot determine pylance version. Distributed indexing requires " + "pylance >= 0.36.0. Please upgrade pylance by running: " + "pip install --upgrade pylance" ) from err index_id = str(uuid.uuid4()) - logger.info(f"Starting distributed index build with ID: {index_id}") + logger.info("Starting distributed scalar index build with ID: %s", index_id) # Validate uri or namespace params validate_uri_or_namespace(uri, namespace_impl, table_id) - # Basic input validation if not column: raise ValueError("Column name cannot be empty") if num_workers <= 0: raise ValueError(f"num_workers must be positive, got {num_workers}") - # Handle index_type validation if isinstance(index_type, str): valid_index_types = [ "BTREE", @@ -334,16 +340,17 @@ def create_scalar_index( f"Index type must be one of {valid_index_types}, not '{index_type}'" ) - # Validate distributed indexing support supported_distributed_types = {"INVERTED", "FTS", "BTREE"} if index_type not in supported_distributed_types: raise ValueError( - f"Distributed indexing currently supports {sorted(supported_distributed_types)} index types, " + "Distributed indexing currently supports " + f"{sorted(supported_distributed_types)} index types, " f"not '{index_type}'" ) elif not isinstance(index_type, IndexConfig): raise ValueError( - f"index_type must be a string literal or IndexConfig object, got {type(index_type)}" + "index_type must be a string literal or IndexConfig object, got " + f"{type(index_type)}" ) # Note: Ray initialization is now handled by the Pool, following the pattern from io.py @@ -375,31 +382,28 @@ def create_scalar_index( storage_options_provider=storage_options_provider, ) - # Validate column exists and has correct type try: field = dataset.schema.field(column) - except KeyError as e: + except KeyError as exc: available_columns = [field.name for field in dataset.schema] raise ValueError( f"Column '{column}' not found. Available: {available_columns}" - ) from e + ) from exc # Check column type according to index type value_type = field.type if pa.types.is_list(field.type) or pa.types.is_large_list(field.type): value_type = field.type.value_type - # Validate column type based on index type requirements if isinstance(index_type, str): match index_type: case "INVERTED" | "FTS": - # Text-based indexes require string types if not pa.types.is_string(value_type): raise TypeError( - f"Column {column} must be string type for {index_type} index, got {value_type}" + f"Column {column} must be string type for {index_type} " + f"index, got {value_type}" ) case "BTREE": - # B-Tree indexes support both numeric and string types is_supported = ( pa.types.is_integer(value_type) or pa.types.is_floating(value_type) @@ -407,7 +411,8 @@ def create_scalar_index( ) if not is_supported: raise TypeError( - f"Column {column} must be numeric or string type for BTREE index, got {value_type}" + f"Column {column} must be numeric or string type for BTREE " + f"index, got {value_type}" ) case _: # For other index types, skip strict validation to maintain compatibility @@ -416,25 +421,22 @@ def create_scalar_index( if name is None: name = f"{column}_idx" - # Handle replace parameter - check for existing index with same name if not replace: try: existing_indices = dataset.list_indices() existing_names = {idx["name"] for idx in existing_indices} if name in existing_names: raise ValueError( - f"Index with name '{name}' already exists. Set replace=True to replace it." + f"Index with name '{name}' already exists. Set replace=True " + "to replace it." ) - except Exception: - # If we can't check existing indices, continue + except Exception: # pragma: no cover - best effort safeguard pass - # Get fragments fragments = dataset.get_fragments() if not fragments: raise ValueError("Dataset contains no fragments") - # Handle fragment_ids parameter - if provided, filter fragments if fragment_ids is not None: available_fragment_ids = {f.fragment_id for f in fragments} invalid_fragments = set(fragment_ids) - available_fragment_ids @@ -442,25 +444,17 @@ def create_scalar_index( raise ValueError( f"Fragment IDs {invalid_fragments} do not exist in dataset" ) - # Filter fragments to only include requested ones fragments = [f for f in fragments if f.fragment_id in fragment_ids] fragment_ids_to_use = fragment_ids else: fragment_ids_to_use = [fragment.fragment_id for fragment in fragments] - # Adjust num_workers if needed if num_workers > len(fragment_ids_to_use): num_workers = len(fragment_ids_to_use) - logger.info(f"Adjusted num_workers to {num_workers} to match fragment count") + logger.info("Adjusted num_workers to %d to match fragment count", num_workers) - # Distribute fragments to workers using balanced distribution algorithm fragment_batches = _distribute_fragments_balanced(fragments, num_workers, logger) - # Phase 1: Fragment index creation using Pool pattern (similar to io.py) - # Use Pool to distribute work instead of direct Ray task submission - pool = Pool(processes=num_workers, ray_remote_args=ray_remote_args) - - # Create the fragment handler function fragment_handler = _handle_fragment_index( dataset_uri=uri, column=column, @@ -476,23 +470,14 @@ def create_scalar_index( **kwargs, ) - # Submit tasks using Pool.map_async - rst_futures = pool.map_async( - fragment_handler, - fragment_batches, - chunksize=1, + results = _map_async_with_pool( + fragment_handler=fragment_handler, + fragment_batches=fragment_batches, + num_workers=num_workers, + ray_remote_args=ray_remote_args, + error_prefix="Failed to complete distributed index building", ) - # Wait for results - try: - results = rst_futures.get() - except Exception as e: - pool.close() - raise RuntimeError(f"Failed to complete distributed index building: {e}") from e - finally: - pool.close() - - # Check for failures failed_results = [r for r in results if r["status"] == "error"] if failed_results: error_messages = [r["error"] for r in failed_results] @@ -505,21 +490,17 @@ def create_scalar_index( storage_options_provider=storage_options_provider, ) - # Phase 2: Merge index metadata using the distributed API - logger.info(f"Phase 2: Merging index metadata for index ID: {index_id}") + logger.info("Phase 2: Merging index metadata for index ID: %s", index_id) merge_index_metadata_compat(dataset, index_id, index_type=index_type, **kwargs) - # Phase 3: Create Index object and commit the operation - logger.info(f"Phase 3: Creating and committing index '{name}'") + logger.info("Phase 3: Creating and committing scalar index '%s'", name) - # Get field information from successful results successful_results = [r for r in results if r["status"] == "success"] if not successful_results: raise RuntimeError("No successful index creation results found") fields = successful_results[0]["fields"] - # Create Index object index = Index( uuid=index_id, name=name, @@ -529,7 +510,6 @@ def create_scalar_index( index_version=0, ) - # Create and commit the index operation create_index_op = lance.LanceOperation.CreateIndex( new_indices=[index], removed_indices=[], @@ -544,9 +524,419 @@ def create_scalar_index( ) logger.info( - f"Successfully created distributed index '{name}' with three-phase workflow" + "Successfully created distributed scalar index '%s' with three-phase workflow", + name, + ) + logger.info( + "Index ID: %s, Fragments: %d, Workers: %d", + index_id, + len(fragment_ids_to_use), + len(fragment_batches), + ) + return updated_dataset + + +# --------------------------------------------------------------------------- +# Distributed vector index support (IVF_* and IVF_HNSW_* families) +# --------------------------------------------------------------------------- + +# Vector index types supported by the distributed merge pipeline. +_VECTOR_INDEX_TYPES = { + "IVF_FLAT", + "IVF_PQ", + "IVF_SQ", + "IVF_HNSW_FLAT", + "IVF_HNSW_PQ", + "IVF_HNSW_SQ", +} + + +def _normalize_index_type(index_type: Any) -> str: + """Normalize index type to upper-case string and validate support. + + Parameters + ---------- + index_type : str or enum-like + Vector index type. Must be one of the precise distributed vector + types supported by Lance. + """ + + if hasattr(index_type, "value") and isinstance(index_type.value, str): + index_type_name = index_type.value.upper() + elif isinstance(index_type, str): + index_type_name = index_type.upper() + else: + raise TypeError( + "index_type must be a string or an enum-like object with a string 'value' " + f"attribute, got {type(index_type)}" + ) + + if index_type_name not in _VECTOR_INDEX_TYPES: + raise ValueError( + "Distributed vector indexing only supports the following index types: " + f"{sorted(_VECTOR_INDEX_TYPES)}, not '{index_type_name}'" + ) + + return index_type_name + + +def _check_pylance_version() -> None: + """Ensure pylance (lance) provides distributed vector APIs.""" + + try: + lance_version = version.parse(lance.__version__) + min_required_version = version.parse("0.36.0") + + if lance_version < min_required_version: + raise RuntimeError( + "Distributed vector indexing requires pylance >= 0.36.0, but found " + f"{lance.__version__}. The distributed vector interfaces are not " + "available in older versions. Please upgrade pylance by running: " + "pip install --upgrade pylance" + ) + + logger.info("Pylance version check passed: %s >= 0.36.0", lance.__version__) + + except AttributeError as err: # pragma: no cover - defensive + raise RuntimeError( + "Cannot determine pylance version. Distributed vector indexing requires " + "pylance >= 0.36.0. Please upgrade pylance by running: " + "pip install --upgrade pylance" + ) from err + + +def _validate_metric(metric: str) -> str: + """Normalize and validate the distance metric string.""" + + if not isinstance(metric, str): + raise TypeError(f"Metric must be a string, got {type(metric)}") + + metric_lower = metric.lower() + valid_metrics = {"l2", "cosine", "euclidean", "dot", "hamming"} + if metric_lower not in valid_metrics: + raise ValueError( + f"Metric {metric} not supported. Valid: {sorted(valid_metrics)}" + ) + return metric_lower + + +def _handle_vector_fragment_index( + dataset_uri: str, + column: str, + index_type: str, + name: str, + index_uuid: str, + replace: bool, + metric: str, + num_partitions: Optional[int], + num_sub_vectors: Optional[int], + ivf_centroids: pa.Array | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None, + pq_codebook: pa.Array | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None, + storage_options: Optional[dict[str, str]] = None, + **kwargs: Any, +): + """Create a fragment handler closure for vector index builds.""" + + def func(fragment_ids: list[int]) -> dict[str, Any]: + try: + if not fragment_ids: + raise ValueError("fragment_ids cannot be empty") + + for fragment_id in fragment_ids: + if fragment_id < 0 or fragment_id > 0xFFFFFFFF: + raise ValueError(f"Invalid fragment_id: {fragment_id}") + + dataset = LanceDataset(dataset_uri, storage_options=storage_options) + available_fragments = {f.fragment_id for f in dataset.get_fragments()} + invalid_fragments = set(fragment_ids) - available_fragments + if invalid_fragments: + raise ValueError(f"Fragment IDs {invalid_fragments} do not exist") + + logger.info( + "Building distributed vector index for fragments %s using " + "LanceDataset.create_index", + fragment_ids, + ) + + dataset.create_index( + column=column, + index_type=index_type, + name=name, + metric=metric, + replace=replace, + num_partitions=num_partitions, + ivf_centroids=ivf_centroids, + pq_codebook=pq_codebook, + num_sub_vectors=num_sub_vectors, + storage_options=storage_options, + train=True, + fragment_ids=fragment_ids, + index_uuid=index_uuid, + **kwargs, + ) + + lance_field = dataset.lance_schema.field(column) + if lance_field is None: + raise KeyError(f"{column} not found in schema") + field_id = lance_field.id() + + logger.info( + "Fragment vector index created successfully for fragments %s", + fragment_ids, + ) + + return { + "status": "success", + "fragment_ids": fragment_ids, + "fields": [field_id], + "uuid": index_uuid, + } + + except Exception as exc: # pragma: no cover - exercised via integration tests + logger.error( + "Fragment vector index task failed for fragments %s: %s", + fragment_ids, + exc, + ) + return { + "status": "error", + "fragment_ids": fragment_ids, + "error": str(exc), + } + + return func + + +def create_index( + dataset: Union[str, "lance.LanceDataset"], + column: str, + index_type: str | Any, + name: Optional[str] = None, + *, + replace: bool = True, + num_workers: int = 4, + storage_options: Optional[dict[str, str]] = None, + ray_remote_args: Optional[dict[str, Any]] = None, + metric: str = "l2", + num_partitions: Optional[int] = None, + num_sub_vectors: Optional[int] = None, + ivf_centroids: Optional[ + pa.Array | pa.FixedSizeListArray | pa.FixedShapeTensorArray + ] = None, + pq_codebook: Optional[ + pa.Array | pa.FixedSizeListArray | pa.FixedShapeTensorArray + ] = None, + **kwargs: Any, +) -> "lance.LanceDataset": + """Build distributed vector indices with Ray. + + This function mirrors :func:`create_scalar_index` but targets the precise + vector index families supported by Lance's distributed merge pipeline. + + Args: + dataset: Lance dataset or URI to build index on + column: Column name to index + index_type: Type of index to build (e.g., "IVF_PQ", "IVF_HNSW_PQ") + name: Name of the index (generated if None) + replace: Whether to replace existing index with the same name (default: True) + num_workers: Number of Ray workers to use (keyword-only) + storage_options: Storage options for the dataset (keyword-only) + ray_remote_args: Options for Ray tasks (keyword-only) + metric: Distance metric to use (default: "l2") + num_partitions: Number of IVF partitions (optional) + num_sub_vectors: Number of PQ sub-vectors (optional) + ivf_centroids: Pre-computed IVF centroids (optional) + pq_codebook: Pre-computed PQ codebook (optional) + **kwargs: Additional arguments to pass to create_index and train_pq (e.g., sample_rate) + + Returns: + Updated Lance dataset with the index created + """ + + _check_pylance_version() + + if not column: + raise ValueError("Column name cannot be empty") + + if num_workers <= 0: + raise ValueError(f"num_workers must be positive, got {num_workers}") + + index_type_name = _normalize_index_type(index_type) + metric_lower = _validate_metric(metric) + + index_id = str(uuid.uuid4()) + logger.info("Starting distributed vector index build with ID: %s", index_id) + + if isinstance(dataset, str): + dataset_uri = dataset + dataset_obj = LanceDataset(dataset_uri, storage_options=storage_options) + else: + dataset_obj = dataset + dataset_uri = dataset.uri + if storage_options is None: + storage_options = getattr(dataset_obj, "_storage_options", None) + + try: + dataset_obj.schema.field(column) + except KeyError as exc: + available_columns = [field.name for field in dataset_obj.schema] + raise ValueError( + f"Column '{column}' not found. Available: {available_columns}" + ) from exc + + if name is None: + name = f"{column}_idx" + + if not replace: + try: + existing_indices = dataset_obj.list_indices() + existing_names = {idx["name"] for idx in existing_indices} + if name in existing_names: + raise ValueError( + f"Index with name '{name}' already exists. Set replace=True " + "to replace it." + ) + except Exception: # pragma: no cover - best effort safeguard + pass + + fragments = dataset_obj.get_fragments() + if not fragments: + raise ValueError("Dataset contains no fragments") + + fragment_ids_to_use = [fragment.fragment_id for fragment in fragments] + + if num_workers > len(fragment_ids_to_use): + num_workers = len(fragment_ids_to_use) + logger.info("Adjusted num_workers to %d to match fragment count", num_workers) + + ivf_centroids_artifact = ivf_centroids + pq_codebook_artifact = pq_codebook + + pq_index_types = {"IVF_PQ", "IVF_HNSW_PQ"} + needs_pq = index_type_name in pq_index_types + + # Always perform global IVF training up front so that all shards share the + # same centroids and number of partitions. The Ray entrypoint owns the + # lifecycle of these artifacts and distributes them to workers. + builder = IndicesBuilder(dataset_obj, column) + num_rows = dataset_obj.count_rows() + dimension = builder.dimension + + computed_num_partitions = builder._determine_num_partitions( + num_partitions, num_rows + ) + ivf_model = builder.train_ivf( + num_partitions=computed_num_partitions, + distance_type=metric_lower, + ) + ivf_centroids_artifact = ivf_model.centroids + num_partitions = ivf_model.num_partitions + + if needs_pq: + computed_num_sub_vectors = builder._normalize_pq_params( + num_sub_vectors, dimension + ) + pq_model = builder.train_pq( + ivf_model, + computed_num_sub_vectors, + sample_rate=kwargs.get("sample_rate", 256), + ) + pq_codebook_artifact = pq_model.codebook + num_sub_vectors = computed_num_sub_vectors + + if ivf_centroids_artifact is None: + raise ValueError( + "ivf_centroids must be provided or trainable for IVF-based " + "distributed vector indices" + ) + + if needs_pq and pq_codebook_artifact is None: + raise ValueError( + "pq_codebook must be provided or trainable for PQ-based " + "distributed vector indices" + ) + + fragment_batches = _distribute_fragments_balanced( + fragments, num_workers=num_workers, logger=logger + ) + + fragment_handler = _handle_vector_fragment_index( + dataset_uri=dataset_uri, + column=column, + index_type=index_type_name, + name=name, + index_uuid=index_id, + replace=replace, + metric=metric_lower, + num_partitions=num_partitions, + num_sub_vectors=num_sub_vectors, + ivf_centroids=ivf_centroids_artifact, + pq_codebook=pq_codebook_artifact, + storage_options=storage_options, + **kwargs, + ) + + results = _map_async_with_pool( + fragment_handler=fragment_handler, + fragment_batches=fragment_batches, + num_workers=num_workers, + ray_remote_args=ray_remote_args, + error_prefix="Failed to complete distributed vector index building", + ) + + failed_results = [r for r in results if r.get("status") == "error"] + if failed_results: + error_messages = [r["error"] for r in failed_results if "error" in r] + raise RuntimeError("Vector index building failed: " + "; ".join(error_messages)) + + dataset_obj = LanceDataset(dataset_uri, storage_options=storage_options) + + logger.info("Phase 3: Merging index metadata for index ID: %s", index_id) + merge_index_metadata_compat( + dataset_obj, + index_id, + index_type=index_type_name, + **kwargs, + ) + + logger.info("Phase 4: Creating and committing vector index '%s'", name) + + successful_results = [r for r in results if r.get("status") == "success"] + if not successful_results: + raise RuntimeError("No successful vector index creation results found") + + fields = successful_results[0]["fields"] + + index = Index( + uuid=index_id, + name=name, + fields=fields, + dataset_version=dataset_obj.version, + fragment_ids=set(fragment_ids_to_use), + index_version=0, + ) + + create_index_op = lance.LanceOperation.CreateIndex( + new_indices=[index], + removed_indices=[], + ) + + updated_dataset = lance.LanceDataset.commit( + dataset_uri, + create_index_op, + read_version=dataset_obj.version, + storage_options=storage_options, + ) + + logger.info( + "Successfully created distributed vector index '%s' with multi-phase workflow", + name, ) logger.info( - f"Index ID: {index_id}, Fragments: {len(fragment_ids_to_use)}, Workers: {len(fragment_batches)}" + "Index ID: %s, Fragments: %d, Workers: %d", + index_id, + len(fragment_ids_to_use), + len(fragment_batches), ) + return updated_dataset diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 7d693fe2..68015b2e 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -1,10 +1,12 @@ """Test cases for lance_ray.indexing module.""" +import random import tempfile from pathlib import Path import lance import lance_ray as lr +import pyarrow as pa import pytest import ray from packaging import version @@ -989,3 +991,90 @@ def test_distributed_btree_index_with_directory_namespace(self, temp_dir): result = updated_dataset.scanner(filter="id = 100", columns=["id"]).to_table() assert result.num_rows == 1, "BTREE index query should return 1 row" + + +def generate_multi_fragment_vector_dataset( + tmp_path, num_fragments: int = 4, rows_per_fragment: int = 64, dim: int = 128 +) -> str: + """Generate a Lance dataset with a vector column and multiple fragments. + + The dataset is written via lance-ray so that each fragment has the same + number of rows, which makes it suitable for distributed vector index + building tests. + """ + num_rows = num_fragments * rows_per_fragment + # Generate random vectors + data = [random.gauss(0, 1) for _ in range(num_rows * dim)] + + # Manually construct a FixedSizeList "vector" column compatible with + # Lance's vector index requirements. + values = pa.array(data, type=pa.float32()) + vector_array = pa.FixedSizeListArray.from_arrays(values, dim) + tbl = pa.Table.from_arrays([vector_array], names=["vector"]) + tbl = tbl.append_column("id", pa.array(range(num_rows), type=pa.int64())) + + path = Path(tmp_path) / "multi_fragment_vector.lance" + lance.write_dataset( + tbl, + str(path), + max_rows_per_file=rows_per_fragment, + ) + + return str(path) + + +@pytest.mark.parametrize("index_type", ["IVF_FLAT", "IVF_SQ","IVF_PQ"]) +def test_build_distributed_vector_index(tmp_path, index_type): + """Build a distributed vector index and verify nearest search works.""" + dataset_uri = generate_multi_fragment_vector_dataset( + tmp_path, num_fragments=4, rows_per_fragment=1024, dim=128 + ) + + # Build distributed vector index using the high-level Ray entrypoint. + try: + updated_dataset = lr.create_index( + dataset=dataset_uri, + column="vector", + index_type=index_type, + name=f"idx_{index_type}", + num_workers=2, + num_partitions=4, + num_sub_vectors=16, + sample_rate=16, + ) + except RuntimeError as exc: + # Older pylance builds may not yet support creating empty distributed + # vector indices with train=False. In that case we skip the functional + # verification while still ensuring the Ray entrypoint is wired + # correctly. + msg = str(exc) + if ( + "Creating empty vector indices with train=False is not yet implemented" + in msg + ): + pytest.skip( + "Current pylance build does not yet support distributed vector " + "indices with train=False; skipping functional test." + ) + raise + + indices = updated_dataset.list_indices() + assert len(indices) > 0, "No indices found after distributed vector index build" + + # Find the index with the name we specified + vec_index = next( + (idx for idx in indices if idx["name"] == f"idx_{index_type}"), None + ) + assert vec_index is not None, f"Index with name idx_{index_type} not found" + assert vec_index["type"] == index_type, ( + f"Expected {index_type} vector index, got {vec_index['type']}" + ) + + # Run a simple nearest-neighbor query to ensure the index is usable. + q = [random.gauss(0, 1) for _ in range(128)] + result = updated_dataset.to_table( + nearest={"column": "vector", "q": q, "k": 5}, + columns=["id", "vector"], + ) + + assert result.num_rows == 5 From e1aaf26d805a3f82bec9c990dcc579d5526a869d Mon Sep 17 00:00:00 2001 From: chenghao Date: Tue, 30 Dec 2025 15:04:00 -0600 Subject: [PATCH 2/4] test: skip testing distribute ivf index when pylance version<2.0.0 --- tests/test_distributed_indexing.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 68015b2e..62323397 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -652,6 +652,16 @@ def check_btree_version_compatibility(): return False +def check_pylance_v2_compatibility(): + """Check if lance version is >= 2.0.0.""" + try: + lance_version = version.parse(lance.__version__) + v2_version = version.parse("2.0.0") + return lance_version >= v2_version + except (AttributeError, Exception): + return False + + @pytest.mark.skipif( not check_btree_version_compatibility(), reason="B-tree indexing requires pylance >= 0.37.0. Current version: {}".format( @@ -1023,7 +1033,13 @@ def generate_multi_fragment_vector_dataset( return str(path) -@pytest.mark.parametrize("index_type", ["IVF_FLAT", "IVF_SQ","IVF_PQ"]) +@pytest.mark.skipif( + not check_pylance_v2_compatibility(), + reason="Distributed vector indexing requires pylance >= 2.0.0. Current version: {}".format( + getattr(lance, "__version__", "unknown") + ), +) +@pytest.mark.parametrize("index_type", ["IVF_FLAT", "IVF_SQ", "IVF_PQ"]) def test_build_distributed_vector_index(tmp_path, index_type): """Build a distributed vector index and verify nearest search works.""" dataset_uri = generate_multi_fragment_vector_dataset( From b25d2c903775f4db97e5e5efe7ae78d791ca6d7d Mon Sep 17 00:00:00 2001 From: chenghao Date: Thu, 15 Jan 2026 14:23:23 -0600 Subject: [PATCH 3/4] Revert "test: skip testing distribute ivf index when pylance version<2.0.0" This reverts commit 2fcd606a58e2728dd35da776d8495dd466049d4f. --- tests/test_distributed_indexing.py | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 62323397..68015b2e 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -652,16 +652,6 @@ def check_btree_version_compatibility(): return False -def check_pylance_v2_compatibility(): - """Check if lance version is >= 2.0.0.""" - try: - lance_version = version.parse(lance.__version__) - v2_version = version.parse("2.0.0") - return lance_version >= v2_version - except (AttributeError, Exception): - return False - - @pytest.mark.skipif( not check_btree_version_compatibility(), reason="B-tree indexing requires pylance >= 0.37.0. Current version: {}".format( @@ -1033,13 +1023,7 @@ def generate_multi_fragment_vector_dataset( return str(path) -@pytest.mark.skipif( - not check_pylance_v2_compatibility(), - reason="Distributed vector indexing requires pylance >= 2.0.0. Current version: {}".format( - getattr(lance, "__version__", "unknown") - ), -) -@pytest.mark.parametrize("index_type", ["IVF_FLAT", "IVF_SQ", "IVF_PQ"]) +@pytest.mark.parametrize("index_type", ["IVF_FLAT", "IVF_SQ","IVF_PQ"]) def test_build_distributed_vector_index(tmp_path, index_type): """Build a distributed vector index and verify nearest search works.""" dataset_uri = generate_multi_fragment_vector_dataset( From fb05425ea2f35d86fa21a9221709ca151d2fea47 Mon Sep 17 00:00:00 2001 From: chenghao Date: Fri, 16 Jan 2026 14:13:18 -0600 Subject: [PATCH 4/4] fix: update uri and doc change --- docs/src/distributed-indexing.md | 133 +++++++++++++++++++++++------ lance_ray/index.py | 12 +-- tests/test_distributed_indexing.py | 2 +- 3 files changed, 115 insertions(+), 32 deletions(-) diff --git a/docs/src/distributed-indexing.md b/docs/src/distributed-indexing.md index 4d5f6d0d..c96d3ec1 100755 --- a/docs/src/distributed-indexing.md +++ b/docs/src/distributed-indexing.md @@ -3,11 +3,13 @@ Lance-Ray provides distributed index building functionality that leverages Ray's distributed computing capabilities to efficiently create text indices for Lance datasets. This is particularly useful for large-scale datasets as it can distribute index building work across multiple Ray worker nodes. -## New Distributed APIs +## Distributed APIs -`create_scalar_index()` - Distributedly create scalar index index using ray. Currently only Inverted/FTS/BTREE are supported. Will add more index type support in the future. +### Scalar Indexing -### How It Works +`create_scalar_index()` - Distributedly create scalar index using ray. Currently only Inverted/FTS/BTREE are supported. Will add more index type support in the future. + +#### How It Works The `create_scalar_index` function allows you to create full-text search indices for Lance datasets using the Ray distributed computing framework. This function distributes the index building process across multiple Ray worker nodes, with each node responsible for building indices for a subset of dataset fragments. These indices are then merged and committed as a single index. **Backward Compatibility**: @@ -19,7 +21,8 @@ The `create_scalar_index` function allows you to create full-text search indices ```python def create_scalar_index( - dataset: Union[str, "lance.LanceDataset"], + uri: Optional[str] = None, + *, column: str, index_type: Union[ Literal["BTREE"], @@ -31,27 +34,30 @@ def create_scalar_index( Literal["ZONEMAP"], IndexConfig, ], + table_id: Optional[list[str]] = None, name: Optional[str] = None, - *, replace: bool = True, train: bool = True, fragment_ids: Optional[list[int]] = None, index_uuid: Optional[str] = None, num_workers: int = 4, storage_options: Optional[dict[str, str]] = None, + namespace_impl: Optional[str] = None, + namespace_properties: Optional[dict[str, str]] = None, ray_remote_args: Optional[dict[str, Any]] = None, **kwargs: Any, ) -> "lance.LanceDataset": ``` -### Parameters +#### Parameters | Parameter | Type | Description | |-----------|------|-------------| -| `dataset` | `str` or `lance.LanceDataset` | Lance dataset or its URI | +| `uri` | `str`, optional | The URI of the Lance dataset. Either `uri` OR (`namespace_impl` + `table_id`) must be provided. | | `column` | `str` | Column name to index | | `index_type` | `str` or `IndexConfig` | Index type, can be `"INVERTED"`, `"FTS"`, `"BTREE"`, `"BITMAP"`, `"LABEL_LIST"`, `"NGRAM"`, `"ZONEMAP"`, or `IndexConfig` object | +| `table_id` | `list[str]`, optional | The table identifier as a list of strings. | | `name` | `str`, optional | Index name, auto-generated if not provided | | `replace` | `bool`, optional | Whether to replace existing index with the same name, default is `True` | | `train` | `bool`, optional | Whether to train the index, default is `True` | @@ -59,19 +65,71 @@ def create_scalar_index( | `index_uuid` | `str`, optional | Optional fragment UUID for distributed indexing | | `num_workers` | `int`, optional | Number of Ray worker nodes to use, default is 4 | | `storage_options` | `Dict[str, str]`, optional | Storage options for the dataset | +| `namespace_impl` | `str`, optional | The namespace implementation type (e.g., `"rest"`, `"dir"`) | +| `namespace_properties` | `Dict[str, str]`, optional | Properties for connecting to the namespace | | `ray_remote_args` | `Dict[str, Any]`, optional | Ray task options (e.g., `num_cpus`, `resources`) | | `**kwargs` | `Any` | Additional arguments passed to `create_scalar_index` | -**Note:** For distributed indexing, currently only `"INVERTED"`,`"FTS"` and `"BTREE"` index types are supported. +**Note:** For distributed scalar indexing, currently only `"INVERTED"`, `"FTS"` and `"BTREE"` index types are supported. -### Return Value +#### Return Value The function returns an updated Lance dataset with the newly created index. +### Vector Indexing + +`create_index()` - Distributedly create vector indices using Ray. It leverages Ray to parallelize the index building process across multiple workers. + +#### Supported Index Types +The following vector index types are supported for distributed building: +- `IVF_FLAT` +- `IVF_SQ` +- `IVF_PQ` + +#### `create_index` + +```python +def create_index( + uri: Union[str, "lance.LanceDataset"], + column: str, + index_type: str, + name: Optional[str] = None, + *, + replace: bool = True, + num_workers: int = 4, + storage_options: Optional[dict[str, str]] = None, + ray_remote_args: Optional[dict[str, Any]] = None, + metric: str = "l2", + num_partitions: Optional[int] = None, + num_sub_vectors: Optional[int] = None, + **kwargs: Any, +) -> "lance.LanceDataset": +``` + +#### Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `uri` | `str` or `lance.LanceDataset` | Lance dataset or its URI | +| `column` | `str` | Vector column name to index | +| `index_type` | `str` | Vector index type (e.g., `"IVF_PQ"`, `"IVF_SQ"`, `"IVF_FLAT"`) | +| `name` | `str`, optional | Index name, auto-generated if not provided | +| `replace` | `bool`, optional | Whether to replace existing index, default is `True` | +| `num_workers` | `int`, optional | Number of Ray workers to use, default is 4 | +| `storage_options` | `Dict[str, str]`, optional | Storage options for the dataset | +| `ray_remote_args` | `Dict[str, Any]`, optional | Ray task options (e.g., `num_cpus`, `resources`) | +| `metric` | `str`, optional | Distance metric to use (e.g., `"l2"`, `"cosine"`, `"dot"`, `"hamming"`), default is `"l2"` | +| `num_partitions` | `int`, optional | Number of IVF partitions | +| `num_sub_vectors` | `int`, optional | Number of PQ sub-vectors | +| `**kwargs` | `Any` | Additional arguments to pass (e.g., `sample_rate`) | + +#### Return Value + +The function returns an updated Lance dataset with the newly created vector index. ## Examples -### FTS Index +### FTS Index (Scalar) ```python import lance import lance_ray as lr @@ -81,7 +139,7 @@ dataset = lance.dataset("path/to/dataset") # Build distributed index updated_dataset = lr.create_scalar_index( - dataset=dataset, + uri=dataset.uri, column="text", index_type="INVERTED", num_workers=4 @@ -98,13 +156,14 @@ results = updated_dataset.scanner( ).to_table() print(f"Search results: {results}") ``` -### BTREE Index + +### BTREE Index (Scalar) ```python # Assume a LanceDataset with a numeric column "id" exists at this path import lance_ray as lr updated_dataset = lr.create_scalar_index( - dataset="path/to/dataset", + uri="path/to/dataset", column="id", index_type="BTREE", name="btree_multiple_fragment_idx", @@ -117,16 +176,40 @@ updated_dataset.scanner(filter="id = 100", columns=["id", "text"]).to_table() updated_dataset.scanner(filter="id >= 200 AND id < 800", columns=["id", "text"]).to_table() ``` +### Vector Index (IVF_PQ / IVF_SQ / IVF_FLAT) +```python +import lance_ray as lr + +# Build a distributed IVF_PQ index +updated_dataset = lr.create_index( + uri="path/to/dataset.lance", + column="vector", + index_type="IVF_PQ", + name="idx_ivf_pq", + num_workers=4, + num_partitions=256, + num_sub_vectors=16, + metric="l2" +) -### Custom Index Name +# Build a distributed IVF_SQ index +updated_dataset = lr.create_index( + uri="path/to/dataset.lance", + column="vector", + index_type="IVF_SQ", + name="idx_ivf_sq", + num_workers=4, + num_partitions=256, +) -```python -updated_dataset = lr.create_scalar_index( - dataset="path/to/dataset", - column="text", - index_type="INVERTED", - name="custom_text_index", - num_workers=4 +# Build a distributed IVF_FLAT index +updated_dataset = lr.create_index( + uri="path/to/dataset.lance", + column="vector", + index_type="IVF_FLAT", + name="idx_ivf_flat", + num_workers=4, + num_partitions=256, ) ``` @@ -134,7 +217,7 @@ updated_dataset = lr.create_scalar_index( ```python updated_dataset = lr.create_scalar_index( - dataset="path/to/dataset", + uri="path/to/dataset", column="text", index_type="INVERTED", num_workers=4, @@ -147,7 +230,7 @@ updated_dataset = lr.create_scalar_index( ```python # Create index with custom name updated_dataset = lr.create_scalar_index( - dataset="path/to/dataset", + uri="path/to/dataset", column="text", index_type="INVERTED", name="my_text_index", @@ -156,7 +239,7 @@ updated_dataset = lr.create_scalar_index( # Try to create another index with the same name (will replace by default) updated_dataset = lr.create_scalar_index( - dataset="path/to/dataset", + uri="path/to/dataset", column="text", index_type="INVERTED", name="my_text_index", # Same name as before @@ -169,7 +252,7 @@ import lance_ray as lr try: updated_dataset = lr.create_scalar_index( - dataset="path/to/dataset", + uri="path/to/dataset", column="text", index_type="INVERTED", name="my_text_index", # Same name as existing index diff --git a/lance_ray/index.py b/lance_ray/index.py index b8d3b999..a51db4b5 100755 --- a/lance_ray/index.py +++ b/lance_ray/index.py @@ -708,7 +708,7 @@ def func(fragment_ids: list[int]) -> dict[str, Any]: def create_index( - dataset: Union[str, "lance.LanceDataset"], + uri: Union[str, "lance.LanceDataset"], column: str, index_type: str | Any, name: Optional[str] = None, @@ -734,7 +734,7 @@ def create_index( vector index families supported by Lance's distributed merge pipeline. Args: - dataset: Lance dataset or URI to build index on + uri: Lance dataset or URI to build index on column: Column name to index index_type: Type of index to build (e.g., "IVF_PQ", "IVF_HNSW_PQ") name: Name of the index (generated if None) @@ -767,12 +767,12 @@ def create_index( index_id = str(uuid.uuid4()) logger.info("Starting distributed vector index build with ID: %s", index_id) - if isinstance(dataset, str): - dataset_uri = dataset + if isinstance(uri, str): + dataset_uri = uri dataset_obj = LanceDataset(dataset_uri, storage_options=storage_options) else: - dataset_obj = dataset - dataset_uri = dataset.uri + dataset_obj = uri + dataset_uri = dataset_obj.uri if storage_options is None: storage_options = getattr(dataset_obj, "_storage_options", None) diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 68015b2e..0a8b0ef0 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -1033,7 +1033,7 @@ def test_build_distributed_vector_index(tmp_path, index_type): # Build distributed vector index using the high-level Ray entrypoint. try: updated_dataset = lr.create_index( - dataset=dataset_uri, + uri=dataset_uri, column="vector", index_type=index_type, name=f"idx_{index_type}",