Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 58 additions & 54 deletions docs/src/guide/distributed_indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,65 @@ write:

1. multiple workers build index data in parallel
2. the caller invokes Lance segment build APIs for one distributed build
3. Lance discovers the relevant worker outputs, then plans and builds index artifacts
3. Lance plans and builds index artifacts from the worker outputs supplied by the caller
4. the built artifacts are committed into the dataset manifest

For vector indices, the worker outputs are temporary shard directories under a
shared UUID. Internally, Lance can turn these shard outputs into one or more
built physical segments.
For vector indices, the worker outputs are segments stored directly
under `indices/<segment_uuid>/`. Lance can turn these outputs into one or more
physical segments and then commit them as one logical index.

![Distributed Vector Segment Build](../images/distributed_vector_segment_build.svg)

## Terminology

This guide uses the following terms consistently:

- **Staging root**: the shared UUID directory used during distributed index build
- **Partial shard**: one worker output written under the staging root as
`partial_<uuid>/`
- **Built segment**: one physical index segment produced during segment build and
ready to be committed into the manifest
- **Segment**: one worker output written by `execute_uncommitted()` under
`indices/<segment_uuid>/`
- **Physical segment**: one index segment that is ready to be committed into
the manifest
- **Logical index**: the user-visible index identified by name; a logical index
may contain one or more built segments
may contain one or more physical segments

For example, a distributed vector build may create a layout like:

```text
indices/<staging_uuid>/
├── partial_<shard_0>/
│ ├── index.idx
│ └── auxiliary.idx
├── partial_<shard_1>/
│ ├── index.idx
│ └── auxiliary.idx
└── partial_<shard_2>/
├── index.idx
└── auxiliary.idx
indices/<segment_uuid_0>/
├── index.idx
└── auxiliary.idx

indices/<segment_uuid_1>/
├── index.idx
└── auxiliary.idx

indices/<segment_uuid_2>/
├── index.idx
└── auxiliary.idx
```

After segment build, Lance produces one or more segment directories:

```text
indices/<segment_uuid_0>/
indices/<physical_segment_uuid_0>/
├── index.idx
└── auxiliary.idx

indices/<segment_uuid_1>/
indices/<physical_segment_uuid_1>/
├── index.idx
└── auxiliary.idx
```

These physical segments are then committed together as one logical index.
These physical segments are then committed together as one logical index. In the
common no-merge case, the input segments are already the physical
segments and `build_all()` returns them unchanged.

## Roles

There are two parties involved in distributed indexing:

- **Workers** build partial shards
- **The caller** launches workers, chooses when a distributed build should be
turned into built segments, provides any additional inputs requested by the
- **Workers** build segments
- **The caller** launches workers, chooses how those segments should be turned
into physical segments, provides any additional inputs requested by the
segment build APIs, and
commits the final result

Expand All @@ -82,21 +84,22 @@ launching workers and driving the overall workflow.

The current model for distributed vector indexing has two layers of parallelism.

### Shard Build
### Worker Build

First, multiple workers build partial shards in parallel:
First, multiple workers build segments in parallel:

1. on each worker, call an uncommitted shard-build API such as
`create_index_builder(...).fragments(...).index_uuid(staging_index_uuid).execute_uncommitted()`
or Python `create_index_uncommitted(..., fragment_ids=..., index_uuid=...)`
2. each worker writes one `partial_<uuid>/` under the shared staging root
1. on each worker, call a shard-build API such as
`create_index_builder(...).fragments(...).execute_uncommitted()`
or Python `create_index_uncommitted(..., fragment_ids=...)`
2. each worker writes one segment under `indices/<segment_uuid>/`

### Segment Build

Then the caller turns that staging root into one or more built segments:
Then the caller turns those existing segments into one or more physical
segments:

1. open the staging root with `create_index_segment_builder(staging_index_uuid)`
2. provide partial index metadata with `with_partial_indices(...)`
1. create a builder with `create_index_segment_builder()`
2. provide segment metadata with `with_segments(...)`
3. optionally choose a grouping policy with `with_target_segment_bytes(...)`
4. call `plan()` to get `Vec<IndexSegmentPlan>`

Expand All @@ -105,51 +108,52 @@ At that point the caller has two execution choices:
- call `build(plan)` for each plan and run those builds in parallel
- call `build_all()` to let Lance build every planned segment on the current node

After the segments are built, publish them with
After the physical segments are built, publish them with
`commit_existing_index_segments(...)`.

## Internal Segmented Finalize Model

Internally, Lance models distributed vector segment build as:

1. **plan** which partial shards should become each built segment
2. **build** each segment from its selected partial shards
1. **plan** which input segments should become each physical segment
2. **build** each segment from its selected input segments
3. **commit** the resulting physical segments as one logical index

The plan step is driven by the staging root and any additional shard metadata
required by the segment build APIs.
The plan step is driven by the segment metadata returned from
`execute_uncommitted()` and any additional inputs requested by the segment
build APIs.

This is intentionally a storage-level model:

- partial shards are temporary worker outputs
- built segments are durable physical artifacts
- segments are worker outputs that are not yet published
- physical segments are durable artifacts referenced by the manifest
- the logical index identity is attached only at commit time

## Segment Grouping

When Lance builds segments from a staging root, it may either:
When Lance builds segments from existing inputs, it may either:

- keep shard boundaries, so each partial shard becomes one built segment
- group multiple partial shards into a larger built segment
- keep segment boundaries, so each input segment becomes one physical segment
- group multiple input segments into a larger physical segment

