Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 170 additions & 5 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{metrics::MetricsCollector, Index, IndexType};
use crate::{scalar::expression::ScalarQueryParser, scalar::IndexReader};

pub const BITMAP_LOOKUP_NAME: &str = "bitmap_page_lookup.lance";
pub const INDEX_STATS_METADATA_KEY: &str = "lance:index_stats";

const MAX_BITMAP_ARRAY_LENGTH: usize = i32::MAX as usize - 1024 * 1024; // leave headroom

Expand Down Expand Up @@ -601,6 +602,7 @@ impl BitmapIndexPlugin {
index_store: &dyn IndexStore,
value_type: &DataType,
) -> Result<()> {
let num_bitmaps = state.len();
let schema = Arc::new(Schema::new(vec![
Field::new("keys", value_type.clone(), true),
Field::new("bitmaps", DataType::Binary, true),
Expand Down Expand Up @@ -653,8 +655,17 @@ impl BitmapIndexPlugin {
bitmap_index_file.write_record_batch(record_batch).await?;
}

// Finish file once at the end - this creates the file even if we wrote no batches
bitmap_index_file.finish().await?;
// Finish file with metadata that allows lightweight statistics reads
let stats_json = serde_json::to_string(&BitmapStatistics { num_bitmaps }).map_err(|e| {
Error::Internal {
message: format!("failed to serialize bitmap statistics: {e}"),
location: location!(),
}
})?;
let mut metadata = HashMap::new();
metadata.insert(INDEX_STATS_METADATA_KEY.to_string(), stats_json);

bitmap_index_file.finish_with_metadata(metadata).await?;

Ok(())
}
Expand Down Expand Up @@ -715,6 +726,10 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
true
}

fn index_type(&self) -> IndexType {
IndexType::Bitmap
}
Comment thread
Xuanwo marked this conversation as resolved.
Outdated

fn version(&self) -> u32 {
BITMAP_INDEX_VERSION
}
Expand Down Expand Up @@ -759,19 +774,169 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
}

async fn load_statistics(
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
let reader = index_store.open_index_file(BITMAP_LOOKUP_NAME).await?;
if let Some(value) = reader.schema().metadata.get(INDEX_STATS_METADATA_KEY) {
let stats = serde_json::from_str(value).map_err(|e| Error::Internal {
message: format!("failed to parse bitmap statistics metadata: {e}"),
location: location!(),
})?;
Ok(Some(stats))
} else {
Ok(None)
}
}
}

#[cfg(test)]
pub mod tests {
use super::*;
use crate::metrics::NoOpMetricsCollector;
use crate::scalar::lance_format::LanceIndexStore;
use crate::scalar::{lance_format::LanceIndexStore, IndexStore, IndexWriter};
use arrow_array::{RecordBatch, StringArray, UInt64Array};
use arrow_schema::{Field, Schema};
use arrow_schema::{DataType, Field, Schema};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use deepsize::DeepSizeOf;
use futures::stream;
use lance_core::utils::{address::RowAddress, tempfile::TempObjDir};
use lance_core::{
datatypes::Schema as LanceSchema,
utils::{address::RowAddress, tempfile::TempObjDir},
};
use lance_io::object_store::ObjectStore;
use std::{any::Any, collections::HashMap};

#[derive(Debug)]
struct MetadataOnlyStore {
schema: Arc<LanceSchema>,
}

impl DeepSizeOf for MetadataOnlyStore {
fn deep_size_of_children(&self, ctx: &mut deepsize::Context) -> usize {
self.schema.deep_size_of_children(ctx)
}
}

#[derive(Debug)]
struct MetadataOnlyReader {
schema: Arc<LanceSchema>,
}

#[async_trait]
impl IndexReader for MetadataOnlyReader {
async fn read_record_batch(&self, _offset: u64, _batch_size: u64) -> Result<RecordBatch> {
panic!("metadata reader should not read record batches")
}

async fn read_range(
&self,
_range: std::ops::Range<usize>,
_projection: Option<&[&str]>,
) -> Result<RecordBatch> {
panic!("metadata reader should not read ranges")
}

async fn num_batches(&self, _batch_size: u64) -> u32 {
0
}

fn num_rows(&self) -> usize {
0
}

fn schema(&self) -> &LanceSchema {
&self.schema
}
}

#[async_trait]
impl IndexStore for MetadataOnlyStore {
fn as_any(&self) -> &dyn Any {
self
}

fn io_parallelism(&self) -> usize {
1
}

async fn new_index_file(
&self,
_name: &str,
_schema: Arc<Schema>,
) -> Result<Box<dyn IndexWriter>> {
panic!("metadata store does not support writing")
}

async fn open_index_file(&self, _name: &str) -> Result<Arc<dyn IndexReader>> {
Ok(Arc::new(MetadataOnlyReader {
schema: self.schema.clone(),
}))
}

async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> {
panic!("metadata store does not support copy")
}

async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> {
panic!("metadata store does not support rename")
}

async fn delete_index_file(&self, _name: &str) -> Result<()> {
panic!("metadata store does not support delete")
}
}

#[tokio::test]
async fn test_bitmap_metadata_statistics_minimal_io() {
Comment thread
Xuanwo marked this conversation as resolved.
Outdated
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));

