diff --git a/core/core/src/layers/complete.rs b/core/core/src/layers/complete.rs index 75fa9ee099de..d9cf1a403f14 100644 --- a/core/core/src/layers/complete.rs +++ b/core/core/src/layers/complete.rs @@ -153,10 +153,32 @@ impl oio::Read for CompleteReader { } } +/// Tracks the state of the Write operation. +/// A successful operation goes through states: Open -> Written -> Closed +/// A failed operation terminates in the Error state +#[derive(Debug, PartialEq, Eq)] +enum CompleteState { + Open, + Written, + Closed, + Error, +} + +impl CompleteState { + /// Attempt to transition to the destination state. Once CompleteState has + /// errored all further transitions are ignored. + fn transition(&mut self, destination: CompleteState) { + if *self != CompleteState::Error { + *self = destination + } + } +} + pub struct CompleteWriter { inner: Option, append: bool, size: u64, + state: CompleteState, } impl CompleteWriter { @@ -165,6 +187,7 @@ impl CompleteWriter { inner: Some(inner), append, size: 0, + state: CompleteState::Open, } } @@ -194,8 +217,10 @@ impl CompleteWriter { #[cfg(debug_assertions)] impl Drop for CompleteWriter { fn drop(&mut self) { - if self.inner.is_some() { - log::warn!("writer has not been closed or aborted, must be a bug") + if self.state == CompleteState::Written { + log::warn!( + "writer has not been closed or aborted after successful write operation, must be a bug" + ) } } } @@ -206,29 +231,47 @@ where { async fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { + debug_assert_ne!( + self.state, + CompleteState::Open, + "bug: inner is empty, but state is Open" + ); Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; let len = bs.len(); - w.write(bs).await?; + w.write(bs) + .await + .inspect_err(|_| self.state.transition(CompleteState::Error))?; self.size += len as u64; + self.state.transition(CompleteState::Written); Ok(()) } async fn close(&mut self) -> Result { let w = self.inner.as_mut().ok_or_else(|| { + debug_assert_ne!( + self.state, + CompleteState::Open, + "bug: inner is empty, but state is Open" + ); Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; // we must return `Err` before setting inner to None; otherwise, // we won't be able to retry `close` in `RetryLayer`. - let mut ret = w.close().await?; - self.check(ret.content_length())?; + let mut ret = w + .close() + .await + .inspect_err(|_| self.state.transition(CompleteState::Error))?; + self.check(ret.content_length()) + .inspect_err(|_| self.state.transition(CompleteState::Error))?; if ret.content_length() == 0 { ret = ret.with_content_length(self.size); } self.inner = None; + self.state.transition(CompleteState::Closed); Ok(ret) } @@ -238,8 +281,11 @@ where Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.abort().await?; + w.abort() + .await + .inspect_err(|_| self.state.transition(CompleteState::Error))?; self.inner = None; + self.state.transition(CompleteState::Closed); Ok(()) }