diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index a3bcfd3c27b..db7ec3cc709 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -1302,7 +1302,7 @@ impl DecodeBatchScheduler { sink: mpsc::UnboundedSender>, scheduler: Arc, ) { - debug_assert!(indices.windows(2).all(|w| w[0] <= w[1])); + debug_assert!(indices.windows(2).all(|w| w[0] < w[1])); if indices.is_empty() { return; } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index f8c98eded68..b43d6acae22 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -2616,18 +2616,43 @@ impl FragmentReader { /// Take rows from this fragment, will perform a copy if the underlying reader returns multiple /// batches. May return an error if the taken rows do not fit into a single batch. + /// + /// Duplicate indices are allowed and will produce duplicate rows in the output. pub async fn take_as_batch( &self, indices: &[u32], take_priority: Option, ) -> Result { + // The v2 encoding layer requires strictly increasing indices. Deduplicate + // here so callers (e.g. FTS with duplicate row matches) don't need to. + let has_duplicates = indices.windows(2).any(|w| w[0] == w[1]); + let (unique_indices, expand_map) = if has_duplicates { + let mut unique: Vec = Vec::with_capacity(indices.len()); + let mut mapping: Vec = Vec::with_capacity(indices.len()); + for &idx in indices { + if unique.last() != Some(&idx) { + unique.push(idx); + } + mapping.push((unique.len() - 1) as u32); + } + (Cow::Owned(unique), Some(UInt32Array::from(mapping))) + } else { + (Cow::Borrowed(indices), None) + }; + let batches = self - .take(indices, u32::MAX, take_priority) + .take(&unique_indices, u32::MAX, take_priority) .await? .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; - concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from) + let mut batch = concat_batches(&Arc::new(self.output_schema.clone()), batches.iter())?; + + if let Some(expand_map) = expand_map { + batch = arrow_select::take::take_record_batch(&batch, &expand_map)?; + } + + Ok(batch) } } diff --git a/rust/lance/src/io/exec/take.rs b/rust/lance/src/io/exec/take.rs index 25ca45e2335..67abe025a99 100644 --- a/rust/lance/src/io/exec/take.rs +++ b/rust/lance/src/io/exec/take.rs @@ -191,13 +191,16 @@ impl TakeStream { "{} nulls in row addresses", row_addrs.null_count() ); - // Check if the row addresses are already sorted to avoid unnecessary reorders - let is_sorted = row_addrs.values().is_sorted(); + + // Fast path: check if addresses are already sorted with no duplicates (common case). + // This avoids all sorting, dedup, and permutation overhead. + let is_sorted_and_unique = row_addrs.values().windows(2).all(|w| w[0] < w[1]); let sorted_addrs: Arc; - let (sorted_addrs, permutation) = if is_sorted { - (row_addrs, None) + let (unique_addrs, permutation, sorted_to_unique) = if is_sorted_and_unique { + (Cow::Borrowed(row_addrs.values().as_ref()), None, None) } else { + // Sort and compute inverse permutation to restore original order later let permutation = arrow::compute::sort_to_indices(&row_addrs_arr, None, None).unwrap(); sorted_addrs = arrow::compute::take( &row_addrs_arr, @@ -207,22 +210,45 @@ impl TakeStream { }), ) .unwrap(); - // Calculate the inverse permutation to restore the original order let mut inverse_permutation = vec![0; permutation.len()]; for (i, p) in permutation.values().iter().enumerate() { inverse_permutation[*p as usize] = i as u32; } - ( - sorted_addrs.as_primitive::(), - Some(UInt32Array::from(inverse_permutation)), - ) + let sorted_values = sorted_addrs.as_primitive::().values(); + + // Deduplicate sorted addresses. FTS on List can produce duplicate + // row addresses when multiple list elements in the same row match. The + // encoding layer requires strictly increasing indices, so we dedup here + // and expand the results back afterwards. + let has_duplicates = sorted_values.windows(2).any(|w| w[0] == w[1]); + if has_duplicates { + let mut deduped: Vec = Vec::with_capacity(sorted_values.len()); + let mut mapping: Vec = Vec::with_capacity(sorted_values.len()); + for &addr in sorted_values.iter() { + if deduped.last() != Some(&addr) { + deduped.push(addr); + } + mapping.push(deduped.len() - 1); + } + ( + Cow::Owned(deduped), + Some(UInt32Array::from(inverse_permutation)), + Some(mapping), + ) + } else { + ( + Cow::Borrowed(sorted_values.as_ref()), + Some(UInt32Array::from(inverse_permutation)), + None, + ) + } }; let mut futures = FuturesOrdered::new(); let mut current_offsets = Vec::new(); let mut current_fragment_id = None; - for row_addr in sorted_addrs.values() { + for row_addr in unique_addrs.iter() { let addr = RowAddress::new_from_u64(*row_addr); if Some(addr.fragment_id()) != current_fragment_id { @@ -267,9 +293,33 @@ impl TakeStream { let schema = batches.first().expect_ok()?.schema(); let mut new_data = concat_batches(&schema, batches.iter())?; - // Restore previous order (if addresses were out of order originally) - if let Some(permutation) = permutation { - new_data = arrow_select::take::take_record_batch(&new_data, &permutation).unwrap(); + // Expand deduplicated rows and restore original order. + // When both are needed, combine into a single take to avoid two passes. + match (sorted_to_unique, permutation) { + (Some(expand_map), Some(inv_perm)) => { + // Compose: for each original position, look up its sorted position + // via the inverse permutation, then map through the dedup expand. + let combined = UInt32Array::from( + inv_perm + .values() + .iter() + .map(|&p| expand_map[p as usize] as u32) + .collect::>(), + ); + new_data = arrow_select::take::take_record_batch(&new_data, &combined).unwrap(); + } + (None, Some(inv_perm)) => { + new_data = arrow_select::take::take_record_batch(&new_data, &inv_perm).unwrap(); + } + (Some(expand_map), None) => { + // Sorted and unique was false but no permutation — shouldn't happen, + // but handle defensively. + let expand_indices = + UInt32Array::from(expand_map.iter().map(|&i| i as u32).collect::>()); + new_data = + arrow_select::take::take_record_batch(&new_data, &expand_indices).unwrap(); + } + (None, None) => {} } self.metrics @@ -820,6 +870,105 @@ mod tests { assert_eq!(metrics.find_count("batches_processed").unwrap().value(), 3); } + /// Regression test: FTS on List can produce duplicate row addresses when + /// multiple list elements in the same row match. These duplicates caused + /// `indices_to_ranges` in the encoding layer to produce overlapping ranges, + /// panicking in BinaryPageScheduler with "attempt to subtract with overflow". + #[tokio::test] + async fn test_take_with_duplicate_row_addrs() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + // Simulate duplicate row addresses (same row matched twice), + // already sorted as they would be within a single fragment. + let row_addrs = UInt64Array::from(vec![0u64, 0, 1, 2, 2]); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + ROW_ADDR, + DataType::UInt64, + true, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(row_addrs)]).unwrap(); + + let row_addr_stream = futures::stream::iter(vec![Ok(batch)]); + let row_addr_stream = Box::pin(RecordBatchStreamAdapter::new(schema, row_addr_stream)); + let input = Arc::new(OneShotExec::new(row_addr_stream)); + + let projection = dataset + .empty_projection() + .union_column("s", OnMissing::Error) + .unwrap(); + let take_exec = TakeExec::try_new(dataset, input, projection) + .unwrap() + .unwrap(); + + let stream = take_exec + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 5); + + let all_data = concat_batches(&batches[0].schema(), &batches).unwrap(); + let s_col = all_data + .column_by_name("s") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + // Duplicated rows should have identical values + assert_eq!(s_col.value(0), s_col.value(1)); + assert_eq!(s_col.value(3), s_col.value(4)); + } + + /// Same as above but with unsorted duplicates, exercising the sort+dedup path. + #[tokio::test] + async fn test_take_with_unsorted_duplicate_row_addrs() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + let row_addrs = UInt64Array::from(vec![2u64, 0, 1, 0, 2]); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + ROW_ADDR, + DataType::UInt64, + true, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(row_addrs)]).unwrap(); + + let row_addr_stream = futures::stream::iter(vec![Ok(batch)]); + let row_addr_stream = Box::pin(RecordBatchStreamAdapter::new(schema, row_addr_stream)); + let input = Arc::new(OneShotExec::new(row_addr_stream)); + + let projection = dataset + .empty_projection() + .union_column("s", OnMissing::Error) + .unwrap(); + let take_exec = TakeExec::try_new(dataset, input, projection) + .unwrap() + .unwrap(); + + let stream = take_exec + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 5); + + let all_data = concat_batches(&batches[0].schema(), &batches).unwrap(); + let s_col = all_data + .column_by_name("s") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + // Original order was [2, 0, 1, 0, 2] — duplicates should match + assert_eq!(s_col.value(0), s_col.value(4)); // both row 2 + assert_eq!(s_col.value(1), s_col.value(3)); // both row 0 + } + #[tokio::test] async fn test_take_struct() { // When taking fields into an existing struct, the field order should be maintained