Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 159 additions & 2 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2717,14 +2717,44 @@ impl Scanner {

let (match_plan, flat_match_plan) = match &index {
Some(index) => {
// Check if any of the scanner's fragments are covered by the FTS index
let indexed_frags_bitmap = index
.fragment_bitmap
.as_ref()
.map(|bitmap| bitmap & &self.get_fragments_as_bitmap())
.unwrap_or_else(|| self.get_fragments_as_bitmap());

let has_indexed_fragments = !indexed_frags_bitmap.is_empty();

if !has_indexed_fragments {
// No indexed fragments in the scan - do flat search only on scanner's fragments
let fragments_to_search = if let Some(frags) = &self.fragments {
frags.clone()
} else {
self.dataset.fragments().iter().cloned().collect()
};
let flat_match_plan = self
.plan_flat_match_query(fragments_to_search, query, params, filter_plan)
.await?;
return Ok(flat_match_plan);
}

let match_plan: Arc<dyn ExecutionPlan> = Arc::new(MatchQueryExec::new(
self.dataset.clone(),
query.clone(),
params.clone(),
prefilter_source.clone(),
));

let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
let mut unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;

// Filter by scanner's fragment list to respect fragment restrictions
if let Some(scanner_frags) = &self.fragments {
let scanner_ids: std::collections::HashSet<_> =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use self.get_fragments_as_bitmap(); in order to perform the same operation instead of using a hashset.

scanner_frags.iter().map(|f| f.id).collect();
unindexed_fragments.retain(|f| scanner_ids.contains(&f.id));
}

if unindexed_fragments.is_empty() {
(Some(match_plan), None)
} else {
Expand All @@ -2735,7 +2765,11 @@ impl Scanner {
}
}
None => {
let unindexed_fragments = self.dataset.fragments().iter().cloned().collect();
let unindexed_fragments = if let Some(frags) = &self.fragments {
frags.clone()
} else {
self.dataset.fragments().iter().cloned().collect()
};
let flat_match_plan = self
.plan_flat_match_query(unindexed_fragments, query, params, filter_plan)
.await?;
Expand Down Expand Up @@ -8333,4 +8367,127 @@ mod test {
);
}
}

/// Test that full text search with fragment restrictions only searches the specified fragments,
/// including properly filtering unindexed fragments
#[tokio::test]
async fn test_fts_respects_fragment_list() {
// Create a dataset with 5 fragments
let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
.await
.unwrap();

// Create FTS index on initial data (fragments 0-4)
test_ds.make_fts_index().await.unwrap();

// Reload dataset to see the index
let dataset = Dataset::open(&test_ds.tmp_dir).await.unwrap();
let num_indexed_fragments = dataset.fragments().len();

// Append new data to create unindexed fragments
test_ds.append_new_data().await.unwrap();

// Reload to see new data
let dataset = Dataset::open(&test_ds.tmp_dir).await.unwrap();
let total_fragments = dataset.fragments().len();

// Verify we have at least one unindexed fragment
assert!(
total_fragments > num_indexed_fragments,
"Should have unindexed fragments after append"
);

// Get fragment metadata for testing
let all_fragments: Vec<_> = dataset.fragments().iter().cloned().collect();
let indexed_fragments: Vec<_> = all_fragments[..num_indexed_fragments].to_vec();
let unindexed_fragments: Vec<_> = all_fragments[num_indexed_fragments..].to_vec();

// Test 1: Scan only indexed fragments
let mut scanner = dataset.scan();
scanner.with_fragments(indexed_fragments.clone());
scanner.with_row_id();
scanner
.full_text_search(FullTextSearchQuery::new("s-100".to_string()))
.unwrap();

let results = scanner.try_into_batch().await.unwrap();

// All results should be from indexed fragments only
let row_ids = results.column_by_name(ROW_ID).unwrap();
let row_id_array = row_ids.as_primitive::<arrow::datatypes::UInt64Type>();

for i in 0..row_id_array.len() {
let row_id = row_id_array.value(i);
let fragment_id = (row_id >> 32) as u32;
assert!(
fragment_id < num_indexed_fragments as u32,
"Row from fragment {} found, but only indexed fragments 0-{} should be scanned",
fragment_id,
num_indexed_fragments - 1
);
}

// Test 2: Scan only unindexed fragments (flat search)
let mut scanner = dataset.scan();
scanner.with_fragments(unindexed_fragments.clone());
scanner.with_row_id();
scanner
.full_text_search(FullTextSearchQuery::new("s-400".to_string()))
.unwrap();

let results = scanner.try_into_batch().await.unwrap();

if results.num_rows() > 0 {
// All results should be from unindexed fragments only
let row_ids = results.column_by_name(ROW_ID).unwrap();
let row_id_array = row_ids.as_primitive::<arrow::datatypes::UInt64Type>();

for i in 0..row_id_array.len() {
let row_id = row_id_array.value(i);
let fragment_id = (row_id >> 32) as u32;
assert!(
fragment_id >= num_indexed_fragments as u32,
"Row from fragment {} found, but only unindexed fragments {}-{} should be scanned",
fragment_id,
num_indexed_fragments,
total_fragments - 1
);
}
}

// Test 3: Scan mix of indexed and unindexed fragments
let mixed_fragments = vec![
indexed_fragments[0].clone(), // indexed
indexed_fragments[1].clone(), // indexed
unindexed_fragments[0].clone(), // unindexed
];

let mut scanner = dataset.scan();
scanner.with_fragments(mixed_fragments.clone());
scanner.with_row_id();
scanner
.full_text_search(FullTextSearchQuery::new("s".to_string()))
.unwrap();

let results = scanner.try_into_batch().await.unwrap();

// Collect allowed fragment IDs
let allowed_fragment_ids: std::collections::HashSet<u32> =
mixed_fragments.iter().map(|f| f.id as u32).collect();

// All results should be from the specified fragments only
let row_ids = results.column_by_name(ROW_ID).unwrap();
let row_id_array = row_ids.as_primitive::<arrow::datatypes::UInt64Type>();

for i in 0..row_id_array.len() {
let row_id = row_id_array.value(i);
let fragment_id = (row_id >> 32) as u32;
assert!(
allowed_fragment_ids.contains(&fragment_id),
"Row from fragment {} found, but only fragments {:?} should be scanned",
fragment_id,
allowed_fragment_ids
);
}
}
}