Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow_schema::{DataType, Schema as ArrowSchema};
use async_trait::async_trait;
use chrono::Duration;

use futures::StreamExt;
use futures::{StreamExt, TryFutureExt};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::UpdateBuilder;
use lance::dataset::{
Expand All @@ -37,6 +37,7 @@ use lance::index::{
};
use lance_arrow::as_fixed_size_list_array;
use lance_core::{datatypes::Schema, format::Fragment, io::object_store::ObjectStoreParams};
use lance_index::optimize::OptimizeOptions;
use lance_index::{
vector::{ivf::IvfBuildParams, pq::PQBuildParams},
DatasetIndexExt, IndexParams, IndexType,
Expand Down Expand Up @@ -639,10 +640,22 @@ impl Dataset {
})
}

fn optimize_indices(&mut self, _kwargs: Option<&PyDict>) -> PyResult<()> {
#[pyo3(signature = (**kwargs))]
fn optimize_indices(&mut self, kwargs: Option<&PyDict>) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.optimize_indices())?
.map_err(|err| PyIOError::new_err(err.to_string()))?;
let mut options: OptimizeOptions = Default::default();
if let Some(kwargs) = kwargs {
if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? {
options.num_indices_to_merge = num_indices_to_merge.extract()?;
}
}
RT.block_on(
None,
new_self
.optimize_indices(&options)
.map_err(|err| PyIOError::new_err(err.to_string())),
)??;

self.ds = Arc::new(new_self);
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ datafusion-physical-expr = "34.0"
either = "1.0"
futures = "0.3"
http = "0.2.9"
itertools = "0.12"
lazy_static = "1"
log = "0.4"
mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num_cpus = "1.0"
num-traits = "0.2"
num_cpus = "1.0"
object_store = { version = "0.9.0", features = ["aws", "gcp", "azure"] }
parquet = "49.0"
pin-project = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use async_trait::async_trait;
use lance_core::Result;
use roaring::RoaringBitmap;

pub mod optimize;
pub mod scalar;
pub mod traits;
pub mod vector;
Expand Down Expand Up @@ -58,6 +59,7 @@ pub trait Index: Send + Sync {
}

