diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 672a52f937..86a3e6ad46 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -31,17 +31,15 @@ use lance::dataset::{ scanner::Scanner as LanceScanner, transaction::Operation as LanceOperation, Dataset as LanceDataset, ReadParams, Version, WriteMode, WriteParams, }; -use lance::index::IndexParams; use lance::index::{ scalar::ScalarIndexParams, vector::{diskann::DiskANNParams, VectorIndexParams}, - DatasetIndexExt, }; use lance_arrow::as_fixed_size_list_array; use lance_core::{datatypes::Schema, format::Fragment, io::object_store::ObjectStoreParams}; use lance_index::{ vector::{ivf::IvfBuildParams, pq::PQBuildParams}, - IndexType, + DatasetIndexExt, IndexParams, IndexType, }; use lance_linalg::distance::MetricType; use pyo3::exceptions::PyStopIteration; @@ -808,7 +806,9 @@ impl Dataset { } fn count_unindexed_rows(&self, index_name: String) -> PyResult> { - let idx = RT.block_on(None, self.ds.load_index_by_name(index_name.as_str()))?; + let idx = RT + .block_on(None, self.ds.load_index_by_name(index_name.as_str()))? + .map_err(|err| PyIOError::new_err(err.to_string()))?; if let Some(index) = idx { RT.block_on( None, @@ -825,7 +825,9 @@ impl Dataset { } fn count_indexed_rows(&self, index_name: String) -> PyResult> { - let idx = RT.block_on(None, self.ds.load_index_by_name(index_name.as_str()))?; + let idx = RT + .block_on(None, self.ds.load_index_by_name(index_name.as_str()))? + .map_err(|err| PyIOError::new_err(err.to_string()))?; if let Some(index) = idx { RT.block_on( None, diff --git a/rust/lance-core/src/format/index.rs b/rust/lance-core/src/format/index.rs index 3ee81e2ff1..42a25335a9 100644 --- a/rust/lance-core/src/format/index.rs +++ b/rust/lance-core/src/format/index.rs @@ -1,19 +1,16 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// Copyright 2024 Lance Developers. // -// http://www.apache.org/licenses/LICENSE-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. //! Metadata for index diff --git a/rust/lance-core/src/lib.rs b/rust/lance-core/src/lib.rs index 173ae2a21f..2ad7b0417c 100644 --- a/rust/lance-core/src/lib.rs +++ b/rust/lance-core/src/lib.rs @@ -34,3 +34,6 @@ lazy_static::lazy_static! { } pub(crate) const DELETION_DIRS: &str = "_deletions"; + +/// Trait for a Lance Dataset +pub trait Dataset {} diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 8827c3fba9..b00b416a72 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2023 Lance Developers. +// Copyright 2024 Lance Developers. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,7 +23,9 @@ use lance_core::Result; use roaring::RoaringBitmap; pub mod scalar; +pub mod traits; pub mod vector; +pub use crate::traits::*; pub const INDEX_FILE_NAME: &str = "index.idx"; @@ -33,16 +35,21 @@ pub mod pb { } /// Generic methods common across all types of secondary indices +/// #[async_trait] pub trait Index: Send + Sync { /// Cast to [Any]. fn as_any(&self) -> &dyn Any; + /// Cast to [Index] fn as_index(self: Arc) -> Arc; + /// Retrieve index statistics as a JSON string fn statistics(&self) -> Result; + /// Get the type of the index fn index_type(&self) -> IndexType; + /// Read through the index and determine which fragment ids are covered by the index /// /// This is a kind of slow operation. It's better to use the fragment_bitmap. This @@ -67,3 +74,7 @@ impl std::fmt::Display for IndexType { } } } + +pub trait IndexParams: Send + Sync { + fn as_any(&self) -> &dyn Any; +} diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs new file mode 100644 index 0000000000..33c943c624 --- /dev/null +++ b/rust/lance-index/src/traits.rs @@ -0,0 +1,84 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; + +use lance_core::{format::Index, Result}; + +use crate::{IndexParams, IndexType}; + +// Extends Lance Dataset with secondary index. +/// +#[async_trait] +pub trait DatasetIndexExt { + /// Create indices on columns. + /// + /// Upon finish, a new dataset version is generated. + /// + /// Parameters: + /// + /// - `columns`: the columns to build the indices on. + /// - `index_type`: specify [`IndexType`]. + /// - `name`: optional index name. Must be unique in the dataset. + /// if not provided, it will auto-generate one. + /// - `params`: index parameters. + /// - `replace`: replace the existing index if it exists. + async fn create_index( + &mut self, + columns: &[&str], + index_type: IndexType, + name: Option, + params: &dyn IndexParams, + replace: bool, + ) -> Result<()>; + + /// Read all indices of this Dataset version. + async fn load_indices(&self) -> Result>; + + /// Loads all the indies of a given UUID. + /// + /// Note that it is possible to have multiple indices with the same UUID, + /// as they are the deltas of the same index. + async fn load_index(&self, uuid: &str) -> Result> { + self.load_indices() + .await + .map(|indices| indices.into_iter().find(|idx| idx.uuid.to_string() == uuid)) + } + + /// Loads a specific index with the given index name + async fn load_index_by_name(&self, name: &str) -> Result> { + self.load_indices() + .await + .map(|indices| indices.into_iter().find(|idx| idx.name == name)) + } + + /// Loads a specific index with the given index name. + async fn load_scalar_index_for_column(&self, col: &str) -> Result>; + + /// Optimize indices. + async fn optimize_indices(&mut self) -> Result<()>; + + /// Find index with a given index_name and return its serialized statistics. + async fn index_statistics(&self, index_name: &str) -> Result>; + + /// Count the rows that are not indexed by the given index. + /// + /// TODO: move to [DatasetInternalExt] + async fn count_unindexed_rows(&self, index_name: &str) -> Result>; + + /// Count the rows that are indexed by the given index. + /// + /// TODO: move to [DatasetInternalExt] + async fn count_indexed_rows(&self, index_name: &str) -> Result>; +} diff --git a/rust/lance/benches/ivf_pq.rs b/rust/lance/benches/ivf_pq.rs index 8cb02694f3..600a6ea296 100644 --- a/rust/lance/benches/ivf_pq.rs +++ b/rust/lance/benches/ivf_pq.rs @@ -17,14 +17,14 @@ use std::sync::Arc; use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator}; use arrow_schema::{DataType, Field, FieldRef, Schema}; use criterion::{criterion_group, criterion_main, Criterion}; + use lance::{ arrow::*, dataset::{WriteMode, WriteParams}, - index::{vector::VectorIndexParams, DatasetIndexExt}, + index::vector::VectorIndexParams, Dataset, }; - -use lance_index::IndexType; +use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::generate_random_array; #[cfg(target_os = "linux")] diff --git a/rust/lance/benches/vector_index.rs b/rust/lance/benches/vector_index.rs index 07af8407df..12e4e21855 100644 --- a/rust/lance/benches/vector_index.rs +++ b/rust/lance/benches/vector_index.rs @@ -20,18 +20,17 @@ use arrow_array::{ use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema}; use criterion::{criterion_group, criterion_main, Criterion}; use futures::TryStreamExt; -use lance::dataset::builder::DatasetBuilder; -use lance_index::IndexType; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; use rand::{self, Rng}; -use lance::dataset::{WriteMode, WriteParams}; +use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams}; use lance::index::vector::VectorIndexParams; -use lance::index::DatasetIndexExt; -use lance::{arrow::as_fixed_size_list_array, dataset::Dataset}; -use lance_arrow::FixedSizeListArrayExt; -use lance_index::vector::{ivf::IvfBuildParams, pq::PQBuildParams}; +use lance_arrow::{as_fixed_size_list_array, FixedSizeListArrayExt}; +use lance_index::{ + vector::{ivf::IvfBuildParams, pq::PQBuildParams}, + DatasetIndexExt, IndexType, +}; use lance_linalg::distance::MetricType; fn bench_ivf_pq_index(c: &mut Criterion) { diff --git a/rust/lance/src/bin/lq.rs b/rust/lance/src/bin/lq.rs index c66c867da0..ecf64de968 100644 --- a/rust/lance/src/bin/lq.rs +++ b/rust/lance/src/bin/lq.rs @@ -20,8 +20,9 @@ use futures::TryStreamExt; use snafu::{location, Location}; use lance::dataset::Dataset; -use lance::index::{vector::VectorIndexParams, DatasetIndexExt}; +use lance::index::vector::VectorIndexParams; use lance::{Error, Result}; +use lance_index::DatasetIndexExt; use lance_linalg::distance::MetricType; #[derive(Parser)] diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9111d911ae..11988d4965 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -35,7 +35,7 @@ use lance_core::io::{ commit::CommitError, object_store::{ObjectStore, ObjectStoreParams}, read_metadata_offset, read_struct, - reader::{read_manifest, read_manifest_indexes}, + reader::read_manifest, write_manifest, ObjectWriter, WriteExt, }; use log::warn; @@ -69,11 +69,9 @@ use self::fragment::FileFragment; use self::scanner::{DatasetRecordBatchStream, Scanner}; use self::transaction::{Operation, Transaction}; use self::write::{reader_to_stream, write_fragments_internal}; -use crate::dataset::index::unindexed_fragments; use crate::datatypes::Schema; use crate::error::box_error; use crate::format::{Fragment, Index, Manifest}; -use crate::index::DatasetIndexInternalExt; use crate::io::commit::{commit_new_dataset, commit_transaction}; use crate::session::Session; @@ -1261,7 +1259,7 @@ impl Dataset { &self.object_store } - async fn manifest_file(&self, version: u64) -> Result { + pub(crate) async fn manifest_file(&self, version: u64) -> Result { self.object_store .commit_handler .resolve_version(&self.base, version, &self.object_store.inner) @@ -1356,94 +1354,6 @@ impl Dataset { &self.manifest.fragments } - /// Read all indices of this Dataset version. - pub async fn load_indices(&self) -> Result> { - let manifest_file = self.manifest_file(self.version().version).await?; - read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest).await - } - - /// Loads a specific index with the given id - pub async fn load_index(&self, uuid: &str) -> Option { - self.load_indices() - .await - .unwrap() - .into_iter() - .find(|idx| idx.uuid.to_string() == uuid) - } - - pub async fn load_index_by_name(&self, name: &str) -> Option { - self.load_indices() - .await - .unwrap() - .into_iter() - .find(|idx| idx.name == name) - } - - pub(crate) async fn load_scalar_index_for_column(&self, col: &str) -> Result> { - Ok(self - .load_indices() - .await? - .into_iter() - .filter(|idx| idx.fields.len() == 1) - .find(|idx| { - let field = self.schema().field_by_id(idx.fields[0]); - if let Some(field) = field { - field.name == col - } else { - false - } - })) - } - - /// Find index with a given index_name and return its serialized statistics. - pub async fn index_statistics(&self, index_name: &str) -> Result> { - let index_uuid = self - .load_index_by_name(index_name) - .await - .map(|idx| idx.uuid.to_string()); - - if let Some(index_uuid) = index_uuid { - let index_statistics = self - .open_generic_index("vector", &index_uuid) - .await? - .statistics() - .unwrap(); - Ok(Some(index_statistics)) - } else { - Ok(None) - } - } - - pub async fn count_unindexed_rows(&self, index_uuid: &str) -> Result> { - let index = self.load_index(index_uuid).await; - - if let Some(index) = index { - let unindexed_frags = unindexed_fragments(&index, self).await?; - let unindexed_rows = unindexed_frags - .iter() - .map(Fragment::num_rows) - // sum the number of rows in each fragment if no fragment returned None from row_count - .try_fold(0, |a, b| b.map(|b| a + b).ok_or(())); - - Ok(unindexed_rows.ok()) - } else { - Ok(None) - } - } - - pub async fn count_indexed_rows(&self, index_uuid: &str) -> Result> { - let count_rows = self.count_rows(); - let count_unindexed_rows = self.count_unindexed_rows(index_uuid); - - let (count_rows, count_unindexed_rows) = - futures::try_join!(count_rows, count_unindexed_rows)?; - - match count_unindexed_rows { - Some(count_unindexed_rows) => Ok(Some(count_rows - count_unindexed_rows)), - None => Ok(None), - } - } - /// Gets the number of files that are so small they don't even have a full /// group. These are considered too small because reading many of them is /// much less efficient than reading a single file because the separate files @@ -1590,7 +1500,7 @@ mod tests { use crate::dataset::WriteMode::Overwrite; use crate::datatypes::Schema; use crate::index::scalar::ScalarIndexParams; - use crate::index::{vector::VectorIndexParams, DatasetIndexExt}; + use crate::index::vector::VectorIndexParams; use crate::io::deletion::read_deletion_file; use arrow_array::{ @@ -1607,8 +1517,7 @@ mod tests { use futures::stream::TryStreamExt; use lance_core::format::WriterVersion; use lance_datagen::{array, gen, BatchCount, RowCount}; - use lance_index::vector::DIST_COL; - use lance_index::IndexType; + use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::generate_random_array; use tempfile::{tempdir, TempDir}; @@ -2610,7 +2519,11 @@ mod tests { .await .unwrap(); - let index = dataset.load_index_by_name(&index_name).await.unwrap(); + let index = dataset + .load_index_by_name(&index_name) + .await + .unwrap() + .unwrap(); assert_eq!(index.dataset_version, 1); assert_eq!(index.fields, vec![0]); @@ -3430,87 +3343,6 @@ mod tests { } } - #[tokio::test] - async fn test_count_index_rows() { - let test_dir = tempdir().unwrap(); - let dimensions = 16; - let column_name = "vec"; - let field = Field::new( - column_name, - DataType::FixedSizeList( - Arc::new(Field::new("item", DataType::Float32, true)), - dimensions, - ), - false, - ); - let schema = Arc::new(ArrowSchema::new(vec![field])); - - let float_arr = generate_random_array(512 * dimensions as usize); - - let vectors = - arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); - - let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); - - let reader = - RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); - - let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - dataset.validate().await.unwrap(); - - // Make sure it returns None if there's no index with the passed identifier - assert_eq!(dataset.count_unindexed_rows("bad_id").await.unwrap(), None); - assert_eq!(dataset.count_indexed_rows("bad_id").await.unwrap(), None); - - // Create an index - let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10); - dataset - .create_index( - &[column_name], - IndexType::Vector, - Some("vec_idx".into()), - ¶ms, - true, - ) - .await - .unwrap(); - - let index = dataset.load_index_by_name("vec_idx").await.unwrap(); - let index_uuid = &index.uuid.to_string(); - - // Make sure there are no unindexed rows - assert_eq!( - dataset.count_unindexed_rows(index_uuid).await.unwrap(), - Some(0) - ); - assert_eq!( - dataset.count_indexed_rows(index_uuid).await.unwrap(), - Some(512) - ); - - // Now we'll append some rows which shouldn't be indexed and see the - // count change - let float_arr = generate_random_array(512 * dimensions as usize); - let vectors = - arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); - - let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); - - let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema); - dataset.append(reader, None).await.unwrap(); - - // Make sure the new rows are not indexed - assert_eq!( - dataset.count_unindexed_rows(index_uuid).await.unwrap(), - Some(512) - ); - assert_eq!( - dataset.count_indexed_rows(index_uuid).await.unwrap(), - Some(512) - ); - } - #[tokio::test] async fn test_num_small_files() { let test_dir = tempdir().unwrap(); diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 7a6f2db9e2..98778cd421 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -461,7 +461,7 @@ mod tests { utils::testing::{MockClock, ProxyObjectStore, ProxyObjectStorePolicy}, Error, Result, }; - use lance_index::IndexType; + use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::{some_batch, BatchGenerator, IncrementingInt32}; use snafu::{location, Location}; @@ -469,10 +469,7 @@ mod tests { use crate::{ dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams}, - index::{ - vector::{StageParams, VectorIndexParams}, - DatasetIndexExt, - }, + index::vector::{StageParams, VectorIndexParams}, io::{ object_store::{ObjectStoreParams, WrappingObjectStore}, ObjectStore, diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index accd012bff..b9848f7721 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -20,6 +20,7 @@ use lance_core::{ format::{Fragment, Index}, Error, Result, }; +use lance_index::DatasetIndexExt; use serde::{Deserialize, Serialize}; use snafu::{location, Location}; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 0a8b12c038..ede2ba5edf 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -99,6 +99,7 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; +use lance_index::DatasetIndexExt; use roaring::{RoaringBitmap, RoaringTreemap}; use serde::{Deserialize, Serialize}; use uuid::Uuid; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a341b277bf..73f7a504d3 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -40,8 +40,8 @@ use futures::TryStreamExt; use lance_arrow::floats::{coerce_float_vector, FloatType}; use lance_core::ROW_ID_FIELD; use lance_datafusion::exec::execute_plan; -use lance_index::scalar::expression::ScalarIndexExpr; use lance_index::vector::{Query, DIST_COL}; +use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt}; use lance_linalg::distance::MetricType; use log::debug; use roaring::RoaringBitmap; @@ -1338,8 +1338,7 @@ mod test { use futures::TryStreamExt; use lance_core::ROW_ID; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; - use lance_index::vector::DIST_COL; - use lance_index::IndexType; + use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use tempfile::{tempdir, TempDir}; @@ -1349,7 +1348,7 @@ mod test { use crate::dataset::WriteMode; use crate::dataset::WriteParams; use crate::index::scalar::ScalarIndexParams; - use crate::index::{vector::VectorIndexParams, DatasetIndexExt}; + use crate::index::vector::VectorIndexParams; #[tokio::test] async fn test_batch_size() { diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 2d81c71dcd..8f9081c10f 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -15,18 +15,22 @@ //! Secondary Index //! -use std::any::Any; use std::collections::HashMap; use std::sync::Arc; use arrow_schema::DataType; use async_trait::async_trait; -use lance_core::io::{read_message, read_message_from_buf, read_metadata_offset, Reader}; +use lance_core::format::Fragment; +use lance_core::io::{ + read_message, read_message_from_buf, read_metadata_offset, reader::read_manifest_indexes, + Reader, +}; use lance_index::pb::index::Implementation; use lance_index::scalar::expression::IndexInformationProvider; use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::scalar::ScalarIndex; -use lance_index::{pb, Index, IndexType, INDEX_FILE_NAME}; +pub use lance_index::IndexParams; +use lance_index::{pb, DatasetIndexExt, Index, IndexType, INDEX_FILE_NAME}; use snafu::{location, Location}; use tracing::instrument; use uuid::Uuid; @@ -37,6 +41,7 @@ pub(crate) mod prefilter; pub mod scalar; pub mod vector; +use crate::dataset::index::unindexed_fragments; use crate::dataset::transaction::{Operation, Transaction}; use crate::format::Index as IndexMetadata; use crate::index::append::append_index; @@ -55,10 +60,6 @@ pub trait IndexBuilder { async fn build(&self) -> Result<()>; } -pub trait IndexParams: Send + Sync { - fn as_any(&self) -> &dyn Any; -} - pub(crate) async fn remap_index( dataset: &Dataset, index_id: &Uuid, @@ -130,34 +131,6 @@ impl IndexInformationProvider for ScalarIndexInfo { } } -/// Extends Dataset with secondary index. -#[async_trait] -pub trait DatasetIndexExt { - /// Create indices on columns. - /// - /// Upon finish, a new dataset version is generated. - /// - /// Parameters: - /// - /// - `columns`: the columns to build the indices on. - /// - `index_type`: specify [`IndexType`]. - /// - `name`: optional index name. Must be unique in the dataset. - /// if not provided, it will auto-generate one. - /// - `params`: index parameters. - /// - `replace`: replace the existing index if it exists. - async fn create_index( - &mut self, - columns: &[&str], - index_type: IndexType, - name: Option, - params: &dyn IndexParams, - replace: bool, - ) -> Result<()>; - - /// Optimize indices. - async fn optimize_indices(&mut self) -> Result<()>; -} - async fn open_index_proto(dataset: &Dataset, reader: &dyn Reader) -> Result { let object_store = dataset.object_store(); @@ -279,6 +252,27 @@ impl DatasetIndexExt for Dataset { Ok(()) } + async fn load_indices(&self) -> Result> { + let manifest_file = self.manifest_file(self.version().version).await?; + read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest).await + } + + async fn load_scalar_index_for_column(&self, col: &str) -> Result> { + Ok(self + .load_indices() + .await? + .into_iter() + .filter(|idx| idx.fields.len() == 1) + .find(|idx| { + let field = self.schema().field_by_id(idx.fields[0]); + if let Some(field) = field { + field.name == col + } else { + false + } + })) + } + #[instrument(skip_all)] async fn optimize_indices(&mut self) -> Result<()> { let dataset = Arc::new(self.clone()); @@ -331,6 +325,53 @@ impl DatasetIndexExt for Dataset { self.manifest = Arc::new(new_manifest); Ok(()) } + + async fn index_statistics(&self, index_name: &str) -> Result> { + let index_uuid = self + .load_index_by_name(index_name) + .await? + .map(|idx| idx.uuid.to_string()); + + if let Some(index_uuid) = index_uuid { + let index_statistics = self + .open_generic_index("vector", &index_uuid) + .await? + .statistics()?; + Ok(Some(index_statistics)) + } else { + Ok(None) + } + } + + async fn count_unindexed_rows(&self, index_uuid: &str) -> Result> { + let index = self.load_index(index_uuid).await?; + + if let Some(index) = index { + let unindexed_frags = unindexed_fragments(&index, self).await?; + let unindexed_rows = unindexed_frags + .iter() + .map(Fragment::num_rows) + // sum the number of rows in each fragment if no fragment returned None from row_count + .try_fold(0, |a, b| b.map(|b| a + b).ok_or(())); + + Ok(unindexed_rows.ok()) + } else { + Ok(None) + } + } + + async fn count_indexed_rows(&self, index_uuid: &str) -> Result> { + let count_rows = self.count_rows(); + let count_unindexed_rows = self.count_unindexed_rows(index_uuid); + + let (count_rows, count_unindexed_rows) = + futures::try_join!(count_rows, count_unindexed_rows)?; + + match count_unindexed_rows { + Some(count_unindexed_rows) => Ok(Some(count_rows - count_unindexed_rows)), + None => Ok(None), + } + } } /// A trait for internal dataset utilities @@ -503,4 +544,89 @@ mod tests { .await .is_err()); } + + #[tokio::test] + async fn test_count_index_rows() { + let test_dir = tempdir().unwrap(); + let dimensions = 16; + let column_name = "vec"; + let field = Field::new( + column_name, + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dimensions, + ), + false, + ); + let schema = Arc::new(Schema::new(vec![field])); + + let float_arr = generate_random_array(512 * dimensions as usize); + + let vectors = + arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); + + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); + + let reader = + RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); + + let test_uri = test_dir.path().to_str().unwrap(); + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + dataset.validate().await.unwrap(); + + // Make sure it returns None if there's no index with the passed identifier + assert_eq!(dataset.count_unindexed_rows("bad_id").await.unwrap(), None); + assert_eq!(dataset.count_indexed_rows("bad_id").await.unwrap(), None); + + // Create an index + let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10); + dataset + .create_index( + &[column_name], + IndexType::Vector, + Some("vec_idx".into()), + ¶ms, + true, + ) + .await + .unwrap(); + + let index = dataset + .load_index_by_name("vec_idx") + .await + .unwrap() + .unwrap(); + let index_uuid = &index.uuid.to_string(); + + // Make sure there are no unindexed rows + assert_eq!( + dataset.count_unindexed_rows(index_uuid).await.unwrap(), + Some(0) + ); + assert_eq!( + dataset.count_indexed_rows(index_uuid).await.unwrap(), + Some(512) + ); + + // Now we'll append some rows which shouldn't be indexed and see the + // count change + let float_arr = generate_random_array(512 * dimensions as usize); + let vectors = + arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); + + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); + + let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema); + dataset.append(reader, None).await.unwrap(); + + // Make sure the new rows are not indexed + assert_eq!( + dataset.count_unindexed_rows(index_uuid).await.unwrap(), + Some(512) + ); + assert_eq!( + dataset.count_indexed_rows(index_uuid).await.unwrap(), + Some(512) + ); + } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index c3a47c8f4b..2795d1caa3 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -44,6 +44,7 @@ use lance_core::{ io::commit::{CommitConfig, CommitError}, Error, Result, }; +use lance_index::DatasetIndexExt; use object_store::path::Path; use prost::Message; @@ -413,14 +414,14 @@ mod tests { CommitError, CommitHandler, CommitLease, CommitLock, RenameCommitHandler, UnsafeCommitHandler, }; - use lance_index::IndexType; + use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_testing::datagen::generate_random_array; use super::*; use crate::dataset::{transaction::Operation, WriteMode, WriteParams}; - use crate::index::{vector::VectorIndexParams, DatasetIndexExt}; + use crate::index::vector::VectorIndexParams; use crate::io::object_store::ObjectStoreParams; use crate::Dataset; diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index d55d84d315..d4b05c5ea7 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -25,9 +25,12 @@ use lance_core::{ format::{Fragment, RowAddress}, Error, Result, ROW_ID_FIELD, }; -use lance_index::scalar::{ - expression::{ScalarIndexExpr, ScalarIndexLoader}, - ScalarIndex, +use lance_index::{ + scalar::{ + expression::{ScalarIndexExpr, ScalarIndexLoader}, + ScalarIndex, + }, + DatasetIndexExt, }; use pin_project::pin_project; use roaring::RoaringBitmap;