From 19dde4270cd2979fa654dbd1d551ebc9fc67454e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Sat, 21 Feb 2026 09:01:01 -0800 Subject: [PATCH 1/2] don't use buffered without spawn --- .../lance-index/src/scalar/inverted/merger.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/merger.rs b/rust/lance-index/src/scalar/inverted/merger.rs index 21b46d0d453..af81b7aa10f 100644 --- a/rust/lance-index/src/scalar/inverted/merger.rs +++ b/rust/lance-index/src/scalar/inverted/merger.rs @@ -2,8 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use fst::Streamer; -use futures::{stream, StreamExt, TryStreamExt}; -use lance_core::{cache::LanceCache, utils::tokio::get_num_compute_intensive_cpus, Error, Result}; +use lance_core::{cache::LanceCache, Error, Result}; use snafu::location; use std::sync::Arc; @@ -252,21 +251,11 @@ impl Merger for SizeBasedMerger<'_> { let start = std::time::Instant::now(); let parts = std::mem::take(&mut self.input); let num_parts = parts.len(); - let buffer_size = std::cmp::max( - 1, - std::cmp::min(get_num_compute_intensive_cpus(), num_parts), - ); let cache = LanceCache::no_cache(); let token_set_format = self.token_set_format; - let mut stream = stream::iter(parts.into_iter().map(|part| { - let cache = cache.clone(); - async move { part.load(&cache, token_set_format).await } - })) - .buffered(buffer_size); - - let mut idx = 0; - while let Some(part) = stream.try_next().await? { - idx += 1; + + for (idx, part) in parts.into_iter().enumerate() { + let part = part.load(&cache, token_set_format).await?; self.merge_partition(part, &mut estimated_size).await?; self.progress .stage_progress("merge_partitions", idx as u64) From b8b00f358a0bb9adf25ca339819a9f7d4ead161f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Sat, 21 Feb 2026 11:37:53 -0800 Subject: [PATCH 2/2] Add a spawn to the part load in FTS training --- .../lance-index/src/scalar/inverted/merger.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/merger.rs b/rust/lance-index/src/scalar/inverted/merger.rs index af81b7aa10f..ea0e3414d8e 100644 --- a/rust/lance-index/src/scalar/inverted/merger.rs +++ b/rust/lance-index/src/scalar/inverted/merger.rs @@ -2,7 +2,8 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use fst::Streamer; -use lance_core::{cache::LanceCache, Error, Result}; +use futures::{stream, StreamExt, TryStreamExt}; +use lance_core::{cache::LanceCache, utils::tokio::get_num_compute_intensive_cpus, Error, Result}; use snafu::location; use std::sync::Arc; @@ -251,11 +252,22 @@ impl Merger for SizeBasedMerger<'_> { let start = std::time::Instant::now(); let parts = std::mem::take(&mut self.input); let num_parts = parts.len(); + let buffer_size = std::cmp::max( + 1, + std::cmp::min(get_num_compute_intensive_cpus(), num_parts), + ); let cache = LanceCache::no_cache(); let token_set_format = self.token_set_format; - - for (idx, part) in parts.into_iter().enumerate() { - let part = part.load(&cache, token_set_format).await?; + let mut stream = stream::iter(parts.into_iter().map(|part| { + let cache = cache.clone(); + tokio::task::spawn(async move { part.load(&cache, token_set_format).await }) + })) + .buffered(buffer_size); + + let mut idx = 0; + while let Some(part) = stream.try_next().await? { + let part = part?; + idx += 1; self.merge_partition(part, &mut estimated_size).await?; self.progress .stage_progress("merge_partitions", idx as u64)