From 1061bf19b9dbf2cc5fac33710389baf67c775d8f Mon Sep 17 00:00:00 2001 From: ugo Date: Thu, 21 Apr 2022 16:54:52 +0800 Subject: [PATCH 1/4] Added ByteStream method to read from file chunk --- sdk/aws-smithy-http/src/byte_stream.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdk/aws-smithy-http/src/byte_stream.rs b/sdk/aws-smithy-http/src/byte_stream.rs index d92937ba1f84..587b45f3815a 100644 --- a/sdk/aws-smithy-http/src/byte_stream.rs +++ b/sdk/aws-smithy-http/src/byte_stream.rs @@ -293,6 +293,18 @@ impl ByteStream { )); Ok(ByteStream::new(body)) } + + /// Create a ByteStream from a file specifying offset and length + #[cfg(feature = "rt-tokio")] + pub async fn from_path_chunk(path: &std::path::Path, offset: u64, sz: u64) -> Result { + let mut file = tokio::fs::File::open(path).await.map_err(|err| Error(err.into()))?; + use tokio::io::AsyncSeekExt; + let _s = file.seek(std::io::SeekFrom::Start(offset)).await.map_err(|err| Error(err.into()))?; + let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( + bytestream_util::PathBody::from_file(file, sz), + )); + Ok(ByteStream::new(body)) + } } impl Default for ByteStream { From 4d994396b45ee1a2548cb16c6b72ccc8742bf527 Mon Sep 17 00:00:00 2001 From: ugo Date: Thu, 21 Apr 2022 18:11:56 +0800 Subject: [PATCH 2/4] Changed signature of ByteStream::from_path_chunk() to match from path --- sdk/aws-smithy-http/src/byte_stream.rs | 37 ++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/sdk/aws-smithy-http/src/byte_stream.rs b/sdk/aws-smithy-http/src/byte_stream.rs index 587b45f3815a..4db110239ebd 100644 --- a/sdk/aws-smithy-http/src/byte_stream.rs +++ b/sdk/aws-smithy-http/src/byte_stream.rs @@ -294,9 +294,42 @@ impl ByteStream { Ok(ByteStream::new(body)) } - /// Create a ByteStream from a file specifying offset and length + /// Create a ByteStream from a file path specifying offset and length. + /// + /// # Examples + ///```no_run + ///use std::time::Instant; + ///pub async fn upload_chunk( + /// client: &Client, + /// bucket: &str, + /// file_name: &str, + /// key: &str, + /// start_offset: u64, + /// chunk_size: u64 + ///) -> Result<(), Error> { + /// let body = ByteStream::from_path_chunk(Path::new(file_name), start_offset, chunk_size) + /// .await + /// .expect(&format!("Cannot read from {}", file_name)); + /// let start = Instant::now(); + /// client + /// .put_object() + /// .bucket(bucket) + /// .key(key) + /// .body(body) + /// .send() + /// .await?; + /// let elapsed = start.elapsed(); + /// println!( + /// "Uploaded chunk of size {} from file {} in {:.2} s", + /// chunk_size, + /// file_name, + /// elapsed.as_secs_f32() + /// ); + /// Ok(()) + ///} + ///``` #[cfg(feature = "rt-tokio")] - pub async fn from_path_chunk(path: &std::path::Path, offset: u64, sz: u64) -> Result { + pub async fn from_path_chunk(path: impl AsRef, offset: u64, sz: u64) -> Result { let mut file = tokio::fs::File::open(path).await.map_err(|err| Error(err.into()))?; use tokio::io::AsyncSeekExt; let _s = file.seek(std::io::SeekFrom::Start(offset)).await.map_err(|err| Error(err.into()))?; From 0a323f465c1c3b828f2bc529f0ad501d942f3ea1 Mon Sep 17 00:00:00 2001 From: ugo Date: Thu, 21 Apr 2022 21:07:55 +0800 Subject: [PATCH 3/4] Added ByteStrea::from_file_chunk() function to read a file chunk and added error handling to from_path_chunk() --- sdk/aws-smithy-http/src/byte_stream.rs | 39 +++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/sdk/aws-smithy-http/src/byte_stream.rs b/sdk/aws-smithy-http/src/byte_stream.rs index 4db110239ebd..a5c98568f01c 100644 --- a/sdk/aws-smithy-http/src/byte_stream.rs +++ b/sdk/aws-smithy-http/src/byte_stream.rs @@ -110,6 +110,7 @@ use std::task::{Context, Poll}; #[cfg(feature = "rt-tokio")] mod bytestream_util; +use tokio::io::AsyncSeekExt; //to enable tokio::fs::File::seek /// Stream of binary data /// @@ -294,6 +295,25 @@ impl ByteStream { Ok(ByteStream::new(body)) } + /// Create a ByteStream from a file chunk. In order to keep the file + /// argument immutable the current offset in the file cannot be checked + /// because a call to `stream_position()` requires the file handle to + /// be mutable. + /// + /// Applications must call `file.seek()` before passing the file + /// handle to this function. + /// + /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of + /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path) + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub async fn from_file_chunk(file: tokio::fs::File, chunk_size: u64) -> Result { + let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( + bytestream_util::PathBody::from_file(file, chunk_size), + )); + Ok(ByteStream::new(body)) + } + /// Create a ByteStream from a file path specifying offset and length. /// /// # Examples @@ -329,12 +349,23 @@ impl ByteStream { ///} ///``` #[cfg(feature = "rt-tokio")] - pub async fn from_path_chunk(path: impl AsRef, offset: u64, sz: u64) -> Result { + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub async fn from_path_chunk(path: impl AsRef, start_offset: u64, chunk_size: u64) -> Result { + let path = path.as_ref(); + let sz = tokio::fs::metadata(path) + .await + .map_err(|err| Error(err.into()))? + .len(); + if start_offset >= sz { + return Err(Error(Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, "Offset exceeds file size")))); + } + if chunk_size > (sz - start_offset) || chunk_size == 0 { + return Err(Error(Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, "Chunk size out of range")))); + } let mut file = tokio::fs::File::open(path).await.map_err(|err| Error(err.into()))?; - use tokio::io::AsyncSeekExt; - let _s = file.seek(std::io::SeekFrom::Start(offset)).await.map_err(|err| Error(err.into()))?; + let _s = file.seek(std::io::SeekFrom::Start(start_offset)).await.map_err(|err| Error(err.into()))?; let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( - bytestream_util::PathBody::from_file(file, sz), + bytestream_util::PathBody::from_file(file, chunk_size), )); Ok(ByteStream::new(body)) } From eceffe76e916cc2b81ef3a42cadf5acfbb50acdc Mon Sep 17 00:00:00 2001 From: ugo Date: Fri, 22 Apr 2022 11:10:02 +0800 Subject: [PATCH 4/4] Added sample code for multipart and partial file uploads --- examples/s3/Cargo.toml | 3 + examples/s3/src/bin/upload-file-multipart.rs | 100 +++++++++++++++++++ examples/s3/src/bin/upload-file.rs | 66 ++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 examples/s3/src/bin/upload-file-multipart.rs create mode 100644 examples/s3/src/bin/upload-file.rs diff --git a/examples/s3/Cargo.toml b/examples/s3/Cargo.toml index ec6cf9de1e3f..3c9f35b6c353 100644 --- a/examples/s3/Cargo.toml +++ b/examples/s3/Cargo.toml @@ -40,3 +40,6 @@ features = ["env-filter"] [dependencies.uuid] version = "0.8" features = ["serde", "v4"] + +[dependencies.http] +version = "0.2" diff --git a/examples/s3/src/bin/upload-file-multipart.rs b/examples/s3/src/bin/upload-file-multipart.rs new file mode 100644 index 000000000000..67c2c335cdee --- /dev/null +++ b/examples/s3/src/bin/upload-file-multipart.rs @@ -0,0 +1,100 @@ +use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::{Client, Endpoint, Error}; +use tokio::io::AsyncSeekExt; +use aws_sdk_s3::model::CompletedMultipartUpload; +use aws_sdk_s3::model::CompletedPart; + +/// Upload file reading files in chunks. +/// +/// ## Usage +/// ``` +/// upload-file-multipart +/// ``` +/// +#[tokio::main] +async fn main() -> Result<(), aws_sdk_s3::Error> { + const REGION: &str = "us-east-1"; + let args = std::env::args().collect::>(); + let usage = format!("{} ", args[0]); + let profile = args.get(1).expect(&usage); + let url = args.get(2).expect(&usage); + let bucket = args.get(3).expect(&usage); + let key = args.get(4).expect(&usage); + let file_name = args.get(5).expect(&usage); + let num_parts = args.get(6).expect(&usage).parse::().expect("Error parsing num parts"); + // credentials are read from .aws/credentials file + let conf = aws_config::from_env() + .region(REGION) + .credentials_provider( + aws_config::profile::ProfileFileCredentialsProvider::builder() + .profile_name(profile) + .build(), + ) + .load() + .await; + let uri = url.parse::().expect("Invalid URL"); + let ep = Endpoint::immutable(uri); + let s3_conf = aws_sdk_s3::config::Builder::from(&conf) + .endpoint_resolver(ep) + .build(); + let client = Client::from_conf(s3_conf); + upload_multipart(&client, &bucket, &file_name, &key, num_parts).await?; + Ok(()) +} + +/// Multipart upload +pub async fn upload_multipart( + client: &Client, + bucket: &str, + file_name: &str, + key: &str, + num_parts: usize +) -> Result<(), Error> { + let len: u64 = std::fs::metadata(file_name).map_err(|err| Error::Unhandled(Box::new(err)))?.len(); + let num_parts = num_parts as u64; + let file = tokio::fs::File::open(file_name).await.map_err(|err| Error::Unhandled(Box::new(err)))?; + let chunk_size = len / num_parts; + let last_chunk_size = chunk_size + len % num_parts; + // Initiate multipart upload and store upload id. + let u = client + .create_multipart_upload() + .bucket(bucket) + .key(key) + .send() + .await?; + let uid = u.upload_id().ok_or( + Error::NoSuchUpload(aws_sdk_s3::error::NoSuchUpload::builder().message("No upload ID").build()))?; + // Iterate over file chunks, changing the file pointer at each iteration + // and storing part id and associated etag into vector. + let mut completed_parts: Vec = Vec::new(); + for i in 0..num_parts { + let mut file = file.try_clone().await.unwrap(); + let size = if i != (num_parts - 1) { chunk_size } else { last_chunk_size }; + file.seek(std::io::SeekFrom::Start((i * len / num_parts) as u64)).await.map_err(|err| Error::Unhandled(Box::new(err)))?; + let body = ByteStream::from_file_chunk(file, size).await.map_err(|err| Error::Unhandled(Box::new(err)))?; + let up = client + .upload_part() + .bucket(bucket) + .key(key) + .content_length(size as i64) + .upload_id(uid.clone()) + .part_number((i + 1) as i32) + .body(body) + .send() + .await?; + let cp = CompletedPart::builder().set_e_tag(up.e_tag).part_number((i+1) as i32).build(); + completed_parts.push(cp); + + } + // Complete multipart upload, sending the (etag, part id) list along the request. + let b = CompletedMultipartUpload::builder().set_parts(Some(completed_parts)).build(); + let completed = client.complete_multipart_upload().multipart_upload(b). + upload_id(uid.clone()).bucket(bucket).key(key).send().await?; + // Print etag removing quotes. + if let Some(etag) = completed.e_tag { + println!("{}", etag.replace("\"","")); + } else { + eprintln!("Error receiving etag"); + } + Ok(()) +} diff --git a/examples/s3/src/bin/upload-file.rs b/examples/s3/src/bin/upload-file.rs new file mode 100644 index 000000000000..c16e94bf839f --- /dev/null +++ b/examples/s3/src/bin/upload-file.rs @@ -0,0 +1,66 @@ +use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::{Client, Endpoint, Error}; // snippet-end:[s3.rust.client-use] +use std::path::Path; + + +/// Upload a file chunk to an S3 object +#[tokio::main] +async fn main() -> Result<(), aws_sdk_s3::Error> { + let args = std::env::args().collect::>(); + let usage = format!("{} ", args[0]); + let profile = args.get(1).expect(&usage); + let url = args.get(2).expect(&usage); + let bucket = args.get(3).expect(&usage); + let key = args.get(4).expect(&usage); + let file_name = args.get(5).expect(&usage); + let start_offset = args.get(6).expect(&usage).parse::().expect("Error parsing offset"); + let chunk_size = args.get(7).expect(&usage).parse::().expect("Error parsing chunk size"); + let md = std::fs::metadata(file_name).map_err(|err| Error::Unhandled(Box::new(err)))?; + let chunk_size = if chunk_size == 0 { md.len()} else {chunk_size}; + + // credentials are read from .aws/credentials file + let conf = aws_config::from_env() + .region("us-east-1") + .credentials_provider( + aws_config::profile::ProfileFileCredentialsProvider::builder() + .profile_name(profile) + .build(), + ) + .load() + .await; + let uri = url.parse::().expect("Invalid URL"); + let ep = Endpoint::immutable(uri); + let s3_conf = aws_sdk_s3::config::Builder::from(&conf) + .endpoint_resolver(ep) + .build(); + let client = Client::from_conf(s3_conf); + upload_chunk(&client, &bucket, &file_name, &key, start_offset, chunk_size).await?; + Ok(()) +} + +/// Upload file chunk to bucket/key +pub async fn upload_chunk( + client: &Client, + bucket: &str, + file_name: &str, + key: &str, + start_offset: u64, + chunk_size: u64 +) -> Result<(), Error> { + let body = ByteStream::from_path_chunk(Path::new(file_name), start_offset, chunk_size) + .await + .expect(&format!("Cannot read from {}", file_name)); + client + .put_object() + .bucket(bucket) + .key(key) + .body(body) + .send() + .await?; + println!( + "Uploaded chunk of size {} from file {}", + chunk_size, + file_name, + ); + Ok(()) +}