From f3812cf5195db694a1c2c1dd9c8c2f2ab9ddf85e Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH] refactor(app): update deprecated hyper body calls hyper 0.14.x provided a collection of interfaces related to collecting and aggregating request and response bodies, which were deprecated and removed in the 1.x major release. this commit updates calls to `hyper::body::to_bytes(..)` and `hyper::body::aggregate(..)`. for now, `http_body::Body` is used, but we can use `http_body_util::BodyExt` once we've bumped our hyper dependency to the 1.x major release. for more information, see: * https://github.com/linkerd/linkerd2/issues/8733 * https://github.com/hyperium/hyper/issues/2840 * https://github.com/hyperium/hyper/pull/3020 Signed-off-by: katelyn martin --- linkerd/app/admin/src/server/log/level.rs | 8 +++++--- linkerd/app/admin/src/server/log/stream.rs | 6 +++--- linkerd/app/integration/src/tap.rs | 3 +-- linkerd/app/integration/src/tests/discovery.rs | 13 +++++++------ linkerd/app/integration/src/tests/profiles.rs | 8 +++++--- linkerd/app/integration/src/tests/shutdown.rs | 15 +++++++-------- linkerd/app/integration/src/tests/tap.rs | 8 ++++++-- linkerd/app/integration/src/tests/telemetry.rs | 11 +++++++---- linkerd/app/outbound/src/http/logical/tests.rs | 8 +++++--- linkerd/app/test/src/http_util.rs | 5 +++-- 10 files changed, 49 insertions(+), 36 deletions(-) diff --git a/linkerd/app/admin/src/server/log/level.rs b/linkerd/app/admin/src/server/log/level.rs index a916ba045c..2884ccca63 100644 --- a/linkerd/app/admin/src/server/log/level.rs +++ b/linkerd/app/admin/src/server/log/level.rs @@ -21,10 +21,12 @@ where } http::Method::PUT => { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let body = hyper::body::aggregate(req.into_body()) + let body = req + .into_body() + .collect() .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .aggregate(); match level.set_from(body.chunk()) { Ok(_) => mk_rsp(StatusCode::NO_CONTENT, Body::empty()), Err(error) => { diff --git a/linkerd/app/admin/src/server/log/stream.rs b/linkerd/app/admin/src/server/log/stream.rs index 953dfef91d..9f521f1dbe 100644 --- a/linkerd/app/admin/src/server/log/stream.rs +++ b/linkerd/app/admin/src/server/log/stream.rs @@ -52,11 +52,11 @@ where // If the request is a QUERY, use the request body method if method.as_str() == "QUERY" => { // TODO(eliza): validate that the request has a content-length... - #[allow(deprecated)] // linkerd/linkerd2#8733 let body = recover!( - hyper::body::aggregate(req.into_body()) + http_body::Body::collect(req.into_body()) .await - .map_err(Into::into), + .map_err(Into::into) + .map(http_body::Collected::aggregate), "Reading log stream request body", StatusCode::BAD_REQUEST ); diff --git a/linkerd/app/integration/src/tap.rs b/linkerd/app/integration/src/tap.rs index ef3989aa10..682bc33780 100644 --- a/linkerd/app/integration/src/tap.rs +++ b/linkerd/app/integration/src/tap.rs @@ -209,8 +209,7 @@ where // just can't prove it. let req = futures::executor::block_on(async move { let (parts, body) = req.into_parts(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let body = match hyper::body::to_bytes(body).await { + let body = match body.collect().await.map(http_body::Collected::to_bytes) { Ok(body) => body, Err(_) => unreachable!("body should not fail"), }; diff --git a/linkerd/app/integration/src/tests/discovery.rs b/linkerd/app/integration/src/tests/discovery.rs index d4b7ad5efe..1cb8d6b52b 100644 --- a/linkerd/app/integration/src/tests/discovery.rs +++ b/linkerd/app/integration/src/tests/discovery.rs @@ -482,14 +482,15 @@ mod http2 { let res = fut.await.expect("beta response"); assert_eq!(res.status(), http::StatusCode::OK); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let body = String::from_utf8( - hyper::body::to_bytes(res.into_body()) + let body = { + let body = res.into_body(); + let body = http_body::Body::collect(body) .await .unwrap() - .to_vec(), - ) - .unwrap(); + .to_bytes() + .to_vec(); + String::from_utf8(body).unwrap() + }; assert_eq!(body, "beta"); } } diff --git a/linkerd/app/integration/src/tests/profiles.rs b/linkerd/app/integration/src/tests/profiles.rs index b1fa49eec0..2972930d28 100644 --- a/linkerd/app/integration/src/tests/profiles.rs +++ b/linkerd/app/integration/src/tests/profiles.rs @@ -129,9 +129,11 @@ impl TestBuilder { async move { // Read the entire body before responding, so that the // client doesn't fail when writing it out. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let _body = hyper::body::to_bytes(req.into_body()).await; - tracing::debug!(body = ?_body.as_ref().map(|body| body.len()), "recieved body"); + let body = http_body::Body::collect(req.into_body()) + .await + .map(http_body::Collected::to_bytes); + let bytes = body.as_ref().map(Bytes::len); + tracing::debug!(?bytes, "recieved body"); Ok::<_, Error>(if fail { Response::builder().status(533).body("nope".into()).unwrap() } else { diff --git a/linkerd/app/integration/src/tests/shutdown.rs b/linkerd/app/integration/src/tests/shutdown.rs index 35b7ea1ead..e676abcef8 100644 --- a/linkerd/app/integration/src/tests/shutdown.rs +++ b/linkerd/app/integration/src/tests/shutdown.rs @@ -48,14 +48,13 @@ async fn h2_exercise_goaways_connections() { let bodies = resps .into_iter() - .map( - #[allow(deprecated)] // linkerd/linkerd2#8733 - |resp| { - hyper::body::aggregate(resp.into_body()) - // Make sure the bodies weren't cut off - .map_ok(|buf| assert_eq!(buf.remaining(), RESPONSE_SIZE)) - }, - ) + .map(Response::into_body) + .map(|body| { + http_body::Body::collect(body) + .map_ok(http_body::Collected::aggregate) + // Make sure the bodies weren't cut off + .map_ok(|buf| assert_eq!(buf.remaining(), RESPONSE_SIZE)) + }) .collect::>(); // See that the proxy gives us all the bodies. diff --git a/linkerd/app/integration/src/tests/tap.rs b/linkerd/app/integration/src/tests/tap.rs index 113ebae4ea..831f9fa834 100644 --- a/linkerd/app/integration/src/tests/tap.rs +++ b/linkerd/app/integration/src/tests/tap.rs @@ -253,8 +253,12 @@ async fn grpc_headers_end() { .unwrap(); assert_eq!(res.status(), 200); assert_eq!(res.headers()["grpc-status"], "1"); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap().len(); + let body = res.into_body(); + let bytes = http_body::Body::collect(body) + .await + .unwrap() + .to_bytes() + .len(); assert_eq!(bytes, 0); let event = events.skip(2).next().await.expect("2nd").expect("stream"); diff --git a/linkerd/app/integration/src/tests/telemetry.rs b/linkerd/app/integration/src/tests/telemetry.rs index a6360fd712..02a3adc059 100644 --- a/linkerd/app/integration/src/tests/telemetry.rs +++ b/linkerd/app/integration/src/tests/telemetry.rs @@ -1304,10 +1304,13 @@ async fn metrics_compression() { ); } - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut body = hyper::body::aggregate(resp.into_body()) - .await - .expect("response body concat"); + let mut body = { + let body = resp.into_body(); + http_body::Body::collect(body) + .await + .expect("response body concat") + .aggregate() + }; let mut decoder = flate2::read::GzDecoder::new(std::io::Cursor::new( body.copy_to_bytes(body.remaining()), )); diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index ea3ffd601d..58101b9b5d 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -144,10 +144,12 @@ async fn assert_rsp( { let rsp = rsp.await.expect("response must not fail"); assert_eq!(rsp.status(), status, "expected status code to be {status}"); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let body = hyper::body::to_bytes(rsp.into_body()) + let body = rsp + .into_body() + .collect() .await - .expect("body must not fail"); + .expect("body must not fail") + .to_bytes(); assert_eq!(body, expected_body, "expected body to be {expected_body:?}"); } diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 4205855de8..72abb3e97d 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -121,9 +121,10 @@ where T: HttpBody, T::Error: Into, { - #[allow(deprecated)] // linkerd/linkerd2#8733 - let body = hyper::body::to_bytes(body) + let body = body + .collect() .await + .map(http_body::Collected::to_bytes) .map_err(ContextError::ctx("HTTP response body stream failed"))?; let body = std::str::from_utf8(&body[..]) .map_err(ContextError::ctx("converting body to string failed"))?