Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,18 @@ impl Access for FsBackend {
}

async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let (target_path, tmp_path) = self.core.prepare_write(path, &op).await?;
let file = self
.core
.fs_write(&target_path, tmp_path.as_ref(), &op)
.await?;
let is_append = op.append();
let concurrent = op.concurrent();

let writer = FsWriter::new(target_path, tmp_path, file);
let writer = FsWriter::create(self.core.clone(), path, op).await?;

let writer = if op.append() {
let writer = if is_append {
FsWriters::One(writer)
} else {
FsWriters::Two(oio::PositionWriter::new(
self.info().clone(),
writer,
op.concurrent(),
concurrent,
))
};

Expand Down
64 changes: 29 additions & 35 deletions core/src/services/fs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,37 +112,7 @@ impl FsCore {
Ok(f)
}

pub async fn prepare_write(
&self,
path: &str,
op: &OpWrite,
) -> Result<(PathBuf, Option<PathBuf>)> {
if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = self.ensure_write_abs_path(&self.root, path).await?;
let tmp_path = self
.ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
.await?;

// If the target file exists, we should append to the end of it directly.
let should_append = op.append()
&& tokio::fs::try_exists(&target_path)
.await
.map_err(new_std_io_error)?;
let tmp_path = (!should_append).then_some(tmp_path);

Ok((target_path, tmp_path))
} else {
let p = self.ensure_write_abs_path(&self.root, path).await?;
Ok((p, None))
}
}

