Skip to content

Commit

Permalink
Allow to specify a read buffer initial capacity when creating ByteStr…
Browse files Browse the repository at this point in the history
…eam from a file

The behaviour of the existing ByteStream::from_file / ByteStream::from_path is unchanged (using a default buffer capacity of 4k, which corresponds to Tokio's ReaderStream default buffer capacity). Using higher buffer sizes can result in a large reduction in CPU during S3 uploads, at the cost of memory increase.
  • Loading branch information
tanguylebarzic committed Mar 4, 2022
1 parent d823f61 commit 1c40f14
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 6 deletions.
54 changes: 52 additions & 2 deletions rust-runtime/aws-smithy-http/src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ use std::io::IoSlice;
use std::pin::Pin;
use std::task::{Context, Poll};

const DEFAULT_BUFFER_SIZE: usize = 4096;

#[cfg(feature = "rt-tokio")]
mod bytestream_util;

Expand Down Expand Up @@ -262,6 +264,38 @@ impl ByteStream {
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_path(path: impl AsRef<std::path::Path>) -> Result<Self, Error> {
Self::from_path_with_buffer_size(path, DEFAULT_BUFFER_SIZE).await
}

/// Create a ByteStream that streams data from the filesystem, with a specific read buffer initial capacity.
///
/// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
/// will provide a size hint when used as an HTTP body. If the request fails, the read will
/// begin again by reloading the file handle.
///
/// Increasing the read buffer capacity to higher values than the default (4_096) can result in a large reduction
/// in CPU usage, at the cost of memory increase.
///
/// ## Warning
/// The contents of the file MUST not change during retries. The length & checksum of the file
/// will be cached. If the contents of the file change, the operation will almost certainly fail.
///
/// Furthermore, a partial write MAY seek in the file and resume from the previous location.
///
/// # Examples
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use std::path::Path;
/// async fn make_bytestream() -> ByteStream {
/// ByteStream::from_path_with_buffer_size("docs/rows.csv", 32_768).await.expect("file should be readable")
/// }
/// ```
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_path_with_buffer_size(
path: impl AsRef<std::path::Path>,
buffer_size: usize,
) -> Result<Self, Error> {
let path = path.as_ref();
let path_buf = path.to_path_buf();
let sz = tokio::fs::metadata(path)
Expand All @@ -270,7 +304,7 @@ impl ByteStream {
.len();
let body_loader = move || {
SdkBody::from_dyn(http_body::combinators::BoxBody::new(
bytestream_util::PathBody::from_path(path_buf.as_path(), sz),
bytestream_util::PathBody::from_path(path_buf.as_path(), sz, buffer_size),
))
};
Ok(ByteStream::new(SdkBody::retryable(body_loader)))
Expand All @@ -283,13 +317,29 @@ impl ByteStream {
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_file(file: tokio::fs::File) -> Result<Self, Error> {
Self::from_file_with_buffer_size(file, DEFAULT_BUFFER_SIZE).await
}

/// Create a ByteStream from a file, with a specific read buffer initial capacity.
///
/// Increasing the read buffer capacity to higher values than the default (4_096) can result in a large reduction
/// in CPU usage, at the cost of memory increase.
///
/// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
/// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path)
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_file_with_buffer_size(
file: tokio::fs::File,
buffer_size: usize,
) -> Result<Self, Error> {
let sz = file
.metadata()
.await
.map_err(|err| Error(err.into()))?
.len();
let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new(
bytestream_util::PathBody::from_file(file, sz),
bytestream_util::PathBody::from_file(file, sz, buffer_size),
));
Ok(ByteStream::new(body))
}
Expand Down
12 changes: 8 additions & 4 deletions rust-runtime/aws-smithy-http/src/byte_stream/bytestream_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@ use tokio_util::io::ReaderStream;
pub struct PathBody {
state: State,
len: u64,
buffer_size: usize,
}

impl PathBody {
pub fn from_path(path: &Path, len: u64) -> Self {
pub fn from_path(path: &Path, len: u64, buffer_size: usize) -> Self {
PathBody {
state: State::Unloaded(path.to_path_buf()),
len,
buffer_size,
}
}
pub fn from_file(file: File, len: u64) -> Self {
pub fn from_file(file: File, len: u64, buffer_size: usize) -> Self {
PathBody {
state: State::Loaded(ReaderStream::new(file)),
state: State::Loaded(ReaderStream::with_capacity(file, buffer_size)),
len,
buffer_size,
}
}
}
Expand Down Expand Up @@ -67,7 +70,8 @@ impl Body for PathBody {
State::Loading(ref mut future) => {
match ready!(Pin::new(future).poll(cx)) {
Ok(file) => {
self.state = State::Loaded(ReaderStream::new(file));
self.state =
State::Loaded(ReaderStream::with_capacity(file, self.buffer_size));
}
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
Expand Down

0 comments on commit 1c40f14

Please sign in to comment.