Skip to content

Commit

Permalink
Remove client body callbacks (#2065)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdisanti authored Dec 6, 2022
1 parent 3ecefff commit 18fc692
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 257 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,15 @@ message = "Implementation of the Debug trait for container shapes now redacts wh
references = ["smithy-rs#1983", "smithy-rs#2029"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "`SdkBody` callbacks have been removed. If you were using these, please [file an issue](https://github.com/awslabs/aws-sdk-rust/issues/new) so that we can better understand your use-case and provide the support you need."
references = ["smithy-rs#2065"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "jdisanti"

[[smithy-rs]]
message = "`SdkBody` callbacks have been removed. If you were using these, please [file an issue](https://github.com/awslabs/smithy-rs/issues/new) so that we can better understand your use-case and provide the support you need."
references = ["smithy-rs#2065"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client"}
author = "jdisanti"
71 changes: 3 additions & 68 deletions rust-runtime/aws-smithy-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::callback::BodyCallback;
use crate::header::append_merge_header_maps;

pub type Error = Box<dyn StdError + Send + Sync>;

pin_project! {
Expand All @@ -35,9 +32,6 @@ pin_project! {
// In the event of retry, this function will be called to generate a new body. See
// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
}
}

Expand Down Expand Up @@ -96,7 +90,6 @@ impl SdkBody {
Self {
inner: Inner::Dyn { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
}

Expand All @@ -113,32 +106,29 @@ impl SdkBody {
SdkBody {
inner: initial.inner,
rebuild: Some(Arc::new(move || f().inner)),
callbacks: Vec::new(),
}
}

pub fn taken() -> Self {
Self {
inner: Inner::Taken,
rebuild: None,
callbacks: Vec::new(),
}
}

pub fn empty() -> Self {
Self {
inner: Inner::Once { inner: None },
rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
callbacks: Vec::new(),
}
}

fn poll_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
let polling_result = match this.inner.project() {
let this = self.project();
match this.inner.project() {
InnerProj::Once { ref mut inner } => {
let data = inner.take();
match data {
Expand All @@ -152,29 +142,7 @@ impl SdkBody {
InnerProj::Taken => {
Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
}
};

match &polling_result {
// When we get some bytes back from polling, pass those bytes to each callback in turn
Poll::Ready(Some(Ok(bytes))) => {
for callback in this.callbacks.iter_mut() {
// Callbacks can run into errors when reading bytes. They'll be surfaced here
callback.update(bytes)?;
}
}
// When we're done polling for bytes, run each callback's `trailers()` method. If any calls to
// `trailers()` return an error, propagate that error up. Otherwise, continue.
Poll::Ready(None) => {
for callback_result in this.callbacks.iter().map(BodyCallback::trailers) {
if let Err(e) = callback_result {
return Poll::Ready(Some(Err(e)));
}
}
}
_ => (),
}

polling_result
}

/// If possible, return a reference to this body as `&[u8]`
Expand All @@ -192,12 +160,9 @@ impl SdkBody {
pub fn try_clone(&self) -> Option<Self> {
self.rebuild.as_ref().map(|rebuild| {
let next = rebuild();
let callbacks = self.callbacks.iter().map(BodyCallback::make_new).collect();

Self {
inner: next,
rebuild: self.rebuild.clone(),
callbacks,
}
})
}
Expand All @@ -206,11 +171,6 @@ impl SdkBody {
http_body::Body::size_hint(self).exact()
}

pub fn with_callback(&mut self, callback: Box<dyn BodyCallback>) -> &mut Self {
self.callbacks.push(callback);
self
}

pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
if self.rebuild.is_some() {
SdkBody::retryable(move || f(self.try_clone().unwrap()))
Expand All @@ -235,7 +195,6 @@ impl From<Bytes> for SdkBody {
rebuild: Some(Arc::new(move || Inner::Once {
inner: Some(bytes.clone()),
})),
callbacks: Vec::new(),
}
}
}
Expand All @@ -245,7 +204,6 @@ impl From<hyper::Body> for SdkBody {
SdkBody {
inner: Inner::Streaming { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -283,30 +241,7 @@ impl http_body::Body for SdkBody {
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
let mut header_map = None;
// Iterate over all callbacks, checking each for any `HeaderMap`s
for callback in &self.callbacks {
match callback.trailers() {
// If this is the first `HeaderMap` we've encountered, save it
Ok(Some(right_header_map)) if header_map.is_none() => {
header_map = Some(right_header_map);
}
// If this is **not** the first `HeaderMap` we've encountered, merge it
Ok(Some(right_header_map)) if header_map.is_some() => {
header_map = Some(append_merge_header_maps(
header_map.unwrap(),
right_header_map,
));
}
// Early return if a callback encountered an error.
Err(e) => {
return Poll::Ready(Err(e));
}
// Otherwise, continue on to the next iteration of the loop.
_ => continue,
}
}
Poll::Ready(Ok(header_map))
Poll::Ready(Ok(None))
}

fn is_end_stream(&self) -> bool {
Expand Down
16 changes: 0 additions & 16 deletions rust-runtime/aws-smithy-http/src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
use crate::body::SdkBody;
use crate::byte_stream::error::Error;
use crate::callback::BodyCallback;
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
Expand Down Expand Up @@ -374,14 +373,6 @@ impl ByteStream {
FsBuilder::new().file(file).build().await
}

/// Set a callback on this `ByteStream`. The callback's methods will be called at various points
/// throughout this `ByteStream`'s life cycle. See the [`BodyCallback`](BodyCallback) trait for
/// more information.
pub fn with_body_callback(&mut self, body_callback: Box<dyn BodyCallback>) -> &mut Self {
self.inner.with_body_callback(body_callback);
self
}

#[cfg(feature = "rt-tokio")]
/// Convert this `ByteStream` into a struct that implements [`AsyncRead`](tokio::io::AsyncRead).
///
Expand Down Expand Up @@ -542,13 +533,6 @@ impl<B> Inner<B> {
}
}

impl Inner<SdkBody> {
fn with_body_callback(&mut self, body_callback: Box<dyn BodyCallback>) -> &mut Self {
self.body.with_callback(body_callback);
self
}
}

const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#"
You're running a 32-bit system and this stream's length is too large to be represented with a usize.
Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture.
Expand Down
172 changes: 0 additions & 172 deletions rust-runtime/aws-smithy-http/src/callback.rs

This file was deleted.

1 change: 0 additions & 1 deletion rust-runtime/aws-smithy-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod body;
pub mod callback;
pub mod endpoint;
pub mod header;
pub mod http_versions;
Expand Down

0 comments on commit 18fc692

Please sign in to comment.