let colors = vec![
"red", "blue", "green", "red", "yellow", "blue", "red", "green",
];
let row_ids = (0u64..colors.len() as u64).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Utf8, false),
Field::new("_rowid", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(colors.clone())),
Arc::new(UInt64Array::from(row_ids.clone())),
],
)
.unwrap();
let stream = stream::once(async move { Ok(batch) });
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref())
.await
.unwrap();

let reader = store.open_index_file(BITMAP_LOOKUP_NAME).await.unwrap();
let schema = Arc::new(reader.schema().clone());

let metadata_store = MetadataOnlyStore { schema };
let stats = BitmapIndexPlugin
.load_statistics(Arc::new(metadata_store), &prost_types::Any::default())
.await
.unwrap()
.expect("bitmap metadata statistics should exist");

assert_eq!(
stats.get("num_bitmaps").and_then(|v| v.as_u64()).unwrap(),
4,
"num_bitmaps should equal number of distinct values",
);
}

#[tokio::test]
async fn test_bitmap_lazy_loading_and_cache() {
Expand Down
12 changes: 12 additions & 0 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,10 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin {
false
}

fn index_type(&self) -> IndexType {
IndexType::BloomFilter
}

fn version(&self) -> u32 {
BLOOMFILTER_INDEX_VERSION
}
Expand All @@ -1259,6 +1263,14 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin {
as Arc<dyn ScalarIndex>,
)
}

async fn load_statistics(
&self,
_index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
Ok(None)
}
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,10 @@ impl ScalarIndexPlugin for BTreeIndexPlugin {
true
}

fn index_type(&self) -> IndexType {
IndexType::BTree
}

fn version(&self) -> u32 {
BTREE_INDEX_VERSION
}
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-index/src/scalar/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
registry::{ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest},
CreatedIndex, ScalarIndex,
},
IndexType,
};

use super::IndexStore;
Expand Down Expand Up @@ -133,6 +134,10 @@ impl ScalarIndexPlugin for InvertedIndexPlugin {
false
}

fn index_type(&self) -> IndexType {
IndexType::Inverted
}

fn version(&self) -> u32 {
INVERTED_INDEX_VERSION
}
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/scalar/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,10 @@ impl ScalarIndexPlugin for JsonIndexPlugin {
true
}

fn index_type(&self) -> IndexType {
IndexType::Scalar
}

fn attach_registry(&self, registry: Arc<ScalarIndexPluginRegistry>) {
let mut reg_ref = self.registry.lock().unwrap();
*reg_ref = Some(registry);
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ impl ScalarIndexPlugin for LabelListIndexPlugin {
true
}

fn index_type(&self) -> IndexType {
IndexType::LabelList
}

fn version(&self) -> u32 {
LABEL_LIST_INDEX_VERSION
}
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/scalar/ngram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,10 @@ impl ScalarIndexPlugin for NGramIndexPlugin {
false
}

fn index_type(&self) -> IndexType {
IndexType::NGram
}

fn version(&self) -> u32 {
NGRAM_INDEX_VERSION
}
Expand Down
13 changes: 13 additions & 0 deletions rust/lance-index/src/scalar/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
label_list::LabelListIndexPlugin, ngram::NGramIndexPlugin, zonemap::ZoneMapIndexPlugin,
CreatedIndex, IndexStore, ScalarIndex,
},
IndexType,
};

pub const VALUE_COLUMN_NAME: &str = "value";
Expand Down Expand Up @@ -126,6 +127,9 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug {
/// Returns true if the index returns an exact answer (e.g. not AtMost)
fn provides_exact_answer(&self) -> bool;

/// Returns the index type for this plugin
fn index_type(&self) -> IndexType;
Comment thread
Xuanwo marked this conversation as resolved.
Outdated

/// The version of the index plugin
///
/// We assume that indexes are not forwards compatible. If an index was written with a
Expand Down Expand Up @@ -153,6 +157,15 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug {
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>>;

/// Optional hook allowing a plugin to provide statistics without loading the index.
async fn load_statistics(
&self,
_index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
) -> Result<Option<serde_json::Value>> {
Ok(None)
}

/// Optional hook that plugins can use if they need to be aware of the registry
fn attach_registry(&self, _registry: Arc<ScalarIndexPluginRegistry>) {}
}
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-index/src/scalar/zonemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,10 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin {
false
}

fn index_type(&self) -> IndexType {
IndexType::ZoneMap
}

fn version(&self) -> u32 {
ZONEMAP_INDEX_VERSION
}
Expand Down
Loading
Loading