Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app/core): prepare rescue body for http-body upgrade #3616

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 61 additions & 37 deletions linkerd/app/core/src/errors/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
header::{GRPC_MESSAGE, GRPC_STATUS},
respond::{HttpRescue, SyntheticHttpResponse},
};
use http::header::HeaderValue;
use http::{header::HeaderValue, HeaderMap};
use linkerd_error::{Error, Result};
use pin_project::pin_project;
use std::{
Expand All @@ -20,14 +20,18 @@ pub struct ResponseBody<R, B>(#[pin] Inner<R, B>);

#[pin_project(project = InnerProj)]
enum Inner<R, B> {
/// An inert body that delegates directly down to the underlying body `B`.
Passthru(#[pin] B),
/// A body that will be rescued if it yields an error.
GrpcRescue {
#[pin]
inner: B,
trailers: Option<http::HeaderMap>,
/// An error response [strategy][HttpRescue].
rescue: R,
emit_headers: bool,
},
/// The underlying body `B` yielded an error and was "rescued".
Rescued { trailers: Option<http::HeaderMap> },
}

// === impl ResponseBody ===
Expand All @@ -44,7 +48,6 @@ impl<R, B> ResponseBody<R, B> {
inner,
rescue,
emit_headers,
trailers: None,
})
}
}
Expand All @@ -64,34 +67,27 @@ where
type Error = B::Error;

fn poll_data(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let ResponseBodyProj(inner) = self.project();
let ResponseBodyProj(inner) = self.as_mut().project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_data(cx),
InnerProj::Rescued { trailers: _ } => Poll::Ready(None),
InnerProj::GrpcRescue {
inner,
trailers,
rescue,
emit_headers,
} => {
// should not be calling poll_data if we have set trailers derived from an error
assert!(trailers.is_none());
match inner.poll_data(cx) {
Poll::Ready(Some(Err(error))) => {
let SyntheticHttpResponse {
grpc_status,
message,
..
} = rescue.rescue(error)?;
let t = Self::grpc_trailers(grpc_status, &message, *emit_headers);
*trailers = Some(t);
Poll::Ready(None)
}
data => data,
} => match inner.poll_data(cx) {
Poll::Ready(Some(Err(error))) => {
// The inner body has yielded an error, which we will try to rescue. If so,
// store our synthetic trailers reporting the error.
let trailers = Self::rescue(error, rescue, *emit_headers)?;
self.set_rescued(trailers);
Poll::Ready(None)
Comment on lines -78 to +87
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the important part to look at ☝️

}
}
data => data,
},
}
}

Expand All @@ -103,12 +99,8 @@ where
let ResponseBodyProj(inner) = self.project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_trailers(cx),
InnerProj::GrpcRescue {
inner, trailers, ..
} => match trailers.take() {
Some(t) => Poll::Ready(Ok(Some(t))),
None => inner.poll_trailers(cx),
},
InnerProj::GrpcRescue { inner, .. } => inner.poll_trailers(cx),
InnerProj::Rescued { trailers } => Poll::Ready(Ok(trailers.take())),
}
}

Expand All @@ -117,9 +109,8 @@ where
let Self(inner) = self;
match inner {
Inner::Passthru(inner) => inner.is_end_stream(),
Inner::GrpcRescue {
inner, trailers, ..
} => trailers.is_none() && inner.is_end_stream(),
Inner::GrpcRescue { inner, .. } => inner.is_end_stream(),
Inner::Rescued { trailers } => trailers.is_none(),
}
}

Expand All @@ -129,25 +120,58 @@ where
match inner {
Inner::Passthru(inner) => inner.size_hint(),
Inner::GrpcRescue { inner, .. } => inner.size_hint(),
Inner::Rescued { .. } => http_body::SizeHint::with_exact(0),
}
}
}

impl<R, B> ResponseBody<R, B> {
fn grpc_trailers(code: tonic::Code, message: &str, emit_headers: bool) -> http::HeaderMap {
debug!(grpc.status = ?code, "Synthesizing gRPC trailers");
impl<R, B> ResponseBody<R, B>
where
B: http_body::Body,
R: HttpRescue<B::Error>,
{
/// Maps an error yielded by the inner body to a collection of gRPC trailers.
///
/// This function returns `Ok(trailers)` if the given [`HttpRescue<E>`] strategy could identify
/// a cause for an error yielded by the inner `B`-typed body.
fn rescue(
error: B::Error,
rescue: &R,
emit_headers: bool,
) -> Result<http::HeaderMap, B::Error> {
let SyntheticHttpResponse {
grpc_status,
message,
..
} = rescue.rescue(error)?;

debug!(grpc.status = ?grpc_status, "Synthesizing gRPC trailers");
let mut t = http::HeaderMap::new();
t.insert(GRPC_STATUS, super::code_header(code));
t.insert(GRPC_STATUS, super::code_header(grpc_status));
if emit_headers {
// A gRPC message trailer is only included if instructed to emit additional headers.
t.insert(
GRPC_MESSAGE,
HeaderValue::from_str(message).unwrap_or_else(|error| {
HeaderValue::from_str(&message).unwrap_or_else(|error| {
warn!(%error, "Failed to encode error header");
HeaderValue::from_static("Unexpected error")
}),
);
}
t

Ok(t)
}
}

impl<R, B> ResponseBody<R, B> {
/// Marks this body as "rescued".
///
/// No more data frames will be yielded, and the given trailers will be returned when this
/// body is polled.
fn set_rescued(mut self: Pin<&mut Self>, trailers: HeaderMap) {
let trailers = Some(trailers);
let new = Self(Inner::Rescued { trailers });
self.set(new);
}
}

Expand Down
Loading