The grouping decision is separate from shard build. Workers only build partial
shards; Lance applies the segment build policy when it plans built segments.
The grouping decision is separate from worker build. Workers only build
segments; Lance applies the segment build policy when it plans
physical segments.

## Responsibility Boundaries

The caller is expected to know:

- which distributed build is ready for segment build
- any additional shard metadata requested by the segment build APIs
- how the resulting built segments should be published
- the segment metadata returned by worker builds
- how the resulting physical segments should be published

Lance is responsible for:

- writing partial shard artifacts
- discovering partial shards under the staging root
- planning built segments from the discovered shard set
- merging shard storage into built segment artifacts
- committing built segments into the manifest
- writing segment artifacts
- planning physical segments from the supplied segment set
- merging segment storage into physical segment artifacts
- committing physical segments into the manifest

This split keeps distributed scheduling outside the storage engine while still
letting Lance own the on-disk index format.
180 changes: 178 additions & 2 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::namespace::{
};
use crate::session::{handle_from_session, session_from_handle};
use crate::storage_options::JavaStorageOptionsProvider;
use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec};
use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec, import_vec_to_rust};
use crate::utils::{
build_compaction_options, extract_storage_options, extract_write_params,
get_scalar_index_params, get_vector_index_params, to_rust_map,
Expand Down Expand Up @@ -49,7 +49,7 @@ use lance_index::DatasetIndexExt;
use lance_index::IndexCriteria as RustIndexCriteria;
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::btree::BTreeParameters;
use lance_index::{IndexParams, IndexType};
use lance_index::{IndexParams, IndexSegment, IndexType};
use lance_io::object_store::ObjectStoreRegistry;
use lance_io::object_store::StorageOptionsProvider;
use lance_namespace::LanceNamespace;
Expand Down Expand Up @@ -1070,6 +1070,182 @@ fn inner_merge_index_metadata(
Ok(())
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
java_segments: JObject,
target_segment_bytes_jobj: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_build_index_segments(
&mut env,
java_dataset,
java_segments,
target_segment_bytes_jobj
)
)
}

fn inner_build_index_segments<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
java_segments: JObject,
target_segment_bytes_jobj: JObject,
) -> Result<JObject<'local>> {
let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?;
let target_segment_bytes = env
.get_long_opt(&target_segment_bytes_jobj)?
.map(|v| v as u64);
let template = segment_template(&segments)?;

let built_segments = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
let mut builder = dataset_guard
.inner
.create_index_segment_builder()
.with_segments(segments);
if let Some(target_segment_bytes) = target_segment_bytes {
builder = builder.with_target_segment_bytes(target_segment_bytes);
}
RT.block_on(builder.build_all())?
};

let built_metadata = built_segments
.into_iter()
.map(|segment| index_segment_to_metadata(&template, segment))
.collect::<Vec<_>>();
export_vec(env, &built_metadata)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeCommitExistingIndexSegments<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
index_name: JString,
column: JString,
java_segments: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_commit_existing_index_segments(
&mut env,
java_dataset,
index_name,
column,
java_segments
)
)
}

fn inner_commit_existing_index_segments<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
index_name: JString,
column: JString,
java_segments: JObject,
) -> Result<JObject<'local>> {
let index_name = index_name.extract(env)?;
let column = column.extract(env)?;
let segment_metadata =
import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?;
let segments = segment_metadata
.iter()
.map(index_metadata_to_segment)
.collect::<Result<Vec<_>>>()?;

let committed = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
RT.block_on(dataset_guard.inner.commit_existing_index_segments(
&index_name,
&column,
segments,
))?;
RT.block_on(dataset_guard.inner.load_indices_by_name(&index_name))?
};

export_vec(env, &committed)
}

struct SegmentTemplate {
name: String,
fields: Vec<i32>,
dataset_version: u64,
}

fn segment_template(segments: &[IndexMetadata]) -> Result<SegmentTemplate> {
let first = segments
.first()
.ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?;
for segment in &segments[1..] {
if segment.name != first.name {
return Err(Error::input_error(format!(
"All segments must share the same index name, got '{}' and '{}'",
first.name, segment.name
)));
}
if segment.fields != first.fields {
return Err(Error::input_error(format!(
"All segments must target the same field ids, got {:?} and {:?}",
first.fields, segment.fields
)));
}
if segment.dataset_version != first.dataset_version {
return Err(Error::input_error(format!(
"All segments must share the same dataset version, got {} and {}",
first.dataset_version, segment.dataset_version
)));
}
}

Ok(SegmentTemplate {
name: first.name.clone(),
fields: first.fields.clone(),
dataset_version: first.dataset_version,
})
}

fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result<IndexSegment> {
let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| {
Error::input_error(format!(
"Segment '{}' is missing fragment coverage metadata",
metadata.uuid
))
})?;
let index_details = metadata.index_details.clone().ok_or_else(|| {
Error::input_error(format!(
"Segment '{}' is missing index details metadata",
metadata.uuid
))
})?;

Ok(IndexSegment::new(
metadata.uuid,
fragment_bitmap,
index_details,
metadata.index_version,
))
}

fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata {
let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts();
IndexMetadata {
uuid,
fields: template.fields.clone(),
name: template.name.clone(),
dataset_version: template.dataset_version,
fragment_bitmap: Some(fragment_bitmap),
index_details: Some(index_details),
index_version,
created_at: Some(Utc::now()),
base_id: None,
files: None,
}
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices(
mut env: JNIEnv,
Expand Down
Loading
Loading