Skip to content

Commit

Permalink
aws-sdk 0.35
Browse files Browse the repository at this point in the history
  • Loading branch information
Hakuyume committed Nov 8, 2023
1 parent 30636d2 commit a28d377
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 33 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ exclude = [".github"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aws-sdk-s3 = "0.29"
aws-smithy-http = "0.56"
aws-sdk-s3 = "0.35"
aws-smithy-types = "0.57"
base64 = "0.13"
bytes = "1"
futures = "0.3"
Expand All @@ -25,7 +25,7 @@ pin-project = "1"

[dev-dependencies]
anyhow = "1"
aws-config = "0.56"
aws-config = "0.57"
rand = "0.8"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
uuid = { version = "1", features = ["v4"] }
6 changes: 3 additions & 3 deletions src/into_byte_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use aws_sdk_s3::primitives::ByteStream;
use aws_smithy_http::body::{Error, SdkBody};
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
use aws_smithy_types::body::Error;
use bytes::Bytes;
use http::header::HeaderMap;
use http_body::combinators::BoxBody;
Expand Down Expand Up @@ -35,7 +35,7 @@ pub fn into_byte_stream(body: Vec<Bytes>) -> ByteStream {

let body = Arc::<[_]>::from(Box::from(body));
ByteStream::new(SdkBody::retryable(move || {
SdkBody::from_dyn(BoxBody::new(B(body.clone(), 0)))
SdkBody::from_body_0_4(BoxBody::new(B(body.clone(), 0)))
}))
}

Expand Down
57 changes: 30 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use aws_sdk_s3::operation::complete_multipart_upload::{
};
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
use aws_sdk_s3::operation::upload_part::UploadPartError;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::primitives::{ByteStream, SdkBody};
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::Client;
use aws_smithy_http::body::SdkBody;
use futures::{TryFutureExt, TryStreamExt};
use std::num::NonZeroUsize;
use std::ops::RangeInclusive;
use std::pin::Pin;

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
pub const PART_SIZE: RangeInclusive<usize> = 5 << 20..=5 << 30;
Expand Down Expand Up @@ -60,12 +60,12 @@ impl MultipartUpload {
}

pub async fn send<E>(
self,
mut self,
part_size: RangeInclusive<usize>,
concurrency_limit: Option<NonZeroUsize>,
) -> Result<MultipartUploadOutput, (E, Option<AbortMultipartUploadFluentBuilder>)>
where
E: From<aws_smithy_http::byte_stream::error::Error>
E: From<aws_smithy_types::byte_stream::error::Error>
+ From<SdkError<CreateMultipartUploadError, http::Response<SdkBody>>>
+ From<SdkError<UploadPartError, http::Response<SdkBody>>>
+ From<SdkError<CompleteMultipartUploadError, http::Response<SdkBody>>>,
Expand All @@ -88,29 +88,32 @@ impl MultipartUpload {
.set_upload_id(upload_id.clone())
};

let parts = split::split(self.body, part_size)
.map_ok(|part| {
self.client
.upload_part()
.body(into_byte_stream::into_byte_stream(part.body))
.set_bucket(self.bucket.clone())
.content_length(part.content_length as _)
.content_md5(base64::encode(part.content_md5))
.set_key(self.key.clone())
.part_number(part.part_number as _)
.set_upload_id(upload_id.clone())
.send()
.map_ok({
move |output| {
CompletedPart::builder()
.set_e_tag(output.e_tag)
.part_number(part.part_number as _)
.build()
}
})
.err_into()
})
.err_into();
let parts = split::split(
futures::stream::poll_fn(move |cx| Pin::new(&mut self.body).poll_next(cx)),
part_size,
)
.map_ok(|part| {
self.client
.upload_part()
.body(into_byte_stream::into_byte_stream(part.body))
.set_bucket(self.bucket.clone())
.content_length(part.content_length as _)
.content_md5(base64::encode(part.content_md5))
.set_key(self.key.clone())
.part_number(part.part_number as _)
.set_upload_id(upload_id.clone())
.send()
.map_ok({
move |output| {
CompletedPart::builder()
.set_e_tag(output.e_tag)
.part_number(part.part_number as _)
.build()
}
})
.err_into()
})
.err_into();

let mut completed_parts = parts
.try_buffer_unordered(concurrency_limit.map_or(usize::MAX, NonZeroUsize::get))
Expand Down

0 comments on commit a28d377

Please sign in to comment.