From 1c40f145dae95e6da2678ebf828f70cd889d6998 Mon Sep 17 00:00:00 2001
From: Tanguy Le Barzic <tanguy.lebarzic@datadoghq.com>
Date: Fri, 4 Mar 2022 16:27:12 -0500
Subject: [PATCH] Allow to specify a read buffer initial capacity when creating
 ByteStream from a file

The behaviour of the existing ByteStream::from_file / ByteStream::from_path is unchanged (using a default buffer capacity of 4k, which corresponds to Tokio's ReaderStream default buffer capacity). Using higher buffer sizes can result in a large reduction in CPU during S3 uploads, at the cost of memory increase.
---
 .../aws-smithy-http/src/byte_stream.rs        | 54 ++++++++++++++++++-
 .../src/byte_stream/bytestream_util.rs        | 12 +++--
 2 files changed, 60 insertions(+), 6 deletions(-)

diff --git a/rust-runtime/aws-smithy-http/src/byte_stream.rs b/rust-runtime/aws-smithy-http/src/byte_stream.rs
index fdd7ec9124..f63d7f82d3 100644
--- a/rust-runtime/aws-smithy-http/src/byte_stream.rs
+++ b/rust-runtime/aws-smithy-http/src/byte_stream.rs
@@ -108,6 +108,8 @@ use std::io::IoSlice;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
+const DEFAULT_BUFFER_SIZE: usize = 4096;
+
 #[cfg(feature = "rt-tokio")]
 mod bytestream_util;
 
@@ -262,6 +264,38 @@ impl ByteStream {
     #[cfg(feature = "rt-tokio")]
     #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
     pub async fn from_path(path: impl AsRef<std::path::Path>) -> Result<Self, Error> {
+        Self::from_path_with_buffer_size(path, DEFAULT_BUFFER_SIZE).await
+    }
+
+    /// Create a ByteStream that streams data from the filesystem, with a specific read buffer initial capacity.
+    ///
+    /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
+    /// will provide a size hint when used as an HTTP body. If the request fails, the read will
+    /// begin again by reloading the file handle.
+    ///
+    /// Increasing the read buffer capacity to higher values than the default (4_096) can result in a large reduction
+    /// in CPU usage, at the cost of memory increase.
+    ///
+    /// ## Warning
+    /// The contents of the file MUST not change during retries. The length & checksum of the file
+    /// will be cached. If the contents of the file change, the operation will almost certainly fail.
+    ///
+    /// Furthermore, a partial write MAY seek in the file and resume from the previous location.
+    ///
+    /// # Examples
+    /// ```no_run
+    /// use aws_smithy_http::byte_stream::ByteStream;
+    /// use std::path::Path;
+    ///  async fn make_bytestream() -> ByteStream {
+    ///     ByteStream::from_path_with_buffer_size("docs/rows.csv", 32_768).await.expect("file should be readable")
+    /// }
+    /// ```
+    #[cfg(feature = "rt-tokio")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
+    pub async fn from_path_with_buffer_size(
+        path: impl AsRef<std::path::Path>,
+        buffer_size: usize,
+    ) -> Result<Self, Error> {
         let path = path.as_ref();
         let path_buf = path.to_path_buf();
         let sz = tokio::fs::metadata(path)
@@ -270,7 +304,7 @@ impl ByteStream {
             .len();
         let body_loader = move || {
             SdkBody::from_dyn(http_body::combinators::BoxBody::new(
-                bytestream_util::PathBody::from_path(path_buf.as_path(), sz),
+                bytestream_util::PathBody::from_path(path_buf.as_path(), sz, buffer_size),
             ))
         };
         Ok(ByteStream::new(SdkBody::retryable(body_loader)))
@@ -283,13 +317,29 @@ impl ByteStream {
     #[cfg(feature = "rt-tokio")]
     #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
     pub async fn from_file(file: tokio::fs::File) -> Result<Self, Error> {
+        Self::from_file_with_buffer_size(file, DEFAULT_BUFFER_SIZE).await
+    }
+
+    /// Create a ByteStream from a file, with a specific read buffer initial capacity.
+    ///
+    /// Increasing the read buffer capacity to higher values than the default (4_096) can result in a large reduction
+    /// in CPU usage, at the cost of memory increase.
+    ///
+    /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
+    /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path)
+    #[cfg(feature = "rt-tokio")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
+    pub async fn from_file_with_buffer_size(
+        file: tokio::fs::File,
+        buffer_size: usize,
+    ) -> Result<Self, Error> {
         let sz = file
             .metadata()
             .await
             .map_err(|err| Error(err.into()))?
             .len();
         let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new(
-            bytestream_util::PathBody::from_file(file, sz),
+            bytestream_util::PathBody::from_file(file, sz, buffer_size),
         ));
         Ok(ByteStream::new(body))
     }
diff --git a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs
index a1b304d94f..ad38c1a800 100644
--- a/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs
+++ b/rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs
@@ -24,19 +24,22 @@ use tokio_util::io::ReaderStream;
 pub struct PathBody {
     state: State,
     len: u64,
+    buffer_size: usize,
 }
 
 impl PathBody {
-    pub fn from_path(path: &Path, len: u64) -> Self {
+    pub fn from_path(path: &Path, len: u64, buffer_size: usize) -> Self {
         PathBody {
             state: State::Unloaded(path.to_path_buf()),
             len,
+            buffer_size,
         }
     }
-    pub fn from_file(file: File, len: u64) -> Self {
+    pub fn from_file(file: File, len: u64, buffer_size: usize) -> Self {
         PathBody {
-            state: State::Loaded(ReaderStream::new(file)),
+            state: State::Loaded(ReaderStream::with_capacity(file, buffer_size)),
             len,
+            buffer_size,
         }
     }
 }
@@ -67,7 +70,8 @@ impl Body for PathBody {
                 State::Loading(ref mut future) => {
                     match ready!(Pin::new(future).poll(cx)) {
                         Ok(file) => {
-                            self.state = State::Loaded(ReaderStream::new(file));
+                            self.state =
+                                State::Loaded(ReaderStream::with_capacity(file, self.buffer_size));
                         }
                         Err(e) => return Poll::Ready(Some(Err(e.into()))),
                     };