diff --git a/Cargo.toml b/Cargo.toml index 93416fbaf9a..fd891f3bdc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -194,4 +194,4 @@ redundant_clone = "deny" print_stdout = "deny" print_stderr = "deny" # not too much we can do to avoid multiple crate versions -multiple-crate-versions = "allow" +multiple-crate-versions = "allow" \ No newline at end of file diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 5f90691c898..bc2c879f7e8 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -656,6 +656,9 @@ async fn resolve_commit_handler( /// data. Otherwise, the source will be spilled to a temporary file on disk. /// /// This is used to support retries on write operations. +// TODO: we will use this again once we bring back +// https://github.com/lancedb/lance/issues/3798 +#[allow(dead_code)] async fn new_source_iter( source: SendableRecordBatchStream, enable_retries: bool, @@ -685,6 +688,9 @@ async fn new_source_iter( } } +// We will use this again once we address this: +// https://github.com/lancedb/lance/issues/3798 +#[allow(dead_code)] struct SpillStreamIter { receiver: SpillReceiver, #[allow(dead_code)] // Exists to keep the SpillSender alive diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index b8d74cb854f..e338e7159ee 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -16,12 +16,10 @@ //! key columns are identical in both the source and the target. This means that you will need some kind of //! meaningful key column to be able to perform a merge insert. -use futures::FutureExt; use std::{ collections::BTreeMap, - future::Future, sync::{Arc, Mutex}, - time::{Duration, Instant}, + time::Duration, }; use arrow_array::{ @@ -55,14 +53,13 @@ use lance_datafusion::{ use datafusion_physical_expr::expressions::Column; use futures::{ - future::Either, stream::{self}, - Stream, StreamExt, TryFutureExt, TryStreamExt, + Stream, StreamExt, TryStreamExt, }; use lance_core::{ datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions}, error::{box_error, InvalidInputSnafu}, - utils::{backoff::SlotBackoff, futures::Capacity, tokio::get_num_compute_intensive_cpus}, + utils::{futures::Capacity, tokio::get_num_compute_intensive_cpus}, Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD, }; use lance_datafusion::{ @@ -340,6 +337,8 @@ impl MergeInsertBuilder { /// Set number of times to retry the operation if there is contention. /// + /// Note: this is disabled until we address https://github.com/lancedb/lance/issues/3798 + /// /// If this is set > 0, then the operation will keep a copy of the input data /// either in memory or on disk (depending on the size of the data) and will /// retry the operation if there is contention. @@ -352,6 +351,8 @@ impl MergeInsertBuilder { /// Set the timeout used to limit retries. /// + /// Note: this is disabled until we address https://github.com/lancedb/lance/issues/3798 + /// /// This is the maximum time to spend on the operation before giving up. At /// least one attempt will be made, regardless of how long it takes to complete. /// Subsequent attempts will be cancelled once this timeout is reached. If @@ -1029,101 +1030,13 @@ impl MergeInsertJob { /// This will take in the source, merge it with the existing target data, and insert new /// rows, update existing rows, and delete existing rows pub async fn execute( - mut self, + self, source: SendableRecordBatchStream, ) -> Result<(Arc, MergeStats)> { - let start = Instant::now(); - let mut source_iter = - super::new_source_iter(source, self.params.conflict_retries > 0).await?; - - let mut dataset_ref = self.dataset.clone(); - let max_retries = self.params.conflict_retries; - let mut backoff = SlotBackoff::default(); - - fn timeout_error(retry_timeout: Duration, attempts: u32) -> Error { - Error::TooMuchWriteContention { - message: format!( - "Attempted {} times, but failed on retry_timeout of {:.3} seconds.", - attempts, - retry_timeout.as_secs_f32() - ), - location: location!(), - } - } - - fn maybe_timeout( - backoff: &SlotBackoff, - start: Instant, - retry_timeout: Duration, - future: impl Future, - ) -> impl Future> { - let attempt = backoff.attempt(); - if attempt == 0 { - // No timeout on first attempt - Either::Left(future.map(|res| Ok(res))) - } else { - let remaining = retry_timeout.saturating_sub(start.elapsed()); - Either::Right( - tokio::time::timeout(remaining, future) - .map_err(move |_| timeout_error(retry_timeout, attempt + 1)), - ) - } - } - - while backoff.attempt() <= max_retries { - let ds = dataset_ref.clone(); - let execute_fut = self - .clone() - .execute_uncommitted_impl(source_iter.next().unwrap()); - let execute_fut = - maybe_timeout(&backoff, start, self.params.retry_timeout, execute_fut); - let (transaction, mut stats) = execute_fut.await??; - stats.num_attempts = backoff.attempt() + 1; - - let commit_future = CommitBuilder::new(ds.clone()).execute(transaction); - let commit_future = - maybe_timeout(&backoff, start, self.params.retry_timeout, commit_future); - match commit_future.await? { - Ok(ds) => return Ok((Arc::new(ds), stats)), - Err(Error::RetryableCommitConflict { .. }) => { - // Check whether we have exhausted our retries *before* - // we sleep. - if backoff.attempt() >= max_retries { - break; - } - if start.elapsed() > self.params.retry_timeout { - return Err(timeout_error( - self.params.retry_timeout, - backoff.attempt() + 1, - )); - } - if backoff.attempt() == 0 { - // We add 10% buffer here, to allow concurrent writes to complete. - // We pass the first attempt's time to the backoff so it's used - // as the unit for backoff time slots. - // See SlotBackoff implementation for more details on how this works. - backoff = backoff.with_unit((start.elapsed().as_millis() * 11 / 10) as u32); - } - - let sleep_fut = tokio::time::sleep(backoff.next_backoff()); - let sleep_fut = - maybe_timeout(&backoff, start, self.params.retry_timeout, sleep_fut); - sleep_fut.await?; - - let mut ds = dataset_ref.as_ref().clone(); - ds.checkout_latest().await?; - - dataset_ref = Arc::new(ds); - self.dataset = dataset_ref.clone(); - continue; - } - Err(e) => return Err(e), - }; - } - Err(Error::TooMuchWriteContention { - message: format!("Attempted {} retries.", max_retries), - location: location!(), - }) + let ds = self.dataset.clone(); + let (transaction, stats) = self.execute_uncommitted_impl(source).await?; + let dataset = CommitBuilder::new(ds).execute(transaction).await?; + Ok((Arc::new(dataset), stats)) } /// Execute the merge insert job without committing the changes. @@ -2228,253 +2141,4 @@ mod tests { } } } - - // For some reason, Windows isn't able to handle the timeout test. Possibly - // a performance bug in their timer implementation? - #[cfg(not(windows))] - #[rstest::rstest] - #[case::all_success(Duration::from_secs(100_000))] - #[case::timeout(Duration::from_millis(200))] - #[tokio::test] - async fn test_merge_insert_concurrency(#[case] timeout: Duration) { - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - Field::new("value", DataType::UInt32, false), - ])); - let concurrency = 10; - let initial_data = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt32Array::from_iter_values(0..concurrency)), - Arc::new(UInt32Array::from_iter_values(std::iter::repeat_n( - 0, - concurrency as usize, - ))), - ], - ) - .unwrap(); - - // Increase likelihood of contention by throttling the store - let throttled = Arc::new(ThrottledStoreWrapper { - config: ThrottleConfig { - wait_list_per_call: Duration::from_millis(1), - wait_get_per_call: Duration::from_millis(1), - ..Default::default() - }, - }); - let session = Arc::new(Session::default()); - - let mut dataset = InsertBuilder::new("memory://") - .with_params(&WriteParams { - store_params: Some(ObjectStoreParams { - object_store_wrapper: Some(throttled.clone()), - ..Default::default() - }), - session: Some(session.clone()), - ..Default::default() - }) - .execute(vec![initial_data]) - .await - .unwrap(); - - // do 10 merge inserts in parallel. Each will open the dataset, signal - // they have opened, and then wait for a signal to proceed. Once the signal - // is received, they will do a merge insert and close the dataset. - - let barrier = Arc::new(Barrier::new(concurrency as usize)); - let mut handles = Vec::new(); - for i in 0..concurrency { - let session_ref = session.clone(); - let schema_ref = schema.clone(); - let barrier_ref = barrier.clone(); - let throttled_ref = throttled.clone(); - let handle = tokio::task::spawn(async move { - let dataset = DatasetBuilder::from_uri("memory://") - .with_read_params(ReadParams { - store_options: Some(ObjectStoreParams { - object_store_wrapper: Some(throttled_ref.clone()), - ..Default::default() - }), - session: Some(session_ref.clone()), - ..Default::default() - }) - .load() - .await - .unwrap(); - let dataset = Arc::new(dataset); - - let new_data = RecordBatch::try_new( - schema_ref.clone(), - vec![ - Arc::new(UInt32Array::from(vec![i])), - Arc::new(UInt32Array::from(vec![1])), - ], - ) - .unwrap(); - let source = Box::new(RecordBatchIterator::new([Ok(new_data)], schema_ref.clone())); - - let job = MergeInsertBuilder::try_new(dataset, vec!["id".to_string()]) - .unwrap() - .when_matched(WhenMatched::UpdateAll) - .when_not_matched(WhenNotMatched::InsertAll) - .conflict_retries(100) - .retry_timeout(timeout) - .try_build() - .unwrap(); - barrier_ref.wait().await; - - let start = Instant::now(); - let res = job - .execute_reader(source) - .await - .map(|(_ds, stats)| stats.num_attempts); - let elapsed = start.elapsed(); - (res, elapsed) - }); - handles.push(handle); - } - - let results = try_join_all(handles).await.unwrap(); - - for (attempts, elapsed) in results.iter() { - let buffer = Duration::from_millis(100); - assert!( - *elapsed < timeout + buffer, - "Elapsed time should be less than {} ms, was {} ms", - (timeout + buffer).as_millis(), - elapsed.as_millis() - ); - - match attempts { - Ok(attempts) => { - assert!(*attempts <= 10, "Attempt count should be <= 10"); - } - Err(err) => { - // If we get an error, it means the task was cancelled - // due to timeout. This is expected if the timeout is - // set to a low value. - assert!( - matches!(err, Error::TooMuchWriteContention { message, .. } if message.contains("failed on retry_timeout")), - "Expected TooMuchWriteContention error, got: {:?}", - err - ); - } - } - } - - if timeout.as_secs() > 10 { - dataset.checkout_latest().await.unwrap(); - let batches = dataset.scan().try_into_batch().await.unwrap(); - - let values = batches["value"].as_primitive::(); - assert!( - values.values().iter().all(|&v| v == 1), - "All values should be 1 after merge insert. Got: {:?}", - values - ); - } - } - - #[tokio::test] - async fn test_merge_insert_large_concurrent() { - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - Field::new("value", DataType::UInt32, false), - ])); - let num_rows = 10; - let initial_data = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt32Array::from_iter_values(0..num_rows)), - Arc::new(UInt32Array::from_iter_values(std::iter::repeat_n( - 0, - num_rows as usize, - ))), - ], - ) - .unwrap(); - - let test_dir = tempdir().unwrap(); - let test_uri = test_dir.path().to_str().unwrap(); - let dataset = InsertBuilder::new(test_uri) - .execute(vec![initial_data]) - .await - .unwrap(); - let dataset = Arc::new(dataset); - - // Start one merge insert, but don't commit it yet. - let new_data1 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt32Array::from(vec![1])), - Arc::new(UInt32Array::from(vec![1])), - ], - ) - .unwrap(); - let (transaction1, _stats) = - MergeInsertBuilder::try_new(dataset.clone(), vec!["id".to_string()]) - .unwrap() - .when_matched(WhenMatched::UpdateAll) - .when_not_matched(WhenNotMatched::InsertAll) - .try_build() - .unwrap() - .execute_uncommitted(RecordBatchIterator::new( - vec![Ok(new_data1)], - schema.clone(), - )) - .await - .unwrap(); - - // Setup a "large" merge insert, with many batches - let new_data2 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(UInt32Array::from_iter_values(0..1000)), - Arc::new(UInt32Array::from_iter_values(std::iter::repeat_n(2, 1000))), - ], - ) - .unwrap(); - let notify = Arc::new(Notify::new()); - let source = RecordBatchIterator::new( - (0..10) - .map(|i| { - let batch = new_data2.slice(i * 100, 100); - if i == 9 { - notify.notify_one(); - } - Ok(batch) - }) - .collect::>(), - schema.clone(), - ); - let dataset2 = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); - let job = MergeInsertBuilder::try_new(Arc::new(dataset2), vec!["id".to_string()]) - .unwrap() - .when_matched(WhenMatched::UpdateAll) - .when_not_matched(WhenNotMatched::InsertAll) - .try_build() - .unwrap() - .execute_reader(source); - let task = tokio::task::spawn(job); - - // Right as the large merge insert has finished reading the last batch, - // we will commit the first merge insert. This should trigger a conflict, - // but we should resolve it automatically. - notify.notified().await; - let mut dataset = CommitBuilder::new(dataset) - .execute(transaction1) - .await - .unwrap(); - - task.await.unwrap().unwrap(); - dataset.checkout_latest().await.unwrap(); - - let batches = dataset.scan().try_into_batch().await.unwrap(); - let values = batches["value"].as_primitive::(); - assert!( - values.values().iter().all(|&v| v == 2), - "All values should be 1 after merge insert. Got: {:?}", - values - ); - } }