Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ impl DecodeBatchScheduler {
sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
scheduler: Arc<dyn EncodingsIo>,
) {
debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
if indices.is_empty() {
return;
}
Expand Down
29 changes: 27 additions & 2 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2616,18 +2616,43 @@ impl FragmentReader {

/// Take rows from this fragment, will perform a copy if the underlying reader returns multiple
/// batches. May return an error if the taken rows do not fit into a single batch.
///
/// Duplicate indices are allowed and will produce duplicate rows in the output.
pub async fn take_as_batch(
&self,
indices: &[u32],
take_priority: Option<u32>,
) -> Result<RecordBatch> {
// The v2 encoding layer requires strictly increasing indices. Deduplicate
// here so callers (e.g. FTS with duplicate row matches) don't need to.
let has_duplicates = indices.windows(2).any(|w| w[0] == w[1]);
let (unique_indices, expand_map) = if has_duplicates {
let mut unique: Vec<u32> = Vec::with_capacity(indices.len());
let mut mapping: Vec<u32> = Vec::with_capacity(indices.len());
for &idx in indices {
if unique.last() != Some(&idx) {
unique.push(idx);
}
mapping.push((unique.len() - 1) as u32);
}
(Cow::Owned(unique), Some(UInt32Array::from(mapping)))
} else {
(Cow::Borrowed(indices), None)
};

let batches = self
.take(indices, u32::MAX, take_priority)
.take(&unique_indices, u32::MAX, take_priority)
.await?
.buffered(get_num_compute_intensive_cpus())
.try_collect::<Vec<_>>()
.await?;
concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from)
let mut batch = concat_batches(&Arc::new(self.output_schema.clone()), batches.iter())?;

if let Some(expand_map) = expand_map {
batch = arrow_select::take::take_record_batch(&batch, &expand_map)?;
}
Comment on lines +2651 to +2653
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someday it would be nice to coalesce the dataset paths and the TakeExec paths so that we don't have to duplicate this logic everywhere. Not a comment on this PR, just griping.


Ok(batch)
}
}

Expand Down
175 changes: 162 additions & 13 deletions rust/lance/src/io/exec/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,16 @@ impl TakeStream {
"{} nulls in row addresses",
row_addrs.null_count()
);
// Check if the row addresses are already sorted to avoid unnecessary reorders
let is_sorted = row_addrs.values().is_sorted();

// Fast path: check if addresses are already sorted with no duplicates (common case).
// This avoids all sorting, dedup, and permutation overhead.
let is_sorted_and_unique = row_addrs.values().windows(2).all(|w| w[0] < w[1]);

let sorted_addrs: Arc<dyn Array>;
let (sorted_addrs, permutation) = if is_sorted {
(row_addrs, None)
let (unique_addrs, permutation, sorted_to_unique) = if is_sorted_and_unique {
(Cow::Borrowed(row_addrs.values().as_ref()), None, None)
} else {
// Sort and compute inverse permutation to restore original order later
let permutation = arrow::compute::sort_to_indices(&row_addrs_arr, None, None).unwrap();
sorted_addrs = arrow::compute::take(
&row_addrs_arr,
Expand All @@ -207,22 +210,45 @@ impl TakeStream {
}),
)
.unwrap();
// Calculate the inverse permutation to restore the original order
let mut inverse_permutation = vec![0; permutation.len()];
for (i, p) in permutation.values().iter().enumerate() {
inverse_permutation[*p as usize] = i as u32;
}
(
sorted_addrs.as_primitive::<UInt64Type>(),
Some(UInt32Array::from(inverse_permutation)),
)
let sorted_values = sorted_addrs.as_primitive::<UInt64Type>().values();

// Deduplicate sorted addresses. FTS on List<Utf8> can produce duplicate
// row addresses when multiple list elements in the same row match. The
// encoding layer requires strictly increasing indices, so we dedup here
// and expand the results back afterwards.
let has_duplicates = sorted_values.windows(2).any(|w| w[0] == w[1]);
if has_duplicates {
let mut deduped: Vec<u64> = Vec::with_capacity(sorted_values.len());
let mut mapping: Vec<usize> = Vec::with_capacity(sorted_values.len());
for &addr in sorted_values.iter() {
if deduped.last() != Some(&addr) {
deduped.push(addr);
}
mapping.push(deduped.len() - 1);
}
(
Cow::Owned(deduped),
Some(UInt32Array::from(inverse_permutation)),
Some(mapping),
)
} else {
(
Cow::Borrowed(sorted_values.as_ref()),
Some(UInt32Array::from(inverse_permutation)),
None,
)
}
};

let mut futures = FuturesOrdered::new();
let mut current_offsets = Vec::new();
let mut current_fragment_id = None;

