diff --git a/docs/src/distributed-indexing.md b/docs/src/distributed-indexing.md index 1936334c..f5a742b5 100755 --- a/docs/src/distributed-indexing.md +++ b/docs/src/distributed-indexing.md @@ -5,7 +5,7 @@ Lance-Ray provides distributed index building functionality that leverages Ray's ## New Distributed APIs -`create_scalar_index()` - Distributedly create scalar index index using ray. Currently only Inverted/FTS are supported. Will add more index type support in the future. +`create_scalar_index()` - Distributedly create scalar index index using ray. Currently only Inverted/FTS/BTREE are supported. Will add more index type support in the future. ### How It Works The `create_scalar_index` function allows you to create full-text search indices for Lance datasets using the Ray distributed computing framework. This function distributes the index building process across multiple Ray worker nodes, with each node responsible for building indices for a subset of dataset fragments. These indices are then merged and committed as a single index. @@ -62,7 +62,7 @@ def create_scalar_index( | `ray_remote_args` | `Dict[str, Any]`, optional | Ray task options (e.g., `num_cpus`, `resources`) | | `**kwargs` | `Any` | Additional arguments passed to `create_scalar_index` | -**Note:** For distributed indexing, currently only `"INVERTED"` and `"FTS"` index types are supported. +**Note:** For distributed indexing, currently only `"INVERTED"`,`"FTS"` and `"BTREE"` index types are supported. ### Return Value @@ -71,8 +71,7 @@ The function returns an updated Lance dataset with the newly created index. ## Examples -### Basic Usage - +### FTS Index ```python import lance import lance_ray as lr @@ -99,6 +98,25 @@ results = updated_dataset.scanner( ).to_table() print(f"Search results: {results}") ``` +### BTREE Index +```python +# Assume a LanceDataset with a numeric column "id" exists at this path +import lance_ray as lr + +updated_dataset = lr.create_scalar_index( + dataset="path/to/dataset", + column="id", + index_type="BTREE", + name="btree_multiple_fragment_idx", + replace=False, + num_workers=4, +) + +# Example queries +updated_dataset.scanner(filter="id = 100", columns=["id", "text"]).to_table() +updated_dataset.scanner(filter="id >= 200 AND id < 800", columns=["id", "text"]).to_table() +``` + ### Custom Index Name @@ -171,7 +189,7 @@ except ValueError as e: ### Important Notes -- **Index Type Support**: For distributed indexing, currently only `"INVERTED"` and `"FTS"` index types are supported, even though the function signature accepts other index types. +- **Index Type Support**: For distributed indexing, currently only `"INVERTED"`/`"FTS"`/`"BTREE"` index types are supported, even though the function signature accepts other index types. - **Default Behavior**: The `replace` parameter defaults to `True`, meaning existing indices with the same name will be replaced without warning. Set `replace=False` to prevent accidental overwrites. - **Fragment Selection**: Use `fragment_ids` parameter to build indices on specific fragments only. This is useful for incremental index building or testing. - **Error Handling**: When `replace=False` and an index with the same name exists, a `ValueError` or `RuntimeError` will be raised depending on the execution context. diff --git a/lance_ray/index.py b/lance_ray/index.py index e532ecf6..8bf3d9dc 100755 --- a/lance_ray/index.py +++ b/lance_ray/index.py @@ -180,9 +180,9 @@ def func(fragment_ids: list[int]) -> dict[str, Any]: return func -def merge_index_metadata_compat(dataset, index_id, default_index_type="INVERTED"): +def merge_index_metadata_compat(dataset, index_id, index_type, **kwargs): try: - return dataset.merge_index_metadata(index_id, default_index_type) + return dataset.merge_index_metadata(index_id, index_type, batch_readhead=kwargs.get("batch_readhead")) except TypeError: return dataset.merge_index_metadata(index_id) @@ -202,7 +202,7 @@ def create_scalar_index( **kwargs: Any, ) -> "lance.LanceDataset": """ - Build a distributed full-text search index using Ray. + Build scalar indices with Ray in a distributed workflow (supports FTS/INVERTED and BTREE). This function distributes the index building process across multiple Ray workers, with each worker building indices for a subset of fragments. The indices are then @@ -266,9 +266,14 @@ def create_scalar_index( valid_index_types = ["BTREE", "BITMAP", "LABEL_LIST", "INVERTED", "FTS", "NGRAM", "ZONEMAP"] if index_type not in valid_index_types: raise ValueError(f"Index type must be one of {valid_index_types}, not '{index_type}'") - # For distributed indexing, currently only support text-based indexes - if index_type not in ["INVERTED", "FTS"]: - raise ValueError(f"Distributed indexing currently only supports 'INVERTED' and 'FTS' index types, not '{index_type}'") + + # Validate distributed indexing support + supported_distributed_types = {"INVERTED", "FTS", "BTREE"} + if index_type not in supported_distributed_types: + raise ValueError( + f"Distributed indexing currently supports {sorted(supported_distributed_types)} index types, " + f"not '{index_type}'" + ) elif not isinstance(index_type, IndexConfig): raise ValueError(f"index_type must be a string literal or IndexConfig object, got {type(index_type)}") @@ -292,13 +297,28 @@ def create_scalar_index( if storage_options is None: storage_options = dataset._storage_options - # Check column type + # Check column type according to index type value_type = field.type if pa.types.is_list(field.type) or pa.types.is_large_list(field.type): value_type = field.type.value_type - if not pa.types.is_string(value_type) and not pa.types.is_large_string(value_type): - raise TypeError(f"Column {column} must be string type, got {value_type}") + # Validate column type based on index type requirements + if isinstance(index_type, str): + match index_type: + case "INVERTED" | "FTS": + # Text-based indexes require string types + if not pa.types.is_string(value_type): + raise TypeError(f"Column {column} must be string type for {index_type} index, got {value_type}") + case "BTREE": + # B-Tree indexes support both numeric and string types + is_supported = (pa.types.is_integer(value_type) or + pa.types.is_floating(value_type) or + pa.types.is_string(value_type)) + if not is_supported: + raise TypeError(f"Column {column} must be numeric or string type for BTREE index, got {value_type}") + case _: + # For other index types, skip strict validation to maintain compatibility + pass if name is None: name = f"{column}_idx" @@ -385,7 +405,7 @@ def create_scalar_index( # Phase 2: Merge index metadata using the distributed API logger.info(f"Phase 2: Merging index metadata for index ID: {index_id}") - merge_index_metadata_compat(dataset, index_id) + merge_index_metadata_compat(dataset, index_id, index_type=index_type, **kwargs) # Phase 3: Create Index object and commit the operation logger.info(f"Phase 3: Creating and committing index '{name}'") diff --git a/tests/test_distributed_indexing.py b/tests/test_distributed_indexing.py index 2aa09985..58e49a70 100755 --- a/tests/test_distributed_indexing.py +++ b/tests/test_distributed_indexing.py @@ -20,7 +20,6 @@ def check_lance_version_compatibility(): except (AttributeError, Exception): return False - # Skip all distributed indexing tests if lance version is incompatible pytestmark = pytest.mark.skipif( not check_lance_version_compatibility(), @@ -517,11 +516,6 @@ def test_build_distributed_index_auto_adjust_workers(self, temp_dir): assert len(indices) > 0, "No indices found after building" - - -class TestDistributedIndexingNewAPI: - """Test cases for the new distributed indexing API from PR #4578.""" - def test_distributed_fts_index_new_api(self, temp_dir): """ Test distributed FTS index building using the new API from PR #4578. @@ -638,3 +632,122 @@ def test_distributed_index_error_handling_new_api(self, temp_dir): index_type="INVALID_TYPE", num_workers=2, ) + +def check_btree_version_compatibility(): + """Check if lance version supports distributed B-tree indexing (>= 0.37.0).""" + try: + lance_version = version.parse(lance.__version__) + btree_min_version = version.parse("0.37.0") + return lance_version >= btree_min_version + except (AttributeError, Exception): + return False + +@pytest.mark.skipif( + not check_btree_version_compatibility(), + reason="B-tree indexing requires pylance >= 0.37.0. Current version: {}".format( + getattr(lance, "__version__", "unknown") + ), +) +class TestDistributedBTreeIndexing: + """Distributed BTREE indexing tests using the unified lr.create_scalar_index entrypoint.""" + + def test_distributed_btree_index_basic(self, temp_dir): + """Build a distributed BTREE index and verify search works and type is BTree.""" + ds = generate_multi_fragment_dataset(temp_dir, num_fragments=3, rows_per_fragment=500) + + updated_dataset = lr.create_scalar_index( + dataset=ds, + column="id", + index_type="BTREE", + name="btree_multiple_fragment_idx", + replace=False, + num_workers=3, + ) + + # Verify index + indices = updated_dataset.list_indices() + assert len(indices) > 0, "No indices found after distributed BTREE build" + + our_index = None + for idx in indices: + if idx["name"] == "btree_multiple_fragment_idx": + our_index = idx + break + assert our_index is not None, "BTREE index not found by name" + assert our_index["type"] == "BTree", f"Expected BTree index, got {our_index['type']}" + + # Spot-check equality and range queries + eq_id = 100 + eq_tbl = updated_dataset.scanner(filter=f"id = {eq_id}", columns=["id", "text"]).to_table() + assert eq_tbl.num_rows == 1 + + rg_tbl = updated_dataset.scanner( + filter="id >= 200 AND id < 800", + columns=["id", "text"], + ).to_table() + assert rg_tbl.num_rows > 0 + + @pytest.fixture + def btree_comp_datasets(self, tmp_path): + """Build two datasets: one with a distributed BTREE index and one without index as baseline.""" + with_index = generate_multi_fragment_dataset(tmp_path / "with_index", num_fragments=3, rows_per_fragment=500) + without_index = generate_multi_fragment_dataset(tmp_path / "without_index", num_fragments=3, rows_per_fragment=500) + + # Build BTREE index on the first dataset using unified API + with_index = lr.create_scalar_index( + dataset=with_index, + column="id", + index_type="BTREE", + name="btree_comp_idx", + replace=True, + num_workers=2, + ) + + return {"with_index": with_index, "without_index": without_index} + + @pytest.mark.parametrize( + "test_name,filter_expr", + [ + ("First value", "id = 0"), + ("Fragment 0 last value", "id = 499"), + ("Fragment 1 first value", "id = 500"), + ("Fragment 1 last value", "id = 999"), + ("Fragment 2 first value", "id = 1000"), + ("Last value", "id = 1499"), + ("Fragment 0 middle", "id = 250"), + ("Fragment 1 middle", "id = 750"), + ("Fragment 2 middle", "id = 1250"), + ("Range within fragment 0", "id >= 10 AND id < 20"), + ("Range within fragment 1", "id >= 510 AND id < 520"), + ("Range within fragment 2", "id >= 1010 AND id < 1020"), + ("Cross fragment 0-1", "id >= 495 AND id < 505"), + ("Cross fragment 1-2", "id >= 995 AND id < 1005"), + ("Cross all fragments", "id >= 250 AND id < 1250"), + ("Non-existent small value", "id = -1"), + ("Non-existent large value", "id = 2000"), + ("Large range", "id >= 0 AND id < 1500"), + ("Less than boundary", "id < 500"), + ("Greater than boundary", "id > 999"), + ("Less than or equal", "id <= 505"), + ("Greater than or equal", "id >= 995"), + ], + ) + def test_btree_query_results_match_baseline(self, btree_comp_datasets, test_name, filter_expr): + """Compare query results between an indexed dataset and an identical baseline dataset without index.""" + with_index = btree_comp_datasets["with_index"] + without_index = btree_comp_datasets["without_index"] + + res_idx = with_index.scanner(filter=filter_expr, columns=["id", "text"]).to_table() + res_base = without_index.scanner(filter=filter_expr, columns=["id", "text"]).to_table() + + assert res_idx.num_rows == res_base.num_rows, ( + f"Test '{test_name}' failed: indexed returned {res_idx.num_rows} rows, " + f"baseline returned {res_base.num_rows} rows for filter: {filter_expr}" + ) + + if res_idx.num_rows > 0: + ids_idx = sorted(res_idx.column("id").to_pylist()) + ids_base = sorted(res_base.column("id").to_pylist()) + assert ids_idx == ids_base, ( + f"Test '{test_name}' failed: indexed and baseline results differ for filter: {filter_expr}" + )