Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions native-engine/datafusion-ext-plans/src/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use std::{
fmt,
fs::{File, OpenOptions},
os::unix::fs::PermissionsExt,
path::Path,
sync::{
Arc,
atomic::{AtomicUsize, Ordering::SeqCst},
Expand Down Expand Up @@ -273,3 +276,18 @@ fn binary_search(rows: &Arc<Rows>, target: Row, from_index: isize, to_index: isi
}
return low as usize; // key not found.
}

pub fn open_shuffle_file<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
let path_ref = path.as_ref();
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path_ref)?;

// Set the shuffle file permissions to 0644 to keep it consistent with the
// permissions of the built-in shuffler manager in Spark.
std::fs::set_permissions(path_ref, std::fs::Permissions::from_mode(0o644))?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below code would be impacted by umask.

OpenOptions::new()
    .write(true)
    .create(true)
    .truncate(true)
    .mode(0o644)
    .open(path_ref)?;

So, we have to use std::fs::set_permissions


Ok(file)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::{
fs::{File, OpenOptions},
fs::File,
io::{Seek, Write},
sync::Arc,
};
Expand All @@ -28,7 +28,7 @@ use crate::{
ipc_compression::IpcCompressionWriter,
timer_helper::{TimedWriter, TimerHelper},
},
shuffle::ShuffleRepartitioner,
shuffle::{ShuffleRepartitioner, open_shuffle_file},
};

pub struct SingleShuffleRepartitioner {
Expand All @@ -54,13 +54,8 @@ impl SingleShuffleRepartitioner {
) -> Result<&'a mut IpcCompressionWriter<TimedWriter<File>>> {
if output_data.is_none() {
*output_data = Some(IpcCompressionWriter::new(
self.output_io_time.wrap_writer(
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.output_data_file)?,
),
self.output_io_time
.wrap_writer(open_shuffle_file(&self.output_data_file)?),
));
}
Ok(output_data.as_mut().unwrap())
Expand All @@ -81,33 +76,21 @@ impl ShuffleRepartitioner for SingleShuffleRepartitioner {

// write index file
if let Some(output_writer) = output_data.as_mut() {
let mut output_index = self.output_io_time.wrap_writer(
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.output_index_file)?,
);
let mut output_index = self
.output_io_time
.wrap_writer(open_shuffle_file(&self.output_index_file)?);
output_writer.finish_current_buf()?;
let offset = output_writer.inner_mut().0.stream_position()?;
output_index.write_all(&[0u8; 8])?;
output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
} else {
// write empty data file and index file
let _output_data = self.output_io_time.wrap_writer(
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.output_data_file)?,
);
let mut output_index = self.output_io_time.wrap_writer(
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.output_index_file)?,
);
let _output_data = self
.output_io_time
.wrap_writer(open_shuffle_file(&self.output_data_file)?);
let mut output_index = self
.output_io_time
.wrap_writer(open_shuffle_file(&self.output_index_file)?);
output_index.write_all(&[0u8; 16])?;
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::{
fs::OpenOptions,
io::{Read, Write},
sync::{Arc, Weak},
};
Expand All @@ -38,7 +37,7 @@ use crate::{
MemConsumer, MemConsumerInfo, MemManager,
spill::{OwnedSpillBufReader, Spill, try_new_spill},
},
shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData},
shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, open_shuffle_file},
};

pub struct SortShuffleRepartitioner {
Expand Down Expand Up @@ -170,16 +169,8 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner {
let output_io_time_cloned = output_io_time.clone();
let _output_io_timer = output_io_time_cloned.timer();

let mut output_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&data_file)?;
let mut output_index = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&index_file)?;
let mut output_data = open_shuffle_file(&data_file)?;
let mut output_index = open_shuffle_file(&index_file)?;

// write data file
// exclude io timer because it is already included buffered_data.write()
Expand Down Expand Up @@ -227,16 +218,8 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner {
let output_io_time = self.output_io_time.clone();
tokio::task::spawn_blocking(move || {
let _output_io_timer = output_io_time.timer();
let mut output_data = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&data_file)?;
let mut output_index = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&index_file)?;
let mut output_data = open_shuffle_file(&data_file)?;
let mut output_index = open_shuffle_file(&index_file)?;

let mut merge_iter = OffsettedMergeIterator::new(
num_output_partitions,
Expand Down
Loading