Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 58 additions & 54 deletions docs/src/guide/distributed_indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,65 @@ write:

1. multiple workers build index data in parallel
2. the caller invokes Lance segment build APIs for one distributed build
3. Lance discovers the relevant worker outputs, then plans and builds index artifacts
3. Lance plans and builds index artifacts from the worker outputs supplied by the caller
4. the built artifacts are committed into the dataset manifest

For vector indices, the worker outputs are temporary shard directories under a
shared UUID. Internally, Lance can turn these shard outputs into one or more
built physical segments.
For vector indices, the worker outputs are segments stored directly
under `indices/<segment_uuid>/`. Lance can turn these outputs into one or more
physical segments and then commit them as one logical index.

![Distributed Vector Segment Build](../images/distributed_vector_segment_build.svg)

## Terminology

This guide uses the following terms consistently:

- **Staging root**: the shared UUID directory used during distributed index build
- **Partial shard**: one worker output written under the staging root as
`partial_<uuid>/`
- **Built segment**: one physical index segment produced during segment build and
ready to be committed into the manifest
- **Segment**: one worker output written by `execute_uncommitted()` under
`indices/<segment_uuid>/`
- **Physical segment**: one index segment that is ready to be committed into
the manifest
- **Logical index**: the user-visible index identified by name; a logical index
may contain one or more built segments
may contain one or more physical segments

For example, a distributed vector build may create a layout like:

```text
indices/<staging_uuid>/
├── partial_<shard_0>/
│ ├── index.idx
│ └── auxiliary.idx
├── partial_<shard_1>/
│ ├── index.idx
│ └── auxiliary.idx
└── partial_<shard_2>/
├── index.idx
└── auxiliary.idx
indices/<segment_uuid_0>/
├── index.idx
└── auxiliary.idx

indices/<segment_uuid_1>/
├── index.idx
└── auxiliary.idx

indices/<segment_uuid_2>/
├── index.idx
└── auxiliary.idx
```

After segment build, Lance produces one or more segment directories:

```text
indices/<segment_uuid_0>/
indices/<physical_segment_uuid_0>/
├── index.idx
└── auxiliary.idx

indices/<segment_uuid_1>/
indices/<physical_segment_uuid_1>/
├── index.idx
└── auxiliary.idx
```

These physical segments are then committed together as one logical index.
These physical segments are then committed together as one logical index. In the
common no-merge case, the input segments are already the physical
segments and `build_all()` returns them unchanged.

## Roles

There are two parties involved in distributed indexing:

- **Workers** build partial shards
- **The caller** launches workers, chooses when a distributed build should be
turned into built segments, provides any additional inputs requested by the
- **Workers** build segments
- **The caller** launches workers, chooses how those segments should be turned
into physical segments, provides any additional inputs requested by the
segment build APIs, and
commits the final result

Expand All @@ -82,21 +84,22 @@ launching workers and driving the overall workflow.

The current model for distributed vector indexing has two layers of parallelism.

### Shard Build
### Worker Build

First, multiple workers build partial shards in parallel:
First, multiple workers build segments in parallel:

1. on each worker, call an uncommitted shard-build API such as
`create_index_builder(...).fragments(...).index_uuid(staging_index_uuid).execute_uncommitted()`
or Python `create_index_uncommitted(..., fragment_ids=..., index_uuid=...)`
2. each worker writes one `partial_<uuid>/` under the shared staging root
1. on each worker, call a shard-build API such as
`create_index_builder(...).fragments(...).execute_uncommitted()`
or Python `create_index_uncommitted(..., fragment_ids=...)`
2. each worker writes one segment under `indices/<segment_uuid>/`

### Segment Build

Then the caller turns that staging root into one or more built segments:
Then the caller turns those existing segments into one or more physical
segments:

1. open the staging root with `create_index_segment_builder(staging_index_uuid)`
2. provide partial index metadata with `with_partial_indices(...)`
1. create a builder with `create_index_segment_builder()`
2. provide segment metadata with `with_segments(...)`
3. optionally choose a grouping policy with `with_target_segment_bytes(...)`
4. call `plan()` to get `Vec<IndexSegmentPlan>`

Expand All @@ -105,51 +108,52 @@ At that point the caller has two execution choices:
- call `build(plan)` for each plan and run those builds in parallel
- call `build_all()` to let Lance build every planned segment on the current node

