Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ impl MultiLevelMergeBuilder {
// as we are not holding the memory for them
let mut sorted_streams = mem::take(&mut self.sorted_streams);

let (sorted_spill_files, buffer_size) = self
let (sorted_spill_files, _) = self
.get_sorted_spill_files_to_merge(
2,
// No read-ahead buffering needed, reserve memory for 1 batch per file
1,
// we must have at least 2 streams to merge
2_usize.saturating_sub(sorted_streams.len()),
&mut memory_reservation,
Expand All @@ -273,7 +274,6 @@ impl MultiLevelMergeBuilder {
let stream = self
.spill_manager
.clone()
.with_batch_read_buffer_capacity(buffer_size)
.read_spill_as_stream(
spill.file,
Some(spill.max_record_batch_memory),
Expand Down
14 changes: 2 additions & 12 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
use crate::coop::cooperative;
use crate::{common::spawn_buffered, metrics::SpillMetrics};
use crate::metrics::SpillMetrics;

/// The `SpillManager` is responsible for the following tasks:
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
Expand All @@ -41,8 +41,6 @@ pub struct SpillManager {
env: Arc<RuntimeEnv>,
pub(crate) metrics: SpillMetrics,
schema: SchemaRef,
/// Number of batches to buffer in memory during disk reads
batch_read_buffer_capacity: usize,
/// general-purpose compression options
pub(crate) compression: SpillCompression,
}
Expand All @@ -53,18 +51,10 @@ impl SpillManager {
env,
metrics,
schema,
batch_read_buffer_capacity: 2,
compression: SpillCompression::default(),
}
}

pub fn with_batch_read_buffer_capacity(
mut self,
batch_read_buffer_capacity: usize,
) -> Self {
self.batch_read_buffer_capacity = batch_read_buffer_capacity;
self
}

pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
self.compression = spill_compression;
Expand Down Expand Up @@ -186,7 +176,7 @@ impl SpillManager {
max_record_batch_memory,
)));

Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
Ok(stream)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Now that read_spill_as_stream is unbuffered, it has become functionally identical to read_spill_as_stream_unbuffered. To reduce code duplication and improve clarity, consider removing read_spill_as_stream_unbuffered and updating its call sites to use this function instead.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! After removing the buffering from read_spill_as_stream() it is not exactly the same as read_spill_as_stream_unbuffered(). If the fix of the initial issue is to drop the pre-fetching then one of these methods should be removed and all its callers should use the other method. But maybe this is the wrong way to fix it. Maybe a better way is to find the reason why a reader gets an invalid EOF/None message and does not wait for more messages.

}

/// Same as `read_spill_as_stream`, but without buffering.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After removing spawn_buffered, read_spill_as_stream is also unbuffered, so the doc comment on read_spill_as_stream_unbuffered is now misleading (there’s no behavioral difference). This could confuse callers/tests that expect a distinction between the two APIs.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! After removing the buffering from read_spill_as_stream() it is not exactly the same as read_spill_as_stream_unbuffered(). If the fix of the initial issue is to drop the pre-fetching then one of these methods should be removed and all its callers should use the other method. But maybe this is the wrong way to fix it. Maybe a better way is to find the reason why a reader gets an invalid EOF/None message and does not wait for more messages.

Expand Down
40 changes: 40 additions & 0 deletions datafusion/physical-plan/src/spill/spill_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,4 +1441,44 @@ mod tests {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_concurrent_writer_reader_race_condition() -> Result<()> {
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
// going for 100 iterations with a 5 batches per iteration
Comment on lines +1447 to +1448
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typos in test comment.

Line 1447: "concurncy" → "concurrency", "now" → "no", "condtion" → "condition".

Proposed fix
-        // stress testing the concurncy in the reader and the reader to make sure there is now race condtion
-        // going for 100 iterations with a 5 batches per iteration 
+        // Stress testing the concurrency between the writer and the reader to make sure there is no race condition.
+        // Going for 100 iterations with 5 batches per iteration.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
// going for 100 iterations with a 5 batches per iteration
// Stress testing the concurrency between the writer and the reader to make sure there is no race condition.
// Going for 100 iterations with 5 batches per iteration.
🤖 Prompt for AI Agents
In `@datafusion/physical-plan/src/spill/spill_pool.rs` around lines 1447 - 1448,
Fix the typos in the test comment that currently reads "stress testing the
concurncy in the reader and the reader to make sure there is now race condtion
going for 100 iterations with a 5 batches per iteration" by replacing
"concurncy" with "concurrency", "now" with "no", and "condtion" with "condition"
so the comment reads correctly about stress testing concurrency and ensuring no
race condition.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The CodeRabbit AI reviewer is correct! There are few minor typos in the comments which need to be fixed.

Comment on lines +1447 to +1448
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment contains a few typos and could be clarified. It seems to be testing concurrency between a writer and a reader, not two readers.

Suggested change
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
// going for 100 iterations with a 5 batches per iteration
// Stress test for concurrency between the writer and reader to ensure there is no race condition.
// It runs for 100 iterations, with 5 batches per iteration.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:documentation; feedback: The Gemini AI reviewer is correct! There are few minor typos in the comments which need to be fixed.

const NUM_BATCHES: usize = 5;
const ITERATIONS: usize = 100;

for iteration in 0..ITERATIONS {
let (writer, mut reader) = create_spill_channel(1024 * 1024);

let writer_handle = SpawnedTask::spawn(async move {
for i in 0..NUM_BATCHES {
let batch = create_test_batch(i as i32 * 10, 10);
writer.push_batch(&batch).unwrap();
tokio::task::yield_now().await;
}
});

let reader_handle = SpawnedTask::spawn(async move {
let mut batches_read = 0;
while let Some(result) = reader.next().await {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This while let Some(...) loop relies on the writer being dropped to terminate; if the stream fails to close for any reason, the test can hang indefinitely and stall CI. That makes failures show up as timeouts rather than a clear assertion failure.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:annoying; category:bug; feedback: The Augment AI reviewer is not correct! Depending on the writer to drop is a standard way to decide to stop waiting on read operation for all kinds of channels in Rust. The first/main reason is when receiving None message and the last dropped writer is the second reason.

let _batch = result.unwrap();
batches_read += 1;
tokio::task::yield_now().await;
}
batches_read
});

writer_handle.join().await.unwrap();
let batches_read = reader_handle.join().await.unwrap();

assert_eq!(
batches_read, NUM_BATCHES,
"Iteration {iteration}: Expected {NUM_BATCHES} got {batches_read}."
);
}

Ok(())
}
}