Skip to content

Commit

Permalink
Avoid processing incoming bodies unnecessarily
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanUkhov committed Oct 10, 2024
1 parent 276be42 commit 5e728f7
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 22 deletions.
23 changes: 15 additions & 8 deletions google-apis-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::time::sleep;
const LINE_ENDING: &str = "\r\n";

/// A body.
pub type Body = http_body_util::Full<hyper::body::Bytes>;
pub type Body = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;

/// A response.
pub type Response = hyper::Response<Body>;
Expand Down Expand Up @@ -618,7 +618,7 @@ where
.header_value(),
)
.header(AUTHORIZATION, self.auth_header.clone())
.body(Default::default())
.body(to_body::<String>(None))
.unwrap(),
)
.await
Expand All @@ -632,7 +632,7 @@ where
}
None | Some(_) => {
let (parts, body) = r.into_parts();
let body = Body::new(to_bytes(body).await.unwrap_or_default());
let body = to_body(to_bytes(body).await);
let response = Response::from_parts(parts, body);
if let Retry::After(d) = self.delegate.http_failure(&response, None) {
sleep(d).await;
Expand Down Expand Up @@ -704,7 +704,7 @@ where
.header("Content-Range", range_header.header_value())
.header(CONTENT_TYPE, format!("{}", self.media_type))
.header(USER_AGENT, self.user_agent.to_string())
.body(to_body(bytes))
.body(to_body(bytes.into()))
.unwrap(),
)
.await
Expand All @@ -724,7 +724,7 @@ where
} else {
None
};
let response = to_response(parts, bytes);
let response = to_response(parts, bytes.into());

if !success {
if let Retry::After(d) =
Expand Down Expand Up @@ -764,11 +764,18 @@ pub fn remove_json_null_values(value: &mut serde_json::value::Value) {
}

#[doc(hidden)]
pub fn to_body<T>(bytes: T) -> Body
pub fn to_body<T>(bytes: Option<T>) -> Body
where
T: Into<hyper::body::Bytes>,
{
Body::new(bytes.into())
use http_body_util::BodyExt;

fn falliable(_: std::convert::Infallible) -> hyper::Error {
unreachable!()
}

let bytes = bytes.map(Into::into).unwrap_or_default();
Body::new(http_body_util::Full::from(bytes).map_err(falliable))
}

#[doc(hidden)]
Expand All @@ -786,7 +793,7 @@ pub fn to_string(bytes: &hyper::body::Bytes) -> std::borrow::Cow<'_, str> {
}

#[doc(hidden)]
pub fn to_response<T>(parts: http::response::Parts, body: T) -> Response
pub fn to_response<T>(parts: http::response::Parts, body: Option<T>) -> Response
where
T: Into<hyper::body::Bytes>,
{
Expand Down
28 changes: 14 additions & 14 deletions src/generator/templates/api/lib/mbuild.mako
Original file line number Diff line number Diff line change
Expand Up @@ -739,13 +739,13 @@ else {
let request = req_builder
.header(CONTENT_TYPE, json_mime_type.to_string())
.header(CONTENT_LENGTH, request_size as u64)
.body(common::to_body(request_value_reader.get_ref().clone()))\
.body(common::to_body(request_value_reader.get_ref().clone().into()))\
% else:
let mut body_reader_bytes = vec![];
body_reader.read_to_end(&mut body_reader_bytes).unwrap();
let request = req_builder
.header(CONTENT_TYPE, content_type.to_string())
.body(common::to_body(body_reader_bytes))\
.body(common::to_body(body_reader_bytes.into()))\
% endif ## not simple_media_param
% else:
% if simple_media_param:
Expand All @@ -755,14 +755,14 @@ else {
reader.read_to_end(&mut bytes)?;
req_builder.header(CONTENT_TYPE, reader_mime_type.to_string())
.header(CONTENT_LENGTH, size)
.body(common::to_body(bytes))
.body(common::to_body(bytes.into()))
} else {
req_builder.body(Default::default())
req_builder.body(common::to_body::<String>(None))
}\
% else:
let request = req_builder
.header(CONTENT_LENGTH, 0_u64)
.body(Default::default())\
.body(common::to_body::<String>(None))\
% endif
% endif
;
Expand All @@ -783,10 +783,11 @@ else {
}
Ok(res) => {
let (mut parts, body) = res.into_parts();
let mut bytes = common::to_bytes(body).await.unwrap_or_default();
let mut body = common::Body::new(body);
if !parts.status.is_success() {
let bytes = common::to_bytes(body).await.unwrap_or_default();
let error = serde_json::from_str(&common::to_string(&bytes));
let response = common::to_response(parts, bytes);
let response = common::to_response(parts, bytes.into());
if let common::Retry::After(d) = dlg.http_failure(&response, error.as_ref().ok()) {
sleep(d).await;
Expand Down Expand Up @@ -836,14 +837,12 @@ else {
## Now the result contains the actual resource, if any ... it will be
## decoded next
Some(Ok(response)) => {
let parts_body = response.into_parts();
parts = parts_body.0;
bytes = common::to_bytes(parts_body.1).await.unwrap_or_default();
(parts, body) = response.into_parts();
if !parts.status.is_success() {
## delegate was called in upload() already - don't tell him again
dlg.store_upload_url(None);
${delegate_finish}(false);
return Err(common::Error::Failure(common::to_response(parts, bytes)));
return Err(common::Error::Failure(common::Response::from_parts(parts, body)));
}
}
}
Expand All @@ -856,9 +855,10 @@ else {
if enable_resource_parsing \
% endif
{
let bytes = common::to_bytes(body).await.unwrap_or_default();
let encoded = common::to_string(&bytes);
match serde_json::from_str(&encoded) {
Ok(decoded) => (common::to_response(parts, bytes), decoded),
Ok(decoded) => (common::to_response(parts, bytes.into()), decoded),
Err(error) => {
dlg.response_json_decode_error(&encoded, &error);
return Err(common::Error::JsonDecodeError(encoded.to_string(), error));
Expand All @@ -867,12 +867,12 @@ if enable_resource_parsing \
}\
% if supports_download:
else {
(common::to_response(parts, bytes), Default::default())
(common::Response::from_parts(parts, body), Default::default())
}\
% endif
;
% else:
let response = common::to_response(parts, bytes);
let response = common::Response::from_parts(parts, body);
% endif
${delegate_finish}(true);
Expand Down

0 comments on commit 5e728f7

Please sign in to comment.