/// Index Type
#[derive(Debug, PartialEq)]
pub enum IndexType {
// Preserve 0-100 for simple indices.
Scalar = 0,
Expand Down
39 changes: 39 additions & 0 deletions rust/lance-index/src/optimize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Options for optimizing all indices.
#[derive(Debug)]
pub struct OptimizeOptions {
/// Number of delta indices to merge for one column. Default: 1.
///
/// If `num_indices_to_merge` is 0, a new delta index will be created.
/// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index.
/// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices
/// will be merged into one single index.
Comment on lines +20 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to explain to the user why they would change this parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry too much. We can tweak this if/when the parameter becomes public. I'm guessing that there is some kind of cost-to-compute/accuracy-of-index tradeoff here? Or is it a cost-to-compute/cost-to-search tradeoff?

In other words, "why wouldn't I merge the indicies into one big index every time?" or "why wouldn't I make every index a delta index?"

I'm still not sure it is clear from the comment.

///
/// It is up to the caller to decide how many indices to merge / keep. Callers can
/// find out how many indices are there by calling [`Dataset::index_statistics`].
///
/// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version,
/// and accumulate a few delta indices, then merge them into the snapshot.
pub num_indices_to_merge: usize,
}

impl Default for OptimizeOptions {
fn default() -> Self {
Self {
num_indices_to_merge: 1,
}
}
}
4 changes: 2 additions & 2 deletions rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use lance_core::{format::Index, Result};

use crate::{IndexParams, IndexType};
use crate::{optimize::OptimizeOptions, IndexParams, IndexType};

// Extends Lance Dataset with secondary index.
///
Expand Down Expand Up @@ -85,7 +85,7 @@ pub trait DatasetIndexExt {
async fn load_scalar_index_for_column(&self, col: &str) -> Result<Option<Index>>;

/// Optimize indices.
async fn optimize_indices(&mut self) -> Result<()>;
async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()>;

/// Find index with a given index_name and return its serialized statistics.
///
Expand Down
3 changes: 1 addition & 2 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ clap = { version = "4.1.1", features = ["derive"], optional = true }
dashmap = "5"
# matches arrow-rs use
half.workspace = true
itertools.workspace = true
http.workspace = true
object_store.workspace = true
aws-config.workspace = true
Expand Down Expand Up @@ -103,10 +104,8 @@ prost-build.workspace = true
[dev-dependencies]
lance-test-macros = { workspace = true }
pretty_assertions = { workspace = true }

clap = { version = "4.1.1", features = ["derive"] }
criterion = { workspace = true }

approx.workspace = true
dirs = "5.0.0"
all_asserts = "2.3.1"
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ mod test {

// UPDATE

dataset.optimize_indices().await.unwrap();
dataset.optimize_indices(&Default::default()).await.unwrap();
let updated_version = dataset.version().version;

// APPEND -> DELETE
Expand Down
133 changes: 121 additions & 12 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use std::sync::Arc;
use arrow_schema::DataType;
use async_trait::async_trait;
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use lance_core::format::Fragment;
use lance_core::io::{
read_message, read_message_from_buf, read_metadata_offset, reader::read_manifest_indexes,
Reader,
};
use lance_index::optimize::OptimizeOptions;
use lance_index::pb::index::Implementation;
use lance_index::scalar::expression::IndexInformationProvider;
use lance_index::scalar::lance_format::LanceIndexStore;
Expand All @@ -46,7 +48,7 @@ pub mod vector;

use crate::dataset::transaction::{Operation, Transaction};
use crate::format::Index as IndexMetadata;
use crate::index::append::append_index;
use crate::index::append::merge_indices;
use crate::index::vector::remap_vector_index;
use crate::io::commit::commit_transaction;
use crate::{dataset::Dataset, Error, Result};
Expand Down Expand Up @@ -296,29 +298,43 @@ impl DatasetIndexExt for Dataset {
}

#[instrument(skip_all)]
async fn optimize_indices(&mut self) -> Result<()> {
async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
let dataset = Arc::new(self.clone());
// Append index
let indices = self.load_indices().await?;

let name_to_indices = indices
.iter()
.map(|idx| (idx.name.clone(), idx))
.into_group_map();

let mut new_indices = vec![];
let mut removed_indices = vec![];
for idx in indices.as_slice() {
if idx.dataset_version == self.manifest.version {
continue;
}
let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else {
for deltas in name_to_indices.values() {
let Some((new_id, removed, mut new_frag_ids)) =
merge_indices(dataset.clone(), deltas.as_slice(), options).await?
else {
continue;
};
for removed_idx in removed.iter() {
new_frag_ids |= removed_idx.fragment_bitmap.as_ref().unwrap();
}

let last_idx = deltas.last().expect("Delte indices should not be empty");
let new_idx = IndexMetadata {
uuid: new_id,
name: idx.name.clone(),
fields: idx.fields.clone(),
name: last_idx.name.clone(), // Keep the same name
fields: last_idx.fields.clone(),
dataset_version: self.manifest.version,
fragment_bitmap: new_frag_ids,
fragment_bitmap: Some(new_frag_ids),
};
removed_indices.push(idx.clone());
removed_indices.extend(removed.iter().map(|&idx| idx.clone()));
if deltas.len() > removed.len() {
new_indices.extend(
deltas[0..(deltas.len() - removed.len())]
.iter()
.map(|&idx| idx.clone()),
);
}
new_indices.push(new_idx);
}

Expand Down Expand Up @@ -533,6 +549,8 @@ impl DatasetIndexInternalExt for Dataset {

#[cfg(test)]
mod tests {
use crate::dataset::builder::DatasetBuilder;

use super::*;

use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator};
Expand Down Expand Up @@ -666,4 +684,95 @@ mod tests {
assert_eq!(stats["num_unindexed_rows"], 512);
assert_eq!(stats["num_indexed_rows"], 512);
}

#[tokio::test]
async fn test_optimize_delta_indices() {
let test_dir = tempdir().unwrap();
let dimensions = 16;
let column_name = "vec";
let field = Field::new(
column_name,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
let schema = Arc::new(Schema::new(vec![field]));

let float_arr = generate_random_array(512 * dimensions as usize);

let vectors =
arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();

let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();

let reader = RecordBatchIterator::new(
vec![record_batch.clone()].into_iter().map(Ok),
schema.clone(),
);

let test_uri = test_dir.path().to_str().unwrap();
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10);
dataset
.create_index(
&[column_name],
IndexType::Vector,
Some("vec_idx".into()),
&params,
true,
)
.await
.unwrap();

let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 512);
assert_eq!(stats["num_indexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

let reader =
RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
dataset.append(reader, None).await.unwrap();
let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 512);
assert_eq!(stats["num_indexed_rows"], 512);
assert_eq!(stats["num_indexed_fragments"], 1);
assert_eq!(stats["num_unindexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0, // Just create index for delta
})
.await
.unwrap();
let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();

let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 1024);
assert_eq!(stats["num_indexed_fragments"], 2);
assert_eq!(stats["num_unindexed_fragments"], 0);
assert_eq!(stats["num_indices"], 2);

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 2,
})
.await
.unwrap();
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 1024);
assert_eq!(stats["num_indexed_fragments"], 2);
assert_eq!(stats["num_unindexed_fragments"], 0);
assert_eq!(stats["num_indices"], 1);
}
}
Loading