diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 9f6762d6e8b2..d7d50107fb70 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -227,21 +227,18 @@ impl Access for FsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let (target_path, tmp_path) = self.core.prepare_write(path, &op).await?; - let file = self - .core - .fs_write(&target_path, tmp_path.as_ref(), &op) - .await?; + let is_append = op.append(); + let concurrent = op.concurrent(); - let writer = FsWriter::new(target_path, tmp_path, file); + let writer = FsWriter::create(self.core.clone(), path, op).await?; - let writer = if op.append() { + let writer = if is_append { FsWriters::One(writer) } else { FsWriters::Two(oio::PositionWriter::new( self.info().clone(), writer, - op.concurrent(), + concurrent, )) }; diff --git a/core/src/services/fs/core.rs b/core/src/services/fs/core.rs index 9777f832e6f1..fa5a9d102c34 100644 --- a/core/src/services/fs/core.rs +++ b/core/src/services/fs/core.rs @@ -112,37 +112,7 @@ impl FsCore { Ok(f) } - pub async fn prepare_write( - &self, - path: &str, - op: &OpWrite, - ) -> Result<(PathBuf, Option)> { - if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = self.ensure_write_abs_path(&self.root, path).await?; - let tmp_path = self - .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path)) - .await?; - - // If the target file exists, we should append to the end of it directly. - let should_append = op.append() - && tokio::fs::try_exists(&target_path) - .await - .map_err(new_std_io_error)?; - let tmp_path = (!should_append).then_some(tmp_path); - - Ok((target_path, tmp_path)) - } else { - let p = self.ensure_write_abs_path(&self.root, path).await?; - Ok((p, None)) - } - } - - pub async fn fs_write( - &self, - target_path: &PathBuf, - tmp_path: Option<&PathBuf>, - op: &OpWrite, - ) -> Result { + pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> Result { let mut open_options = tokio::fs::OpenOptions::new(); if op.if_not_exists() { open_options.create_new(true); @@ -158,14 +128,38 @@ impl FsCore { open_options.truncate(true); } - let f = open_options - .open(tmp_path.unwrap_or(target_path)) - .await - .map_err(parse_error)?; + let f = open_options.open(path).await.map_err(parse_error)?; Ok(f) } + /// This function is used to build a tempfile for writing. + /// + /// We don't care about the OpWrite since every check should be performed on target path directly. + pub async fn fs_tempfile_write( + &self, + path: &str, + ) -> Result<(tokio::fs::File, Option)> { + let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else { + return Err(Error::new(ErrorKind::Unexpected, "fs didn't configure atomic_write_dir, but we're still entering the tempfile logic. This might be a bug.")); + }; + + let tmp_path = self + .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path)) + .await?; + + let mut open_options = tokio::fs::OpenOptions::new(); + + // tempfile should always be new file. + open_options.create_new(true); + open_options.write(true); + open_options.truncate(true); + + let f = open_options.open(&tmp_path).await.map_err(parse_error)?; + + Ok((f, Some(tmp_path))) + } + pub async fn fs_list(&self, path: &str) -> Result> { let p = self.root.join(path.trim_end_matches('/')); diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 2b02ced16b50..97d75f78565f 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -18,45 +18,64 @@ use std::fs::File; use std::io::Write; use std::path::PathBuf; +use std::sync::Arc; use bytes::Buf; use tokio::io::AsyncWriteExt; use crate::raw::*; +use crate::services::fs::core::FsCore; use crate::*; -pub type FsWriters = - TwoWays, oio::PositionWriter>>; +pub type FsWriters = TwoWays>; -pub struct FsWriter { +pub struct FsWriter { target_path: PathBuf, - tmp_path: Option, - - f: Option, + /// The temp_path is used to specify whether we should move to target_path after the file has been closed. + temp_path: Option, + f: tokio::fs::File, } -impl FsWriter { - pub fn new(target_path: PathBuf, tmp_path: Option, f: F) -> Self { - Self { - target_path, - tmp_path, - - f: Some(f), +impl FsWriter { + pub async fn create(core: Arc, path: &str, op: OpWrite) -> Result { + let target_path = core.ensure_write_abs_path(&core.root, path).await?; + + // Create a target file using our OpWrite to check for permissions and existence. + // + // If target check passed, we can go decide which path we should go for writing. + let target_file = core.fs_write(&target_path, &op).await?; + + // file is created success with append. + let is_append = op.append(); + // file is created success with if_not_exists. + let is_exist = !op.if_not_exists(); + + let (mut f, mut temp_path) = (target_file, None); + if core.atomic_write_dir.is_some() { + // The only case we allow write in place is the file + // exists and users request for append writing. + if !(is_append && is_exist) { + (f, temp_path) = core.fs_tempfile_write(path).await?; + } } + + Ok(Self { + target_path, + temp_path, + f, + }) } } /// # Safety /// /// We will only take `&mut Self` reference for FsWriter. -unsafe impl Sync for FsWriter {} +unsafe impl Sync for FsWriter {} -impl oio::Write for FsWriter { +impl oio::Write for FsWriter { async fn write(&mut self, mut bs: Buffer) -> Result<()> { - let f = self.f.as_mut().expect("FsWriter must be initialized"); - while bs.has_remaining() { - let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?; bs.advance(n); } @@ -64,33 +83,25 @@ impl oio::Write for FsWriter { } async fn close(&mut self) -> Result { - let f = self.f.as_mut().expect("FsWriter must be initialized"); - f.flush().await.map_err(new_std_io_error)?; - f.sync_all().await.map_err(new_std_io_error)?; + self.f.flush().await.map_err(new_std_io_error)?; + self.f.sync_all().await.map_err(new_std_io_error)?; - if let Some(tmp_path) = &self.tmp_path { - tokio::fs::rename(tmp_path, &self.target_path) + if let Some(temp_path) = &self.temp_path { + tokio::fs::rename(temp_path, &self.target_path) .await .map_err(new_std_io_error)?; } - let file_meta = f.metadata().await.map_err(new_std_io_error)?; - let mode = if file_meta.is_file() { - EntryMode::FILE - } else if file_meta.is_dir() { - EntryMode::DIR - } else { - EntryMode::Unknown - }; - let meta = Metadata::new(mode) + let file_meta = self.f.metadata().await.map_err(new_std_io_error)?; + let meta = Metadata::new(EntryMode::FILE) .with_content_length(file_meta.len()) .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into()); Ok(meta) } async fn abort(&mut self) -> Result<()> { - if let Some(tmp_path) = &self.tmp_path { - tokio::fs::remove_file(tmp_path) + if let Some(temp_path) = &self.temp_path { + tokio::fs::remove_file(temp_path) .await .map_err(new_std_io_error) } else { @@ -102,11 +113,10 @@ impl oio::Write for FsWriter { } } -impl oio::PositionWrite for FsWriter { +impl oio::PositionWrite for FsWriter { async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> { - let f = self.f.as_ref().expect("FsWriter must be initialized"); - - let f = f + let f = self + .f .try_clone() .await .map_err(new_std_io_error)? @@ -132,9 +142,8 @@ impl oio::PositionWrite for FsWriter { } async fn close(&self) -> Result { - let f = self.f.as_ref().expect("FsWriter must be initialized"); - - let mut f = f + let mut f = self + .f .try_clone() .await .map_err(new_std_io_error)? @@ -144,8 +153,8 @@ impl oio::PositionWrite for FsWriter { f.flush().map_err(new_std_io_error)?; f.sync_all().map_err(new_std_io_error)?; - if let Some(tmp_path) = &self.tmp_path { - tokio::fs::rename(tmp_path, &self.target_path) + if let Some(temp_path) = &self.temp_path { + tokio::fs::rename(temp_path, &self.target_path) .await .map_err(new_std_io_error)?; } @@ -165,8 +174,8 @@ impl oio::PositionWrite for FsWriter { } async fn abort(&self) -> Result<()> { - if let Some(tmp_path) = &self.tmp_path { - tokio::fs::remove_file(tmp_path) + if let Some(temp_path) = &self.temp_path { + tokio::fs::remove_file(temp_path) .await .map_err(new_std_io_error) } else {