From 996ee16df2c9020156b20adc765e19374ea67452 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 4 Jan 2024 16:45:01 +0000 Subject: [PATCH 1/6] feat: expose the concurrent field --- core/src/raw/ops.rs | 12 ++++++++++++ core/src/types/operator/operator_futures.rs | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index dbf43bfaa79c..955a5884210e 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -516,6 +516,7 @@ impl OpStat { pub struct OpWrite { append: bool, buffer: Option, + concurrent: usize, content_type: Option, content_disposition: Option, @@ -601,6 +602,17 @@ impl OpWrite { self.cache_control = Some(cache_control.to_string()); self } + + /// Get the concurrent. + pub fn concurrent(&self) -> usize { + self.concurrent + } + + /// Set the maximum concurrent write task amount. + pub fn with_concurrent(mut self, concurrent: usize) -> Self { + self.concurrent = concurrent; + self + } } /// Args for `copy` operation. diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 8c75f56b3cf3..4faf5ea52301 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -478,6 +478,12 @@ impl FutureWrite { .map_args(|(args, bs)| (args.with_cache_control(v), bs)); self } + + /// Set the maximum concurrent write task amount. + pub fn concurrent(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs)); + self + } } impl Future for FutureWrite { @@ -543,6 +549,12 @@ impl FutureWriter { self.0 = self.0.map_args(|args| args.with_cache_control(v)); self } + + /// Set the maximum concurrent write task amount. + pub fn concurrent(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_concurrent(v)); + self + } } impl Future for FutureWriter { From 70042759feec956e95c3f6091ecaad3a158b8541 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 4 Jan 2024 16:45:13 +0000 Subject: [PATCH 2/6] feat: implement concurrent `MultipartUploadWriter` --- .../raw/oio/write/multipart_upload_write.rs | 276 +++++++++++------- core/src/services/b2/backend.rs | 3 +- core/src/services/cos/backend.rs | 2 +- core/src/services/obs/backend.rs | 2 +- core/src/services/oss/backend.rs | 2 +- core/src/services/s3/backend.rs | 3 +- core/src/services/upyun/backend.rs | 3 +- 7 files changed, 183 insertions(+), 108 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index a0f92aa98ec6..86dd98cd1576 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -15,12 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::min; +use std::collections::VecDeque; +use std::pin::Pin; use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; use async_trait::async_trait; +use futures::Future; +use futures::FutureExt; +use futures::StreamExt; use crate::raw::*; use crate::*; @@ -103,44 +109,119 @@ pub struct MultipartUploadPart { pub etag: String, } +struct UploadFuture(BoxedFuture>); + +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this UploadFuture. +unsafe impl Send for UploadFuture {} + +/// # Safety +/// +/// We will only take `&mut Self` reference for UploadFuture. +unsafe impl Sync for UploadFuture {} + +impl Future for UploadFuture { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_mut().0.poll_unpin(cx) + } +} + +#[derive(Clone)] +struct WriteTask { + part_number: usize, + bs: oio::ChunkedBytes, +} + +/// # Safety +/// +/// wasm32 is a special target that we only have one event-loop for this WriteTask. +unsafe impl Send for WriteTask {} +/// # Safety +/// +/// We will only take `&mut Self` reference for WriteTask. +unsafe impl Sync for WriteTask {} + /// MultipartUploadWriter will implements [`Write`] based on multipart /// uploads. pub struct MultipartUploadWriter { - state: State, + state: State, + w: Arc, - cache: Option, upload_id: Option>, parts: Vec, + processing_tasks: VecDeque, + pending_tasks: VecDeque, + futures: ConcurrentFutures, + part_number: usize, } -enum State { - Idle(Option), - Init(BoxedFuture<(W, Result)>), - Write(BoxedFuture<(W, Result)>), - Close(BoxedFuture<(W, Result<()>)>), - Abort(BoxedFuture<(W, Result<()>)>), +enum State { + Idle, + Init(BoxedFuture>), + Busy, + Close(BoxedFuture>), + Abort(BoxedFuture>), } /// # Safety /// /// wasm32 is a special target that we only have one event-loop for this state. -unsafe impl Send for State {} +unsafe impl Send for State {} /// # Safety /// /// We will only take `&mut Self` reference for State. -unsafe impl Sync for State {} +unsafe impl Sync for State {} impl MultipartUploadWriter { /// Create a new MultipartUploadWriter. - pub fn new(inner: W) -> Self { + pub fn new(inner: W, concurrent: usize) -> Self { Self { - state: State::Idle(Some(inner)), + state: State::Idle, - cache: None, + w: Arc::new(inner), upload_id: None, parts: Vec::new(), + processing_tasks: VecDeque::new(), + pending_tasks: VecDeque::new(), + futures: ConcurrentFutures::new(1.max(concurrent)), + part_number: 0, } } + + /// Increases part number and return the previous part number. + fn inc_part_number(&mut self) -> usize { + let part_number = self.part_number; + self.part_number += 1; + part_number + } + + fn process_write_task(&mut self, upload_id: Arc, task: WriteTask) { + let size = task.bs.len(); + let part_number = task.part_number; + let bs = task.bs.clone(); + let w = self.w.clone(); + + self.futures.push(UploadFuture(Box::pin(async move { + w.write_part( + &upload_id, + part_number, + size as u64, + AsyncBody::ChunkedBytes(bs), + ) + .await + }))); + self.processing_tasks.push_back(task); + } + + fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize { + let size = bs.remaining(); + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + let part_number = self.inc_part_number(); + self.pending_tasks.push_back(WriteTask { bs, part_number }); + size + } } impl oio::Write for MultipartUploadWriter @@ -150,60 +231,54 @@ where fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { loop { match &mut self.state { - State::Idle(w) => { + State::Idle => { match self.upload_id.as_ref() { Some(upload_id) => { let upload_id = upload_id.clone(); - let part_number = self.parts.len(); - - let bs = self.cache.clone().expect("cache must be valid").clone(); - let w = w.take().expect("writer must be valid"); - self.state = State::Write(Box::pin(async move { - let size = bs.len(); - let part = w - .write_part( - &upload_id, - part_number, - size as u64, - AsyncBody::ChunkedBytes(bs), - ) - .await; + if self.futures.has_remaining() { + let task = self + .pending_tasks + .pop_front() + .expect("pending task must exist"); + self.process_write_task(upload_id, task); - (w, part) - })); + let size = self.add_write_task(bs); + return Poll::Ready(Ok(size)); + } else { + self.state = State::Busy; + } } None => { // Fill cache with the first write. - if self.cache.is_none() { - let size = bs.remaining(); - let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); - self.cache = Some(cb); + if self.pending_tasks.is_empty() { + let size = self.add_write_task(bs); return Poll::Ready(Ok(size)); } - let w = w.take().expect("writer must be valid"); - self.state = State::Init(Box::pin(async move { - let upload_id = w.initiate_part().await; - (w, upload_id) - })); + let w = self.w.clone(); + self.state = + State::Init(Box::pin(async move { w.initiate_part().await })); } } } State::Init(fut) => { - let (w, upload_id) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(w)); + let upload_id = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle; self.upload_id = Some(Arc::new(upload_id?)); } - State::Write(fut) => { - let (w, part) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(w)); - self.parts.push(part?); - - // Replace the cache when last write succeeded - let size = bs.remaining(); - let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); - self.cache = Some(cb); - return Poll::Ready(Ok(size)); + State::Busy => { + if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + // Safety: must exist. + let task = self.processing_tasks.pop_front().unwrap(); + match part { + Ok(part) => self.parts.push(part), + Err(err) => { + self.pending_tasks.push_front(task); + return Poll::Ready(Err(err)); + } + } + } + self.state = State::Idle; } State::Close(_) => { unreachable!( @@ -222,72 +297,70 @@ where fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match &mut self.state { - State::Idle(w) => { - let w = w.take().expect("writer must be valid"); + State::Idle => { match self.upload_id.clone() { Some(upload_id) => { - let parts = self.parts.clone(); - match self.cache.clone() { - Some(bs) => { - let upload_id = upload_id.clone(); - self.state = State::Write(Box::pin(async move { - let size = bs.len(); - let part = w - .write_part( - &upload_id, - parts.len(), - size as u64, - AsyncBody::ChunkedBytes(bs), - ) - .await; - (w, part) - })); - } - None => { - self.state = State::Close(Box::pin(async move { - let res = w.complete_part(&upload_id, &parts).await; - (w, res) - })); + let w = self.w.clone(); + if self.futures.is_empty() && self.pending_tasks.is_empty() { + let upload_id = upload_id.clone(); + let parts = self.parts.clone(); + self.state = State::Close(Box::pin(async move { + w.complete_part(&upload_id, &parts).await + })); + } else { + let rem = min(self.futures.remaining(), self.pending_tasks.len()); + for _ in 0..rem { + if let Some(task) = self.pending_tasks.pop_front() { + let upload_id = upload_id.clone(); + self.process_write_task(upload_id, task); + } } + self.state = State::Busy; } } - None => match self.cache.clone() { - Some(bs) => { + None => match self.pending_tasks.pop_front() { + Some(task) => { + let w = self.w.clone(); + let bs = task.bs.clone(); self.state = State::Close(Box::pin(async move { let size = bs.len(); - let res = w - .write_once(size as u64, AsyncBody::ChunkedBytes(bs)) - .await; - (w, res) + w.write_once(size as u64, AsyncBody::ChunkedBytes(bs)).await })); } None => { + let w = self.w.clone(); // Call write_once if there is no data in cache and no upload_id. self.state = State::Close(Box::pin(async move { - let res = w.write_once(0, AsyncBody::Empty).await; - (w, res) + w.write_once(0, AsyncBody::Empty).await })); } }, } } State::Close(fut) => { - let (w, res) = futures::ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(w)); + let res = futures::ready!(fut.as_mut().poll(cx)); + self.state = State::Idle; // We should check res first before clean up cache. res?; - self.cache = None; return Poll::Ready(Ok(())); } State::Init(_) => unreachable!( "MultipartUploadWriter must not go into State::Init during poll_close" ), - State::Write(fut) => { - let (w, part) = ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(w)); - self.parts.push(part?); - self.cache = None; + State::Busy => { + while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + // Safety: must exist. + let task = self.processing_tasks.pop_front().unwrap(); + match part { + Ok(part) => self.parts.push(part), + Err(err) => { + self.pending_tasks.push_front(task); + return Poll::Ready(Err(err)); + } + } + } + self.state = State::Idle; } State::Abort(_) => unreachable!( "MultipartUploadWriter must not go into State::Abort during poll_close" @@ -299,31 +372,30 @@ where fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match &mut self.state { - State::Idle(w) => { - let w = w.take().expect("writer must be valid"); + State::Idle => { + let w = self.w.clone(); match self.upload_id.clone() { Some(upload_id) => { - self.state = State::Abort(Box::pin(async move { - let res = w.abort_part(&upload_id).await; - (w, res) - })); + self.state = + State::Abort(Box::pin( + async move { w.abort_part(&upload_id).await }, + )); } None => { - self.cache = None; return Poll::Ready(Ok(())); } } } State::Abort(fut) => { - let (w, res) = futures::ready!(fut.as_mut().poll(cx)); - self.state = State::Idle(Some(w)); + let res = futures::ready!(fut.as_mut().poll(cx)); + self.state = State::Idle; return Poll::Ready(res); } State::Init(_) => unreachable!( "MultipartUploadWriter must not go into State::Init during poll_abort" ), - State::Write(_) => unreachable!( - "MultipartUploadWriter must not go into State::Write during poll_abort" + State::Busy => unreachable!( + "MultipartUploadWriter must not go into State::Busy during poll_abort" ), State::Close(_) => unreachable!( "MultipartUploadWriter must not go into State::Close during poll_abort" diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs index 2c5128bad659..a6f29055fddb 100644 --- a/core/src/services/b2/backend.rs +++ b/core/src/services/b2/backend.rs @@ -376,9 +376,10 @@ impl Accessor for B2Backend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let concurrent = args.concurrent(); let writer = B2Writer::new(self.core.clone(), path, args); - let w = oio::MultipartUploadWriter::new(writer); + let w = oio::MultipartUploadWriter::new(writer, concurrent); Ok((RpWrite::default(), w)) } diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 5c6d45885860..e0e58701fd68 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -340,7 +340,7 @@ impl Accessor for CosBackend { let w = if args.append() { CosWriters::Two(oio::AppendObjectWriter::new(writer)) } else { - CosWriters::One(oio::MultipartUploadWriter::new(writer)) + CosWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index d00fbe423a47..ad3b9c98fc89 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -350,7 +350,7 @@ impl Accessor for ObsBackend { let w = if args.append() { ObsWriters::Two(oio::AppendObjectWriter::new(writer)) } else { - ObsWriters::One(oio::MultipartUploadWriter::new(writer)) + ObsWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 6e109e132175..79624a7dded8 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -492,7 +492,7 @@ impl Accessor for OssBackend { let w = if args.append() { OssWriters::Two(oio::AppendObjectWriter::new(writer)) } else { - OssWriters::One(oio::MultipartUploadWriter::new(writer)) + OssWriters::One(oio::MultipartUploadWriter::new(writer, args.concurrent())) }; Ok((RpWrite::default(), w)) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 94acf49c37de..2094fde0d93c 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -1085,9 +1085,10 @@ impl Accessor for S3Backend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let concurrent = args.concurrent(); let writer = S3Writer::new(self.core.clone(), path, args); - let w = oio::MultipartUploadWriter::new(writer); + let w = oio::MultipartUploadWriter::new(writer, concurrent); Ok((RpWrite::default(), w)) } diff --git a/core/src/services/upyun/backend.rs b/core/src/services/upyun/backend.rs index 091977f6f59a..a13764b945cc 100644 --- a/core/src/services/upyun/backend.rs +++ b/core/src/services/upyun/backend.rs @@ -316,9 +316,10 @@ impl Accessor for UpyunBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let concurrent = args.concurrent(); let writer = UpyunWriter::new(self.core.clone(), args, path.to_string()); - let w = oio::MultipartUploadWriter::new(writer); + let w = oio::MultipartUploadWriter::new(writer, concurrent); Ok((RpWrite::default(), w)) } From 59106c77ca5ac3cf073a4beeb1a4f5faa466f445 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 5 Jan 2024 10:18:11 +0000 Subject: [PATCH 3/6] chore: apply suggestions from CR --- .../raw/oio/write/multipart_upload_write.rs | 77 +++++++------------ 1 file changed, 26 insertions(+), 51 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 86dd98cd1576..9ef746d6c29d 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp::min; -use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::ready; @@ -151,10 +149,9 @@ pub struct MultipartUploadWriter { upload_id: Option>, parts: Vec, - processing_tasks: VecDeque, - pending_tasks: VecDeque, + pending: Option, futures: ConcurrentFutures, - part_number: usize, + next_part_number: usize, } enum State { @@ -183,24 +180,31 @@ impl MultipartUploadWriter { w: Arc::new(inner), upload_id: None, parts: Vec::new(), - processing_tasks: VecDeque::new(), - pending_tasks: VecDeque::new(), + pending: None, futures: ConcurrentFutures::new(1.max(concurrent)), - part_number: 0, + next_part_number: 0, } } /// Increases part number and return the previous part number. fn inc_part_number(&mut self) -> usize { - let part_number = self.part_number; - self.part_number += 1; + let part_number = self.next_part_number; + self.next_part_number += 1; part_number } + fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize { + let size = bs.remaining(); + let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); + let part_number = self.inc_part_number(); + self.pending = Some(WriteTask { bs, part_number }); + size + } + fn process_write_task(&mut self, upload_id: Arc, task: WriteTask) { let size = task.bs.len(); let part_number = task.part_number; - let bs = task.bs.clone(); + let bs = task.bs; let w = self.w.clone(); self.futures.push(UploadFuture(Box::pin(async move { @@ -212,15 +216,6 @@ impl MultipartUploadWriter { ) .await }))); - self.processing_tasks.push_back(task); - } - - fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize { - let size = bs.remaining(); - let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); - let part_number = self.inc_part_number(); - self.pending_tasks.push_back(WriteTask { bs, part_number }); - size } } @@ -236,12 +231,8 @@ where Some(upload_id) => { let upload_id = upload_id.clone(); if self.futures.has_remaining() { - let task = self - .pending_tasks - .pop_front() - .expect("pending task must exist"); + let task = self.pending.take().expect("pending write must exist"); self.process_write_task(upload_id, task); - let size = self.add_write_task(bs); return Poll::Ready(Ok(size)); } else { @@ -250,7 +241,7 @@ where } None => { // Fill cache with the first write. - if self.pending_tasks.is_empty() { + if self.pending.is_none() { let size = self.add_write_task(bs); return Poll::Ready(Ok(size)); } @@ -268,15 +259,7 @@ where } State::Busy => { if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { - // Safety: must exist. - let task = self.processing_tasks.pop_front().unwrap(); - match part { - Ok(part) => self.parts.push(part), - Err(err) => { - self.pending_tasks.push_front(task); - return Poll::Ready(Err(err)); - } - } + self.parts.push(part?); } self.state = State::Idle; } @@ -301,16 +284,15 @@ where match self.upload_id.clone() { Some(upload_id) => { let w = self.w.clone(); - if self.futures.is_empty() && self.pending_tasks.is_empty() { + if self.futures.is_empty() && self.pending.is_none() { let upload_id = upload_id.clone(); let parts = self.parts.clone(); self.state = State::Close(Box::pin(async move { w.complete_part(&upload_id, &parts).await })); } else { - let rem = min(self.futures.remaining(), self.pending_tasks.len()); - for _ in 0..rem { - if let Some(task) = self.pending_tasks.pop_front() { + if self.futures.has_remaining() { + if let Some(task) = self.pending.take() { let upload_id = upload_id.clone(); self.process_write_task(upload_id, task); } @@ -318,7 +300,7 @@ where self.state = State::Busy; } } - None => match self.pending_tasks.pop_front() { + None => match &self.pending { Some(task) => { let w = self.w.clone(); let bs = task.bs.clone(); @@ -329,7 +311,7 @@ where } None => { let w = self.w.clone(); - // Call write_once if there is no data in cache and no upload_id. + // Call write_once if there is no data in `pending` and no upload_id. self.state = State::Close(Box::pin(async move { w.write_once(0, AsyncBody::Empty).await })); @@ -340,8 +322,9 @@ where State::Close(fut) => { let res = futures::ready!(fut.as_mut().poll(cx)); self.state = State::Idle; - // We should check res first before clean up cache. + // We should check res first before clean up `pending`. res?; + self.pending = None; return Poll::Ready(Ok(())); } @@ -350,15 +333,7 @@ where ), State::Busy => { while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { - // Safety: must exist. - let task = self.processing_tasks.pop_front().unwrap(); - match part { - Ok(part) => self.parts.push(part), - Err(err) => { - self.pending_tasks.push_front(task); - return Poll::Ready(Err(err)); - } - } + self.parts.push(part?); } self.state = State::Idle; } From aacd003898fedcb1f93d1755d7b42b1b902f50b0 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 5 Jan 2024 11:05:25 +0000 Subject: [PATCH 4/6] chore: apply suggestions from CR --- .../raw/oio/write/multipart_upload_write.rs | 135 +++++++----------- 1 file changed, 53 insertions(+), 82 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 9ef746d6c29d..b3d157b7baa9 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -107,40 +107,25 @@ pub struct MultipartUploadPart { pub etag: String, } -struct UploadFuture(BoxedFuture>); +struct WritePartFuture(BoxedFuture>); /// # Safety /// -/// wasm32 is a special target that we only have one event-loop for this UploadFuture. -unsafe impl Send for UploadFuture {} +/// wasm32 is a special target that we only have one event-loop for this WritePartFuture. +unsafe impl Send for WritePartFuture {} /// # Safety /// -/// We will only take `&mut Self` reference for UploadFuture. -unsafe impl Sync for UploadFuture {} +/// We will only take `&mut Self` reference for WritePartFuture. +unsafe impl Sync for WritePartFuture {} -impl Future for UploadFuture { +impl Future for WritePartFuture { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.get_mut().0.poll_unpin(cx) } } -#[derive(Clone)] -struct WriteTask { - part_number: usize, - bs: oio::ChunkedBytes, -} - -/// # Safety -/// -/// wasm32 is a special target that we only have one event-loop for this WriteTask. -unsafe impl Send for WriteTask {} -/// # Safety -/// -/// We will only take `&mut Self` reference for WriteTask. -unsafe impl Sync for WriteTask {} - /// MultipartUploadWriter will implements [`Write`] based on multipart /// uploads. pub struct MultipartUploadWriter { @@ -149,15 +134,14 @@ pub struct MultipartUploadWriter { upload_id: Option>, parts: Vec, - pending: Option, - futures: ConcurrentFutures, + cache: Option, + futures: ConcurrentFutures, next_part_number: usize, } enum State { Idle, Init(BoxedFuture>), - Busy, Close(BoxedFuture>), Abort(BoxedFuture>), } @@ -180,43 +164,19 @@ impl MultipartUploadWriter { w: Arc::new(inner), upload_id: None, parts: Vec::new(), - pending: None, + cache: None, futures: ConcurrentFutures::new(1.max(concurrent)), next_part_number: 0, } } - /// Increases part number and return the previous part number. - fn inc_part_number(&mut self) -> usize { - let part_number = self.next_part_number; - self.next_part_number += 1; - part_number - } - - fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize { + fn fill_cache(&mut self, bs: &dyn oio::WriteBuf) -> usize { let size = bs.remaining(); let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); - let part_number = self.inc_part_number(); - self.pending = Some(WriteTask { bs, part_number }); + assert!(self.cache.is_none()); + self.cache = Some(bs); size } - - fn process_write_task(&mut self, upload_id: Arc, task: WriteTask) { - let size = task.bs.len(); - let part_number = task.part_number; - let bs = task.bs; - let w = self.w.clone(); - - self.futures.push(UploadFuture(Box::pin(async move { - w.write_part( - &upload_id, - part_number, - size as u64, - AsyncBody::ChunkedBytes(bs), - ) - .await - }))); - } } impl oio::Write for MultipartUploadWriter @@ -231,18 +191,30 @@ where Some(upload_id) => { let upload_id = upload_id.clone(); if self.futures.has_remaining() { - let task = self.pending.take().expect("pending write must exist"); - self.process_write_task(upload_id, task); - let size = self.add_write_task(bs); + let cache = self.cache.take().expect("pending write must exist"); + let part_number = self.next_part_number; + self.next_part_number += 1; + let w = self.w.clone(); + let size = cache.len(); + self.futures.push(WritePartFuture(Box::pin(async move { + w.write_part( + &upload_id, + part_number, + size as u64, + AsyncBody::ChunkedBytes(cache), + ) + .await + }))); + let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); - } else { - self.state = State::Busy; + } else if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + self.parts.push(part?); } } None => { // Fill cache with the first write. - if self.pending.is_none() { - let size = self.add_write_task(bs); + if self.cache.is_none() { + let size = self.fill_cache(bs); return Poll::Ready(Ok(size)); } @@ -257,12 +229,6 @@ where self.state = State::Idle; self.upload_id = Some(Arc::new(upload_id?)); } - State::Busy => { - if let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { - self.parts.push(part?); - } - self.state = State::Idle; - } State::Close(_) => { unreachable!( "MultipartUploadWriter must not go into State::Close during poll_write" @@ -284,7 +250,7 @@ where match self.upload_id.clone() { Some(upload_id) => { let w = self.w.clone(); - if self.futures.is_empty() && self.pending.is_none() { + if self.futures.is_empty() && self.cache.is_none() { let upload_id = upload_id.clone(); let parts = self.parts.clone(); self.state = State::Close(Box::pin(async move { @@ -292,18 +258,32 @@ where })); } else { if self.futures.has_remaining() { - if let Some(task) = self.pending.take() { + if let Some(cache) = self.cache.take() { let upload_id = upload_id.clone(); - self.process_write_task(upload_id, task); + let part_number = self.next_part_number; + self.next_part_number += 1; + let size = cache.len(); + let w = self.w.clone(); + self.futures.push(WritePartFuture(Box::pin(async move { + w.write_part( + &upload_id, + part_number, + size as u64, + AsyncBody::ChunkedBytes(cache), + ) + .await + }))); } } - self.state = State::Busy; + while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { + self.parts.push(part?); + } } } - None => match &self.pending { - Some(task) => { + None => match &self.cache { + Some(cache) => { let w = self.w.clone(); - let bs = task.bs.clone(); + let bs = cache.clone(); self.state = State::Close(Box::pin(async move { let size = bs.len(); w.write_once(size as u64, AsyncBody::ChunkedBytes(bs)).await @@ -324,19 +304,13 @@ where self.state = State::Idle; // We should check res first before clean up `pending`. res?; - self.pending = None; + self.cache = None; return Poll::Ready(Ok(())); } State::Init(_) => unreachable!( "MultipartUploadWriter must not go into State::Init during poll_close" ), - State::Busy => { - while let Some(part) = ready!(self.futures.poll_next_unpin(cx)) { - self.parts.push(part?); - } - self.state = State::Idle; - } State::Abort(_) => unreachable!( "MultipartUploadWriter must not go into State::Abort during poll_close" ), @@ -369,9 +343,6 @@ where State::Init(_) => unreachable!( "MultipartUploadWriter must not go into State::Init during poll_abort" ), - State::Busy => unreachable!( - "MultipartUploadWriter must not go into State::Busy during poll_abort" - ), State::Close(_) => unreachable!( "MultipartUploadWriter must not go into State::Close during poll_abort" ), From 5d57d4a16a44fdd9ec1ed911b5727dce63ef5550 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 5 Jan 2024 11:09:50 +0000 Subject: [PATCH 5/6] chore: correct comments --- core/src/raw/oio/write/multipart_upload_write.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index b3d157b7baa9..06a747c7823b 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -291,7 +291,7 @@ where } None => { let w = self.w.clone(); - // Call write_once if there is no data in `pending` and no upload_id. + // Call write_once if there is no data in cache and no upload_id. self.state = State::Close(Box::pin(async move { w.write_once(0, AsyncBody::Empty).await })); @@ -302,7 +302,7 @@ where State::Close(fut) => { let res = futures::ready!(fut.as_mut().poll(cx)); self.state = State::Idle; - // We should check res first before clean up `pending`. + // We should check res first before clean up cache. res?; self.cache = None; From 394eb74da66b4e28cab5f08b4df769f99c90c29e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 5 Jan 2024 12:25:03 +0000 Subject: [PATCH 6/6] fix: clear future queue while aborting --- core/src/raw/futures_util.rs | 9 +++++++++ core/src/raw/oio/write/multipart_upload_write.rs | 1 + 2 files changed, 10 insertions(+) diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 08583fe6d8e4..d7a1168b6340 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -101,6 +101,15 @@ where } } + /// Drop all tasks. + pub fn clear(&mut self) { + match &mut self.tasks { + Tasks::Once(fut) => *fut = None, + Tasks::Small(tasks) => tasks.clear(), + Tasks::Large(tasks) => *tasks = FuturesOrdered::new(), + } + } + /// Return the length of current concurrent futures (both ongoing and ready). pub fn len(&self) -> usize { match &self.tasks { diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 06a747c7823b..66105b4d4a4f 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -325,6 +325,7 @@ where let w = self.w.clone(); match self.upload_id.clone() { Some(upload_id) => { + self.futures.clear(); self.state = State::Abort(Box::pin( async move { w.abort_part(&upload_id).await },