After the segments are built, publish them with
After the physical segments are built, publish them with
`commit_existing_index_segments(...)`.

## Internal Segmented Finalize Model

Internally, Lance models distributed vector segment build as:

1. **plan** which partial shards should become each built segment
2. **build** each segment from its selected partial shards
1. **plan** which input segments should become each physical segment
2. **build** each segment from its selected input segments
3. **commit** the resulting physical segments as one logical index

The plan step is driven by the staging root and any additional shard metadata
required by the segment build APIs.
The plan step is driven by the segment metadata returned from
`execute_uncommitted()` and any additional inputs requested by the segment
build APIs.

This is intentionally a storage-level model:

- partial shards are temporary worker outputs
- built segments are durable physical artifacts
- segments are worker outputs that are not yet published
- physical segments are durable artifacts referenced by the manifest
- the logical index identity is attached only at commit time

## Segment Grouping

When Lance builds segments from a staging root, it may either:
When Lance builds segments from existing inputs, it may either:

- keep shard boundaries, so each partial shard becomes one built segment
- group multiple partial shards into a larger built segment
- keep segment boundaries, so each input segment becomes one physical segment
- group multiple input segments into a larger physical segment

The grouping decision is separate from shard build. Workers only build partial
shards; Lance applies the segment build policy when it plans built segments.
The grouping decision is separate from worker build. Workers only build
segments; Lance applies the segment build policy when it plans
physical segments.

## Responsibility Boundaries

The caller is expected to know:

- which distributed build is ready for segment build
- any additional shard metadata requested by the segment build APIs
- how the resulting built segments should be published
- the segment metadata returned by worker builds
- how the resulting physical segments should be published

Lance is responsible for:

- writing partial shard artifacts
- discovering partial shards under the staging root
- planning built segments from the discovered shard set
- merging shard storage into built segment artifacts
- committing built segments into the manifest
- writing segment artifacts
- planning physical segments from the supplied segment set
- merging segment storage into physical segment artifacts
- committing physical segments into the manifest