for row_addr in sorted_addrs.values() {
for row_addr in unique_addrs.iter() {
let addr = RowAddress::new_from_u64(*row_addr);

if Some(addr.fragment_id()) != current_fragment_id {
Expand Down Expand Up @@ -267,9 +293,33 @@ impl TakeStream {
let schema = batches.first().expect_ok()?.schema();
let mut new_data = concat_batches(&schema, batches.iter())?;

// Restore previous order (if addresses were out of order originally)
if let Some(permutation) = permutation {
new_data = arrow_select::take::take_record_batch(&new_data, &permutation).unwrap();
// Expand deduplicated rows and restore original order.
// When both are needed, combine into a single take to avoid two passes.
match (sorted_to_unique, permutation) {
(Some(expand_map), Some(inv_perm)) => {
// Compose: for each original position, look up its sorted position
// via the inverse permutation, then map through the dedup expand.
let combined = UInt32Array::from(
inv_perm
.values()
.iter()
.map(|&p| expand_map[p as usize] as u32)
.collect::<Vec<_>>(),
);
new_data = arrow_select::take::take_record_batch(&new_data, &combined).unwrap();
}
(None, Some(inv_perm)) => {
new_data = arrow_select::take::take_record_batch(&new_data, &inv_perm).unwrap();
}
(Some(expand_map), None) => {
// Sorted and unique was false but no permutation — shouldn't happen,
// but handle defensively.
let expand_indices =
UInt32Array::from(expand_map.iter().map(|&i| i as u32).collect::<Vec<_>>());
new_data =
arrow_select::take::take_record_batch(&new_data, &expand_indices).unwrap();
}
(None, None) => {}
}

self.metrics
Expand Down Expand Up @@ -820,6 +870,105 @@ mod tests {
assert_eq!(metrics.find_count("batches_processed").unwrap().value(), 3);
}

/// Regression test: FTS on List<Utf8> can produce duplicate row addresses when
/// multiple list elements in the same row match. These duplicates caused
/// `indices_to_ranges` in the encoding layer to produce overlapping ranges,
/// panicking in BinaryPageScheduler with "attempt to subtract with overflow".
#[tokio::test]
async fn test_take_with_duplicate_row_addrs() {
let TestFixture {
dataset,
_tmp_dir_guard,
} = test_fixture().await;

// Simulate duplicate row addresses (same row matched twice),
// already sorted as they would be within a single fragment.
let row_addrs = UInt64Array::from(vec![0u64, 0, 1, 2, 2]);
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
ROW_ADDR,
DataType::UInt64,
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(row_addrs)]).unwrap();

let row_addr_stream = futures::stream::iter(vec![Ok(batch)]);
let row_addr_stream = Box::pin(RecordBatchStreamAdapter::new(schema, row_addr_stream));
let input = Arc::new(OneShotExec::new(row_addr_stream));

let projection = dataset
.empty_projection()
.union_column("s", OnMissing::Error)
.unwrap();
let take_exec = TakeExec::try_new(dataset, input, projection)
.unwrap()
.unwrap();

let stream = take_exec
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 5);

let all_data = concat_batches(&batches[0].schema(), &batches).unwrap();
let s_col = all_data
.column_by_name("s")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
// Duplicated rows should have identical values
assert_eq!(s_col.value(0), s_col.value(1));
assert_eq!(s_col.value(3), s_col.value(4));
}

/// Same as above but with unsorted duplicates, exercising the sort+dedup path.
#[tokio::test]
async fn test_take_with_unsorted_duplicate_row_addrs() {
let TestFixture {
dataset,
_tmp_dir_guard,
} = test_fixture().await;

let row_addrs = UInt64Array::from(vec![2u64, 0, 1, 0, 2]);
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
ROW_ADDR,
DataType::UInt64,
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(row_addrs)]).unwrap();

let row_addr_stream = futures::stream::iter(vec![Ok(batch)]);
let row_addr_stream = Box::pin(RecordBatchStreamAdapter::new(schema, row_addr_stream));
let input = Arc::new(OneShotExec::new(row_addr_stream));

let projection = dataset
.empty_projection()
.union_column("s", OnMissing::Error)
.unwrap();
let take_exec = TakeExec::try_new(dataset, input, projection)
.unwrap()
.unwrap();

let stream = take_exec
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 5);

let all_data = concat_batches(&batches[0].schema(), &batches).unwrap();
let s_col = all_data
.column_by_name("s")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
// Original order was [2, 0, 1, 0, 2] — duplicates should match
assert_eq!(s_col.value(0), s_col.value(4)); // both row 2
assert_eq!(s_col.value(1), s_col.value(3)); // both row 0
}

#[tokio::test]
async fn test_take_struct() {
// When taking fields into an existing struct, the field order should be maintained
Expand Down