diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index 9d945db86..2d0a2f369 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -14,6 +14,9 @@ use std::{ fmt, + fs::{File, OpenOptions}, + os::unix::fs::PermissionsExt, + path::Path, sync::{ Arc, atomic::{AtomicUsize, Ordering::SeqCst}, @@ -273,3 +276,18 @@ fn binary_search(rows: &Arc, target: Row, from_index: isize, to_index: isi } return low as usize; // key not found. } + +pub fn open_shuffle_file>(path: P) -> std::io::Result { + let path_ref = path.as_ref(); + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path_ref)?; + + // Set the shuffle file permissions to 0644 to keep it consistent with the + // permissions of the built-in shuffler manager in Spark. + std::fs::set_permissions(path_ref, std::fs::Permissions::from_mode(0o644))?; + + Ok(file) +} diff --git a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs index 7e15b1e0e..2b5059bd8 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::{ - fs::{File, OpenOptions}, + fs::File, io::{Seek, Write}, sync::Arc, }; @@ -28,7 +28,7 @@ use crate::{ ipc_compression::IpcCompressionWriter, timer_helper::{TimedWriter, TimerHelper}, }, - shuffle::ShuffleRepartitioner, + shuffle::{ShuffleRepartitioner, open_shuffle_file}, }; pub struct SingleShuffleRepartitioner { @@ -54,13 +54,8 @@ impl SingleShuffleRepartitioner { ) -> Result<&'a mut IpcCompressionWriter>> { if output_data.is_none() { *output_data = Some(IpcCompressionWriter::new( - self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file)?, - ), + self.output_io_time + .wrap_writer(open_shuffle_file(&self.output_data_file)?), )); } Ok(output_data.as_mut().unwrap()) @@ -81,33 +76,21 @@ impl ShuffleRepartitioner for SingleShuffleRepartitioner { // write index file if let Some(output_writer) = output_data.as_mut() { - let mut output_index = self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_index_file)?, - ); + let mut output_index = self + .output_io_time + .wrap_writer(open_shuffle_file(&self.output_index_file)?); output_writer.finish_current_buf()?; let offset = output_writer.inner_mut().0.stream_position()?; output_index.write_all(&[0u8; 8])?; output_index.write_all(&(offset as i64).to_le_bytes()[..])?; } else { // write empty data file and index file - let _output_data = self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file)?, - ); - let mut output_index = self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_index_file)?, - ); + let _output_data = self + .output_io_time + .wrap_writer(open_shuffle_file(&self.output_data_file)?); + let mut output_index = self + .output_io_time + .wrap_writer(open_shuffle_file(&self.output_index_file)?); output_index.write_all(&[0u8; 16])?; } Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index a6ed361dc..6ac3d9da2 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::{ - fs::OpenOptions, io::{Read, Write}, sync::{Arc, Weak}, }; @@ -38,7 +37,7 @@ use crate::{ MemConsumer, MemConsumerInfo, MemManager, spill::{OwnedSpillBufReader, Spill, try_new_spill}, }, - shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData}, + shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, open_shuffle_file}, }; pub struct SortShuffleRepartitioner { @@ -170,16 +169,8 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { let output_io_time_cloned = output_io_time.clone(); let _output_io_timer = output_io_time_cloned.timer(); - let mut output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&data_file)?; - let mut output_index = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&index_file)?; + let mut output_data = open_shuffle_file(&data_file)?; + let mut output_index = open_shuffle_file(&index_file)?; // write data file // exclude io timer because it is already included buffered_data.write() @@ -227,16 +218,8 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { let output_io_time = self.output_io_time.clone(); tokio::task::spawn_blocking(move || { let _output_io_timer = output_io_time.timer(); - let mut output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&data_file)?; - let mut output_index = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&index_file)?; + let mut output_data = open_shuffle_file(&data_file)?; + let mut output_index = open_shuffle_file(&index_file)?; let mut merge_iter = OffsettedMergeIterator::new( num_output_partitions,