This split keeps distributed scheduling outside the storage engine while still
letting Lance own the on-disk index format.
79 changes: 39 additions & 40 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2591,14 +2591,14 @@ def create_scalar_index(
fragment_ids : List[int], optional
If provided, the index will be created only on the specified fragments.
This enables distributed/fragment-level indexing. When provided, the
method returns an IndexMetadata object but does not commit the index
to the dataset. The index can be committed later using the commit API.
method returns metadata for one segment but does not commit
the index to the dataset. The segment can be planned, merged, and
committed later using the segment builder and commit APIs.
This parameter is passed via kwargs internally.
index_uuid : str, optional
A UUID to use for fragment-level distributed indexing
multiple fragment-level indices need to share UUID for later merging.
If not provided, a new UUID will be generated. This parameter is passed via
kwargs internally.
A UUID to use for the segment written by this call.
If not provided, a new UUID will be generated. This parameter is
passed via kwargs internally.

with_position: bool, default False
This is for the ``INVERTED`` index. If True, the index will store the
Expand Down Expand Up @@ -3258,12 +3258,12 @@ def create_index(
fragment_ids : List[int], optional
If provided, the index will be created only on the specified fragments.
This enables distributed/fragment-level indexing. When provided, the
method creates temporary index metadata but does not commit the index
to the dataset. The index can be committed later using
merge_index_metadata(index_uuid, "VECTOR", column=..., index_name=...).
method creates one segment but does not commit the index
to the dataset. The returned metadata can be passed to
``create_index_segment_builder().with_segments(...)``
and then committed with ``commit_existing_index_segments(...)``.
index_uuid : str, optional
A UUID to use for fragment-level distributed indexing. Multiple
fragment-level indices need to share UUID for later merging.
A UUID to use for the segment written by this call.
If not provided, a new UUID will be generated.
target_partition_size: int, optional
The target partition size. If set, the number of partitions will be computed
Expand Down Expand Up @@ -3426,33 +3426,32 @@ def create_index_uncommitted(
**kwargs,
) -> Index:
"""
Create one uncommitted partial index and return its metadata.
Create one segment without publishing it and return its metadata.

This is the public shard-build API for distributed index construction.
Unlike :meth:`create_index`, this method does not publish the index into
the dataset manifest. Instead, it writes one partial index under the
staging UUID and returns the resulting :class:`Index` metadata.
This is the public distributed-build API for vector index
construction. Unlike :meth:`create_index`, this method does not publish
the index into the dataset manifest. Instead, it writes one segment
under ``_indices/<segment_uuid>/`` and returns the resulting
:class:`Index` metadata.

Callers should:

1. run :meth:`create_index_uncommitted` on each worker with the worker's
assigned ``fragment_ids`` and a shared ``index_uuid``
1. run :meth:`create_index_uncommitted` on each worker with that worker's
assigned ``fragment_ids``
2. collect the returned :class:`Index` objects
3. pass them to :meth:`IndexSegmentBuilder.with_partial_indices`
4. build one or more segments and commit them with
3. pass them to :meth:`IndexSegmentBuilder.with_segments`
4. build one or more physical segments and commit them with
:meth:`commit_existing_index_segments`

Parameters are the same as :meth:`create_index`, with two additional
requirements for distributed shard build:
Parameters are the same as :meth:`create_index`, with one additional
requirement:

- ``fragment_ids`` must be provided
- workers that belong to the same distributed build must share the same
``index_uuid``

Returns
-------
Index
Metadata for the partial index that was written by this call.
Metadata for the segment that was written by this call.
"""
return self._create_index_impl(
column,
Expand Down Expand Up @@ -3514,28 +3513,29 @@ def merge_index_metadata(
batch_readhead: Optional[int] = None,
):
"""
Merge distributed index metadata for supported scalar
and vector index types.
Merge distributed scalar index metadata.

This method supports all index types defined in
:class:`lance.indices.SupportedDistributedIndices`,
including scalar indices and precise vector index types.
Vector distributed indexing no longer uses this API. For vector indices,
build segments with :meth:`create_index_uncommitted`, plan or
merge them with :meth:`create_index_segment_builder`, and publish them
with :meth:`commit_existing_index_segments`.

This method does NOT commit changes.

This API merges temporary index files (e.g., per-fragment partials).
This API merges temporary scalar index files (for example per-fragment
BTree or inverted index outputs).
After this method returns, callers MUST explicitly commit
the index manifest using lance.LanceDataset.commit(...)
with a LanceOperation.CreateIndex.

Parameters
----------
index_uuid: str
The shared UUID used when building fragment-level indices.
The shared UUID used when building fragment-level scalar indices.
index_type: str
Index type name. Must be one of the enum values in
:class:`lance.indices.SupportedDistributedIndices`
(for example ``"IVF_PQ"``).
supported by scalar distributed merge.
batch_readhead: int, optional
Prefetch concurrency used by BTREE merge reader. Default: 1.
"""
Expand All @@ -3552,16 +3552,15 @@ def merge_index_metadata(
self._ds.merge_index_metadata(index_uuid, t, batch_readhead)
return None

def create_index_segment_builder(self, staging_index_uuid: str):
def create_index_segment_builder(self):
"""
Create a builder for turning partial index outputs into committed segments.
Create a builder for turning existing segments into physical segments.

The caller should pass the shared index UUID used during
:meth:`create_index` with ``fragment_ids=...`` and ``index_uuid=...``.
Then provide the returned partial index metadata through
:meth:`IndexSegmentBuilder.with_partial_indices`.
Provide the segment metadata returned by
:meth:`create_index_uncommitted` through
:meth:`IndexSegmentBuilder.with_segments`.
"""
return self._ds.create_index_segment_builder(staging_index_uuid)
return self._ds.create_index_segment_builder()

def commit_existing_index_segments(
self, index_name: str, column: str, segments: List[IndexSegment]
Expand Down
2 changes: 2 additions & 0 deletions python/python/lance/indices/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ class SupportedDistributedIndices(str, Enum):
# Scalar index types
BTREE = "BTREE"
INVERTED = "INVERTED"

# Precise vector index types supported by distributed merge
IVF_FLAT = "IVF_FLAT"
IVF_PQ = "IVF_PQ"
IVF_SQ = "IVF_SQ"

# Deprecated generic placeholder (kept for backward compatibility)
VECTOR = "VECTOR"
Loading
Loading