Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove client body callbacks #2065

Merged
merged 3 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,15 @@ message = "Implementation of the Debug trait for container shapes now redacts wh
references = ["smithy-rs#1983", "smithy-rs#2029"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "`SdkBody` callbacks have been removed. If you were using these, please [file an issue](https://github.com/awslabs/aws-sdk-rust/issues/new) so that we can better understand your use-case and provide the support you need."
references = ["smithy-rs#2065"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "jdisanti"

[[smithy-rs]]
message = "`SdkBody` callbacks have been removed. If you were using these, please [file an issue](https://github.com/awslabs/smithy-rs/issues/new) so that we can better understand your use-case and provide the support you need."
references = ["smithy-rs#2065"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client"}
author = "jdisanti"
71 changes: 3 additions & 68 deletions rust-runtime/aws-smithy-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::callback::BodyCallback;
use crate::header::append_merge_header_maps;

pub type Error = Box<dyn StdError + Send + Sync>;

pin_project! {
Expand All @@ -35,9 +32,6 @@ pin_project! {
// In the event of retry, this function will be called to generate a new body. See
// [`try_clone()`](SdkBody::try_clone)
rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
// A list of callbacks that will be called at various points of this `SdkBody`'s lifecycle
#[pin]
callbacks: Vec<Box<dyn BodyCallback>>,
}
}

Expand Down Expand Up @@ -96,7 +90,6 @@ impl SdkBody {
Self {
inner: Inner::Dyn { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
}

Expand All @@ -113,32 +106,29 @@ impl SdkBody {
SdkBody {
inner: initial.inner,
rebuild: Some(Arc::new(move || f().inner)),
callbacks: Vec::new(),
}
}

pub fn taken() -> Self {
Self {
inner: Inner::Taken,
rebuild: None,
callbacks: Vec::new(),
}
}

pub fn empty() -> Self {
Self {
inner: Inner::Once { inner: None },
rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
callbacks: Vec::new(),
}
}

fn poll_inner(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
let polling_result = match this.inner.project() {
let this = self.project();
match this.inner.project() {
InnerProj::Once { ref mut inner } => {
let data = inner.take();
match data {
Expand All @@ -152,29 +142,7 @@ impl SdkBody {
InnerProj::Taken => {
Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
}
};

match &polling_result {
// When we get some bytes back from polling, pass those bytes to each callback in turn
Poll::Ready(Some(Ok(bytes))) => {
for callback in this.callbacks.iter_mut() {
// Callbacks can run into errors when reading bytes. They'll be surfaced here
callback.update(bytes)?;
}
}
// When we're done polling for bytes, run each callback's `trailers()` method. If any calls to
// `trailers()` return an error, propagate that error up. Otherwise, continue.
Poll::Ready(None) => {
for callback_result in this.callbacks.iter().map(BodyCallback::trailers) {
if let Err(e) = callback_result {
return Poll::Ready(Some(Err(e)));
}
}
}
_ => (),
}

polling_result
}

/// If possible, return a reference to this body as `&[u8]`
Expand All @@ -192,12 +160,9 @@ impl SdkBody {
pub fn try_clone(&self) -> Option<Self> {
self.rebuild.as_ref().map(|rebuild| {
let next = rebuild();
let callbacks = self.callbacks.iter().map(BodyCallback::make_new).collect();

Self {
inner: next,
rebuild: self.rebuild.clone(),
callbacks,
}
})
}
Expand All @@ -206,11 +171,6 @@ impl SdkBody {
http_body::Body::size_hint(self).exact()
}

pub fn with_callback(&mut self, callback: Box<dyn BodyCallback>) -> &mut Self {
self.callbacks.push(callback);
self
}

pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
if self.rebuild.is_some() {
SdkBody::retryable(move || f(self.try_clone().unwrap()))
Expand All @@ -235,7 +195,6 @@ impl From<Bytes> for SdkBody {
rebuild: Some(Arc::new(move || Inner::Once {
inner: Some(bytes.clone()),
})),
callbacks: Vec::new(),
}
}
}
Expand All @@ -245,7 +204,6 @@ impl From<hyper::Body> for SdkBody {
SdkBody {
inner: Inner::Streaming { inner: body },
rebuild: None,
callbacks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -283,30 +241,7 @@ impl http_body::Body for SdkBody {
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
let mut header_map = None;
// Iterate over all callbacks, checking each for any `HeaderMap`s
for callback in &self.callbacks {
match callback.trailers() {
// If this is the first `HeaderMap` we've encountered, save it
Ok(Some(right_header_map)) if header_map.is_none() => {
header_map = Some(right_header_map);
}
// If this is **not** the first `HeaderMap` we've encountered, merge it
Ok(Some(right_header_map)) if header_map.is_some() => {
header_map = Some(append_merge_header_maps(
header_map.unwrap(),
right_header_map,
));
}
// Early return if a callback encountered an error.
Err(e) => {
return Poll::Ready(Err(e));
}
// Otherwise, continue on to the next iteration of the loop.
_ => continue,
}
}
Poll::Ready(Ok(header_map))
Poll::Ready(Ok(None))
}

fn is_end_stream(&self) -> bool {
Expand Down
16 changes: 0 additions & 16 deletions rust-runtime/aws-smithy-http/src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@

use crate::body::SdkBody;
use crate::byte_stream::error::Error;
use crate::callback::BodyCallback;
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
Expand Down Expand Up @@ -374,14 +373,6 @@ impl ByteStream {
FsBuilder::new().file(file).build().await
}

/// Set a callback on this `ByteStream`. The callback's methods will be called at various points
/// throughout this `ByteStream`'s life cycle. See the [`BodyCallback`](BodyCallback) trait for
/// more information.
pub fn with_body_callback(&mut self, body_callback: Box<dyn BodyCallback>) -> &mut Self {
self.inner.with_body_callback(body_callback);
self
}

#[cfg(feature = "rt-tokio")]
/// Convert this `ByteStream` into a struct that implements [`AsyncRead`](tokio::io::AsyncRead).
///
Expand Down Expand Up @@ -542,13 +533,6 @@ impl<B> Inner<B> {
}
}

impl Inner<SdkBody> {
fn with_body_callback(&mut self, body_callback: Box<dyn BodyCallback>) -> &mut Self {
self.body.with_callback(body_callback);
self
}
}

const SIZE_HINT_32_BIT_PANIC_MESSAGE: &str = r#"
You're running a 32-bit system and this stream's length is too large to be represented with a usize.
Please limit stream length to less than 4.294Gb or run this program on a 64-bit computer architecture.
Expand Down
172 changes: 0 additions & 172 deletions rust-runtime/aws-smithy-http/src/callback.rs

This file was deleted.

1 change: 0 additions & 1 deletion rust-runtime/aws-smithy-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod body;
pub mod callback;
pub mod endpoint;
pub mod header;
pub mod http_versions;
Expand Down