Skip to content

Commit

Permalink
refactor(app): update deprecated hyper body calls (#3411)
Browse files Browse the repository at this point in the history
* chore(app/admin): add `http-body` dependency

before we address deprecated hyper interfaces related to `http_bodies`,
we'll want to add this dependency so that we can call `Body::collect()`.

Signed-off-by: katelyn martin <[email protected]>

* refactor(app): update deprecated hyper body calls

hyper 0.14.x provided a collection of interfaces related to collecting
and aggregating request and response bodies, which were deprecated and
removed in the 1.x major release.

this commit updates calls to `hyper::body::to_bytes(..)` and
`hyper::body::aggregate(..)`. for now, `http_body::Body` is used, but we
can use `http_body_util::BodyExt` once we've bumped our hyper dependency
to the 1.x major release.

for more information, see:

* linkerd/linkerd2#8733
* hyperium/hyper#2840
* hyperium/hyper#3020

Signed-off-by: katelyn martin <[email protected]>

---------

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn authored Dec 4, 2024
1 parent a4113a0 commit 36474b3
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,7 @@ dependencies = [
"deflate",
"futures",
"http",
"http-body",
"hyper",
"linkerd-app-core",
"linkerd-app-inbound",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log-streaming = ["linkerd-tracing/stream"]
[dependencies]
deflate = { version = "1", optional = true, features = ["gzip"] }
http = "0.2"
http-body = "0.4"
hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] }
futures = { version = "0.3", default-features = false }
pprof = { version = "0.14", optional = true, features = ["prost-codec"] }
Expand Down
8 changes: 5 additions & 3 deletions linkerd/app/admin/src/server/log/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ where
}

http::Method::PUT => {
#[allow(deprecated)] // linkerd/linkerd2#8733
let body = hyper::body::aggregate(req.into_body())
let body = req
.into_body()
.collect()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.aggregate();
match level.set_from(body.chunk()) {
Ok(_) => mk_rsp(StatusCode::NO_CONTENT, Body::empty()),
Err(error) => {
Expand Down
6 changes: 3 additions & 3 deletions linkerd/app/admin/src/server/log/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ where
// If the request is a QUERY, use the request body
method if method.as_str() == "QUERY" => {
// TODO(eliza): validate that the request has a content-length...
#[allow(deprecated)] // linkerd/linkerd2#8733
let body = recover!(
hyper::body::aggregate(req.into_body())
http_body::Body::collect(req.into_body())
.await
.map_err(Into::into),
.map_err(Into::into)
.map(http_body::Collected::aggregate),
"Reading log stream request body",
StatusCode::BAD_REQUEST
);
Expand Down
3 changes: 1 addition & 2 deletions linkerd/app/integration/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ where
// just can't prove it.
let req = futures::executor::block_on(async move {
let (parts, body) = req.into_parts();
#[allow(deprecated)] // linkerd/linkerd2#8733
let body = match hyper::body::to_bytes(body).await {
let body = match body.collect().await.map(http_body::Collected::to_bytes) {
Ok(body) => body,
Err(_) => unreachable!("body should not fail"),
};
Expand Down
13 changes: 7 additions & 6 deletions linkerd/app/integration/src/tests/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,14 +482,15 @@ mod http2 {
let res = fut.await.expect("beta response");
assert_eq!(res.status(), http::StatusCode::OK);

#[allow(deprecated)] // linkerd/linkerd2#8733
let body = String::from_utf8(
hyper::body::to_bytes(res.into_body())
let body = {
let body = res.into_body();
let body = http_body::Body::collect(body)
.await
.unwrap()
.to_vec(),
)
.unwrap();
.to_bytes()
.to_vec();
String::from_utf8(body).unwrap()
};
assert_eq!(body, "beta");
}
}
Expand Down
8 changes: 5 additions & 3 deletions linkerd/app/integration/src/tests/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ impl TestBuilder {
async move {
// Read the entire body before responding, so that the
// client doesn't fail when writing it out.
#[allow(deprecated)] // linkerd/linkerd2#8733
let _body = hyper::body::to_bytes(req.into_body()).await;
tracing::debug!(body = ?_body.as_ref().map(|body| body.len()), "recieved body");
let body = http_body::Body::collect(req.into_body())
.await
.map(http_body::Collected::to_bytes);
let bytes = body.as_ref().map(Bytes::len);
tracing::debug!(?bytes, "recieved body");
Ok::<_, Error>(if fail {
Response::builder().status(533).body("nope".into()).unwrap()
} else {
Expand Down
15 changes: 7 additions & 8 deletions linkerd/app/integration/src/tests/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ async fn h2_exercise_goaways_connections() {

let bodies = resps
.into_iter()
.map(
#[allow(deprecated)] // linkerd/linkerd2#8733
|resp| {
hyper::body::aggregate(resp.into_body())
// Make sure the bodies weren't cut off
.map_ok(|buf| assert_eq!(buf.remaining(), RESPONSE_SIZE))
},
)
.map(Response::into_body)
.map(|body| {
http_body::Body::collect(body)
.map_ok(http_body::Collected::aggregate)
// Make sure the bodies weren't cut off
.map_ok(|buf| assert_eq!(buf.remaining(), RESPONSE_SIZE))
})
.collect::<Vec<_>>();

// See that the proxy gives us all the bodies.
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/integration/src/tests/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,12 @@ async fn grpc_headers_end() {
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.headers()["grpc-status"], "1");
#[allow(deprecated)] // linkerd/linkerd2#8733
let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap().len();
let body = res.into_body();
let bytes = http_body::Body::collect(body)
.await
.unwrap()
.to_bytes()
.len();
assert_eq!(bytes, 0);

let event = events.skip(2).next().await.expect("2nd").expect("stream");
Expand Down
11 changes: 7 additions & 4 deletions linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,10 +1304,13 @@ async fn metrics_compression() {
);
}

#[allow(deprecated)] // linkerd/linkerd2#8733
let mut body = hyper::body::aggregate(resp.into_body())
.await
.expect("response body concat");
let mut body = {
let body = resp.into_body();
http_body::Body::collect(body)
.await
.expect("response body concat")
.aggregate()
};
let mut decoder = flate2::read::GzDecoder::new(std::io::Cursor::new(
body.copy_to_bytes(body.remaining()),
));
Expand Down
8 changes: 5 additions & 3 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ async fn assert_rsp<T: std::fmt::Debug>(
{
let rsp = rsp.await.expect("response must not fail");
assert_eq!(rsp.status(), status, "expected status code to be {status}");
#[allow(deprecated)] // linkerd/linkerd2#8733
let body = hyper::body::to_bytes(rsp.into_body())
let body = rsp
.into_body()
.collect()
.await
.expect("body must not fail");
.expect("body must not fail")
.to_bytes();
assert_eq!(body, expected_body, "expected body to be {expected_body:?}");
}

Expand Down
5 changes: 3 additions & 2 deletions linkerd/app/test/src/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ where
T: HttpBody,
T::Error: Into<Error>,
{
#[allow(deprecated)] // linkerd/linkerd2#8733
let body = hyper::body::to_bytes(body)
let body = body
.collect()
.await
.map(http_body::Collected::to_bytes)
.map_err(ContextError::ctx("HTTP response body stream failed"))?;
let body = std::str::from_utf8(&body[..])
.map_err(ContextError::ctx("converting body to string failed"))?
Expand Down

0 comments on commit 36474b3

Please sign in to comment.