Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions docs/src/distributed-indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Lance-Ray provides distributed index building functionality that leverages Ray's

## New Distributed APIs

`create_scalar_index()` - Distributedly create scalar index index using ray. Currently only Inverted/FTS are supported. Will add more index type support in the future.
`create_scalar_index()` - Distributedly create scalar index index using ray. Currently only Inverted/FTS/BTREE are supported. Will add more index type support in the future.

### How It Works
The `create_scalar_index` function allows you to create full-text search indices for Lance datasets using the Ray distributed computing framework. This function distributes the index building process across multiple Ray worker nodes, with each node responsible for building indices for a subset of dataset fragments. These indices are then merged and committed as a single index.
Expand Down Expand Up @@ -62,7 +62,7 @@ def create_scalar_index(
| `ray_remote_args` | `Dict[str, Any]`, optional | Ray task options (e.g., `num_cpus`, `resources`) |
| `**kwargs` | `Any` | Additional arguments passed to `create_scalar_index` |

**Note:** For distributed indexing, currently only `"INVERTED"` and `"FTS"` index types are supported.
**Note:** For distributed indexing, currently only `"INVERTED"`,`"FTS"` and `"BTREE"` index types are supported.

### Return Value

Expand All @@ -71,8 +71,7 @@ The function returns an updated Lance dataset with the newly created index.

## Examples

### Basic Usage

### FTS Index
```python
import lance
import lance_ray as lr
Expand All @@ -99,6 +98,25 @@ results = updated_dataset.scanner(
).to_table()
print(f"Search results: {results}")
```
### BTREE Index
```python
# Assume a LanceDataset with a numeric column "id" exists at this path
import lance_ray as lr

updated_dataset = lr.create_scalar_index(
dataset="path/to/dataset",
column="id",
index_type="BTREE",
name="btree_multiple_fragment_idx",
replace=False,
num_workers=4,
)

# Example queries
updated_dataset.scanner(filter="id = 100", columns=["id", "text"]).to_table()
updated_dataset.scanner(filter="id >= 200 AND id < 800", columns=["id", "text"]).to_table()
```


### Custom Index Name

Expand Down Expand Up @@ -171,7 +189,7 @@ except ValueError as e:

### Important Notes

- **Index Type Support**: For distributed indexing, currently only `"INVERTED"` and `"FTS"` index types are supported, even though the function signature accepts other index types.
- **Index Type Support**: For distributed indexing, currently only `"INVERTED"`/`"FTS"`/`"BTREE"` index types are supported, even though the function signature accepts other index types.
- **Default Behavior**: The `replace` parameter defaults to `True`, meaning existing indices with the same name will be replaced without warning. Set `replace=False` to prevent accidental overwrites.
- **Fragment Selection**: Use `fragment_ids` parameter to build indices on specific fragments only. This is useful for incremental index building or testing.
- **Error Handling**: When `replace=False` and an index with the same name exists, a `ValueError` or `RuntimeError` will be raised depending on the execution context.
40 changes: 30 additions & 10 deletions lance_ray/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def func(fragment_ids: list[int]) -> dict[str, Any]:

return func

def merge_index_metadata_compat(dataset, index_id, default_index_type="INVERTED"):
def merge_index_metadata_compat(dataset, index_id, index_type, **kwargs):
try:
return dataset.merge_index_metadata(index_id, default_index_type)
return dataset.merge_index_metadata(index_id, index_type, batch_readhead=kwargs.get("batch_readhead"))
except TypeError:
return dataset.merge_index_metadata(index_id)

Expand All @@ -202,7 +202,7 @@ def create_scalar_index(
**kwargs: Any,
) -> "lance.LanceDataset":
"""
Build a distributed full-text search index using Ray.
Build scalar indices with Ray in a distributed workflow (supports FTS/INVERTED and BTREE).

This function distributes the index building process across multiple Ray workers,
with each worker building indices for a subset of fragments. The indices are then
Expand Down Expand Up @@ -266,9 +266,14 @@ def create_scalar_index(
valid_index_types = ["BTREE", "BITMAP", "LABEL_LIST", "INVERTED", "FTS", "NGRAM", "ZONEMAP"]
if index_type not in valid_index_types:
raise ValueError(f"Index type must be one of {valid_index_types}, not '{index_type}'")
# For distributed indexing, currently only support text-based indexes
if index_type not in ["INVERTED", "FTS"]:
raise ValueError(f"Distributed indexing currently only supports 'INVERTED' and 'FTS' index types, not '{index_type}'")

# Validate distributed indexing support
supported_distributed_types = {"INVERTED", "FTS", "BTREE"}
if index_type not in supported_distributed_types:
raise ValueError(
f"Distributed indexing currently supports {sorted(supported_distributed_types)} index types, "
f"not '{index_type}'"
)
elif not isinstance(index_type, IndexConfig):
raise ValueError(f"index_type must be a string literal or IndexConfig object, got {type(index_type)}")

Expand All @@ -292,13 +297,28 @@ def create_scalar_index(
if storage_options is None:
storage_options = dataset._storage_options

# Check column type
# Check column type according to index type
value_type = field.type
if pa.types.is_list(field.type) or pa.types.is_large_list(field.type):
value_type = field.type.value_type

if not pa.types.is_string(value_type) and not pa.types.is_large_string(value_type):
raise TypeError(f"Column {column} must be string type, got {value_type}")
# Validate column type based on index type requirements
if isinstance(index_type, str):
match index_type:
case "INVERTED" | "FTS":
# Text-based indexes require string types
if not pa.types.is_string(value_type):
raise TypeError(f"Column {column} must be string type for {index_type} index, got {value_type}")
case "BTREE":
# B-Tree indexes support both numeric and string types
is_supported = (pa.types.is_integer(value_type) or
pa.types.is_floating(value_type) or
pa.types.is_string(value_type))
if not is_supported:
raise TypeError(f"Column {column} must be numeric or string type for BTREE index, got {value_type}")
case _:
# For other index types, skip strict validation to maintain compatibility
pass

if name is None:
name = f"{column}_idx"
Expand Down Expand Up @@ -385,7 +405,7 @@ def create_scalar_index(

# Phase 2: Merge index metadata using the distributed API
logger.info(f"Phase 2: Merging index metadata for index ID: {index_id}")
merge_index_metadata_compat(dataset, index_id)
merge_index_metadata_compat(dataset, index_id, index_type=index_type, **kwargs)

# Phase 3: Create Index object and commit the operation
logger.info(f"Phase 3: Creating and committing index '{name}'")
Expand Down
125 changes: 119 additions & 6 deletions tests/test_distributed_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def check_lance_version_compatibility():
except (AttributeError, Exception):
return False


# Skip all distributed indexing tests if lance version is incompatible
pytestmark = pytest.mark.skipif(
not check_lance_version_compatibility(),
Expand Down Expand Up @@ -517,11 +516,6 @@ def test_build_distributed_index_auto_adjust_workers(self, temp_dir):
assert len(indices) > 0, "No indices found after building"




class TestDistributedIndexingNewAPI:
"""Test cases for the new distributed indexing API from PR #4578."""

def test_distributed_fts_index_new_api(self, temp_dir):
"""
Test distributed FTS index building using the new API from PR #4578.
Expand Down Expand Up @@ -638,3 +632,122 @@ def test_distributed_index_error_handling_new_api(self, temp_dir):
index_type="INVALID_TYPE",
num_workers=2,
)

def check_btree_version_compatibility():
"""Check if lance version supports distributed B-tree indexing (>= 0.37.0)."""
try:
lance_version = version.parse(lance.__version__)
btree_min_version = version.parse("0.37.0")
return lance_version >= btree_min_version
except (AttributeError, Exception):
return False

@pytest.mark.skipif(
not check_btree_version_compatibility(),
reason="B-tree indexing requires pylance >= 0.37.0. Current version: {}".format(
getattr(lance, "__version__", "unknown")
),
)
class TestDistributedBTreeIndexing:
"""Distributed BTREE indexing tests using the unified lr.create_scalar_index entrypoint."""

def test_distributed_btree_index_basic(self, temp_dir):
"""Build a distributed BTREE index and verify search works and type is BTree."""
ds = generate_multi_fragment_dataset(temp_dir, num_fragments=3, rows_per_fragment=500)

updated_dataset = lr.create_scalar_index(
dataset=ds,
column="id",
index_type="BTREE",
name="btree_multiple_fragment_idx",
replace=False,
num_workers=3,
)

# Verify index
indices = updated_dataset.list_indices()
assert len(indices) > 0, "No indices found after distributed BTREE build"

our_index = None
for idx in indices:
if idx["name"] == "btree_multiple_fragment_idx":
our_index = idx
break
assert our_index is not None, "BTREE index not found by name"
assert our_index["type"] == "BTree", f"Expected BTree index, got {our_index['type']}"

# Spot-check equality and range queries
eq_id = 100
eq_tbl = updated_dataset.scanner(filter=f"id = {eq_id}", columns=["id", "text"]).to_table()
assert eq_tbl.num_rows == 1

rg_tbl = updated_dataset.scanner(
filter="id >= 200 AND id < 800",
columns=["id", "text"],
).to_table()
assert rg_tbl.num_rows > 0

@pytest.fixture
def btree_comp_datasets(self, tmp_path):
"""Build two datasets: one with a distributed BTREE index and one without index as baseline."""
with_index = generate_multi_fragment_dataset(tmp_path / "with_index", num_fragments=3, rows_per_fragment=500)
without_index = generate_multi_fragment_dataset(tmp_path / "without_index", num_fragments=3, rows_per_fragment=500)

# Build BTREE index on the first dataset using unified API
with_index = lr.create_scalar_index(
dataset=with_index,
column="id",
index_type="BTREE",
name="btree_comp_idx",
replace=True,
num_workers=2,
)

return {"with_index": with_index, "without_index": without_index}

@pytest.mark.parametrize(
"test_name,filter_expr",
[
("First value", "id = 0"),
("Fragment 0 last value", "id = 499"),
("Fragment 1 first value", "id = 500"),
("Fragment 1 last value", "id = 999"),
("Fragment 2 first value", "id = 1000"),
("Last value", "id = 1499"),
("Fragment 0 middle", "id = 250"),
("Fragment 1 middle", "id = 750"),
("Fragment 2 middle", "id = 1250"),
("Range within fragment 0", "id >= 10 AND id < 20"),
("Range within fragment 1", "id >= 510 AND id < 520"),
("Range within fragment 2", "id >= 1010 AND id < 1020"),
("Cross fragment 0-1", "id >= 495 AND id < 505"),
("Cross fragment 1-2", "id >= 995 AND id < 1005"),
("Cross all fragments", "id >= 250 AND id < 1250"),
("Non-existent small value", "id = -1"),
("Non-existent large value", "id = 2000"),
("Large range", "id >= 0 AND id < 1500"),
("Less than boundary", "id < 500"),
("Greater than boundary", "id > 999"),
("Less than or equal", "id <= 505"),
("Greater than or equal", "id >= 995"),
],
)
def test_btree_query_results_match_baseline(self, btree_comp_datasets, test_name, filter_expr):
"""Compare query results between an indexed dataset and an identical baseline dataset without index."""
with_index = btree_comp_datasets["with_index"]
without_index = btree_comp_datasets["without_index"]

res_idx = with_index.scanner(filter=filter_expr, columns=["id", "text"]).to_table()
res_base = without_index.scanner(filter=filter_expr, columns=["id", "text"]).to_table()

assert res_idx.num_rows == res_base.num_rows, (
f"Test '{test_name}' failed: indexed returned {res_idx.num_rows} rows, "
f"baseline returned {res_base.num_rows} rows for filter: {filter_expr}"
)

if res_idx.num_rows > 0:
ids_idx = sorted(res_idx.column("id").to_pylist())
ids_base = sorted(res_base.column("id").to_pylist())
assert ids_idx == ids_base, (
f"Test '{test_name}' failed: indexed and baseline results differ for filter: {filter_expr}"
)