Skip to content
Draft
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
// if we wrote a spill file for this partition then copy the
// contents into the shuffle file
if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() {
let mut spill_file =
BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?);
let mut spill_file = BufReader::new(&spill_data.file);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spill_data.file is opened in write-only mode.

let spill_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(spill_file.path())
.map_err(|e| {
DataFusionError::Execution(format!("Error occurred while spilling {e}"))
})?;
self.spill_file = Some(SpillFile {
temp_file: spill_file,
file: spill_data,
});

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

let mut write_timer = self.metrics.write_time.timer();
std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?;
write_timer.stop();
Expand Down Expand Up @@ -1113,6 +1112,7 @@ struct PartitionWriter {
}

struct SpillFile {
#[allow(dead_code)]
temp_file: RefCountedTempFile,
file: File,
}
Expand Down
Loading