From 11ebd7ebf83406f01b60543e821a4dc5d5dd9143 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Sun, 10 Aug 2025 22:33:00 -0700 Subject: [PATCH 1/4] Fix shuffle file permission issue when using BlazeShuffleManager --- .../src/shuffle/fs_utils.rs | 31 +++++++++++++ .../datafusion-ext-plans/src/shuffle/mod.rs | 1 + .../src/shuffle/single_repartitioner.rs | 43 ++++++------------- .../src/shuffle/sort_repartitioner.rs | 30 ++++--------- 4 files changed, 53 insertions(+), 52 deletions(-) create mode 100644 native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs diff --git a/native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs b/native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs new file mode 100644 index 000000000..4284ccac0 --- /dev/null +++ b/native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs @@ -0,0 +1,31 @@ +// Copyright 2022 The Blaze Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fs::{File, OpenOptions}, + io::Result, + os::unix::fs::OpenOptionsExt, + path::Path, +}; + +pub fn open_shuffle_file>(path: P) -> Result { + OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + // Set the shuffle file permissions to 0644 to keep it consistent with the permissions of + // the built-in shuffler manager in Spark. + .mode(0o644) + .open(path) +} diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index 9d945db86..eed20f6cf 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -44,6 +44,7 @@ pub mod single_repartitioner; pub mod sort_repartitioner; pub mod buffered_data; +mod fs_utils; mod rss; pub mod rss_single_repartitioner; pub mod rss_sort_repartitioner; diff --git a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs index 7e15b1e0e..c4dccfddc 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::{ - fs::{File, OpenOptions}, + fs::File, io::{Seek, Write}, sync::Arc, }; @@ -28,7 +28,7 @@ use crate::{ ipc_compression::IpcCompressionWriter, timer_helper::{TimedWriter, TimerHelper}, }, - shuffle::ShuffleRepartitioner, + shuffle::{ShuffleRepartitioner, fs_utils::open_shuffle_file}, }; pub struct SingleShuffleRepartitioner { @@ -54,13 +54,8 @@ impl SingleShuffleRepartitioner { ) -> Result<&'a mut IpcCompressionWriter>> { if output_data.is_none() { *output_data = Some(IpcCompressionWriter::new( - self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file)?, - ), + self.output_io_time + .wrap_writer(open_shuffle_file(&self.output_data_file)?), )); } Ok(output_data.as_mut().unwrap()) @@ -81,33 +76,21 @@ impl ShuffleRepartitioner for SingleShuffleRepartitioner { // write index file if let Some(output_writer) = output_data.as_mut() { - let mut output_index = self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_index_file)?, - ); + let mut output_index = self + .output_io_time + .wrap_writer(open_shuffle_file(&self.output_index_file)?); output_writer.finish_current_buf()?; let offset = output_writer.inner_mut().0.stream_position()?; output_index.write_all(&[0u8; 8])?; output_index.write_all(&(offset as i64).to_le_bytes()[..])?; } else { // write empty data file and index file - let _output_data = self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_data_file)?, - ); - let mut output_index = self.output_io_time.wrap_writer( - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.output_index_file)?, - ); + let _output_data = self + .output_io_time + .wrap_writer(open_shuffle_file(&self.output_data_file)?); + let mut output_index = self + .output_io_time + .wrap_writer(open_shuffle_file(&self.output_index_file)?); output_index.write_all(&[0u8; 16])?; } Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index a6ed361dc..f2a66369a 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::{ - fs::OpenOptions, io::{Read, Write}, sync::{Arc, Weak}, }; @@ -38,7 +37,10 @@ use crate::{ MemConsumer, MemConsumerInfo, MemManager, spill::{OwnedSpillBufReader, Spill, try_new_spill}, }, - shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData}, + shuffle::{ + Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, + fs_utils::open_shuffle_file, + }, }; pub struct SortShuffleRepartitioner { @@ -170,16 +172,8 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { let output_io_time_cloned = output_io_time.clone(); let _output_io_timer = output_io_time_cloned.timer(); - let mut output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&data_file)?; - let mut output_index = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&index_file)?; + let mut output_data = open_shuffle_file(&data_file)?; + let mut output_index = open_shuffle_file(&index_file)?; // write data file // exclude io timer because it is already included buffered_data.write() @@ -227,16 +221,8 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { let output_io_time = self.output_io_time.clone(); tokio::task::spawn_blocking(move || { let _output_io_timer = output_io_time.timer(); - let mut output_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&data_file)?; - let mut output_index = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&index_file)?; + let mut output_data = open_shuffle_file(&data_file)?; + let mut output_index = open_shuffle_file(&index_file)?; let mut merge_iter = OffsettedMergeIterator::new( num_output_partitions, From 2a584114d1c63154c4bf85e640fd54b543f58fc6 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Mon, 11 Aug 2025 00:01:36 -0700 Subject: [PATCH 2/4] refine --- .../src/shuffle/fs_utils.rs | 31 ------------------- .../datafusion-ext-plans/src/shuffle/mod.rs | 15 ++++++++- .../src/shuffle/single_repartitioner.rs | 2 +- .../src/shuffle/sort_repartitioner.rs | 5 +-- 4 files changed, 16 insertions(+), 37 deletions(-) delete mode 100644 native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs diff --git a/native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs b/native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs deleted file mode 100644 index 4284ccac0..000000000 --- a/native-engine/datafusion-ext-plans/src/shuffle/fs_utils.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2022 The Blaze Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - fs::{File, OpenOptions}, - io::Result, - os::unix::fs::OpenOptionsExt, - path::Path, -}; - -pub fn open_shuffle_file>(path: P) -> Result { - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - // Set the shuffle file permissions to 0644 to keep it consistent with the permissions of - // the built-in shuffler manager in Spark. - .mode(0o644) - .open(path) -} diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index eed20f6cf..a66595728 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -14,6 +14,9 @@ use std::{ fmt, + fs::{File, OpenOptions}, + os::unix::fs::OpenOptionsExt, + path::Path, sync::{ Arc, atomic::{AtomicUsize, Ordering::SeqCst}, @@ -44,7 +47,6 @@ pub mod single_repartitioner; pub mod sort_repartitioner; pub mod buffered_data; -mod fs_utils; mod rss; pub mod rss_single_repartitioner; pub mod rss_sort_repartitioner; @@ -274,3 +276,14 @@ fn binary_search(rows: &Arc, target: Row, from_index: isize, to_index: isi } return low as usize; // key not found. } + +pub fn open_shuffle_file>(path: P) -> std::io::Result { + OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + // Set the shuffle file permissions to 0644 to keep it consistent with the permissions of + // the built-in shuffler manager in Spark. + .mode(0o644) + .open(path) +} diff --git a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs index c4dccfddc..2b5059bd8 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs @@ -28,7 +28,7 @@ use crate::{ ipc_compression::IpcCompressionWriter, timer_helper::{TimedWriter, TimerHelper}, }, - shuffle::{ShuffleRepartitioner, fs_utils::open_shuffle_file}, + shuffle::{ShuffleRepartitioner, open_shuffle_file}, }; pub struct SingleShuffleRepartitioner { diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index f2a66369a..6ac3d9da2 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -37,10 +37,7 @@ use crate::{ MemConsumer, MemConsumerInfo, MemManager, spill::{OwnedSpillBufReader, Spill, try_new_spill}, }, - shuffle::{ - Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, - fs_utils::open_shuffle_file, - }, + shuffle::{Partitioning, ShuffleRepartitioner, buffered_data::BufferedData, open_shuffle_file}, }; pub struct SortShuffleRepartitioner { From e17818ed1fbf2596c47072608f1ce0fc91979df6 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Mon, 11 Aug 2025 09:34:11 -0700 Subject: [PATCH 3/4] debug --- .../datafusion-ext-plans/src/shuffle/mod.rs | 18 +++++++++++++----- .../blaze/shuffle/BlazeShuffleWriterBase.scala | 18 ++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index a66595728..fe6e3a26a 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -15,7 +15,7 @@ use std::{ fmt, fs::{File, OpenOptions}, - os::unix::fs::OpenOptionsExt, + os::unix::fs::{MetadataExt, OpenOptionsExt}, path::Path, sync::{ Arc, @@ -278,12 +278,20 @@ fn binary_search(rows: &Arc, target: Row, from_index: isize, to_index: isi } pub fn open_shuffle_file>(path: P) -> std::io::Result { - OpenOptions::new() + let path_ref = path.as_ref(); + let file = OpenOptions::new() .write(true) .create(true) .truncate(true) - // Set the shuffle file permissions to 0644 to keep it consistent with the permissions of - // the built-in shuffler manager in Spark. .mode(0o644) - .open(path) + .open(path_ref)?; + + let metadata = file.metadata()?; + let permissions = metadata.mode(); + println!( + "Opened file at {:?} with permissions {:o}", + path_ref, permissions + ); + + Ok(file) } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala index ea605dd59..b84cd257f 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala @@ -84,6 +84,9 @@ abstract class BlazeShuffleWriterBase[K, V](metrics: ShuffleWriteMetricsReporter val dataSize = Files.size(tempDataFilePath) metrics.incBytesWritten(dataSize) + printFileAndPermission(tempDataFilename) + printFileAndPermission(tempIndexFilename) + mapStatus = Some( Shims.get.commit( dep, @@ -93,9 +96,24 @@ abstract class BlazeShuffleWriterBase[K, V](metrics: ShuffleWriteMetricsReporter partitionLengths, dataSize, context)) + + printFileAndPermission(dataFile.getPath) + printFileAndPermission(dataFile.getPath.replace(".data", ".index")) } override def stop(success: Boolean): Option[MapStatus] = { mapStatus.filter(_ => success) } + + // scalafix:off + private def printFileAndPermission(file: String): Unit = { + try { + val path = Paths.get(file) + val posixPermissions = Files.getPosixFilePermissions(path) + println(s"Shuffle File: $file, Permissions: $posixPermissions") + } catch { + case e: Exception => + logWarning(s"Failed to get permissions for file $file: ${e.getMessage}") + } + } } From db728e534c8a7a8505fbbcbd5a2697047308928e Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Mon, 11 Aug 2025 11:22:02 -0700 Subject: [PATCH 4/4] Using set_permissions instead of mode to prevent impact by umask --- .../datafusion-ext-plans/src/shuffle/mod.rs | 12 ++++-------- .../blaze/shuffle/BlazeShuffleWriterBase.scala | 18 ------------------ 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs index fe6e3a26a..2d0a2f369 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/mod.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/mod.rs @@ -15,7 +15,7 @@ use std::{ fmt, fs::{File, OpenOptions}, - os::unix::fs::{MetadataExt, OpenOptionsExt}, + os::unix::fs::PermissionsExt, path::Path, sync::{ Arc, @@ -283,15 +283,11 @@ pub fn open_shuffle_file>(path: P) -> std::io::Result { .write(true) .create(true) .truncate(true) - .mode(0o644) .open(path_ref)?; - let metadata = file.metadata()?; - let permissions = metadata.mode(); - println!( - "Opened file at {:?} with permissions {:o}", - path_ref, permissions - ); + // Set the shuffle file permissions to 0644 to keep it consistent with the + // permissions of the built-in shuffler manager in Spark. + std::fs::set_permissions(path_ref, std::fs::Permissions::from_mode(0o644))?; Ok(file) } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala index b84cd257f..ea605dd59 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriterBase.scala @@ -84,9 +84,6 @@ abstract class BlazeShuffleWriterBase[K, V](metrics: ShuffleWriteMetricsReporter val dataSize = Files.size(tempDataFilePath) metrics.incBytesWritten(dataSize) - printFileAndPermission(tempDataFilename) - printFileAndPermission(tempIndexFilename) - mapStatus = Some( Shims.get.commit( dep, @@ -96,24 +93,9 @@ abstract class BlazeShuffleWriterBase[K, V](metrics: ShuffleWriteMetricsReporter partitionLengths, dataSize, context)) - - printFileAndPermission(dataFile.getPath) - printFileAndPermission(dataFile.getPath.replace(".data", ".index")) } override def stop(success: Boolean): Option[MapStatus] = { mapStatus.filter(_ => success) } - - // scalafix:off - private def printFileAndPermission(file: String): Unit = { - try { - val path = Paths.get(file) - val posixPermissions = Files.getPosixFilePermissions(path) - println(s"Shuffle File: $file, Permissions: $posixPermissions") - } catch { - case e: Exception => - logWarning(s"Failed to get permissions for file $file: ${e.getMessage}") - } - } }