Skip to content

Commit

Permalink
chore(http/upgrade): replace hyper::Body with BoxBody (#3479)
Browse files Browse the repository at this point in the history
* chore(http/upgrade): replace `hyper::Body` with `BoxBody`

`hyper::Body` is removed in the 1.0 version.

this commit removes it from our upgrade facilities, using a generic body
parameter that defaults to BoxBody.

see <linkerd/linkerd2#8733>.

Signed-off-by: katelyn martin <[email protected]>

* review(http/upgrade): remove frivolous `Unpin` bound

https://github.com/linkerd/linkerd2-proxy/pull/3479/files#r1894068885

in `main` this isn't currently pinned, so this was needed to add the `B`
parameter originally in development, but tweaking how we poll the body
(_see lines 70-80, below_) means this bound is indeed frivolous now.

this commit removes an extraneous `Unpin` bound.

Co-authored-by: Scott Fleener <[email protected]>
Signed-off-by: katelyn martin <[email protected]>

---------

Signed-off-by: katelyn martin <[email protected]>
Co-authored-by: Scott Fleener <[email protected]>
  • Loading branch information
cratelyn and sfleen authored Dec 20, 2024
1 parent 81b43e5 commit fcfde84
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,7 @@ dependencies = [
"hyper",
"linkerd-duplex",
"linkerd-error",
"linkerd-http-box",
"linkerd-http-version",
"linkerd-io",
"linkerd-stack",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/upgrade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ try-lock = "0.2"

linkerd-duplex = { path = "../../duplex" }
linkerd-error = { path = "../../error" }
linkerd-http-box = { path = "../box" }
linkerd-http-version = { path = "../version" }
linkerd-io = { path = "../../io" }
linkerd-stack = { path = "../../stack" }
80 changes: 42 additions & 38 deletions linkerd/http/upgrade/src/glue.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::upgrade::Http11Upgrade;
use bytes::Bytes;
use futures::TryFuture;
use futures::{ready, TryFuture};
use http_body::Body;
use hyper::client::connect as hyper_connect;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_io::{self as io, AsyncRead, AsyncWrite};
use linkerd_stack::{MakeConnection, Service};
use pin_project::{pin_project, pinned_drop};
Expand All @@ -17,10 +17,11 @@ use tracing::debug;
/// Provides optional HTTP/1.1 upgrade support on the body.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct UpgradeBody {
pub struct UpgradeBody<B = BoxBody> {
/// In UpgradeBody::drop, if this was an HTTP upgrade, the body is taken
/// to be inserted into the Http11Upgrade half.
body: hyper::Body,
#[pin]
body: B,
pub(super) upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>,
}

Expand Down Expand Up @@ -50,9 +51,13 @@ pub struct HyperConnectFuture<F> {

// === impl UpgradeBody ===

impl Body for UpgradeBody {
type Data = Bytes;
type Error = hyper::Error;
impl<B> Body for UpgradeBody<B>
where
B: Body,
B::Error: std::fmt::Display,
{
type Data = B::Data;
type Error = B::Error;

fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
Expand All @@ -62,28 +67,34 @@ impl Body for UpgradeBody {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let body = self.project().body;
let poll = futures::ready!(Pin::new(body) // `hyper::Body` is Unpin
.poll_data(cx));
Poll::Ready(poll.map(|x| {
x.map_err(|e| {
debug!("http body error: {}", e);
e
})
}))
// Poll the next chunk from the body.
let this = self.project();
let body = this.body;
let data = ready!(body.poll_data(cx));

// Log errors.
if let Some(Err(e)) = &data {
debug!("http body error: {}", e);
}

Poll::Ready(data)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let body = self.project().body;
Pin::new(body) // `hyper::Body` is Unpin
.poll_trailers(cx)
.map_err(|e| {
debug!("http trailers error: {}", e);
e
})
// Poll the trailers from the body.
let this = self.project();
let body = this.body;
let trailers = ready!(body.poll_trailers(cx));

// Log errors.
if let Err(e) = &trailers {
debug!("http trailers error: {}", e);
}

Poll::Ready(trailers)
}

#[inline]
Expand All @@ -92,32 +103,23 @@ impl Body for UpgradeBody {
}
}

impl Default for UpgradeBody {
impl<B: Default> Default for UpgradeBody<B> {
fn default() -> Self {
hyper::Body::empty().into()
}
}

impl From<hyper::Body> for UpgradeBody {
fn from(body: hyper::Body) -> Self {
Self {
body,
body: B::default(),
upgrade: None,
}
}
}

impl UpgradeBody {
pub fn new(
body: hyper::Body,
upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>,
) -> Self {
impl<B> UpgradeBody<B> {
pub fn new(body: B, upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>) -> Self {
Self { body, upgrade }
}
}

#[pinned_drop]
impl PinnedDrop for UpgradeBody {
impl<B> PinnedDrop for UpgradeBody<B> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
// If an HTTP/1 upgrade was wanted, send the upgrade future.
Expand Down Expand Up @@ -164,6 +166,8 @@ where
}
}

// === impl HyperConnectFuture ===

impl<F, I, M> Future for HyperConnectFuture<F>
where
F: TryFuture<Ok = (I, M)> + 'static,
Expand All @@ -181,7 +185,7 @@ where
}
}

// === impl Connected ===
// === impl Connection ===

impl<C> AsyncRead for Connection<C>
where
Expand Down
10 changes: 5 additions & 5 deletions linkerd/http/upgrade/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,20 @@ impl<S> Service<S> {

type ResponseFuture<F, B, E> = Either<F, future::Ready<Result<http::Response<B>, E>>>;

impl<S, B> tower::Service<http::Request<hyper::Body>> for Service<S>
impl<S, ReqB, RespB> tower::Service<http::Request<ReqB>> for Service<S>
where
S: tower::Service<http::Request<UpgradeBody>, Response = http::Response<B>>,
B: Default,
S: tower::Service<http::Request<UpgradeBody<ReqB>>, Response = http::Response<RespB>>,
RespB: Default,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future, B, S::Error>;
type Future = ResponseFuture<S::Future, RespB, S::Error>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, mut req: http::Request<hyper::Body>) -> Self::Future {
fn call(&mut self, mut req: http::Request<ReqB>) -> Self::Future {
// Should this rejection happen later in the Service stack?
//
// Rejecting here means telemetry doesn't record anything about it...
Expand Down

0 comments on commit fcfde84

Please sign in to comment.