diff --git a/docs/src/guide/performance.md b/docs/src/guide/performance.md index 67253fc94b2..fc2a011f0c1 100644 --- a/docs/src/guide/performance.md +++ b/docs/src/guide/performance.md @@ -165,3 +165,74 @@ across the entire process. This limit is specified by the `LANCE_PROCESS_IO_THRE The default is 128 which is more than enough for most workloads. You can increase this limit if you are working with a high-throughput workload. You can even disable this limit entirely by setting it to zero. Note that this can often lead to issues with excessive retries and timeouts from the object store. + +## Indexes + +Training and searching indexes can have unique requirements for compute and memory. This section provides some +guidance on what can be expected for different index types. + +### BTree Index + +The BTree index is a two-level structure that provides efficient range queries and sorted access. +It strikes a balance between an expensive memory structure containing all values and an expensive disk +structure that can't be efficiently searched. + +Training a BTree index is done by sorting the column. This is done using an [external sort](https://en.wikipedia.org/wiki/External_sorting) to constrain the total memory usage to a reasonable amount. Updating a BTree index does not +require re-sorting the entire column. The new values are sorted and the existing values are merged into the new sorted +values in linear time. + +#### Storage Requirements + +The BTree index is essentially a sorted copy of a column. The storage requirements are therefore the same as the column +but an additional 4 bytes per value is required to store the row ID and there is a small lookup structure which +should be roughly 0.001% of the size of the column. + +#### Memory Requirements + +Training a BTree index requires some RAM but the current implementation spills to disk rather aggressively and so the +total memory usage is fairly low. + +When searching a BTree index, the index is loaded into the index cache in pages. Each page contains 4096 values. + +#### Performance + +The sort stage is the most expensive step in training a BTree index. The time complexity is O(n log n) where n is the number of rows in the column. At very large scales this can be a bottleneck and a distributed sort may be necessary. Lance currently does +not have anything builtin for this but work is underway to add this functionality. Training an index in parts as the data grows +may be slightly more efficient than training the entire index at once if you have the flexibility to do so. + +When the BTree index is fully loaded into the index cache, the search time scales linearly with the number of rows that match the +query. When the BTree index is not fully loaded into the index cache, the search time will be controlled by the number of pages +that need to be loaded from disk and the speed of storage. The parts_loaded metric in the execution metrics can tell you how many +pages were loaded from disk to satisfy a query. + +### Bitmap Index + +The Bitmap index is an inverted lookup table that stores a bitmap for each possible value in the column. These bitmaps are compressed and serialized as a [Roaring Bitmap](https://roaringbitmap.org/). + +A bitmap index is currently trained by accumulating the column into a hash map from value to a vector of row ids. Each value +is then serialized into a bitmap and stored in a file. + +### Storage Requirements + +The size of a bitmap index is difficult to calculate precisely but will generally scale with the number of unique values in the +column since a unique bitmap is required for each value and a single bitmap with all rows will compress more efficiently than +many bitmaps with a small number of rows. + +#### Memory Requirements + +Since training a bitmap index requires collecting the values into a hash map you will need at least 8 bytes of memory per row. +In addition, if you have many unique values, then you will need additional memory for the keys of the hash map. Training large +bitmaps with many unique values at scale can be memory intensive. + +When a bitmap index is searched, bitmaps are loaded into the session cache individually. The size of the bitmap will depend on +the number of rows that match the token. + +### Performance + +When the bitmap index is fully loaded into the index cache, the search time scales linearly with the number of values that the +query requires. This makes the bitmap very fast for equality queries or very small ranges. Queries against large ranges are +currently extremely slow and the btree index is much faster for large range queries. + +When a bitmap index is not fully loaded into the index cache, the search time will be controlled by the number of bitmaps that +need to be loaded from disk and the speed of storage. The parts_loaded metric in the execution metrics can tell you how many +bitmaps were loaded from disk to satisfy a query. diff --git a/python/python/ci_benchmarks/benchmarks/test_search.py b/python/python/ci_benchmarks/benchmarks/test_search.py index 484b6cacbcd..f7ab517701d 100644 --- a/python/python/ci_benchmarks/benchmarks/test_search.py +++ b/python/python/ci_benchmarks/benchmarks/test_search.py @@ -6,6 +6,7 @@ import lance import pytest from ci_benchmarks.datasets import get_dataset_uri +from ci_benchmarks.utils import wipe_os_cache COLUMN_LABELS = ["bools", "normals"] COLUMNS = [["bools"], ["normals"]] @@ -38,24 +39,33 @@ def bench(): benchmark.pedantic(bench, rounds=1, iterations=1) +LARGE_IN_FILTER = ( + "image_widths IN (" + ", ".join([str(i) for i in range(3990, 4100)]) + ")" +) + BTREE_FILTERS = [ None, "image_widths = 3997", "image_widths >= 3990 AND image_widths <= 3997", "image_widths != 3997", + LARGE_IN_FILTER, ] BTREE_FILTER_LABELS = [ None, "equal", "small_range", "not_equal", + "large_in", ] # These tests benchmark a variety of filtered read patterns @pytest.mark.parametrize("filt", BTREE_FILTERS, ids=BTREE_FILTER_LABELS) @pytest.mark.parametrize("payload", [None, "image_widths"], ids=["none", "integers"]) -def test_eda_btree_search(benchmark, filt: str | None, payload: str | None): +@pytest.mark.parametrize("use_cache", [True, False], ids=["cache", "no_cache"]) +def test_eda_btree_search( + benchmark, filt: str | None, payload: str | None, use_cache: bool +): dataset_uri = get_dataset_uri("image_eda") ds = lance.dataset(dataset_uri) @@ -66,7 +76,8 @@ def test_eda_btree_search(benchmark, filt: str | None, payload: str | None): columns = [payload] def bench(): - ds.to_table( + to_search = ds if use_cache else lance.dataset(dataset_uri) + to_search.to_table( columns=columns, filter=filt, with_row_id=True, @@ -80,14 +91,22 @@ def bench(): iterations = 100 # We warmup so we can test hot index performance - benchmark.pedantic(bench, warmup_rounds=1, rounds=1, iterations=iterations) + warmup_rounds = 1 if use_cache else 0 + + benchmark.pedantic( + bench, warmup_rounds=warmup_rounds, rounds=1, iterations=iterations + ) +BASIC_LARGE_IN_FILTER = ( + "row_number IN (" + ", ".join([str(i) for i in range(100000, 100100)]) + ")" +) BASIC_BTREE_FILTERS = [ None, "row_number = 100000", "row_number != 100000", "row_number >= 100000 AND row_number <= 100007", + BASIC_LARGE_IN_FILTER, ] BASIC_BTREE_FILTER_LABELS = [ @@ -95,14 +114,11 @@ def bench(): "equal", "not_equal", "small_range", + "large_in", ] -# Repeats the same test for the basic dataset which is easier to test with locally -# This benchmark is not part of the CI job as the EDA dataset is better for that -@pytest.mark.parametrize("filt", BASIC_BTREE_FILTERS, ids=BASIC_BTREE_FILTER_LABELS) -@pytest.mark.parametrize("payload", [None, "small_strings", "integers"]) -def test_basic_btree_search(benchmark, filt: str | None, payload: str | None): +def do_basic_search(benchmark, filt: str | None, payload: str | None, use_cache: bool): dataset_uri = get_dataset_uri("basic") ds = lance.dataset(dataset_uri) @@ -110,15 +126,66 @@ def test_basic_btree_search(benchmark, filt: str | None, payload: str | None): if payload is not None: columns = [payload] + def clear_cache(): + wipe_os_cache(dataset_uri) + def bench(): - ds.to_table( + to_search = ds if use_cache else lance.dataset(dataset_uri) + to_search.to_table( columns=columns, filter=filt, with_row_id=True, batch_size=32 * 1024, ) - benchmark.pedantic(bench, warmup_rounds=1, rounds=1, iterations=10) + setup = None if use_cache else clear_cache + + warmup_rounds = 1 if use_cache else 0 + benchmark.pedantic( + bench, warmup_rounds=warmup_rounds, rounds=10, iterations=1, setup=setup + ) + + +# Repeats the same test for the basic dataset which is easier to test with locally +# This benchmark is not part of the CI job as the EDA dataset is better for that +@pytest.mark.parametrize("filt", BASIC_BTREE_FILTERS, ids=BASIC_BTREE_FILTER_LABELS) +@pytest.mark.parametrize("payload", [None, "small_strings", "integers"]) +@pytest.mark.parametrize("use_cache", [True, False], ids=["cache", "no_cache"]) +def test_basic_btree_search( + benchmark, filt: str | None, payload: str | None, use_cache: bool +): + do_basic_search(benchmark, filt, payload, use_cache) + + +BASIC_LARGE_IN_FILTER_BITMAP = ( + "row_number_bitmap IN (" + ", ".join([str(i) for i in range(100000, 100100)]) + ")" +) +BASIC_BITMAP_FILTERS = [ + None, + "row_number_bitmap = 100000", + "row_number_bitmap != 100000", + # "row_number_bitmap >= 100000 AND row_number_bitmap <= 100007", + # BASIC_LARGE_IN_FILTER_BITMAP, +] + +BASIC_BITMAP_FILTER_LABELS = [ + "none", + "equal", + "not_equal", + # "small_range", + # "large_in", +] + + +# Repeats the same test for the basic dataset which is easier to test with locally +# This benchmark is not part of the CI job as the EDA dataset is better for that +@pytest.mark.parametrize("filt", BASIC_BITMAP_FILTERS, ids=BASIC_BITMAP_FILTER_LABELS) +@pytest.mark.parametrize("payload", [None, "small_strings", "integers"]) +@pytest.mark.parametrize("use_cache", [True, False], ids=["cache", "no_cache"]) +def test_basic_bitmap_search( + benchmark, filt: str | None, payload: str | None, use_cache: bool +): + do_basic_search(benchmark, filt, payload, use_cache) IOPS = 0.0 diff --git a/python/python/ci_benchmarks/datagen/basic.py b/python/python/ci_benchmarks/datagen/basic.py index cd115675540..c14d7dcb47a 100644 --- a/python/python/ci_benchmarks/datagen/basic.py +++ b/python/python/ci_benchmarks/datagen/basic.py @@ -19,6 +19,7 @@ SCHEMA = pa.schema( { "row_number": pa.uint64(), + "row_number_bitmap": pa.uint64(), "integers": pa.int64(), "small_strings": pa.string(), } @@ -36,9 +37,12 @@ def _gen_data(): pa.array( [batch_idx * ROWS_PER_BATCH + i for i in range(ROWS_PER_BATCH)] ), + pa.array( + [batch_idx * ROWS_PER_BATCH + i for i in range(ROWS_PER_BATCH)] + ), pa.array([f"payload_{i}" for i in range(ROWS_PER_BATCH)]), ], - names=["row_number", "integers", "small_strings"], + names=["row_number", "row_number_bitmap", "integers", "small_strings"], ) yield batch @@ -72,6 +76,7 @@ def _create(dataset_uri: str): ) if ds.list_indices() == []: ds.create_scalar_index("row_number", "BTREE") + ds.create_scalar_index("row_number_bitmap", "BITMAP") def gen_basic(): diff --git a/python/python/ci_benchmarks/utils.py b/python/python/ci_benchmarks/utils.py new file mode 100644 index 00000000000..17d04c8b72e --- /dev/null +++ b/python/python/ci_benchmarks/utils.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +import os +from pathlib import Path + + +def wipe_os_cache(dataset_uri: str): + if dataset_uri.startswith("/"): + path = dataset_uri + elif dataset_uri.startswith("file://"): + path = Path(dataset_uri.removeprefix("file://")) + else: + return + + if not hasattr(os, "posix_fadvise"): + raise NotImplementedError("posix_fadvise not available on this platform") + + POSIX_FADV_DONTNEED = 4 # Tell kernel we don't need this data in cache + + directory = Path(path) + + file_iterator = directory.rglob("*") + + for filepath in file_iterator: + # Skip directories, symlinks, and non-regular files + if not filepath.is_file(): + continue + + with open(filepath, "rb") as f: + fd = f.fileno() + # offset=0, length=0 means drop entire file from cache + os.posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED) diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index 3cb6c435d7f..f5bcd55e858 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -144,5 +144,13 @@ harness = false name = "rq" harness = false +[[bench]] +name = "btree" +harness = false + +[[bench]] +name = "bitmap" +harness = false + [lints] workspace = true diff --git a/rust/lance-index/benches/bitmap.rs b/rust/lance-index/benches/bitmap.rs new file mode 100644 index 00000000000..d75cb88e2a5 --- /dev/null +++ b/rust/lance-index/benches/bitmap.rs @@ -0,0 +1,476 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark of Bitmap scalar index. +//! +//! This benchmark measures the performance of Bitmap index with: +//! - 50 million data points +//! - Int64 and String data types +//! - High cardinality (unique values) and low cardinality (100 unique values) +//! - Equality filters +//! - IN filters with varying size (1, 3, 5 values) + +mod common; + +use std::{ + sync::{Arc, OnceLock}, + time::Duration, +}; + +use common::{LOW_CARDINALITY_COUNT, TOTAL_ROWS}; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use lance_core::cache::LanceCache; +use lance_index::metrics::NoOpMetricsCollector; +use lance_index::pbold; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::scalar::registry::ScalarIndexPlugin; +use lance_index::scalar::{bitmap::BitmapIndexPlugin, SargableQuery, ScalarIndex}; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; + +// Lazy static runtime - only created once +static RUNTIME: OnceLock = OnceLock::new(); + +// Lazy static cache - only created when cached benchmarks are run +static CACHE: OnceLock> = OnceLock::new(); + +// Lazy static indices - only created when first accessed +// Separate indices for cached and uncached variants +static INT_UNIQUE_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static INT_UNIQUE_INDEX_CACHED: OnceLock> = OnceLock::new(); +static INT_LOW_CARD_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static INT_LOW_CARD_INDEX_CACHED: OnceLock> = OnceLock::new(); +static STRING_UNIQUE_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static STRING_UNIQUE_INDEX_CACHED: OnceLock> = OnceLock::new(); +static STRING_LOW_CARD_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static STRING_LOW_CARD_INDEX_CACHED: OnceLock> = OnceLock::new(); + +/// Get or create the tokio runtime +fn get_runtime() -> &'static tokio::runtime::Runtime { + RUNTIME.get_or_init(|| tokio::runtime::Builder::new_multi_thread().build().unwrap()) +} + +/// Get the cache - either a singleton cache or no_cache based on use_cache parameter +fn get_cache(use_cache: bool, key_prefix: &str) -> Arc { + if use_cache { + Arc::new( + CACHE + .get_or_init(|| Arc::new(LanceCache::with_capacity(1024 * 1024 * 1024))) + .with_key_prefix(key_prefix), + ) + } else { + Arc::new(LanceCache::no_cache()) + } +} + +/// Create and train a Bitmap index for int64 data with unique values +async fn create_int_unique_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_int_unique_stream(); + + BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref()) + .await + .unwrap(); + + let details = prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()).unwrap(); + let index = BitmapIndexPlugin + .load_index(store, &details, None, &get_cache(use_cache, "int_unique")) + .await + .unwrap(); + + index +} + +/// Create and train a Bitmap index for int64 data with low cardinality +async fn create_int_low_card_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_int_low_cardinality_stream(); + + BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref()) + .await + .unwrap(); + + let details = prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()).unwrap(); + let index = BitmapIndexPlugin + .load_index(store, &details, None, &get_cache(use_cache, "int_low_card")) + .await + .unwrap(); + + index +} + +/// Create and train a Bitmap index for string data with unique values +async fn create_string_unique_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_string_unique_stream(); + + BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref()) + .await + .unwrap(); + + let details = prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()).unwrap(); + let index = BitmapIndexPlugin + .load_index( + store, + &details, + None, + &get_cache(use_cache, "string_unique"), + ) + .await + .unwrap(); + + index +} + +/// Create and train a Bitmap index for string data with low cardinality +async fn create_string_low_card_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_string_low_cardinality_stream(); + + BitmapIndexPlugin::train_bitmap_index(stream, store.as_ref()) + .await + .unwrap(); + + let details = prost_types::Any::from_msg(&pbold::BitmapIndexDetails::default()).unwrap(); + let index = BitmapIndexPlugin + .load_index( + store, + &details, + None, + &get_cache(use_cache, "string_low_card"), + ) + .await + .unwrap(); + + index +} + +/// Set up all benchmark indices +/// Setup function for int unique index - creates it only once per cache variant +fn setup_int_unique_index(rt: &tokio::runtime::Runtime, use_cache: bool) -> Arc { + let static_ref = if use_cache { + &INT_UNIQUE_INDEX_CACHED + } else { + &INT_UNIQUE_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "int_unique"), + )); + let index = create_int_unique_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +/// Setup function for int low cardinality index - creates it only once per cache variant +fn setup_int_low_card_index(rt: &tokio::runtime::Runtime, use_cache: bool) -> Arc { + let static_ref = if use_cache { + &INT_LOW_CARD_INDEX_CACHED + } else { + &INT_LOW_CARD_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "int_low_card"), + )); + let index = create_int_low_card_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +/// Setup function for string unique index - creates it only once per cache variant +fn setup_string_unique_index( + rt: &tokio::runtime::Runtime, + use_cache: bool, +) -> Arc { + let static_ref = if use_cache { + &STRING_UNIQUE_INDEX_CACHED + } else { + &STRING_UNIQUE_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "string_unique"), + )); + let index = create_string_unique_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +/// Setup function for string low cardinality index - creates it only once per cache variant +fn setup_string_low_card_index( + rt: &tokio::runtime::Runtime, + use_cache: bool, +) -> Arc { + let static_ref = if use_cache { + &STRING_LOW_CARD_INDEX_CACHED + } else { + &STRING_LOW_CARD_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "string_low_card"), + )); + let index = create_string_low_card_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +fn bench_equality(c: &mut Criterion) { + let rt = get_runtime(); + + // Calculate test values from constants (middle of range) + let int_unique_value = (TOTAL_ROWS / 2) as i64; + let string_unique_value = format!("string_{:010}", TOTAL_ROWS / 2); + let int_low_card_value = (LOW_CARDINALITY_COUNT / 2) as i64; + let string_low_card_value = format!("value_{:03}", LOW_CARDINALITY_COUNT / 2); + + let mut group = c.benchmark_group("bitmap_equality"); + group + .sample_size(10) + .measurement_time(Duration::from_secs(10)); + + // Benchmark both cached and uncached variants + for use_cache in [false, true] { + let cache_label = if use_cache { "cached" } else { "no_cache" }; + + // int unique + group.bench_function(BenchmarkId::new("int_unique", cache_label), |b| { + let index = setup_int_unique_index(rt, use_cache); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = int_unique_value; + async move { + let query = SargableQuery::Equals(ScalarValue::Int64(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // int low cardinality + group.bench_function(BenchmarkId::new("int_low_card", cache_label), |b| { + let index = setup_int_low_card_index(rt, use_cache); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = int_low_card_value; + async move { + let query = SargableQuery::Equals(ScalarValue::Int64(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // String unique + group.bench_function(BenchmarkId::new("string_unique", cache_label), |b| { + let index = setup_string_unique_index(rt, use_cache); + let value = string_unique_value.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = value.clone(); + async move { + let query = SargableQuery::Equals(ScalarValue::Utf8(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // String low cardinality + group.bench_function(BenchmarkId::new("string_low_card", cache_label), |b| { + let index = setup_string_low_card_index(rt, use_cache); + let value = string_low_card_value.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = value.clone(); + async move { + let query = SargableQuery::Equals(ScalarValue::Utf8(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + } + + group.finish(); +} + +fn bench_in(c: &mut Criterion) { + let rt = get_runtime(); + + // Test with different numbers of values in the IN clause + let value_counts = [1, 3, 5]; + + for &num_values in &value_counts { + let mut group = c.benchmark_group(format!("bitmap_in_{}", num_values)); + group + .sample_size(10) + .measurement_time(Duration::from_secs(10)); + + // Calculate values around the middle of the range + let mid_int = (TOTAL_ROWS / 2) as i64; + let mid_string = TOTAL_ROWS / 2; + let mid_low_card = LOW_CARDINALITY_COUNT / 2; + + // Int unique - IN query + let int_values: Vec = (0..num_values) + .map(|i| ScalarValue::Int64(Some(mid_int + i as i64 - num_values as i64 / 2))) + .collect(); + + // Int low cardinality - IN query + let int_low_card_values: Vec = (0..num_values) + .map(|i| ScalarValue::Int64(Some((mid_low_card + i - num_values / 2) as i64))) + .collect(); + + // String unique - IN query + let string_values: Vec = (0..num_values) + .map(|i| { + ScalarValue::Utf8(Some(format!( + "string_{:010}", + (mid_string as i64 + i as i64 - num_values as i64 / 2) as u64 + ))) + }) + .collect(); + + // String low cardinality - IN query + let string_low_card_values: Vec = (0..num_values) + .map(|i| { + ScalarValue::Utf8(Some(format!( + "value_{:03}", + (mid_low_card as i32 + i as i32 - num_values as i32 / 2) as usize + ))) + }) + .collect(); + + // Benchmark both cached and uncached variants + for use_cache in [false, true] { + let cache_label = if use_cache { "cached" } else { "no_cache" }; + + group.bench_function(BenchmarkId::new("int_unique", cache_label), |b| { + let index = setup_int_unique_index(rt, use_cache); + let values = int_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + group.bench_function(BenchmarkId::new("int_low_card", cache_label), |b| { + let index = setup_int_low_card_index(rt, use_cache); + let values = int_low_card_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + group.bench_function(BenchmarkId::new("string_unique", cache_label), |b| { + let index = setup_string_unique_index(rt, use_cache); + let values = string_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + group.bench_function(BenchmarkId::new("string_low_card", cache_label), |b| { + let index = setup_string_low_card_index(rt, use_cache); + let values = string_low_card_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + } + + group.finish(); + } +} + +fn bench_bitmap(c: &mut Criterion) { + // Run equality benchmarks + bench_equality(c); + + // Run IN query benchmarks + bench_in(c); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_bitmap); + +// Non-linux version does not support pprof. +#[cfg(not(target_os = "linux"))] +criterion_group!( + name=benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10); + targets = bench_bitmap); + +criterion_main!(benches); diff --git a/rust/lance-index/benches/btree.rs b/rust/lance-index/benches/btree.rs new file mode 100644 index 00000000000..bac06ecd449 --- /dev/null +++ b/rust/lance-index/benches/btree.rs @@ -0,0 +1,746 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark of BTree scalar index. +//! +//! This benchmark measures the performance of BTree index with: +//! - 50 million data points +//! - int and String data types +//! - High cardinality (unique values) and low cardinality (100 unique values) +//! - Equality filters +//! - Range filters with varying selectivity (few/many/most rows match) +//! - IN filters with varying size (10, 20, 30 values) + +mod common; + +use std::{ + ops::Bound, + sync::{Arc, OnceLock}, + time::Duration, +}; + +use arrow_schema::DataType; +use common::{LOW_CARDINALITY_COUNT, TOTAL_ROWS}; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use lance_core::cache::LanceCache; +use lance_index::metrics::NoOpMetricsCollector; +use lance_index::pbold; +use lance_index::scalar::btree::{train_btree_index, BTreeIndexPlugin, DEFAULT_BTREE_BATCH_SIZE}; +use lance_index::scalar::flat::FlatIndexMetadata; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::scalar::registry::ScalarIndexPlugin; +use lance_index::scalar::{SargableQuery, ScalarIndex}; +use lance_io::object_store::ObjectStore; +use object_store::path::Path; +#[cfg(target_os = "linux")] +use pprof::criterion::{Output, PProfProfiler}; + +/// Selectivity level for range queries +#[derive(Clone, Copy, Debug)] +enum Selectivity { + Few, // ~0.1% of rows + Many, // ~10% of rows + Most, // ~90% of rows +} + +impl Selectivity { + fn name(&self) -> &'static str { + match self { + Self::Few => "few", + Self::Many => "many", + Self::Most => "most", + } + } + + /// Get the approximate percentage of rows that should match + fn percentage(&self) -> f64 { + match self { + Self::Few => 0.001, + Self::Many => 0.10, + Self::Most => 0.90, + } + } +} + +// Lazy static runtime - only created once +static RUNTIME: OnceLock = OnceLock::new(); + +// Lazy static cache - only created when cached benchmarks are run +static CACHE: OnceLock> = OnceLock::new(); + +// Lazy static indices - only created when first accessed +// Separate indices for cached and uncached variants +static INT_UNIQUE_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static INT_UNIQUE_INDEX_CACHED: OnceLock> = OnceLock::new(); +static INT_LOW_CARD_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static INT_LOW_CARD_INDEX_CACHED: OnceLock> = OnceLock::new(); +static STRING_UNIQUE_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static STRING_UNIQUE_INDEX_CACHED: OnceLock> = OnceLock::new(); +static STRING_LOW_CARD_INDEX_NO_CACHE: OnceLock> = OnceLock::new(); +static STRING_LOW_CARD_INDEX_CACHED: OnceLock> = OnceLock::new(); + +// Keep temp directories alive for the lifetime of the program +static TEMP_DIRS: OnceLock> = OnceLock::new(); + +/// Get or create the tokio runtime +fn get_runtime() -> &'static tokio::runtime::Runtime { + RUNTIME.get_or_init(|| tokio::runtime::Builder::new_multi_thread().build().unwrap()) +} + +/// Get the cache - either a singleton cache or no_cache based on use_cache parameter +fn get_cache(use_cache: bool, key_prefix: &str) -> Arc { + if use_cache { + Arc::new( + CACHE + .get_or_init(|| Arc::new(LanceCache::with_capacity(1024 * 1024 * 1024))) + .with_key_prefix(key_prefix), + ) + } else { + Arc::new(LanceCache::no_cache()) + } +} + +/// Create and train a BTree index for int64 data with unique values +async fn create_int_unique_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_int_unique_stream(); + let sub_index = FlatIndexMetadata::new(DataType::Int64); + + train_btree_index( + stream, + &sub_index, + store.as_ref(), + DEFAULT_BTREE_BATCH_SIZE, + None, + ) + .await + .unwrap(); + + let cache = get_cache(use_cache, "int_unique"); + let details = prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default()).unwrap(); + let index = BTreeIndexPlugin + .load_index(store, &details, None, &cache) + .await + .unwrap(); + + index +} + +/// Create and train a BTree index for int64 data with low cardinality +async fn create_int_low_card_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_int_low_cardinality_stream(); + let sub_index = FlatIndexMetadata::new(DataType::Int64); + + train_btree_index( + stream, + &sub_index, + store.as_ref(), + DEFAULT_BTREE_BATCH_SIZE, + None, + ) + .await + .unwrap(); + + let cache = get_cache(use_cache, "int_low_card"); + let details = prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default()).unwrap(); + let index = BTreeIndexPlugin + .load_index(store, &details, None, &cache) + .await + .unwrap(); + + index +} + +/// Create and train a BTree index for string data with unique values +async fn create_string_unique_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_string_unique_stream(); + let sub_index = FlatIndexMetadata::new(DataType::Utf8); + + train_btree_index( + stream, + &sub_index, + store.as_ref(), + DEFAULT_BTREE_BATCH_SIZE, + None, + ) + .await + .unwrap(); + + let cache = get_cache(use_cache, "string_unique"); + let details = prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default()).unwrap(); + let index = BTreeIndexPlugin + .load_index(store, &details, None, &cache) + .await + .unwrap(); + + index +} + +/// Create and train a BTree index for string data with low cardinality +async fn create_string_low_card_index( + store: Arc, + use_cache: bool, +) -> Arc { + let stream = common::generate_string_low_cardinality_stream(); + let sub_index = FlatIndexMetadata::new(DataType::Utf8); + + train_btree_index( + stream, + &sub_index, + store.as_ref(), + DEFAULT_BTREE_BATCH_SIZE, + None, + ) + .await + .unwrap(); + + let cache = get_cache(use_cache, "string_low_card"); + let details = prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default()).unwrap(); + let index = BTreeIndexPlugin + .load_index(store, &details, None, &cache) + .await + .unwrap(); + + index +} + +/// Setup function for int unique index - creates it only once per cache variant +fn setup_int_unique_index(rt: &tokio::runtime::Runtime, use_cache: bool) -> Arc { + let static_ref = if use_cache { + &INT_UNIQUE_INDEX_CACHED + } else { + &INT_UNIQUE_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "int_unique"), + )); + let index = create_int_unique_index(store, use_cache).await; + + // Store the temp directory to keep it alive + TEMP_DIRS.get_or_init(Vec::new); + // Note: We can't modify TEMP_DIRS after init, but the tempdir staying in scope here + // should keep it alive for the program duration due to the static lifetime + let _ = tempdir.keep(); + + index + }) + }) + .clone() +} + +/// Setup function for int low cardinality index - creates it only once per cache variant +fn setup_int_low_card_index(rt: &tokio::runtime::Runtime, use_cache: bool) -> Arc { + let static_ref = if use_cache { + &INT_LOW_CARD_INDEX_CACHED + } else { + &INT_LOW_CARD_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "int_low_card"), + )); + let index = create_int_low_card_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +/// Setup function for string unique index - creates it only once per cache variant +fn setup_string_unique_index( + rt: &tokio::runtime::Runtime, + use_cache: bool, +) -> Arc { + let static_ref = if use_cache { + &STRING_UNIQUE_INDEX_CACHED + } else { + &STRING_UNIQUE_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "string_unique"), + )); + let index = create_string_unique_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +/// Setup function for string low cardinality index - creates it only once per cache variant +fn setup_string_low_card_index( + rt: &tokio::runtime::Runtime, + use_cache: bool, +) -> Arc { + let static_ref = if use_cache { + &STRING_LOW_CARD_INDEX_CACHED + } else { + &STRING_LOW_CARD_INDEX_NO_CACHE + }; + + static_ref + .get_or_init(|| { + rt.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + Path::from_filesystem_path(tempdir.path()).unwrap(), + get_cache(use_cache, "string_low_card"), + )); + let index = create_string_low_card_index(store, use_cache).await; + let _ = tempdir.keep(); + index + }) + }) + .clone() +} + +fn bench_equality(c: &mut Criterion) { + let rt = get_runtime(); + + // Calculate test values from constants (middle of range) + let int_unique_value = (TOTAL_ROWS / 2) as i64; + let string_unique_value = format!("string_{:010}", TOTAL_ROWS / 2); + let int_low_card_value = (LOW_CARDINALITY_COUNT / 2) as i64; + let string_low_card_value = format!("value_{:03}", LOW_CARDINALITY_COUNT / 2); + + let mut group = c.benchmark_group("btree_equality"); + group + .sample_size(10) + .measurement_time(Duration::from_secs(10)); + + // Benchmark both cached and uncached variants + for use_cache in [false, true] { + let cache_label = if use_cache { "cached" } else { "no_cache" }; + + // int unique + group.bench_function(BenchmarkId::new("int_unique", cache_label), |b| { + let index = setup_int_unique_index(rt, use_cache); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = int_unique_value; + async move { + let query = SargableQuery::Equals(ScalarValue::Int64(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // int low cardinality + group.bench_function(BenchmarkId::new("int_low_card", cache_label), |b| { + let index = setup_int_low_card_index(rt, use_cache); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = int_low_card_value; + async move { + let query = SargableQuery::Equals(ScalarValue::Int64(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // String unique + group.bench_function(BenchmarkId::new("string_unique", cache_label), |b| { + let index = setup_string_unique_index(rt, use_cache); + let value = string_unique_value.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = value.clone(); + async move { + let query = SargableQuery::Equals(ScalarValue::Utf8(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // String low cardinality + group.bench_function(BenchmarkId::new("string_low_card", cache_label), |b| { + let index = setup_string_low_card_index(rt, use_cache); + let value = string_low_card_value.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let value = value.clone(); + async move { + let query = SargableQuery::Equals(ScalarValue::Utf8(Some(value))); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + } + + group.finish(); +} + +/// Helper function to count results from a range query +fn count_range_results( + rt: &tokio::runtime::Runtime, + index: &Arc, + query: SargableQuery, +) -> usize { + rt.block_on(async { + let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); + match result { + lance_index::scalar::SearchResult::Exact(row_ids) => { + row_ids.len().expect("Expected exact row count") as usize + } + _ => panic!("Expected exact search result"), + } + }) +} + +fn bench_range(c: &mut Criterion, selectivity: Selectivity) { + let rt = get_runtime(); + + let group_name = format!("btree_range_{}", selectivity.name()); + let mut group = c.benchmark_group(&group_name); + group + .sample_size(10) + .measurement_time(Duration::from_secs(10)); + + let pct = selectivity.percentage(); + + // Int unique - range queries + let int_range_size = (TOTAL_ROWS as f64 * pct) as u64; + let int_start = (TOTAL_ROWS / 2) - (int_range_size / 2); + let int_end = int_start + int_range_size; + + // Benchmark both cached and uncached variants + for use_cache in [false, true] { + let cache_label = if use_cache { "cached" } else { "no_cache" }; + + group.bench_function(BenchmarkId::new("int_unique", cache_label), |b| { + // Setup index and run sanity check + let index = setup_int_unique_index(rt, use_cache); + + // Sanity check: verify int unique range returns expected count + let int_unique_query = SargableQuery::Range( + Bound::Included(ScalarValue::Int64(Some(int_start as i64))), + Bound::Included(ScalarValue::Int64(Some(int_end as i64))), + ); + let int_unique_count = count_range_results(rt, &index, int_unique_query); + let expected_count = (int_end - int_start + 1) as usize; // +1 because range is inclusive + assert!( + (int_unique_count as f64 - expected_count as f64).abs() / (expected_count as f64) + < 0.01, + "int unique count mismatch: expected {}, got {}", + expected_count, + int_unique_count + ); + b.to_async(rt).iter(|| { + let index = index.clone(); + async move { + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int64(Some(int_start as i64))), + Bound::Included(ScalarValue::Int64(Some(int_end as i64))), + ); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // int low cardinality - range queries + // With 100 unique values, select appropriate range + let low_card_range_size = (LOW_CARDINALITY_COUNT as f64 * pct) as usize; + let low_card_start = (LOW_CARDINALITY_COUNT / 2) - (low_card_range_size / 2); + let low_card_end = low_card_start + low_card_range_size; + + group.bench_function(BenchmarkId::new("int_low_card", cache_label), |b| { + // Setup index and run sanity check + let index = setup_int_low_card_index(rt, use_cache); + + // Sanity check: verify int low cardinality range returns expected count + let int_low_card_query = SargableQuery::Range( + Bound::Included(ScalarValue::Int64(Some(low_card_start as i64))), + Bound::Included(ScalarValue::Int64(Some(low_card_end as i64))), + ); + let int_low_card_count = count_range_results(rt, &index, int_low_card_query); + let rows_per_value = TOTAL_ROWS / LOW_CARDINALITY_COUNT as u64; + let expected_low_card_count = + ((low_card_end - low_card_start + 1) as u64 * rows_per_value) as usize; + assert!( + (int_low_card_count as f64 - expected_low_card_count as f64).abs() + / (expected_low_card_count as f64) + < 0.01, + "int low cardinality count mismatch: expected {}, got {}", + expected_low_card_count, + int_low_card_count + ); + b.to_async(rt).iter(|| { + let index = index.clone(); + async move { + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int64(Some(low_card_start as i64))), + Bound::Included(ScalarValue::Int64(Some(low_card_end as i64))), + ); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // String unique - range queries + let string_start_row = int_start; + let string_end_row = int_end; + + group.bench_function(BenchmarkId::new("string_unique", cache_label), |b| { + // Setup index and run sanity check + let index = setup_string_unique_index(rt, use_cache); + + // Sanity check: verify string unique range returns expected count + let string_unique_query = SargableQuery::Range( + Bound::Included(ScalarValue::Utf8(Some(format!( + "string_{:010}", + string_start_row + )))), + Bound::Included(ScalarValue::Utf8(Some(format!( + "string_{:010}", + string_end_row + )))), + ); + let string_unique_count = count_range_results(rt, &index, string_unique_query); + let expected_string_count = (string_end_row - string_start_row + 1) as usize; + assert!( + (string_unique_count as f64 - expected_string_count as f64).abs() + / (expected_string_count as f64) + < 0.01, + "String unique count mismatch: expected {}, got {}", + expected_string_count, + string_unique_count + ); + b.to_async(rt).iter(|| { + let index = index.clone(); + async move { + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Utf8(Some(format!( + "string_{:010}", + string_start_row + )))), + Bound::Included(ScalarValue::Utf8(Some(format!( + "string_{:010}", + string_end_row + )))), + ); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + // String low cardinality - range queries + group.bench_function(BenchmarkId::new("string_low_card", cache_label), |b| { + // Setup index and run sanity check + let index = setup_string_low_card_index(rt, use_cache); + + // Sanity check: verify string low cardinality range returns expected count + let string_low_card_query = SargableQuery::Range( + Bound::Included(ScalarValue::Utf8(Some(format!( + "value_{:03}", + low_card_start + )))), + Bound::Included(ScalarValue::Utf8(Some(format!( + "value_{:03}", + low_card_end + )))), + ); + let string_low_card_count = count_range_results(rt, &index, string_low_card_query); + let rows_per_value = TOTAL_ROWS / LOW_CARDINALITY_COUNT as u64; + let expected_string_low_card_count = + ((low_card_end - low_card_start + 1) as u64 * rows_per_value) as usize; + assert!( + (string_low_card_count as f64 - expected_string_low_card_count as f64).abs() + / (expected_string_low_card_count as f64) + < 0.01, + "String low cardinality count mismatch: expected {}, got {}", + expected_string_low_card_count, + string_low_card_count + ); + b.to_async(rt).iter(|| { + let index = index.clone(); + async move { + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Utf8(Some(format!( + "value_{:03}", + low_card_start + )))), + Bound::Included(ScalarValue::Utf8(Some(format!( + "value_{:03}", + low_card_end + )))), + ); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + } + + group.finish(); +} + +fn bench_in(c: &mut Criterion) { + let rt = get_runtime(); + + // Test with different numbers of values in the IN clause + let value_counts = [10, 20, 30]; + + for &num_values in &value_counts { + let mut group = c.benchmark_group(format!("btree_in_{}", num_values)); + group + .sample_size(10) + .measurement_time(Duration::from_secs(10)); + + // Calculate values around the middle of the range + let mid_int = (TOTAL_ROWS / 2) as i64; + let mid_string = TOTAL_ROWS / 2; + let mid_low_card = LOW_CARDINALITY_COUNT / 2; + + // Int unique - IN query + let int_values: Vec = (0..num_values) + .map(|i| ScalarValue::Int64(Some(mid_int + i as i64 - num_values as i64 / 2))) + .collect(); + + // Int low cardinality - IN query + let int_low_card_values: Vec = (0..num_values) + .map(|i| ScalarValue::Int64(Some((mid_low_card + i - num_values / 2) as i64))) + .collect(); + + // String unique - IN query + let string_values: Vec = (0..num_values) + .map(|i| { + ScalarValue::Utf8(Some(format!( + "string_{:010}", + (mid_string as i64 + i as i64 - num_values as i64 / 2) as u64 + ))) + }) + .collect(); + + // String low cardinality - IN query + let string_low_card_values: Vec = (0..num_values) + .map(|i| { + ScalarValue::Utf8(Some(format!( + "value_{:03}", + (mid_low_card as i32 + i as i32 - num_values as i32 / 2) as usize + ))) + }) + .collect(); + + // Benchmark both cached and uncached variants + for use_cache in [false, true] { + let cache_label = if use_cache { "cached" } else { "no_cache" }; + + group.bench_function(BenchmarkId::new("int_unique", cache_label), |b| { + let index = setup_int_unique_index(rt, use_cache); + let values = int_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + group.bench_function(BenchmarkId::new("int_low_card", cache_label), |b| { + let index = setup_int_low_card_index(rt, use_cache); + let values = int_low_card_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + group.bench_function(BenchmarkId::new("string_unique", cache_label), |b| { + let index = setup_string_unique_index(rt, use_cache); + let values = string_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + + group.bench_function(BenchmarkId::new("string_low_card", cache_label), |b| { + let index = setup_string_low_card_index(rt, use_cache); + let values = string_low_card_values.clone(); + b.to_async(rt).iter(|| { + let index = index.clone(); + let values = values.clone(); + async move { + let query = SargableQuery::IsIn(values); + black_box(index.search(&query, &NoOpMetricsCollector).await.unwrap()); + } + }) + }); + } + + group.finish(); + } +} + +fn bench_btree(c: &mut Criterion) { + // Run equality benchmarks + bench_equality(c); + + // Run IN query benchmarks + bench_in(c); + + // Run range benchmarks with different selectivities + bench_range(c, Selectivity::Few); + bench_range(c, Selectivity::Many); + bench_range(c, Selectivity::Most); +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10) + .with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_btree); + +// Non-linux version does not support pprof. +#[cfg(not(target_os = "linux"))] +criterion_group!( + name=benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10); + targets = bench_btree); + +criterion_main!(benches); diff --git a/rust/lance-index/benches/common.rs b/rust/lance-index/benches/common.rs new file mode 100644 index 00000000000..8cf94d7b806 --- /dev/null +++ b/rust/lance-index/benches/common.rs @@ -0,0 +1,155 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Common utilities and data generation for scalar index benchmarks. +use std::sync::Arc; + +use arrow::datatypes::{Int64Type, UInt64Type}; +use arrow_array::{Int64Array, RecordBatch, StringArray, UInt64Array}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::physical_plan::SendableRecordBatchStream; +use lance_datafusion::datagen::DatafusionDatagenExt; +use lance_datagen::{array, gen_batch, BatchCount, RowCount}; + +/// Total number of rows in the dataset +pub const TOTAL_ROWS: u64 = 1_000_000; + +/// Number of unique values for low cardinality tests +pub const LOW_CARDINALITY_COUNT: usize = 100; + +/// Batch size for streaming data +pub const BATCH_SIZE: u64 = 10_000; + +/// Number of batches in the dataset +pub const NUM_BATCHES: u64 = TOTAL_ROWS / BATCH_SIZE; + +/// Generate a stream of int64 data with unique values (sequential) +pub fn generate_int_unique_stream() -> SendableRecordBatchStream { + gen_batch() + .col("value", array::step::()) + .col("_rowid", array::step::()) + .into_df_stream( + RowCount::from(BATCH_SIZE), + BatchCount::from(NUM_BATCHES as u32), + ) +} + +/// Generate sorted int64 data with low cardinality (100 unique values) +/// Each value appears 10,000 times consecutively +pub fn generate_int_low_cardinality_stream() -> SendableRecordBatchStream { + let rows_per_value = TOTAL_ROWS / LOW_CARDINALITY_COUNT as u64; + let mut batches = Vec::new(); + let mut current_row = 0u64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int64, false), + Field::new("_rowid", DataType::UInt64, false), + ])); + + for value_idx in 0..LOW_CARDINALITY_COUNT { + let value = value_idx as i64; + let value_end_row = current_row + rows_per_value; + + while current_row < value_end_row { + let batch_end = (current_row + BATCH_SIZE).min(value_end_row); + let batch_size = (batch_end - current_row) as usize; + + // Manually create arrays with proper row IDs + let values = vec![value; batch_size]; + let row_ids: Vec = (current_row..batch_end).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(values)), + Arc::new(UInt64Array::from(row_ids)), + ], + ) + .unwrap(); + + batches.push(Ok(batch)); + current_row = batch_end; + } + } + + let stream = futures::stream::iter(batches); + Box::pin(datafusion::physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream)) +} + +/// Generate a stream of string data with unique values +/// Strings are zero-padded to 10 digits for proper lexicographic sorting +pub fn generate_string_unique_stream() -> SendableRecordBatchStream { + let mut batches = Vec::new(); + let mut current_row = 0u64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Utf8, false), + Field::new("_rowid", DataType::UInt64, false), + ])); + + while current_row < TOTAL_ROWS { + let batch_end = (current_row + BATCH_SIZE).min(TOTAL_ROWS); + + // Generate zero-padded strings for proper lexicographic sorting + let values: Vec = (current_row..batch_end) + .map(|i| format!("string_{:010}", i)) + .collect(); + let row_ids: Vec = (current_row..batch_end).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(values)), + Arc::new(UInt64Array::from(row_ids)), + ], + ) + .unwrap(); + + batches.push(Ok(batch)); + current_row = batch_end; + } + + let stream = futures::stream::iter(batches); + Box::pin(datafusion::physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream)) +} + +/// Generate sorted string data with low cardinality (100 unique values) +pub fn generate_string_low_cardinality_stream() -> SendableRecordBatchStream { + let rows_per_value = TOTAL_ROWS / LOW_CARDINALITY_COUNT as u64; + let mut batches = Vec::new(); + let mut current_row = 0u64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Utf8, false), + Field::new("_rowid", DataType::UInt64, false), + ])); + + for value_idx in 0..LOW_CARDINALITY_COUNT { + let value = format!("value_{:03}", value_idx); + let value_end_row = current_row + rows_per_value; + + while current_row < value_end_row { + let batch_end = (current_row + BATCH_SIZE).min(value_end_row); + let batch_size = (batch_end - current_row) as usize; + + // Manually create arrays with proper row IDs + let values = vec![value.as_str(); batch_size]; + let row_ids: Vec = (current_row..batch_end).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(values)), + Arc::new(UInt64Array::from(row_ids)), + ], + ) + .unwrap(); + + batches.push(Ok(batch)); + current_row = batch_end; + } + } + + let stream = futures::stream::iter(batches); + Box::pin(datafusion::physical_plan::stream::RecordBatchStreamAdapter::new(schema, stream)) +}