Skip to content

Commit

Permalink
chore(deps): address hyper deprecations in policy controller
Browse files Browse the repository at this point in the history
NB: this branch is based upon #13492. see #8733 for more information
about migrating to hyper 1.0.

this enables the `backports` and `deprecated` feature flags in the hyper
dependencies in this project, and addresses warnings. see
<https://hyper.rs/guides/1/upgrading/> for more information about these
feature flags.

largely, the control plane is unaffected by this upgrade, besides the
following changes:

* one usage of a deprecated `hyper::body::aggregate` function is
  updated.

* a `hyper::rt::Executor<E>` implementation, which spawns tasks onto the
  tokio runtime, is provided. once we upgrade to hyper 1.0, we can
  replace this with the executor provided in
  [`hyper-util`](https://docs.rs/hyper-util/latest/hyper_util/rt/tokio/struct.TokioExecutor.html#impl-Executor%3CFut%3E-for-TokioExecutor).

* the `hyper::service::Service<hyper::Request<tonic::body::BoxBody>>`
  implementation for `GrpcHttp` now boxes its returned future, on
  account of `SendRequest` returning an anonymous
  `impl Future<Output = ...>`.

* the `policy-test` additionally depends on the `runtime` feature of
  hyper. this is an artifact of an internal config structure shared by
  the legacy connection builder and the backported connection builder
  containing two keep-alive fields that were feature gated prior to 1.0.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Dec 18, 2024
1 parent f5b4d45 commit f6468a9
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,7 @@ dependencies = [
"clap",
"drain",
"futures",
"http-body",
"hyper",
"ipnet",
"k8s-openapi",
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-trait = "0.1"
http = "0.2"
drain = "0.1"
futures = { version = "0.3", default-features = false }
hyper = { version = "0.14", features = ["http2", "server", "tcp"] }
hyper = { version = "0.14", features = ["backports", "deprecated", "http2", "server", "tcp"] }
linkerd-policy-controller-core = { path = "../core" }
maplit = "1"
prost-types = "0.12.6"
Expand Down
3 changes: 2 additions & 1 deletion policy-controller/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ async-trait = "0.1"
drain = "0.1"
futures = { version = "0.3", default-features = false }
k8s-openapi = { workspace = true }
hyper = { version = "0.14", features = ["http1", "http2", "runtime", "server"] }
http-body = "0.4"
hyper = { version = "0.14", features = ["backports", "deprecated", "http1", "http2", "runtime", "server"] }
ipnet = { version = "2", default-features = false }
openssl = { version = "0.10.68", optional = true }
parking_lot = "0.12"
Expand Down
3 changes: 2 additions & 1 deletion policy-controller/runtime/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ impl hyper::service::Service<Request<Body>> for Admission {

let admission = self.clone();
Box::pin(async move {
let bytes = hyper::body::aggregate(req.into_body()).await?;
use http_body::Body as _;
let bytes = req.into_body().collect().await?.aggregate();
let review: Review = match serde_json::from_reader(bytes.reader()) {
Ok(review) => review,
Err(error) => {
Expand Down
2 changes: 1 addition & 1 deletion policy-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false

[dependencies]
anyhow = "1"
hyper = { version = "0.14", features = ["client", "http2"] }
hyper = { version = "0.14", features = ["backports", "deprecated", "client", "http2", "runtime"] }
futures = { version = "0.3", default-features = false }
ipnet = "2"
k8s-gateway-api = "0.16"
Expand Down
17 changes: 11 additions & 6 deletions policy-test/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
//! forwarding to connect to a running instance.
use anyhow::Result;
pub use linkerd2_proxy_api::*;
use linkerd2_proxy_api::{
inbound::inbound_server_policies_client::InboundServerPoliciesClient,
outbound::outbound_policies_client::OutboundPoliciesClient,
};
use linkerd_policy_controller_grpc::workload;
use linkerd_policy_controller_k8s_api::{self as k8s, ResourceExt};
use std::{future::Future, pin::Pin};
use tokio::io;

pub use linkerd2_proxy_api::*;

#[macro_export]
macro_rules! assert_is_default_all_unauthenticated {
($config:expr) => {
Expand Down Expand Up @@ -105,7 +107,7 @@ pub struct OutboundPolicyClient {

#[derive(Debug)]
struct GrpcHttp {
tx: hyper::client::conn::SendRequest<tonic::body::BoxBody>,
tx: hyper::client::conn::http2::SendRequest<tonic::body::BoxBody>,
}

async fn get_policy_controller_pod(client: &kube::Client) -> Result<String> {
Expand Down Expand Up @@ -338,8 +340,7 @@ impl GrpcHttp {
where
I: io::AsyncRead + io::AsyncWrite + Unpin + Send + 'static,
{
let (tx, conn) = hyper::client::conn::Builder::new()
.http2_only(true)
let (tx, conn) = hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor)
.handshake(io)
.await?;
tokio::spawn(conn);
Expand All @@ -350,7 +351,7 @@ impl GrpcHttp {
impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp {
type Response = hyper::Response<hyper::Body>;
type Error = hyper::Error;
type Future = hyper::client::conn::ResponseFuture;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

fn poll_ready(
&mut self,
Expand All @@ -360,6 +361,8 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
}

fn call(&mut self, req: hyper::Request<tonic::body::BoxBody>) -> Self::Future {
use futures::FutureExt;

let (mut parts, body) = req.into_parts();

let mut uri = parts.uri.into_parts();
Expand All @@ -371,7 +374,9 @@ impl hyper::service::Service<hyper::Request<tonic::body::BoxBody>> for GrpcHttp
);
parts.uri = hyper::Uri::from_parts(uri).unwrap();

self.tx.call(hyper::Request::from_parts(parts, body))
self.tx
.send_request(hyper::Request::from_parts(parts, body))
.boxed()
}
}

Expand Down
2 changes: 2 additions & 0 deletions policy-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub mod grpc;
pub mod outbound_api;
pub mod web;

mod rt;

use kube::runtime::wait::Condition;
use linkerd_policy_controller_k8s_api::{
self as k8s,
Expand Down
18 changes: 18 additions & 0 deletions policy-test/src/rt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! HTTP runtime components for Linkerd.
use hyper::rt::Executor;
use std::future::Future;

#[derive(Clone, Debug, Default)]
pub struct TokioExecutor;

impl<F> Executor<F> for TokioExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[inline]
fn execute(&self, f: F) {
tokio::spawn(f);
}
}

0 comments on commit f6468a9

Please sign in to comment.