From 44091949d182e89bb13a44fb09cab426ec520f92 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 2 Jan 2024 15:11:55 +0000 Subject: [PATCH 1/3] feat: add concurrent writer rfc --- core/src/docs/rfcs/3898_concurrent_writer.md | 48 ++++++++++++++++++++ core/src/docs/rfcs/mod.rs | 3 ++ 2 files changed, 51 insertions(+) create mode 100644 core/src/docs/rfcs/3898_concurrent_writer.md diff --git a/core/src/docs/rfcs/3898_concurrent_writer.md b/core/src/docs/rfcs/3898_concurrent_writer.md new file mode 100644 index 000000000000..213e12a0ee8e --- /dev/null +++ b/core/src/docs/rfcs/3898_concurrent_writer.md @@ -0,0 +1,48 @@ +- Proposal Name: `concurrent_writer` +- Start Date: 2024-01-02 +- RFC PR: [apache/incubator-opendal#3898](https://github.com/apache/incubator-opendal/pull/3898) +- Tracking Issue: [apache/incubator-opendal#3899](https://github.com/apache/incubator-opendal/issues/3899) + +# Summary + +Add concurrent write in `MultipartUploadWriter`. + +# Motivation + +The [object_writer](./1420_object_writer.md) introduces the `ObjectWriter` multipart upload support. However, the multiple parts are currently uploaded serially without fully leveraging the potential for improved throughput through concurrent uploads. We should support the upload of multiple parts concurrently. + +# Guide-level explanation + +For users who want to concurrent writer, they will call the new API `concurrent`. And the default behavior remains unchanged, so users using `op.writer_with()` are not affected. The `concurrent` function will take a number as input, and this number will represent the maximum concurrent write task amount the writer can perform. + +```rust +op.writer_with(path).concurrent(8).await +``` + +# Reference-level explanation + +This feature will be implemented in the `MultipartUploadWriter`, which will utilize a `ConcurrentFutures` as a task queue to store concurrent write tasks. + +A `concurrent` field of type `usize` will be introduced to `OpWrite`. If `concurrent` is set to 0 or 1, it functions with default behavior. However, if concurrent is set to number larger than 1, it denotes the maximum concurrent write task amount that the `MultipartUploadWriter` can utilize. + +When the upper layer invokes `poll_write`, the `MultipartUploadWriter` pushes `concurrent` upload parts to the task queue (`ConcurrentFutures`) if there are available slots. If the task queue is full, the `MultipartUploadWriter` waits for the first task to yield results. + +# Drawbacks + +None + +# Rationale and alternatives + +None + +# Prior art + +None + +# Unresolved questions + +None + +# Future possibilities + +None diff --git a/core/src/docs/rfcs/mod.rs b/core/src/docs/rfcs/mod.rs index 4272b128b850..57bc6824d45d 100644 --- a/core/src/docs/rfcs/mod.rs +++ b/core/src/docs/rfcs/mod.rs @@ -219,3 +219,6 @@ pub mod rfc_3574_concurrent_stat_in_list {} #[doc = include_str!("3734_buffered_reader.md")] pub mod rfc_3734_buffered_reader {} + +#[doc = include_str!("3898_concurrent_writer.md")] +pub mod rfc_3898_concurrent_writer {} From 0be516f2a6e170a938a872d05914e17aa4c7c32d Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 3 Jan 2024 16:05:21 +0000 Subject: [PATCH 2/3] chore: apply suggestions from CR --- core/src/docs/rfcs/3898_concurrent_writer.md | 32 +++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/core/src/docs/rfcs/3898_concurrent_writer.md b/core/src/docs/rfcs/3898_concurrent_writer.md index 213e12a0ee8e..c15f1d656a3b 100644 --- a/core/src/docs/rfcs/3898_concurrent_writer.md +++ b/core/src/docs/rfcs/3898_concurrent_writer.md @@ -5,31 +5,48 @@ # Summary -Add concurrent write in `MultipartUploadWriter`. +Enhance the `Writer` by adding concurrent write capabilities. # Motivation -The [object_writer](./1420_object_writer.md) introduces the `ObjectWriter` multipart upload support. However, the multiple parts are currently uploaded serially without fully leveraging the potential for improved throughput through concurrent uploads. We should support the upload of multiple parts concurrently. +Currently, When invoking `writer` or `writer_with` on an `Operator` that utilizes Object storage as its backend, such as S3, the `Operator` will return a `Writer` that supports multipart uploading. + +```rust + // The `op` is a S3 backend `Operator`. + let mut writer = op.writer("path/to").await?; + writer.write(part0).await?; + writer.write(part1).await?; // It starts to upload after the `part0` is finished. +``` +However, the multiple parts are currently uploaded serially without fully leveraging the potential for improved throughput through concurrent uploads. We should support the upload of multiple parts concurrently. + # Guide-level explanation For users who want to concurrent writer, they will call the new API `concurrent`. And the default behavior remains unchanged, so users using `op.writer_with()` are not affected. The `concurrent` function will take a number as input, and this number will represent the maximum concurrent write task amount the writer can perform. +- If `concurrent` is set to 0 or 1, it functions with default behavior(Uploads parts serially). +- However, if `concurrent` is set to number larger than 1. It enables concurrent uploading of up to `concurrent` write tasks and allows users to initiate additional write tasks without waiting to complete the previous part uploading, as long as the inner task queue still has available slots. + +It won't interact with other existing components, except the `buffer` inside a `Writer`. If the multipart upload isn't initialized, `Writer` puts the bytes into `buffer` first, then retrieves it back when uploading the part. + ```rust op.writer_with(path).concurrent(8).await ``` # Reference-level explanation -This feature will be implemented in the `MultipartUploadWriter`, which will utilize a `ConcurrentFutures` as a task queue to store concurrent write tasks. +The concurrent write capability is only supported for services that have implemented the `MultipartUploadWriter` or `RangeWriter`. Otherwise, setting the `concurrent` parameter will have no effect (Same as default behavior). + +This feature will be implemented in the `MultipartUploadWriter` and `RangeWriter`, which will utilize a `ConcurrentFutures` as a task queue to store concurrent write tasks. A `concurrent` field of type `usize` will be introduced to `OpWrite` to allow the user setting the maximum concurrent write task amount. -A `concurrent` field of type `usize` will be introduced to `OpWrite`. If `concurrent` is set to 0 or 1, it functions with default behavior. However, if concurrent is set to number larger than 1, it denotes the maximum concurrent write task amount that the `MultipartUploadWriter` can utilize. +When the upper layer invokes `poll_write`, the `Writer` pushes write to the task queue (`ConcurrentFutures`) if there are available slots, and then relinquishes control back to the upper layer. This allows for up to `concurrent` write tasks to uploaded concurrently without waiting. If the task queue is full, the `Writer` waits for the first task to yield results. -When the upper layer invokes `poll_write`, the `MultipartUploadWriter` pushes `concurrent` upload parts to the task queue (`ConcurrentFutures`) if there are available slots. If the task queue is full, the `MultipartUploadWriter` waits for the first task to yield results. +In the future, we can introduce the `write_at` for `fs` and use `ConcurrentFutureUnordered` instead of `ConcurrentFutures.`. # Drawbacks -None +- More memory usage +- More concurrent connections # Rationale and alternatives @@ -45,4 +62,5 @@ None # Future possibilities -None +- Adding `write_at` for `fs`. +- Add `ConcurrentFutureUnordered` From 98c1db7305e399e3422276126c8f0d68c91681a2 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 4 Jan 2024 04:40:14 +0000 Subject: [PATCH 3/3] chore: apply suggestions from CR --- core/src/docs/rfcs/3898_concurrent_writer.md | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/docs/rfcs/3898_concurrent_writer.md b/core/src/docs/rfcs/3898_concurrent_writer.md index c15f1d656a3b..d95106efcfcb 100644 --- a/core/src/docs/rfcs/3898_concurrent_writer.md +++ b/core/src/docs/rfcs/3898_concurrent_writer.md @@ -9,40 +9,40 @@ Enhance the `Writer` by adding concurrent write capabilities. # Motivation -Currently, When invoking `writer` or `writer_with` on an `Operator` that utilizes Object storage as its backend, such as S3, the `Operator` will return a `Writer` that supports multipart uploading. - +Certain services, such as S3, GCS, and AzBlob, offer the `multi_write` functionality, allowing users to perform multiple write operations for uploading of large files. If a service support `multi_write`, the [Capability::write_can_multi](https://opendal.apache.org/docs/rust/opendal/struct.Capability.html#structfield.write_can_multi) metadata should be set to `true`. ```rust - // The `op` is a S3 backend `Operator`. - let mut writer = op.writer("path/to").await?; + let mut writer = op.writer("path/to").await?; // a writers supports the `multi_write`. writer.write(part0).await?; writer.write(part1).await?; // It starts to upload after the `part0` is finished. + writer.close().await?; ``` -However, the multiple parts are currently uploaded serially without fully leveraging the potential for improved throughput through concurrent uploads. We should support the upload of multiple parts concurrently. +Currently, when invoking a `Writer` that supports the `multi_write` functionality, multiple writes are proceed serially, without fully leveraging the potential for improved throughput through concurrent uploads. We should enhance support to allow concurrent processing of multiple write operations. # Guide-level explanation For users who want to concurrent writer, they will call the new API `concurrent`. And the default behavior remains unchanged, so users using `op.writer_with()` are not affected. The `concurrent` function will take a number as input, and this number will represent the maximum concurrent write task amount the writer can perform. -- If `concurrent` is set to 0 or 1, it functions with default behavior(Uploads parts serially). -- However, if `concurrent` is set to number larger than 1. It enables concurrent uploading of up to `concurrent` write tasks and allows users to initiate additional write tasks without waiting to complete the previous part uploading, as long as the inner task queue still has available slots. +- If `concurrent` is set to 0 or 1, it functions with default behavior(writes serially). +- However, if `concurrent` is set to number larger than 1. It enables concurrent uploading of up to `concurrent` write tasks and allows users to initiate additional write tasks without waiting to complete the previous write operation, as long as the inner task queue still has available slots. -It won't interact with other existing components, except the `buffer` inside a `Writer`. If the multipart upload isn't initialized, `Writer` puts the bytes into `buffer` first, then retrieves it back when uploading the part. +The concurrent write feature operate independently of other features. ```rust -op.writer_with(path).concurrent(8).await +let mut w = op.writer_with(path).concurrent(8).await; +w.write(part0).await?; +w.write(part1).await?; // `write` won't wait for part0. +w.close().await?; // `close` will make sure all parts are finished. ``` # Reference-level explanation -The concurrent write capability is only supported for services that have implemented the `MultipartUploadWriter` or `RangeWriter`. Otherwise, setting the `concurrent` parameter will have no effect (Same as default behavior). +The S3 and similar services use `MultipartUploadWriter`, while GCS uses `RangeWriter`. We can enhance these services by adding concurrent write features to them. A `concurrent` field of type `usize` will be introduced to `OpWrite` to allow the user to set the maximum concurrent write task amount. For other services that don't support `multi_write`, setting the concurrent parameter will have no effect, maintaining the default behavior. -This feature will be implemented in the `MultipartUploadWriter` and `RangeWriter`, which will utilize a `ConcurrentFutures` as a task queue to store concurrent write tasks. A `concurrent` field of type `usize` will be introduced to `OpWrite` to allow the user setting the maximum concurrent write task amount. +This feature will be implemented in the `MultipartUploadWriter` and `RangeWriter`, which will utilize a `ConcurrentFutures` as a task queue to store concurrent write tasks. When the upper layer invokes `poll_write`, the `Writer` pushes write to the task queue (`ConcurrentFutures`) if there are available slots, and then relinquishes control back to the upper layer. This allows for up to `concurrent` write tasks to uploaded concurrently without waiting. If the task queue is full, the `Writer` waits for the first task to yield results. -In the future, we can introduce the `write_at` for `fs` and use `ConcurrentFutureUnordered` instead of `ConcurrentFutures.`. - # Drawbacks - More memory usage @@ -63,4 +63,4 @@ None # Future possibilities - Adding `write_at` for `fs`. -- Add `ConcurrentFutureUnordered` +- Use `ConcurrentFutureUnordered` instead of `ConcurrentFutures.`