Skip to content

Commit

Permalink
refactor(app/core): prepare rescue body for http-body upgrade
Browse files Browse the repository at this point in the history
XXX(kate): this commit is not yet signed off. write a proper commit
message for this.
  • Loading branch information
cratelyn committed Feb 13, 2025
1 parent b883089 commit 424fa9a
Showing 1 changed file with 26 additions and 31 deletions.
57 changes: 26 additions & 31 deletions linkerd/app/core/src/errors/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ use tracing::{debug, warn};
#[pin_project(project = ResponseBodyProj)]
pub struct ResponseBody<R, B>(#[pin] Inner<R, B>);

#[pin_project(project = InnerProj)]
#[pin_project(project = InnerProj, project_replace = InnerProjReplaced)]
enum Inner<R, B> {
Passthru(#[pin] B),
GrpcRescue {
#[pin]
inner: B,
trailers: Option<http::HeaderMap>,
rescue: R,
emit_headers: bool,
},
Rescued {
trailers: Option<http::HeaderMap>,
},
}

// === impl ResponseBody ===
Expand All @@ -44,7 +46,6 @@ impl<R, B> ResponseBody<R, B> {
inner,
rescue,
emit_headers,
trailers: None,
})
}
}
Expand All @@ -64,34 +65,32 @@ where
type Error = B::Error;

fn poll_data(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let ResponseBodyProj(inner) = self.project();
let ResponseBodyProj(inner) = self.as_mut().project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_data(cx),
InnerProj::Rescued { trailers: _ } => Poll::Ready(None),
InnerProj::GrpcRescue {
inner,
trailers,
rescue,
emit_headers,
} => {
// should not be calling poll_data if we have set trailers derived from an error
assert!(trailers.is_none());
match inner.poll_data(cx) {
Poll::Ready(Some(Err(error))) => {
let SyntheticHttpResponse {
grpc_status,
message,
..
} = rescue.rescue(error)?;
let t = Self::grpc_trailers(grpc_status, &message, *emit_headers);
*trailers = Some(t);
Poll::Ready(None)
}
data => data,
} => match inner.poll_data(cx) {
Poll::Ready(Some(Err(error))) => {
let SyntheticHttpResponse {
grpc_status,
message,
..
} = rescue.rescue(error)?;
let trailers = Self::grpc_trailers(grpc_status, &message, *emit_headers);
self.project().0.project_replace(Inner::Rescued {
trailers: Some(trailers),
});
Poll::Ready(None)
}
}
data => data,
},
}
}

Expand All @@ -103,12 +102,8 @@ where
let ResponseBodyProj(inner) = self.project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_trailers(cx),
InnerProj::GrpcRescue {
inner, trailers, ..
} => match trailers.take() {
Some(t) => Poll::Ready(Ok(Some(t))),
None => inner.poll_trailers(cx),
},
InnerProj::GrpcRescue { inner, .. } => inner.poll_trailers(cx),
InnerProj::Rescued { trailers } => Poll::Ready(Ok(trailers.take())),
}
}

Expand All @@ -117,9 +112,8 @@ where
let Self(inner) = self;
match inner {
Inner::Passthru(inner) => inner.is_end_stream(),
Inner::GrpcRescue {
inner, trailers, ..
} => trailers.is_none() && inner.is_end_stream(),
Inner::GrpcRescue { inner, .. } => inner.is_end_stream(),
Inner::Rescued { trailers } => trailers.is_none(),
}
}

Expand All @@ -129,6 +123,7 @@ where
match inner {
Inner::Passthru(inner) => inner.size_hint(),
Inner::GrpcRescue { inner, .. } => inner.size_hint(),
Inner::Rescued { trailers: _ } => http_body::SizeHint::with_exact(0),
}
}
}
Expand Down

0 comments on commit 424fa9a

Please sign in to comment.