Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ use futures::stream::FuturesOrdered;
use futures::FutureExt;
use futures::StreamExt;

/// MaybeSendFuture is a wrapper trait allow us to use `Send` or not depends on the target.
#[cfg(not(target_arch = "wasm32"))]
pub trait MaybeSendFuture<T>: Future<Output = T> + Send {}

#[cfg(not(target_arch = "wasm32"))]
impl<T, F> MaybeSendFuture<T> for F where F: Future<Output = T> + Send {}

/// MaybeSendFuture is a wrapper trait allow us to use `Send` or not depends on the target.
#[cfg(target_arch = "wasm32")]
pub trait MaybeSendFuture<T>: Future<Output = T> {}

#[cfg(target_arch = "wasm32")]
impl<T, F> MaybeSendFuture<T> for F where F: Future<Output = T> {}

/// BoxedFuture is the type alias of [`futures::future::BoxFuture`].
///
/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target.
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use std_io_util::*;
mod futures_util;
pub use futures_util::BoxedFuture;
pub use futures_util::ConcurrentFutures;
pub use futures_util::MaybeSendFuture;

mod enum_utils;
pub use enum_utils::*;
Expand Down
19 changes: 10 additions & 9 deletions core/src/raw/oio/write/multipart_upload_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use futures::Future;
use futures::FutureExt;
use futures::StreamExt;
Expand Down Expand Up @@ -51,15 +50,13 @@ use crate::*;
/// ```
///
/// We will use `write_once` instead of starting a new multipart upload.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static {
/// write_once is used to write the data to underlying storage at once.
///
/// MultipartUploadWriter will call this API when:
///
/// - All the data has been written to the buffer and we can perform the upload at once.
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
fn write_once(&self, size: u64, body: AsyncBody) -> impl MaybeSendFuture<Result<()>>;

/// initiate_part will call start a multipart upload and return the upload id.
///
Expand All @@ -68,7 +65,7 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static {
/// - the total size of data is unknown.
/// - the total size of data is known, but the size of current write
/// is less then the total size.
async fn initiate_part(&self) -> Result<String>;
fn initiate_part(&self) -> impl MaybeSendFuture<Result<String>>;

/// write_part will write a part of the data and returns the result
/// [`MultipartUploadPart`].
Expand All @@ -77,20 +74,24 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static {
/// order.
///
/// - part_number is the index of the part, starting from 0.
async fn write_part(
fn write_part(
&self,
upload_id: &str,
part_number: usize,
size: u64,
body: AsyncBody,
) -> Result<MultipartUploadPart>;
) -> impl MaybeSendFuture<Result<MultipartUploadPart>>;

/// complete_part will complete the multipart upload to build the final
/// file.
async fn complete_part(&self, upload_id: &str, parts: &[MultipartUploadPart]) -> Result<()>;
fn complete_part(
&self,
upload_id: &str,
parts: &[MultipartUploadPart],
) -> impl MaybeSendFuture<Result<()>>;

/// abort_part will cancel the multipart upload and purge all data.
async fn abort_part(&self, upload_id: &str) -> Result<()>;
fn abort_part(&self, upload_id: &str) -> impl MaybeSendFuture<Result<()>>;
}

/// The result of [`MultipartUploadWrite::write_part`].
Expand Down
2 changes: 0 additions & 2 deletions core/src/services/b2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;

use super::core::B2Core;
Expand Down Expand Up @@ -46,7 +45,6 @@ impl B2Writer {
}
}

#[async_trait]
impl oio::MultipartUploadWrite for B2Writer {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let resp = self
Expand Down
1 change: 0 additions & 1 deletion core/src/services/cos/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl CosWriter {
}
}

#[async_trait]
impl oio::MultipartUploadWrite for CosWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self
Expand Down
1 change: 0 additions & 1 deletion core/src/services/obs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl ObsWriter {
}
}

#[async_trait]
impl oio::MultipartUploadWrite for ObsWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self
Expand Down
1 change: 0 additions & 1 deletion core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl OssWriter {
}
}

#[async_trait]
impl oio::MultipartUploadWrite for OssWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req =
Expand Down
3 changes: 0 additions & 3 deletions core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;

use super::core::*;
Expand All @@ -44,8 +43,6 @@ impl S3Writer {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl oio::MultipartUploadWrite for S3Writer {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self
Expand Down
2 changes: 0 additions & 2 deletions core/src/services/upyun/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;

use super::core::constants::X_UPYUN_MULTI_UUID;
Expand All @@ -40,7 +39,6 @@ impl UpyunWriter {
}
}

#[async_trait]
impl oio::MultipartUploadWrite for UpyunWriter {
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let req = self
Expand Down