From 3c760b89dae949ea8d2c9e1e2fb766be4dafbb9d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Feb 2026 11:43:55 -0800 Subject: [PATCH 1/3] fix: deduplicate row addresses in TakeStream to prevent panic TakeStream::map_batch passed duplicate row addresses straight through to the encoding layer, which requires strictly increasing indices. Duplicates (e.g. from FTS on List where multiple list elements in the same row match) caused indices_to_ranges to produce overlapping ranges, panicking in BinaryPageScheduler with "attempt to subtract with overflow". Dedup sorted addresses before passing them to fragment readers, then expand the results back to include duplicates. Also tighten the schedule_take debug_assert from <= to < to catch this earlier. Fixes #5260 Co-Authored-By: Claude Opus 4.5 --- rust/lance-encoding/src/decoder.rs | 2 +- rust/lance/src/io/exec/take.rs | 126 ++++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index a3bcfd3c27b..db7ec3cc709 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1302,7 +1302,7 @@ impl DecodeBatchScheduler { sink: mpsc::UnboundedSender>, scheduler: Arc, ) { - debug_assert!(indices.windows(2).all(|w| w[0] <= w[1])); + debug_assert!(indices.windows(2).all(|w| w[0] < w[1])); if indices.is_empty() { return; } diff --git a/rust/lance/src/io/exec/take.rs b/rust/lance/src/io/exec/take.rs index 25ca45e2335..ee694f674aa 100644 --- a/rust/lance/src/io/exec/take.rs +++ b/rust/lance/src/io/exec/take.rs @@ -218,11 +218,25 @@ impl TakeStream { ) }; + // Deduplicate sorted addresses. FTS on List can produce duplicate + // row addresses when multiple list elements in the same row match. The + // encoding layer requires strictly increasing indices, so we dedup here + // and expand the results back afterwards. + let sorted_values = sorted_addrs.values(); + let mut unique_addrs: Vec = Vec::with_capacity(sorted_values.len()); + let mut sorted_to_unique: Vec = Vec::with_capacity(sorted_values.len()); + for &addr in sorted_values.iter() { + if unique_addrs.last() != Some(&addr) { + unique_addrs.push(addr); + } + sorted_to_unique.push(unique_addrs.len() - 1); + } + let mut futures = FuturesOrdered::new(); let mut current_offsets = Vec::new(); let mut current_fragment_id = None; - for row_addr in sorted_addrs.values() { + for row_addr in &unique_addrs { let addr = RowAddress::new_from_u64(*row_addr); if Some(addr.fragment_id()) != current_fragment_id { @@ -267,6 +281,17 @@ impl TakeStream { let schema = batches.first().expect_ok()?.schema(); let mut new_data = concat_batches(&schema, batches.iter())?; + // Expand deduplicated rows back to the sorted order (reinsert duplicates) + if unique_addrs.len() < sorted_values.len() { + let expand_indices = UInt32Array::from( + sorted_to_unique + .iter() + .map(|&i| i as u32) + .collect::>(), + ); + new_data = arrow_select::take::take_record_batch(&new_data, &expand_indices).unwrap(); + } + // Restore previous order (if addresses were out of order originally) if let Some(permutation) = permutation { new_data = arrow_select::take::take_record_batch(&new_data, &permutation).unwrap(); @@ -820,6 +845,105 @@ mod tests { assert_eq!(metrics.find_count("batches_processed").unwrap().value(), 3); } + /// Regression test: FTS on List can produce duplicate row addresses when + /// multiple list elements in the same row match. These duplicates caused + /// `indices_to_ranges` in the encoding layer to produce overlapping ranges, + /// panicking in BinaryPageScheduler with "attempt to subtract with overflow". + #[tokio::test] + async fn test_take_with_duplicate_row_addrs() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + // Simulate duplicate row addresses (same row matched twice), + // already sorted as they would be within a single fragment. + let row_addrs = UInt64Array::from(vec![0u64, 0, 1, 2, 2]); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + ROW_ADDR, + DataType::UInt64, + true, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(row_addrs)]).unwrap(); + + let row_addr_stream = futures::stream::iter(vec![Ok(batch)]); + let row_addr_stream = Box::pin(RecordBatchStreamAdapter::new(schema, row_addr_stream)); + let input = Arc::new(OneShotExec::new(row_addr_stream)); + + let projection = dataset + .empty_projection() + .union_column("s", OnMissing::Error) + .unwrap(); + let take_exec = TakeExec::try_new(dataset, input, projection) + .unwrap() + .unwrap(); + + let stream = take_exec + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 5); + + let all_data = concat_batches(&batches[0].schema(), &batches).unwrap(); + let s_col = all_data + .column_by_name("s") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + // Duplicated rows should have identical values + assert_eq!(s_col.value(0), s_col.value(1)); + assert_eq!(s_col.value(3), s_col.value(4)); + } + + /// Same as above but with unsorted duplicates, exercising the sort+dedup path. + #[tokio::test] + async fn test_take_with_unsorted_duplicate_row_addrs() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + let row_addrs = UInt64Array::from(vec![2u64, 0, 1, 0, 2]); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + ROW_ADDR, + DataType::UInt64, + true, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(row_addrs)]).unwrap(); + + let row_addr_stream = futures::stream::iter(vec![Ok(batch)]); + let row_addr_stream = Box::pin(RecordBatchStreamAdapter::new(schema, row_addr_stream)); + let input = Arc::new(OneShotExec::new(row_addr_stream)); + + let projection = dataset + .empty_projection() + .union_column("s", OnMissing::Error) + .unwrap(); + let take_exec = TakeExec::try_new(dataset, input, projection) + .unwrap() + .unwrap(); + + let stream = take_exec + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 5); + + let all_data = concat_batches(&batches[0].schema(), &batches).unwrap(); + let s_col = all_data + .column_by_name("s") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + // Original order was [2, 0, 1, 0, 2] — duplicates should match + assert_eq!(s_col.value(0), s_col.value(4)); // both row 2 + assert_eq!(s_col.value(1), s_col.value(3)); // both row 0 + } + #[tokio::test] async fn test_take_struct() { // When taking fields into an existing struct, the field order should be maintained From 97546422338f76cdb5f505b4e0d995db1873a4fc Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Feb 2026 12:05:23 -0800 Subject: [PATCH 2/3] perf: skip sort/dedup overhead in TakeStream for sorted-unique addresses The common path (sorted, no duplicates) now does a single-pass is_sorted_and_unique check and skips all sorting, dedup, and permutation logic. The unsorted/duplicate paths are unified and the expand + inverse permutation are composed into a single take. Co-Authored-By: Claude Opus 4.5 --- rust/lance/src/io/exec/take.rs | 101 ++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/rust/lance/src/io/exec/take.rs b/rust/lance/src/io/exec/take.rs index ee694f674aa..67abe025a99 100644 --- a/rust/lance/src/io/exec/take.rs +++ b/rust/lance/src/io/exec/take.rs @@ -191,13 +191,16 @@ impl TakeStream { "{} nulls in row addresses", row_addrs.null_count() ); - // Check if the row addresses are already sorted to avoid unnecessary reorders - let is_sorted = row_addrs.values().is_sorted(); + + // Fast path: check if addresses are already sorted with no duplicates (common case). + // This avoids all sorting, dedup, and permutation overhead. + let is_sorted_and_unique = row_addrs.values().windows(2).all(|w| w[0] < w[1]); let sorted_addrs: Arc; - let (sorted_addrs, permutation) = if is_sorted { - (row_addrs, None) + let (unique_addrs, permutation, sorted_to_unique) = if is_sorted_and_unique { + (Cow::Borrowed(row_addrs.values().as_ref()), None, None) } else { + // Sort and compute inverse permutation to restore original order later let permutation = arrow::compute::sort_to_indices(&row_addrs_arr, None, None).unwrap(); sorted_addrs = arrow::compute::take( &row_addrs_arr, @@ -207,36 +210,45 @@ impl TakeStream { }), ) .unwrap(); - // Calculate the inverse permutation to restore the original order let mut inverse_permutation = vec![0; permutation.len()]; for (i, p) in permutation.values().iter().enumerate() { inverse_permutation[*p as usize] = i as u32; } - ( - sorted_addrs.as_primitive::(), - Some(UInt32Array::from(inverse_permutation)), - ) - }; - - // Deduplicate sorted addresses. FTS on List can produce duplicate - // row addresses when multiple list elements in the same row match. The - // encoding layer requires strictly increasing indices, so we dedup here - // and expand the results back afterwards. - let sorted_values = sorted_addrs.values(); - let mut unique_addrs: Vec = Vec::with_capacity(sorted_values.len()); - let mut sorted_to_unique: Vec = Vec::with_capacity(sorted_values.len()); - for &addr in sorted_values.iter() { - if unique_addrs.last() != Some(&addr) { - unique_addrs.push(addr); + let sorted_values = sorted_addrs.as_primitive::().values(); + + // Deduplicate sorted addresses. FTS on List can produce duplicate + // row addresses when multiple list elements in the same row match. The + // encoding layer requires strictly increasing indices, so we dedup here + // and expand the results back afterwards. + let has_duplicates = sorted_values.windows(2).any(|w| w[0] == w[1]); + if has_duplicates { + let mut deduped: Vec = Vec::with_capacity(sorted_values.len()); + let mut mapping: Vec = Vec::with_capacity(sorted_values.len()); + for &addr in sorted_values.iter() { + if deduped.last() != Some(&addr) { + deduped.push(addr); + } + mapping.push(deduped.len() - 1); + } + ( + Cow::Owned(deduped), + Some(UInt32Array::from(inverse_permutation)), + Some(mapping), + ) + } else { + ( + Cow::Borrowed(sorted_values.as_ref()), + Some(UInt32Array::from(inverse_permutation)), + None, + ) } - sorted_to_unique.push(unique_addrs.len() - 1); - } + }; let mut futures = FuturesOrdered::new(); let mut current_offsets = Vec::new(); let mut current_fragment_id = None; - for row_addr in &unique_addrs { + for row_addr in unique_addrs.iter() { let addr = RowAddress::new_from_u64(*row_addr); if Some(addr.fragment_id()) != current_fragment_id { @@ -281,20 +293,33 @@ impl TakeStream { let schema = batches.first().expect_ok()?.schema(); let mut new_data = concat_batches(&schema, batches.iter())?; - // Expand deduplicated rows back to the sorted order (reinsert duplicates) - if unique_addrs.len() < sorted_values.len() { - let expand_indices = UInt32Array::from( - sorted_to_unique - .iter() - .map(|&i| i as u32) - .collect::>(), - ); - new_data = arrow_select::take::take_record_batch(&new_data, &expand_indices).unwrap(); - } - - // Restore previous order (if addresses were out of order originally) - if let Some(permutation) = permutation { - new_data = arrow_select::take::take_record_batch(&new_data, &permutation).unwrap(); + // Expand deduplicated rows and restore original order. + // When both are needed, combine into a single take to avoid two passes. + match (sorted_to_unique, permutation) { + (Some(expand_map), Some(inv_perm)) => { + // Compose: for each original position, look up its sorted position + // via the inverse permutation, then map through the dedup expand. + let combined = UInt32Array::from( + inv_perm + .values() + .iter() + .map(|&p| expand_map[p as usize] as u32) + .collect::>(), + ); + new_data = arrow_select::take::take_record_batch(&new_data, &combined).unwrap(); + } + (None, Some(inv_perm)) => { + new_data = arrow_select::take::take_record_batch(&new_data, &inv_perm).unwrap(); + } + (Some(expand_map), None) => { + // Sorted and unique was false but no permutation — shouldn't happen, + // but handle defensively. + let expand_indices = + UInt32Array::from(expand_map.iter().map(|&i| i as u32).collect::>()); + new_data = + arrow_select::take::take_record_batch(&new_data, &expand_indices).unwrap(); + } + (None, None) => {} } self.metrics From 0da33c12f4382c2f81fff827019f562d73928e8e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 3 Feb 2026 14:52:38 -0800 Subject: [PATCH 3/3] lower level fix --- rust/lance/src/dataset/fragment.rs | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index f8c98eded68..b43d6acae22 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2616,18 +2616,43 @@ impl FragmentReader { /// Take rows from this fragment, will perform a copy if the underlying reader returns multiple /// batches. May return an error if the taken rows do not fit into a single batch. + /// + /// Duplicate indices are allowed and will produce duplicate rows in the output. pub async fn take_as_batch( &self, indices: &[u32], take_priority: Option, ) -> Result { + // The v2 encoding layer requires strictly increasing indices. Deduplicate + // here so callers (e.g. FTS with duplicate row matches) don't need to. + let has_duplicates = indices.windows(2).any(|w| w[0] == w[1]); + let (unique_indices, expand_map) = if has_duplicates { + let mut unique: Vec = Vec::with_capacity(indices.len()); + let mut mapping: Vec = Vec::with_capacity(indices.len()); + for &idx in indices { + if unique.last() != Some(&idx) { + unique.push(idx); + } + mapping.push((unique.len() - 1) as u32); + } + (Cow::Owned(unique), Some(UInt32Array::from(mapping))) + } else { + (Cow::Borrowed(indices), None) + }; + let batches = self - .take(indices, u32::MAX, take_priority) + .take(&unique_indices, u32::MAX, take_priority) .await? .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; - concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from) + let mut batch = concat_batches(&Arc::new(self.output_schema.clone()), batches.iter())?; + + if let Some(expand_map) = expand_map { + batch = arrow_select::take::take_record_batch(&batch, &expand_map)?; + } + + Ok(batch) } }