pub async fn fs_write(
&self,
target_path: &PathBuf,
tmp_path: Option<&PathBuf>,
op: &OpWrite,
) -> Result<tokio::fs::File> {
pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> Result<tokio::fs::File> {
let mut open_options = tokio::fs::OpenOptions::new();
if op.if_not_exists() {
open_options.create_new(true);
Expand All @@ -158,14 +128,38 @@ impl FsCore {
open_options.truncate(true);
}

let f = open_options
.open(tmp_path.unwrap_or(target_path))
.await
.map_err(parse_error)?;
let f = open_options.open(path).await.map_err(parse_error)?;

Ok(f)
}

/// This function is used to build a tempfile for writing.
///
/// We don't care about the OpWrite since every check should be performed on target path directly.
pub async fn fs_tempfile_write(
&self,
path: &str,
) -> Result<(tokio::fs::File, Option<PathBuf>)> {
let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
return Err(Error::new(ErrorKind::Unexpected, "fs didn't configure atomic_write_dir, but we're still entering the tempfile logic. This might be a bug."));
};

let tmp_path = self
.ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
.await?;

let mut open_options = tokio::fs::OpenOptions::new();

// tempfile should always be new file.
open_options.create_new(true);
open_options.write(true);
open_options.truncate(true);

let f = open_options.open(&tmp_path).await.map_err(parse_error)?;

Ok((f, Some(tmp_path)))
}

pub async fn fs_list(&self, path: &str) -> Result<Option<tokio::fs::ReadDir>> {
let p = self.root.join(path.trim_end_matches('/'));

Expand Down
99 changes: 54 additions & 45 deletions core/src/services/fs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,79 +18,90 @@
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

use bytes::Buf;
use tokio::io::AsyncWriteExt;

use crate::raw::*;
use crate::services::fs::core::FsCore;
use crate::*;

pub type FsWriters =
TwoWays<FsWriter<tokio::fs::File>, oio::PositionWriter<FsWriter<tokio::fs::File>>>;
pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;

pub struct FsWriter<F> {
pub struct FsWriter {
target_path: PathBuf,
tmp_path: Option<PathBuf>,

f: Option<F>,
/// The temp_path is used to specify whether we should move to target_path after the file has been closed.
temp_path: Option<PathBuf>,
f: tokio::fs::File,
}

impl<F> FsWriter<F> {
pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
Self {
target_path,
tmp_path,

f: Some(f),
impl FsWriter {
pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) -> Result<Self> {
let target_path = core.ensure_write_abs_path(&core.root, path).await?;

// Create a target file using our OpWrite to check for permissions and existence.
//
// If target check passed, we can go decide which path we should go for writing.
let target_file = core.fs_write(&target_path, &op).await?;

// file is created success with append.
let is_append = op.append();
// file is created success with if_not_exists.
let is_exist = !op.if_not_exists();

let (mut f, mut temp_path) = (target_file, None);
if core.atomic_write_dir.is_some() {
// The only case we allow write in place is the file
// exists and users request for append writing.
if !(is_append && is_exist) {
(f, temp_path) = core.fs_tempfile_write(path).await?;
}
}

Ok(Self {
target_path,
temp_path,
f,
})
}
}

/// # Safety
///
/// We will only take `&mut Self` reference for FsWriter.
unsafe impl<F> Sync for FsWriter<F> {}
unsafe impl Sync for FsWriter {}

impl oio::Write for FsWriter<tokio::fs::File> {
impl oio::Write for FsWriter {
async fn write(&mut self, mut bs: Buffer) -> Result<()> {
let f = self.f.as_mut().expect("FsWriter must be initialized");

while bs.has_remaining() {
let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
bs.advance(n);
}

Ok(())
}

async fn close(&mut self) -> Result<Metadata> {
let f = self.f.as_mut().expect("FsWriter must be initialized");
f.flush().await.map_err(new_std_io_error)?;
f.sync_all().await.map_err(new_std_io_error)?;
self.f.flush().await.map_err(new_std_io_error)?;
self.f.sync_all().await.map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
tokio::fs::rename(tmp_path, &self.target_path)
if let Some(temp_path) = &self.temp_path {
tokio::fs::rename(temp_path, &self.target_path)
.await
.map_err(new_std_io_error)?;
}

let file_meta = f.metadata().await.map_err(new_std_io_error)?;
let mode = if file_meta.is_file() {
EntryMode::FILE
} else if file_meta.is_dir() {
EntryMode::DIR
} else {
EntryMode::Unknown
};
let meta = Metadata::new(mode)
let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
let meta = Metadata::new(EntryMode::FILE)
.with_content_length(file_meta.len())
.with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
Ok(meta)
}

async fn abort(&mut self) -> Result<()> {
if let Some(tmp_path) = &self.tmp_path {
tokio::fs::remove_file(tmp_path)
if let Some(temp_path) = &self.temp_path {
tokio::fs::remove_file(temp_path)
.await
.map_err(new_std_io_error)
} else {
Expand All @@ -102,11 +113,10 @@ impl oio::Write for FsWriter<tokio::fs::File> {
}
}

impl oio::PositionWrite for FsWriter<tokio::fs::File> {
impl oio::PositionWrite for FsWriter {
async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let f = f
let f = self
.f
.try_clone()
.await
.map_err(new_std_io_error)?
Expand All @@ -132,9 +142,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
}

async fn close(&self) -> Result<Metadata> {
let f = self.f.as_ref().expect("FsWriter must be initialized");

let mut f = f
let mut f = self
.f
.try_clone()
.await
.map_err(new_std_io_error)?
Expand All @@ -144,8 +153,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
f.flush().map_err(new_std_io_error)?;
f.sync_all().map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
tokio::fs::rename(tmp_path, &self.target_path)
if let Some(temp_path) = &self.temp_path {
tokio::fs::rename(temp_path, &self.target_path)
.await
.map_err(new_std_io_error)?;
}
Expand All @@ -165,8 +174,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
}

async fn abort(&self) -> Result<()> {
if let Some(tmp_path) = &self.tmp_path {
tokio::fs::remove_file(tmp_path)
if let Some(temp_path) = &self.temp_path {
tokio::fs::remove_file(temp_path)
.await
.map_err(new_std_io_error)
} else {
Expand Down
Loading