diff --git a/Cargo.lock b/Cargo.lock index aa4ad7ab4f..c7aeea98e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,14 +16,15 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" dependencies = [ "cfg-if", "getrandom", "once_cell", "version_check", + "zerocopy", ] [[package]] @@ -1010,6 +1011,7 @@ dependencies = [ "linkerd-app-core", "linkerd-app-test", "linkerd-http-access-log", + "linkerd-http-metrics", "linkerd-idle-cache", "linkerd-io", "linkerd-meshtls", @@ -3341,3 +3343,23 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" dependencies = [ "winapi", ] + +[[package]] +name = "zerocopy" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a7af71d8643341260a65f89fa60c0eeaa907f34544d8f6d9b0df72f069b5e74" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9731702e2f0617ad526794ae28fbc6f6ca8849b5ba729666c2a5bc4b6ddee2cd" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.12", +] diff --git a/deny.toml b/deny.toml index 7fc5921892..82a7a4b867 100644 --- a/deny.toml +++ b/deny.toml @@ -17,6 +17,7 @@ ignore = [] unlicensed = "deny" allow = [ "Apache-2.0", + "BSD-2-Clause", "BSD-3-Clause", "ISC", "MIT", @@ -27,13 +28,21 @@ allow-osi-fsf-free = "neither" default = "deny" confidence-threshold = 0.8 exceptions = [ - { allow = ["Zlib"], name = "adler32", version = "*" }, - { allow = ["ISC", "MIT", "OpenSSL"], name = "ring", version = "*" }, + { allow = [ + "Zlib", + ], name = "adler32", version = "*" }, + { allow = [ + "ISC", + "MIT", + "OpenSSL", + ], name = "ring", version = "*" }, # The Unicode-DFS-2016 license is necessary for unicode-ident because they # use data from the unicode tables to generate the tables which are # included in the application. We do not distribute those data files so # this is not a problem for us. See https://github.com/dtolnay/unicode-ident/pull/9/files - { allow = ["Unicode-DFS-2016"], name = "unicode-ident", version = "*"}, + { allow = [ + "Unicode-DFS-2016", + ], name = "unicode-ident", version = "*" }, ] [[licenses.clarify]] diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 571f8915e0..f703155e63 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -7,7 +7,9 @@ use linkerd_app_core::{ serve, svc::{self, ExtractParam, InsertParam, Param}, tls, trace, - transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{ + self, addrs::AddrPair, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr, + }, Error, Result, }; use linkerd_app_inbound as inbound; @@ -84,7 +86,9 @@ impl Config { where R: FmtMetrics + Clone + Send + Sync + Unpin + 'static, B: Bind, - B::Addrs: svc::Param> + svc::Param>, + B::Addrs: svc::Param>, + B::Addrs: svc::Param>, + B::Addrs: svc::Param, { let (listen_addr, listen) = bind.bind(&self.server)?; @@ -95,6 +99,7 @@ impl Config { let admin = crate::server::Admin::new(report, ready, shutdown, trace); let admin = svc::stack(move |_| admin.clone()) .push(metrics.proxy.http_endpoint.to_layer::()) + .push(classify::NewClassify::layer_default()) .push_map_target(|(permit, http)| Permitted { permit, http }) .push(inbound::policy::NewHttpPolicy::layer(metrics.http_authz.clone())) .push(Rescue::layer()) @@ -201,6 +206,14 @@ impl Param> for Http { } } +impl Param for Http { + fn param(&self) -> AddrPair { + let Remote(client) = self.tcp.client; + let Local(server) = self.tcp.addr; + AddrPair(client, server) + } +} + impl Param for Http { fn param(&self) -> tls::ConditionalServerTls { self.tcp.tls.clone() diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 9591661071..45b472d221 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -124,6 +124,7 @@ impl Config { .lift_new() .push(self::balance::layer(dns, resolve_backoff)) .push(metrics.to_layer::()) + .push(classify::NewClassify::layer_default()) // This buffer allows a resolver client to be shared across stacks. // No load shed is applied here, however, so backpressure may leak // into the caller task. diff --git a/linkerd/app/core/src/serve.rs b/linkerd/app/core/src/serve.rs index 0c8a593ade..32d4547952 100644 --- a/linkerd/app/core/src/serve.rs +++ b/linkerd/app/core/src/serve.rs @@ -1,11 +1,11 @@ use crate::{ io, is_caused_by, svc::{self, Param}, - transport::{ClientAddr, Remote}, Result, }; use futures::prelude::*; use linkerd_error::Error; +use linkerd_proxy_transport::AddrPair; use tower::util::ServiceExt; use tracing::{debug, debug_span, info, instrument::Instrument, warn}; @@ -18,7 +18,7 @@ pub async fn serve( shutdown: impl Future, ) where I: Send + 'static, - A: Param>, + A: Param, M: svc::NewService, S: tower::Service, Response = ()> + Send + 'static, S::Error: Into, @@ -40,8 +40,8 @@ pub async fn serve( }; // The local addr should be instrumented from the listener's context. - let Remote(ClientAddr(client_addr)) = addrs.param(); - let span = debug_span!("accept", client.addr = %client_addr).entered(); + let AddrPair(client_addr, server_addr) = addrs.param(); + let span = debug_span!("accept", client.addr = %client_addr, server.addr = %server_addr).entered(); let accept = new_accept.new_service(addrs); // Dispatch all of the work for a given connection onto a @@ -57,10 +57,20 @@ pub async fn serve( { Ok(()) => debug!("Connection closed"), Err(reason) if is_caused_by::(&*reason) => { - debug!(%reason, "Connection closed") + debug!( + reason, + client.addr = %client_addr, + server.addr = %server_addr, + "Connection closed" + ); } Err(error) => { - info!(error, client.addr = %client_addr, "Connection closed") + info!( + error, + client.addr = %client_addr, + server.addr = %server_addr, + "Connection closed" + ); } } // Hold the service until the connection is complete. This diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index 8f74e9635f..c76c1b229d 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -52,6 +52,7 @@ libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] } [dev-dependencies] hyper = { version = "0.14", features = ["http1", "http2"] } linkerd-app-test = { path = "../test" } +linkerd-http-metrics = { path = "../../http-metrics", features = ["test-util"] } linkerd-idle-cache = { path = "../../idle-cache", features = ["test-util"] } linkerd-io = { path = "../../io", features = ["tokio-test"] } linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] } diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 9e895e04f9..a234b86bac 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -148,7 +148,8 @@ impl Inbound { // Attempts to discover a service profile for each logical target (as // informed by the request's headers). The stack is cached until a // request has not been received for `cache_max_idle_age`. - let router = http.clone() + let router = http + .clone() .check_new_service::>() .push_map_target(|p: Profile| p.logical) .push(profiles::http::NewProxyRouter::layer( @@ -164,6 +165,7 @@ impl Inbound { .to_layer::(), ) .push_on_service(http::BoxResponse::layer()) + // Configure a per-route response classifier based on the profile. .push(classify::NewClassify::layer()) .push_http_insert_target::() .push_map_target(|(route, profile)| ProfileRoute { route, profile }) @@ -186,10 +188,7 @@ impl Inbound { } Ok(svc::Either::B(logical)) }, - http.clone() - .push_on_service(svc::MapErr::layer(Error::from)) - .check_new_service::>() - .into_inner(), + http.clone().into_inner(), ) .check_new_service::<(Option, Logical), http::Request<_>>(); @@ -229,8 +228,7 @@ impl Inbound { // Skip the profile stack if it takes too long to become ready. .push_when_unready(config.profile_skip_timeout, http.into_inner()) .push_on_service( - svc::layers() - .push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical"))) + rt.metrics.proxy.stack.layer(stack_labels("http", "logical")), ) .push(svc::NewQueue::layer_via(config.http_request_queue)) .push_new_idle_cached(config.discovery_idle_timeout) @@ -239,6 +237,9 @@ impl Inbound { .push(http::Retain::layer()) .push(http::BoxResponse::layer()), ) + // Configure default response classification early. It may be + // overridden by profile routes above. + .push(classify::NewClassify::layer_default()) .check_new_service::>() .instrument(|t: &Logical| { let name = t.logical.as_ref().map(tracing::field::display); @@ -414,12 +415,6 @@ impl Param for Logical { } } -impl Param for Logical { - fn param(&self) -> classify::Request { - classify::Request::default() - } -} - impl tap::Inspect for Logical { fn src_addr(&self, req: &http::Request) -> Option { req.extensions().get::>().map(|a| **a) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index cf55c435bc..d21f005af5 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -6,10 +6,11 @@ use crate::{ }, Config, Inbound, }; -use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response}; +use hyper::{body::HttpBody, client::conn::Builder as ClientBuilder, Body, Request, Response}; use linkerd_app_core::{ + classify, errors::respond::L5D_PROXY_ERROR, - identity, io, + identity, io, metrics, proxy::http, svc::{self, NewService, Param}, tls, @@ -19,6 +20,7 @@ use linkerd_app_core::{ use linkerd_app_test::connect::ConnectFuture; use linkerd_tracing::test::trace_init; use std::{net::SocketAddr, sync::Arc}; +use tokio::time; use tracing::Instrument; fn build_server( @@ -469,6 +471,84 @@ async fn grpc_unmeshed_response_error_header() { let _ = bg.await; } +#[tokio::test(flavor = "current_thread")] +async fn grpc_response_class() { + let _trace = trace_init(); + + // Build a mock connector serves a gRPC server that returns errors. + let connect = { + let mut server = hyper::server::conn::Http::new(); + server.http2_only(true); + support::connect().endpoint_fn_boxed( + Target::addr(), + grpc_status_server(server, tonic::Code::Unknown), + ) + }; + + // Build a client using the connect that always errors. + let mut client = ClientBuilder::new(); + client.http2_only(true); + let profiles = profile::resolver(); + let profile_tx = + profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); + profile_tx.send(profile::Profile::default()).unwrap(); + let cfg = default_config(); + let (rt, _shutdown) = runtime(); + let metrics = rt + .metrics + .clone() + .http_endpoint + .into_report(time::Duration::from_secs(3600)); + let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); + let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + + // Send a request and assert that it is OK with the expected header + // message. + let req = Request::builder() + .method(http::Method::POST) + .uri("http://foo.svc.cluster.local:5550") + .header(http::header::CONTENT_TYPE, "application/grpc") + .body(Body::default()) + .unwrap(); + + let mut response = http_util::http_request(&mut client, req).await.unwrap(); + assert_eq!(response.status(), http::StatusCode::OK); + + response.body_mut().data().await; + let trls = response.body_mut().trailers().await.unwrap().unwrap(); + assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2"); + + let response_total = metrics + .get_response_total( + &metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels { + tls: Target::meshed_h2().1, + authority: Some("foo.svc.cluster.local:5550".parse().unwrap()), + target_addr: "127.0.0.1:80".parse().unwrap(), + policy: metrics::RouteAuthzLabels { + route: metrics::RouteLabels { + server: metrics::ServerLabel(Arc::new(policy::Meta::Resource { + group: "policy.linkerd.io".into(), + kind: "server".into(), + name: "testsrv".into(), + })), + route: policy::Meta::new_default("default"), + }, + authz: Arc::new(policy::Meta::Resource { + group: "policy.linkerd.io".into(), + kind: "serverauthorization".into(), + name: "testsaz".into(), + }), + }, + }), + Some(http::StatusCode::OK), + &classify::Class::Grpc(Err(tonic::Code::Unknown)), + ) + .expect("response_total not found"); + assert_eq!(response_total, 1.0); + + drop((client, bg)); +} + #[tracing::instrument] fn hello_server( http: hyper::server::conn::Http, @@ -490,6 +570,42 @@ fn hello_server( } } +#[tracing::instrument] +fn grpc_status_server( + http: hyper::server::conn::Http, + status: tonic::Code, +) -> impl Fn(Remote) -> io::Result { + move |endpoint| { + let span = tracing::info_span!("grpc_status_server", ?endpoint); + let _e = span.enter(); + tracing::info!("mock connecting"); + let (client_io, server_io) = support::io::duplex(4096); + tokio::spawn( + http.serve_connection( + server_io, + hyper::service::service_fn(move |request: Request| async move { + tracing::info!(?request); + let (mut tx, rx) = Body::channel(); + tokio::spawn(async move { + let mut trls = ::http::HeaderMap::new(); + trls.insert("grpc-status", (status as u32).to_string().parse().unwrap()); + tx.send_trailers(trls).await + }); + Ok::<_, io::Error>( + http::Response::builder() + .version(::http::Version::HTTP_2) + .header("content-type", "application/grpc") + .body(rx) + .unwrap(), + ) + }), + ) + .in_current_span(), + ); + Ok(io::BoxedIo::new(client_io)) + } +} + #[tracing::instrument] fn connect_error() -> impl Fn(Remote) -> io::Result { move |_| { @@ -523,7 +639,7 @@ fn connect_timeout( struct Target(http::Version, tls::ConditionalServerTls); #[track_caller] -fn check_error_header(hdrs: &hyper::header::HeaderMap, expected: &str) { +fn check_error_header(hdrs: &::http::HeaderMap, expected: &str) { let message = hdrs .get(L5D_PROXY_ERROR) .expect("response did not contain l5d-proxy-error header") diff --git a/linkerd/app/inbound/src/server.rs b/linkerd/app/inbound/src/server.rs index 3920e71cb6..d9b18c3fac 100644 --- a/linkerd/app/inbound/src/server.rs +++ b/linkerd/app/inbound/src/server.rs @@ -5,7 +5,7 @@ use linkerd_app_core::{ io, profiles, proxy::http, serve, svc, - transport::{self, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{self, addrs::*}, Error, Result, }; use std::{fmt::Debug, sync::Arc}; @@ -43,7 +43,10 @@ impl Inbound<()> { profiles: P, gateway: G, ) where - A: svc::Param> + svc::Param + Clone + Send + Sync + 'static, + A: svc::Param>, + A: svc::Param, + A: svc::Param, + A: Clone + Send + Sync + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Unpin + Send + Sync + 'static, G: svc::NewService, diff --git a/linkerd/app/outbound/src/http/endpoint/tests.rs b/linkerd/app/outbound/src/http/endpoint/tests.rs index 461ea2e913..863e4a1147 100644 --- a/linkerd/app/outbound/src/http/endpoint/tests.rs +++ b/linkerd/app/outbound/src/http/endpoint/tests.rs @@ -27,6 +27,8 @@ async fn http11_forward() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { @@ -61,6 +63,8 @@ async fn http2_forward() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { @@ -97,6 +101,8 @@ async fn orig_proto_upgrade() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { @@ -146,6 +152,7 @@ async fn orig_proto_skipped_on_http_upgrade() { .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() .into_stack() + .push(classify::NewClassify::layer_default()) .push_on_service(http::BoxRequest::layer()) // We need the server-side upgrade layer to annotate the request so that the client // knows that an HTTP upgrade is in progress. @@ -192,6 +199,8 @@ async fn orig_proto_http2_noop() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { diff --git a/linkerd/app/outbound/src/http/retry.rs b/linkerd/app/outbound/src/http/retry.rs index b61ddf3722..625ff19648 100644 --- a/linkerd/app/outbound/src/http/retry.rs +++ b/linkerd/app/outbound/src/http/retry.rs @@ -131,6 +131,10 @@ where clone.extensions_mut().insert(client_handle); } + if let Some(classify) = req.extensions().get::().cloned() { + clone.extensions_mut().insert(classify); + } + Some(clone) } } diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 8c9e7939e2..44046e0140 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -230,7 +230,7 @@ impl Outbound<()> { resolve: R, ) where // Target describing a server-side connection. - T: svc::Param>, + T: svc::Param, T: svc::Param, T: Clone + Send + Sync + 'static, // Server-side socket. diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 22bff9a62e..70713ba0b7 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -22,7 +22,7 @@ use linkerd_app_core::{ metrics::FmtMetrics, svc::Param, telemetry, - transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{addrs::*, listen::Bind}, Error, ProxyRuntime, }; use linkerd_app_gateway as gateway; @@ -102,11 +102,17 @@ impl Config { ) -> Result where BIn: Bind + 'static, - BIn::Addrs: Param> + Param> + Param, + BIn::Addrs: Param> + + Param> + + Param + + Param, BOut: Bind + 'static, - BOut::Addrs: Param> + Param> + Param, + BOut::Addrs: Param> + + Param> + + Param + + Param, BAdmin: Bind + Clone + 'static, - BAdmin::Addrs: Param> + Param>, + BAdmin::Addrs: Param> + Param> + Param, { let Config { admin, diff --git a/linkerd/app/src/tap.rs b/linkerd/app/src/tap.rs index 81f4beeffb..880d0aebac 100644 --- a/linkerd/app/src/tap.rs +++ b/linkerd/app/src/tap.rs @@ -6,7 +6,7 @@ use linkerd_app_core::{ serve, svc::{self, ExtractParam, InsertParam, Param}, tls, - transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, + transport::{addrs::AddrPair, listen::Bind, ClientAddr, Local, Remote, ServerAddr}, Error, }; use std::{collections::HashSet, pin::Pin}; @@ -47,6 +47,7 @@ impl Config { where B: Bind, B::Addrs: Param>, + B::Addrs: Param, { let (registry, server) = tap::new(); match self { diff --git a/linkerd/http-metrics/Cargo.toml b/linkerd/http-metrics/Cargo.toml index 61c7eeda72..3c22bc329e 100644 --- a/linkerd/http-metrics/Cargo.toml +++ b/linkerd/http-metrics/Cargo.toml @@ -6,6 +6,9 @@ license = "Apache-2.0" edition = "2021" publish = false +[features] +test-util = [] + [dependencies] bytes = "1" futures = { version = "0.3", default-features = false } diff --git a/linkerd/http-metrics/src/lib.rs b/linkerd/http-metrics/src/lib.rs index c28376a0b8..c33922b845 100644 --- a/linkerd/http-metrics/src/lib.rs +++ b/linkerd/http-metrics/src/lib.rs @@ -25,6 +25,22 @@ where include_latencies: bool, } +#[cfg(feature = "test-util")] +impl Report> { + pub fn get_response_total( + &self, + labels: &T, + status: Option, + class: &C, + ) -> Option { + let registry = self.registry.lock(); + let requests = registry.get(labels)?.lock(); + let status = requests.by_status().get(&status)?; + let class = status.by_class().get(class)?; + Some(class.total()) + } +} + impl Clone for Report { fn clone(&self) -> Self { Self { diff --git a/linkerd/http-metrics/src/requests.rs b/linkerd/http-metrics/src/requests.rs index ca563e6cfa..7a0df94b87 100644 --- a/linkerd/http-metrics/src/requests.rs +++ b/linkerd/http-metrics/src/requests.rs @@ -28,7 +28,7 @@ where } #[derive(Debug)] -struct StatusMetrics +pub struct StatusMetrics where C: Hash + Eq, { @@ -87,6 +87,17 @@ impl Default for Metrics { } } +#[cfg(feature = "test-util")] +impl Metrics { + pub fn total(&self) -> &Counter { + &self.total + } + + pub fn by_status(&self) -> &HashMap, StatusMetrics> { + &self.by_status + } +} + impl LastUpdate for Metrics { fn last_update(&self) -> Instant { self.last_update @@ -105,6 +116,20 @@ where } } +#[cfg(feature = "test-util")] +impl StatusMetrics { + pub fn by_class(&self) -> &HashMap { + &self.by_class + } +} + +#[cfg(feature = "test-util")] +impl ClassMetrics { + pub fn total(&self) -> f64 { + self.total.value() + } +} + #[cfg(test)] mod tests { #[test] diff --git a/linkerd/http-metrics/src/requests/service.rs b/linkerd/http-metrics/src/requests/service.rs index 71a75c5860..279117f3b8 100644 --- a/linkerd/http-metrics/src/requests/service.rs +++ b/linkerd/http-metrics/src/requests/service.rs @@ -96,7 +96,7 @@ where impl Clone for HttpMetrics where S: Clone, - C: ClassifyResponse + Clone + Default + Send + Sync + 'static, + C: ClassifyResponse + Clone + Send + Sync + 'static, C::Class: Hash + Eq, { fn clone(&self) -> Self { @@ -108,6 +108,16 @@ where } } +#[inline] +fn classify_unwrap_if_debug_else_default(req: &http::Request) -> C +where + C: Clone + Default + Send + Sync + 'static, +{ + let c = req.extensions().get::().cloned(); + debug_assert!(c.is_some(), "request must have response classifier"); + c.unwrap_or_default() +} + impl Proxy, S> for HttpMetrics where P: Proxy>, S, Response = http::Response>, @@ -143,10 +153,8 @@ where http::Request::from_parts(head, body) }; - let classify = req.extensions().get::().cloned().unwrap_or_default(); - ResponseFuture { - classify: Some(classify), + classify: Some(classify_unwrap_if_debug_else_default(&req)), metrics: self.metrics.clone(), stream_open_at: Instant::now(), inner: self.inner.proxy(svc, req), @@ -167,6 +175,7 @@ where type Error = Error; type Future = ResponseFuture; + #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } @@ -192,10 +201,8 @@ where http::Request::from_parts(head, body) }; - let classify = req.extensions().get::().cloned().unwrap_or_default(); - ResponseFuture { - classify: Some(classify), + classify: Some(classify_unwrap_if_debug_else_default(&req)), metrics: self.metrics.clone(), stream_open_at: Instant::now(), inner: self.inner.call(req), diff --git a/linkerd/proxy/http/src/classify/insert.rs b/linkerd/proxy/http/src/classify/insert.rs index ba67096dea..958f507863 100644 --- a/linkerd/proxy/http/src/classify/insert.rs +++ b/linkerd/proxy/http/src/classify/insert.rs @@ -1,4 +1,4 @@ -use linkerd_stack::{layer, ExtractParam, NewService, Proxy, Service}; +use linkerd_stack::{layer, CloneParam, ExtractParam, NewService, Proxy, Service}; use std::{ marker::PhantomData, task::{Context, Poll}, @@ -33,6 +33,12 @@ impl NewInsertClassifyResponse { } } +impl NewInsertClassifyResponse, N> { + pub fn layer_default() -> impl layer::Layer + Clone { + Self::layer_via(CloneParam::from(C::default())) + } +} + impl NewService for NewInsertClassifyResponse where C: super::Classify, @@ -61,8 +67,9 @@ where fn proxy(&self, svc: &mut S, mut req: http::Request) -> Self::Future { let classify_rsp = self.classify.classify(&req); - let prior = req.extensions_mut().insert(classify_rsp); - debug_assert!(prior.is_none(), "classification extension already existed"); + if req.extensions_mut().insert(classify_rsp).is_some() { + tracing::debug!("Overrode response classifier"); + } self.inner.proxy(svc, req) } } @@ -83,8 +90,9 @@ where fn call(&mut self, mut req: http::Request) -> Self::Future { let classify_rsp = self.classify.classify(&req); - let prior = req.extensions_mut().insert(classify_rsp); - debug_assert!(prior.is_none(), "classification extension already existed"); + if req.extensions_mut().insert(classify_rsp).is_some() { + tracing::debug!("Overrode response classifier"); + } self.inner.call(req) } } diff --git a/linkerd/proxy/transport/src/addrs.rs b/linkerd/proxy/transport/src/addrs.rs index 19bab4801e..c9eca2f0ba 100644 --- a/linkerd/proxy/transport/src/addrs.rs +++ b/linkerd/proxy/transport/src/addrs.rs @@ -28,6 +28,10 @@ pub struct Local(pub T); #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct Remote(pub T); +/// Describes a connection from a client to a server. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub struct AddrPair(pub ClientAddr, pub ServerAddr); + // === impl ClientAddr === impl std::ops::Deref for ClientAddr { diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index 0713942a6b..c463ad84c6 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -15,7 +15,7 @@ pub mod listen; pub mod orig_dst; pub use self::{ - addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, + addrs::{AddrPair, ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, connect::ConnectTcp, listen::{Bind, BindTcp}, orig_dst::BindWithOrigDst, diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index fd70ec57cc..4addd907d6 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -104,3 +104,12 @@ impl Param> for Addrs { self.server } } + +impl Param for Addrs { + #[inline] + fn param(&self) -> AddrPair { + let Remote(client) = self.client; + let Local(server) = self.server; + AddrPair(client, server) + } +} diff --git a/linkerd/proxy/transport/src/orig_dst.rs b/linkerd/proxy/transport/src/orig_dst.rs index bc2232c56f..6b1a4c480f 100644 --- a/linkerd/proxy/transport/src/orig_dst.rs +++ b/linkerd/proxy/transport/src/orig_dst.rs @@ -23,6 +23,7 @@ pub struct Addrs { // === impl Addrs === impl Param for Addrs { + #[inline] fn param(&self) -> OrigDstAddr { self.orig_dst } @@ -32,11 +33,23 @@ impl Param> for Addrs where A: Param>, { + #[inline] fn param(&self) -> Remote { self.inner.param() } } +impl Param for Addrs +where + A: Param>, +{ + #[inline] + fn param(&self) -> AddrPair { + let Remote(client) = self.inner.param(); + AddrPair(client, ServerAddr(self.orig_dst.into())) + } +} + impl Param> for Addrs where A: Param>,