diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 60e01571b47a..8665d50c00e8 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -154,6 +154,11 @@ where match &mut self.state { State::Idle => { if self.futures.has_remaining() { + // Fill cache with the first write. + if self.cache.is_none() { + let size = self.fill_cache(bs); + return Poll::Ready(Ok(size)); + } let cache = self.cache.take().expect("pending write must exist"); let block_id = uuid::Uuid::new_v4().to_string(); self.block_ids.push(block_id.clone()); @@ -185,6 +190,26 @@ where State::Idle => { let w = self.w.clone(); let block_ids = self.block_ids.clone(); + + if self.block_ids.is_empty() { + match &self.cache { + Some(cache) => { + let w = self.w.clone(); + let bs = cache.clone(); + self.state = State::Close(Box::pin(async move { + let size = bs.len(); + w.write_once(size as u64, AsyncBody::ChunkedBytes(bs)).await + })); + } + None => { + let w = self.w.clone(); + // Call write_once if there is no data in cache. + self.state = State::Close(Box::pin(async move { + w.write_once(0, AsyncBody::Empty).await + })); + } + } + } if self.futures.is_empty() && self.cache.is_none() { self.state = State::Close(Box::pin(