Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions core/core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,32 @@ impl<R: oio::Read> oio::Read for CompleteReader<R> {
}
}

/// Tracks the state of the Write operation.
/// A successful operation goes through states: Open -> Written -> Closed
/// A failed operation terminates in the Error state
#[derive(Debug, PartialEq, Eq)]
enum CompleteState {
Open,
Written,
Closed,
Error,
}

impl CompleteState {
/// Attempt to transition to the destination state. Once CompleteState has
/// errored all further transitions are ignored.
fn transition(&mut self, destination: CompleteState) {
if *self != CompleteState::Error {
*self = destination
}
}
}

pub struct CompleteWriter<W> {
inner: Option<W>,
append: bool,
size: u64,
state: CompleteState,
}

impl<W> CompleteWriter<W> {
Expand All @@ -165,6 +187,7 @@ impl<W> CompleteWriter<W> {
inner: Some(inner),
append,
size: 0,
state: CompleteState::Open,
}
}

Expand Down Expand Up @@ -194,8 +217,10 @@ impl<W> CompleteWriter<W> {
#[cfg(debug_assertions)]
impl<W> Drop for CompleteWriter<W> {
fn drop(&mut self) {
if self.inner.is_some() {
log::warn!("writer has not been closed or aborted, must be a bug")
if self.state == CompleteState::Written {
log::warn!(
"writer has not been closed or aborted after successful write operation, must be a bug"
)
}
}
}
Expand All @@ -206,29 +231,47 @@ where
{
async fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
debug_assert_ne!(
self.state,
CompleteState::Open,
"bug: inner is empty, but state is Open"
);
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

let len = bs.len();
w.write(bs).await?;
w.write(bs)
.await
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
self.size += len as u64;
self.state.transition(CompleteState::Written);

Ok(())
}

async fn close(&mut self) -> Result<Metadata> {
let w = self.inner.as_mut().ok_or_else(|| {
debug_assert_ne!(
self.state,
CompleteState::Open,
"bug: inner is empty, but state is Open"
);
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

// we must return `Err` before setting inner to None; otherwise,
// we won't be able to retry `close` in `RetryLayer`.
let mut ret = w.close().await?;
self.check(ret.content_length())?;
let mut ret = w
.close()
.await
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
self.check(ret.content_length())
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
if ret.content_length() == 0 {
ret = ret.with_content_length(self.size);
}
self.inner = None;
self.state.transition(CompleteState::Closed);

Ok(ret)
}
Expand All @@ -238,8 +281,11 @@ where
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.abort().await?;
w.abort()
.await
.inspect_err(|_| self.state.transition(CompleteState::Error))?;
self.inner = None;
self.state.transition(CompleteState::Closed);

Ok(())
}
Expand Down
Loading