Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};
use std::{collections::HashMap, ops::Range};

use crate::metrics::NoOpMetricsCollector;
use crate::metrics::{LocalMetricsCollector, NoOpMetricsCollector};
use crate::prefilter::NoFilter;
use crate::scalar::registry::{TrainingCriteria, TrainingOrdering};
use arrow::datatypes::{self, Float32Type, Int32Type, UInt64Type};
Expand Down Expand Up @@ -40,7 +40,10 @@ use lance_core::utils::{
mask::RowIdMask,
tracing::{IO_TYPE_LOAD_SCALAR_PART, TRACE_IO_EVENTS},
};
use lance_core::{container::list::ExpLinkedList, utils::tokio::get_num_compute_intensive_cpus};
use lance_core::{
container::list::ExpLinkedList,
utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu},
};
use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD};
use roaring::RoaringBitmap;
use snafu::location;
Expand Down Expand Up @@ -251,7 +254,7 @@ impl InvertedIndex {
let params = params.clone();
let mask = mask.clone();
let metrics = metrics.clone();
tokio::spawn(async move {
async move {
part.bm25_search(
tokens.as_ref(),
params.as_ref(),
Expand All @@ -260,7 +263,7 @@ impl InvertedIndex {
metrics.as_ref(),
)
.await
})
}
})
.collect::<Vec<_>>();
let mut parts = stream::iter(parts).buffer_unordered(get_num_compute_intensive_cpus());
Expand All @@ -270,7 +273,7 @@ impl InvertedIndex {
row_id,
freqs,
doc_length,
} in res?
} in res
{
let mut score = 0.0;
for (token, freq) in freqs.into_iter() {
Expand Down Expand Up @@ -621,6 +624,24 @@ pub struct InvertedPartition {
token_set_format: TokenSetFormat,
}

#[derive(Copy, Clone)]
struct PartitionPtr(*const InvertedPartition);

unsafe impl Send for PartitionPtr {}
unsafe impl Sync for PartitionPtr {}

impl PartitionPtr {
fn new(partition: &InvertedPartition) -> Self {
Self(partition as *const InvertedPartition)
}

/// # Safety
/// Caller must guarantee the referenced partition lives at least as long as the returned borrow.
unsafe fn deref(&self) -> &InvertedPartition {
&*self.0
}
}

impl InvertedPartition {
/// Check if this partition belongs to the specified fragment.
///
Expand Down Expand Up @@ -770,9 +791,20 @@ impl InvertedPartition {
.buffered(self.store.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
let scorer = IndexBM25Scorer::new(std::iter::once(self));
let mut wand = Wand::new(operator, postings.into_iter(), &self.docs, scorer);
wand.search(params, mask, metrics)
let params = params.clone();
let partition_ptr = PartitionPtr::new(self);
let (candidates, local_metrics) = spawn_cpu(move || {
let local_metrics = LocalMetricsCollector::default();
// SAFETY: `partition_ptr` points to `self`, which outlives this task because we await it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this? This can't be guaranteed since the future itself could be dropped at any time.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if you could refactor this function.

Move the postings calculation out and await it separately, since bm25_search is just a pure blocking function.

let partition = unsafe { partition_ptr.deref() };

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep partition alive for spawn_cpu WAND task

The WAND search is now offloaded to spawn_cpu using a raw PartitionPtr to self, but the background CPU task is not tied to the async future’s cancellation. If the bm25_search future is dropped (e.g., request cancellation) while the index is concurrently torn down, the blocking task will continue running and dereference a pointer to a freed InvertedPartition, leading to potential use-after-free/UB. Consider holding an Arc<InvertedPartition> inside the closure or otherwise ensuring the partition outlives the spawned CPU job under cancellation.

Useful? React with 👍 / 👎.

let scorer = IndexBM25Scorer::new(std::iter::once(partition));
let mut wand = Wand::new(operator, postings.into_iter(), &partition.docs, scorer);
let hits = wand.search(&params, mask, &local_metrics)?;
Result::Ok((hits, local_metrics))
})
.await?;
local_metrics.dump_into(metrics);
Ok(candidates)
}

pub async fn into_builder(self) -> Result<InnerBuilder> {
Expand Down
Loading