diff --git a/linkerd/app/core/src/config.rs b/linkerd/app/core/src/config.rs index 2ea8426f85..fbe7f7993a 100644 --- a/linkerd/app/core/src/config.rs +++ b/linkerd/app/core/src/config.rs @@ -13,6 +13,17 @@ pub struct ServerConfig { pub h2_settings: h2::Settings, } +#[derive(Clone, Debug)] +pub struct BufferConfig { + /// The number of requests (or connections, depending on the context) that + /// may be buffered + pub capacity: usize, + + /// The maximum amount of time a request may be buffered before failfast + /// errors are emitted. + pub failfast_timeout: Duration, +} + #[derive(Clone, Debug)] pub struct ConnectConfig { pub backoff: ExponentialBackoff, @@ -26,9 +37,6 @@ pub struct ConnectConfig { pub struct ProxyConfig { pub server: ServerConfig, pub connect: ConnectConfig, - pub buffer_capacity: usize, - pub cache_max_idle_age: Duration, - pub dispatch_timeout: Duration, pub max_in_flight_requests: usize, pub detect_protocol_timeout: Duration, } diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 3a1b3269dc..138935df90 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -12,7 +12,7 @@ use tracing::warn; pub struct Config { pub addr: ControlAddr, pub connect: config::ConnectConfig, - pub buffer_capacity: usize, + pub buffer: config::BufferConfig, } #[derive(Clone, Debug)] @@ -95,7 +95,7 @@ impl Config { .into_new_service() .push(metrics.to_layer::()) .push(self::add_origin::layer()) - .push_on_service(svc::layers().push_spawn_buffer(self.buffer_capacity)) + .push_buffer_on_service("Controller client", &self.buffer) .instrument(|c: &ControlAddr| tracing::info_span!("controller", addr = %c.addr)) .push_map_target(move |()| addr.clone()) .push(svc::ArcNewService::layer()) diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 8a86c3e3f5..7d27b78511 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -1,7 +1,7 @@ // Possibly unused, but useful during development. pub use crate::proxy::http; -use crate::{cache, Error}; +use crate::{cache, config::BufferConfig, Error}; use linkerd_error::Recover; use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; pub use linkerd_reconnect::NewReconnect; @@ -11,12 +11,14 @@ pub use linkerd_stack::{ NewRouter, NewService, Param, Predicate, UnwrapOr, }; pub use linkerd_stack_tracing::{GetSpan, NewInstrument, NewInstrumentLayer}; +use stack::OnService; use std::{ + marker::PhantomData, task::{Context, Poll}, time::Duration, }; use tower::{ - buffer::{Buffer as TowerBuffer, BufferLayer}, + buffer::Buffer as TowerBuffer, layer::util::{Identity, Stack as Pair}, make::MakeService, }; @@ -30,6 +32,13 @@ pub struct AlwaysReconnect(ExponentialBackoff); pub type Buffer = TowerBuffer, Req>; +pub struct BufferLayer { + name: &'static str, + capacity: usize, + failfast_timeout: Duration, + _marker: PhantomData, +} + pub type BoxHttp = BoxService, http::Response, Error>; @@ -80,15 +89,15 @@ impl Layers { } /// Buffers requests in an mpsc, spawning the inner service onto a dedicated task. - pub fn push_spawn_buffer( + pub fn push_buffer( self, - capacity: usize, - ) -> Layers>, BufferLayer>> + name: &'static str, + config: &BufferConfig, + ) -> Layers>> where Req: Send + 'static, { - self.push(BoxServiceLayer::new()) - .push(BufferLayer::new(capacity)) + self.push(buffer(name, config)) } pub fn push_on_service(self, layer: U) -> Layers>> { @@ -152,22 +161,6 @@ impl Stack { self.push(NewReconnect::layer(AlwaysReconnect(backoff))) } - /// Buffer requests when when the next layer is out of capacity. - pub fn spawn_buffer( - self, - capacity: usize, - ) -> Stack> - where - Req: Send + 'static, - S: Service + Send + 'static, - S::Response: Send + 'static, - S::Error: Into + Send + Sync + 'static, - S::Future: Send, - { - self.push(BoxServiceLayer::new()) - .push(BufferLayer::new(capacity)) - } - /// Assuming `S` implements `NewService` or `MakeService`, applies the given /// `L`-typed layer on each service produced by `S`. pub fn push_on_service(self, layer: L) -> Stack> { @@ -202,6 +195,18 @@ impl Stack { self.push(http::insert::NewResponseInsert::layer()) } + /// Buffers requests in an mpsc, spawning the inner service onto a dedicated task. + pub fn push_buffer_on_service( + self, + name: &'static str, + config: &BufferConfig, + ) -> Stack, S>> + where + Req: Send + 'static, + { + self.push_on_service(buffer(name, config)) + } + pub fn push_cache(self, idle: Duration) -> Stack> where T: Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static, @@ -381,3 +386,40 @@ impl> Recover for AlwaysReconnect { Ok(self.0.stream()) } } + +// === impl BufferLayer === + +fn buffer(name: &'static str, config: &BufferConfig) -> BufferLayer { + BufferLayer { + name, + capacity: config.capacity, + failfast_timeout: config.failfast_timeout, + _marker: PhantomData, + } +} + +impl Layer for BufferLayer +where + Req: Send + 'static, + S: Service + Send + 'static, + S::Future: Send, +{ + type Service = Buffer; + + fn layer(&self, inner: S) -> Self::Service { + let spawn_ready = SpawnReady::new(inner); + let fail_fast = FailFast::layer(self.name, self.failfast_timeout).layer(spawn_ready); + Buffer::new(BoxService::new(fail_fast), self.capacity) + } +} + +impl Clone for BufferLayer { + fn clone(&self) -> Self { + Self { + capacity: self.capacity, + name: self.name, + failfast_timeout: self.failfast_timeout, + _marker: self._marker, + } + } +} diff --git a/linkerd/app/gateway/src/lib.rs b/linkerd/app/gateway/src/lib.rs index ceabd09692..ac44d27dd3 100644 --- a/linkerd/app/gateway/src/lib.rs +++ b/linkerd/app/gateway/src/lib.rs @@ -7,7 +7,6 @@ mod tests; use self::gateway::NewGateway; use linkerd_app_core::{ - config::ProxyConfig, identity, io, metrics, profiles::{self, DiscoveryRejected}, proxy::{ @@ -81,12 +80,7 @@ where R::Resolution: Send, R::Future: Send + Unpin, { - let ProxyConfig { - buffer_capacity, - cache_max_idle_age, - dispatch_timeout, - .. - } = inbound.config().proxy.clone(); + let inbound_config = inbound.config().clone(); let local_id = identity::LocalId(inbound.identity().name().clone()); // For each gatewayed connection that is *not* HTTP, use the target from the @@ -158,11 +152,9 @@ where .stack .layer(metrics::StackLabels::inbound("tcp", "gateway")), ) - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer("TCP Gateway", dispatch_timeout)) - .push_spawn_buffer(buffer_capacity), + .push_buffer("TCP Gateway", &outbound.config().tcp_connection_buffer), ) - .push_cache(cache_max_idle_age) + .push_cache(outbound.config().discovery_idle_timeout) .check_new_service::(); // Cache an HTTP gateway service for each destination and HTTP version. @@ -194,11 +186,9 @@ where .stack .layer(metrics::StackLabels::inbound("http", "gateway")), ) - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer("Gateway", dispatch_timeout)) - .push_spawn_buffer(buffer_capacity), + .push_buffer("Gateway", &inbound_config.http_request_buffer), ) - .push_cache(cache_max_idle_age) + .push_cache(inbound_config.discovery_idle_timeout) .push_on_service( svc::layers() .push(http::Retain::layer()) diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index befe204820..f331e42431 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -200,23 +200,14 @@ impl Inbound { Ok(profiles::LookupAddr(addr.into())) })) .instrument(|_: &Logical| debug_span!("profile")) - .push_on_service(svc::layer::mk(svc::SpawnReady::new)) // Skip the profile stack if it takes too long to become ready. - .push_when_unready( - config.profile_idle_timeout, - http.push_on_service(svc::layer::mk(svc::SpawnReady::new)) - .into_inner(), - ) + .push_when_unready(config.profile_skip_timeout, http.into_inner()) .push_on_service( svc::layers() .push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical"))) - .push(svc::FailFast::layer( - "HTTP Logical", - config.proxy.dispatch_timeout, - )) - .push_spawn_buffer(config.proxy.buffer_capacity), + .push_buffer("HTTP Logical", &config.http_request_buffer), ) - .push_cache(config.proxy.cache_max_idle_age) + .push_cache(config.discovery_idle_timeout) .push_on_service( svc::layers() .push(http::Retain::layer()) diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index e3105e7437..f6e113aef6 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -21,6 +21,7 @@ use tracing::debug_span; struct ServerRescue; impl Inbound { + /// Fails requests when the `HSvc`-typed inner service is not ready. pub fn push_http_server(self) -> Inbound> where T: Param @@ -43,7 +44,6 @@ impl Inbound { self.map_stack(|config, rt, http| { let ProxyConfig { server: ServerConfig { h2_settings, .. }, - dispatch_timeout, max_in_flight_requests, .. } = config.proxy; @@ -59,14 +59,18 @@ impl Inbound { .push(http::BoxRequest::layer()) // Downgrades the protocol if upgraded by an outbound proxy. .push(http::orig_proto::Downgrade::layer()) - // Limit the number of in-flight requests. When the proxy is - // at capacity, go into failfast after a dispatch timeout. - // Note that the inner service _always_ returns ready (due - // to `NewRouter`) and the concurrency limit need not be - // driven outside of the request path, so there's no need - // for SpawnReady + // Limit the number of in-flight inbound requests. + // + // TODO(ver) This concurrency limit applies only to + // requests that do not yet have responses, but ignores + // streaming bodies. We should change this to an + // HTTP-specific imlementation that tracks request and + // response bodies. .push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests)) - .push(svc::FailFast::layer("HTTP Server", dispatch_timeout)), + // Shed load by failing requests when the concurrency + // limit is reached. No delay is used before failfast + // goes into effect so it is expected that the inner. + .push(svc::FailFast::layer("HTTP Server", Default::default())), ) .push(rt.metrics.http_errors.to_layer()) .push(ServerRescue::layer()) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 2dffc8846f..d7180c9672 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -18,7 +18,7 @@ pub(crate) mod test_util; pub use self::{metrics::Metrics, policy::DefaultPolicy}; use linkerd_app_core::{ - config::{ConnectConfig, ProxyConfig}, + config::{BufferConfig, ConnectConfig, ProxyConfig}, drain, http_tracing::OpenCensusSink, identity, io, @@ -39,8 +39,18 @@ pub struct Config { pub allow_discovery: NameMatch, pub proxy: ProxyConfig, pub policy: policy::Config, - pub profile_idle_timeout: Duration, pub allowed_ips: transport::AllowIps, + + /// Configures the timeout after which the proxy will revert to skipping + /// service profile routing instrumentation. + pub profile_skip_timeout: Duration, + + /// Configures how long the proxy will retain policy & profile resolutions + /// for idle/unused ports and services. + pub discovery_idle_timeout: Duration, + + /// Configures how HTTP requests are buffered *for each inbound port*. + pub http_request_buffer: BufferConfig, } #[derive(Clone)] diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index dbc6eff04d..583356ed29 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -50,6 +50,7 @@ pub fn default_config() -> Config { }; Config { + policy, allow_discovery: Some(cluster_local).into_iter().collect(), proxy: config::ProxyConfig { server: config::ServerConfig { @@ -72,15 +73,16 @@ pub fn default_config() -> Config { }, h2_settings: h2::Settings::default(), }, - buffer_capacity: 10_000, - cache_max_idle_age: Duration::from_secs(20), - dispatch_timeout: Duration::from_secs(1), max_in_flight_requests: 10_000, detect_protocol_timeout: Duration::from_secs(10), }, - policy, - profile_idle_timeout: Duration::from_millis(500), allowed_ips: Default::default(), + http_request_buffer: config::BufferConfig { + capacity: 10_000, + failfast_timeout: Duration::from_secs(1), + }, + discovery_idle_timeout: Duration::from_secs(20), + profile_skip_timeout: Duration::from_secs(1), } } diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index ee9937eecc..256fe5d45e 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -52,24 +52,15 @@ impl Outbound { })) .push_on_service( svc::layers() - // If the traffic split is empty/unavailable, eagerly fail - // requests. When the split is in failfast, spawn the - // service in a background task so it becomes ready without - // new requests. - .push(svc::layer::mk(svc::SpawnReady::new)) .push( rt.metrics .proxy .stack .layer(crate::stack_labels("tcp", "server")), ) - .push(svc::FailFast::layer( - "TCP Server", - config.proxy.dispatch_timeout, - )) - .push_spawn_buffer(config.proxy.buffer_capacity), + .push_buffer("TCP Server", &config.tcp_connection_buffer), ) - .push_cache(config.proxy.cache_max_idle_age) + .push_cache(config.discovery_idle_timeout) .push(svc::ArcNewService::layer()) .check_new_service::() }) @@ -189,7 +180,7 @@ mod tests { // service after `idle_timeout`. let cfg = { let mut cfg = default_config(); - cfg.proxy.cache_max_idle_age = idle_timeout; + cfg.discovery_idle_timeout = idle_timeout; cfg }; let (rt, _shutdown) = runtime(); diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index da7a699dcb..6f8660e9c4 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -215,6 +215,9 @@ impl Outbound { .clone() .push_tcp_endpoint::() .push_http_endpoint() + .map_stack(|config, _, stk| { + stk.push_buffer_on_service("HTTP Server", &config.http_request_buffer) + }) .push_http_server() .into_inner(); diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index bbba1dcea9..2bc91f1723 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -16,7 +16,6 @@ pub(crate) use self::{require_id_header::IdentityRequired, server::ServerRescue} use crate::tcp; pub use linkerd_app_core::proxy::http::*; use linkerd_app_core::{ - classify, metrics, profiles::{self, LogicalAddr}, proxy::{api_resolve::ProtocolHint, tap}, svc::Param, @@ -31,12 +30,6 @@ pub type Endpoint = crate::endpoint::Endpoint; pub type Connect = self::endpoint::Connect; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct ProfileRoute { - logical: Logical, - route: profiles::http::Route, -} - #[derive(Clone, Debug)] pub struct CanonicalDstHeader(pub Addr); @@ -182,31 +175,3 @@ impl tap::Inspect for Endpoint { true } } - -// === impl ProfileRoute === - -impl Param for ProfileRoute { - fn param(&self) -> profiles::http::Route { - self.route.clone() - } -} - -impl Param for ProfileRoute { - fn param(&self) -> metrics::ProfileRouteLabels { - metrics::ProfileRouteLabels::outbound(self.logical.logical_addr.clone(), &self.route) - } -} - -impl Param for ProfileRoute { - fn param(&self) -> ResponseTimeout { - ResponseTimeout(self.route.timeout()) - } -} - -impl classify::CanClassify for ProfileRoute { - type Classify = classify::Request; - - fn classify(&self) -> classify::Request { - self.route.response_classes().clone().into() - } -} diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index 23fd35318d..a7dbc6c5cc 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -1,7 +1,6 @@ use super::{Concrete, Endpoint}; use crate::{endpoint, resolve, stack_labels, Outbound}; use linkerd_app_core::{ - config::ProxyConfig, proxy::{ api_resolve::{ConcreteAddr, Metadata}, core::Resolve, @@ -10,7 +9,6 @@ use linkerd_app_core::{ }, svc, Error, Infallible, }; -use tracing::debug_span; impl Outbound { pub fn push_http_concrete( @@ -40,12 +38,11 @@ impl Outbound { R::Future: Send + Unpin, { self.map_stack(|config, rt, endpoint| { - let ProxyConfig { - cache_max_idle_age, - dispatch_timeout, + let crate::Config { + discovery_idle_timeout, .. - } = config.proxy; - let watchdog = cache_max_idle_age * 2; + } = config; + let watchdog = *discovery_idle_timeout * 2; let resolve = svc::stack(resolve.into_service()) .check_service::() @@ -62,7 +59,6 @@ impl Outbound { .into_inner(); endpoint - .instrument(|e: &Endpoint| debug_span!("endpoint", server.addr = %e.addr)) .check_new_service::>() .push_on_service( svc::layers().push(http::BoxRequest::layer()).push( @@ -72,6 +68,7 @@ impl Outbound { .layer(stack_labels("http", "balance.endpoint")), ), ) + .instrument(|t: &Endpoint| tracing::debug_span!("endpoint", addr = %t.addr)) .check_new_service::>() // Resolve the service to its endpoints and balance requests over them. // @@ -97,8 +94,6 @@ impl Outbound { .stack .layer(stack_labels("http", "balancer")), ) - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)) .push(http::BoxResponse::layer()), ) .check_make_service::>() @@ -108,7 +103,7 @@ impl Outbound { // The concrete address is only set when the profile could be // resolved. Endpoint resolution is skipped when there is no // concrete address. - .instrument(|c: &Concrete| debug_span!("concrete", addr = %c.resolve)) + .instrument(|c: &Concrete| tracing::debug_span!("concrete", service = %c.resolve)) .push(svc::ArcNewService::layer()) }) } diff --git a/linkerd/app/outbound/src/http/detect.rs b/linkerd/app/outbound/src/http/detect.rs index 9790a61c12..d3ad3e902a 100644 --- a/linkerd/app/outbound/src/http/detect.rs +++ b/linkerd/app/outbound/src/http/detect.rs @@ -31,11 +31,6 @@ impl Outbound { self.map_stack(|config, rt, tcp| { let ServerConfig { h2_settings, .. } = config.proxy.server; - let skipped = tcp - .clone() - .push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left)) - .into_inner(); - svc::stack(http) .push_on_service( svc::layers() @@ -47,7 +42,8 @@ impl Outbound { .push_map_target(U::from) .instrument(|(v, _): &(http::Version, _)| debug_span!("http", %v)) .push(svc::UnwrapOr::layer( - tcp.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right)) + tcp.clone() + .push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right)) .into_inner(), )) .push_on_service(svc::BoxService::layer()) @@ -66,7 +62,8 @@ impl Outbound { tracing::debug!("Attempting HTTP protocol detection"); Ok(svc::Either::A(target)) }, - skipped, + tcp.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Left)) + .into_inner(), ) .check_new_service::() .push_on_service(svc::BoxService::layer()) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 7508a67450..5a42c12fbe 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -1,12 +1,18 @@ -use super::{retry, CanonicalDstHeader, Concrete, Logical, ProfileRoute}; +use super::{retry, CanonicalDstHeader, Concrete, Logical}; use crate::{stack_labels, Outbound}; use linkerd_app_core::{ - classify, config, profiles, + classify, metrics, profiles, proxy::{api_resolve::ConcreteAddr, http}, svc, Error, Infallible, }; use tracing::debug_span; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct ProfileRoute { + logical: Logical, + route: profiles::http::Route, +} + impl Outbound { pub fn push_http_logical(self) -> Outbound> where @@ -18,82 +24,89 @@ impl Outbound { NSvc::Future: Send, { self.map_stack(|config, rt, concrete| { - let config::ProxyConfig { - buffer_capacity, - cache_max_idle_age, - dispatch_timeout, + let crate::Config { + discovery_idle_timeout, + http_request_buffer, .. - } = config.proxy; + } = config; - // Distribute requests over a distribution of balancers via a - // traffic split. - // - // If the traffic split is empty/unavailable, eagerly fail requests. - // When the split is in failfast, spawn the service in a background - // task so it becomes ready without new requests. - let logical = concrete + let split = concrete .push_map_target(Concrete::from) + // This failfast is needed to ensure that a single unavailable + // balancer doesn't block the entire split. + // + // TODO(ver) remove this when we replace `split` with + // `distribute`. + .push_on_service( + svc::layers() + .push(svc::layer::mk(svc::SpawnReady::new)) + .push(svc::FailFast::layer( + "HTTP Balancer", + http_request_buffer.failfast_timeout, + )) + ) .check_new_service::<(ConcreteAddr, Logical), _>() + // Distribute requests over a distribution of balancers via a + // traffic split. .push(profiles::split::layer()) .push_on_service( svc::layers() - .push(svc::layer::mk(svc::SpawnReady::new)) .push( rt.metrics .proxy .stack .layer(stack_labels("http", "logical")), ) - .push(svc::FailFast::layer("HTTP Logical", dispatch_timeout)) - .push_spawn_buffer(buffer_capacity), + .push_buffer("HTTP Logical", http_request_buffer), ) - .push_cache(cache_max_idle_age); + // TODO(ver) this should not be a generalized time-based evicting cache. + .push_cache(*discovery_idle_timeout); // If there's no route, use the logical service directly; otherwise // use the per-route stack. - logical - .clone() + let route = split.clone() + .check_new_service::>() + .push_map_target(|r: ProfileRoute| r.logical) + .push_on_service(http::BoxRequest::layer()) + .push( + rt.metrics + .proxy + .http_profile_route_actual + .to_layer::(), + ) + .check_new_service::>() + // Depending on whether or not the request can be + // retried, it may have one of two `Body` types. This + // layer unifies any `Body` type into `BoxBody`. + .push_on_service(http::BoxRequest::erased()) + .push_http_insert_target::() + // Sets an optional retry policy. + .push(retry::layer(rt.metrics.proxy.http_profile_route_retry.clone())) + .check_new_service::>() + // Sets an optional request timeout. + .push(http::NewTimeout::layer()) + // Records per-route metrics. + .push( + rt.metrics + .proxy + .http_profile_route + .to_layer::(), + ) + // Sets the per-route response classifier as a request + // extension. + .push(classify::NewClassify::layer()) + .push_on_service(http::BoxResponse::layer()) + .check_new_service::>(); + + split .push_switch( |(route, logical): (Option, Logical)| -> Result<_, Infallible> { - match route { - None => Ok(svc::Either::A(logical)), - Some(route) => Ok(svc::Either::B(ProfileRoute { route, logical })), - } + Ok(match route { + None => svc::Either::A(logical), + Some(route) => svc::Either::B(ProfileRoute { route, logical }), + }) }, - logical - .push_map_target(|r: ProfileRoute| r.logical) - .push_on_service(http::BoxRequest::layer()) - .push( - rt.metrics - .proxy - .http_profile_route_actual - .to_layer::(), - ) - // Depending on whether or not the request can be - // retried, it may have one of two `Body` types. This - // layer unifies any `Body` type into `BoxBody`. - .push_on_service(http::BoxRequest::erased()) - .push_http_insert_target::() - // Sets an optional retry policy. - .push(retry::layer(rt.metrics.proxy.http_profile_route_retry.clone())) - // Sets an optional request timeout. - .push(http::NewTimeout::layer()) - // Records per-route metrics. - .push( - rt.metrics - .proxy - .http_profile_route - .to_layer::(), - ) - // Sets the per-route response classifier as a request - // extension. - .push(classify::NewClassify::layer()) - .push_on_service( - svc::layers() - .push(http::BoxResponse::layer()) - .push(svc::BoxCloneService::layer()) - ) - .into_inner(), + route.into_inner(), ) .push(profiles::http::NewServiceRouter::layer()) // Strips headers that may be set by this proxy and add an @@ -101,9 +114,37 @@ impl Outbound { // unify the profile stack's response type with that of to // endpoint stack. .push(http::NewHeaderFromTarget::::layer()) - .instrument(|l: &Logical| debug_span!("logical", dst = %l.logical_addr)) + .instrument(|l: &Logical| debug_span!("logical", service = %l.logical_addr)) .push_on_service(svc::BoxService::layer()) .push(svc::ArcNewService::layer()) }) } } + +// === impl ProfileRoute === + +impl svc::Param for ProfileRoute { + fn param(&self) -> profiles::http::Route { + self.route.clone() + } +} + +impl svc::Param for ProfileRoute { + fn param(&self) -> metrics::ProfileRouteLabels { + metrics::ProfileRouteLabels::outbound(self.logical.logical_addr.clone(), &self.route) + } +} + +impl svc::Param for ProfileRoute { + fn param(&self) -> http::ResponseTimeout { + http::ResponseTimeout(self.route.timeout()) + } +} + +impl classify::CanClassify for ProfileRoute { + type Classify = classify::Request; + + fn classify(&self) -> classify::Request { + self.route.response_classes().clone().into() + } +} diff --git a/linkerd/app/outbound/src/http/retry.rs b/linkerd/app/outbound/src/http/retry.rs index f23aeafcb6..b61ddf3722 100644 --- a/linkerd/app/outbound/src/http/retry.rs +++ b/linkerd/app/outbound/src/http/retry.rs @@ -1,9 +1,9 @@ -use super::ProfileRoute; use futures::{future, FutureExt}; use linkerd_app_core::{ classify, http_metrics::retries::Handle, - metrics, profiles, + metrics::{self, ProfileRouteLabels}, + profiles::{self, http::Route}, proxy::http::{ClientHandle, EraseResponse, HttpBody}, svc::{layer, Either, Param}, Error, @@ -49,17 +49,19 @@ impl NewRetryPolicy { } } -impl retry::NewPolicy for NewRetryPolicy { +impl retry::NewPolicy for NewRetryPolicy +where + T: Param + Param, +{ type Policy = RetryPolicy; - fn new_policy(&self, route: &ProfileRoute) -> Option { - let retries = route.route.retries().cloned()?; - - let metrics = self.metrics.get_handle(route.param()); + fn new_policy(&self, target: &T) -> Option { + let route: Route = target.param(); + let labels: ProfileRouteLabels = target.param(); Some(RetryPolicy { - metrics, - budget: retries.budget().clone(), - response_classes: route.route.response_classes().clone(), + metrics: self.metrics.get_handle(labels), + budget: route.retries()?.budget().clone(), + response_classes: route.response_classes().clone(), }) } } diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index 685c379087..9173f0b40d 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -1,7 +1,7 @@ use super::{IdentityRequired, ProxyConnectionClose}; use crate::{http, trace_labels, Outbound}; use linkerd_app_core::{ - config, errors, http_tracing, + errors, http_tracing, svc::{self, ExtractParam}, Error, Result, }; @@ -28,32 +28,36 @@ impl Outbound { where T: svc::Param, N: svc::NewService + Clone + Send + Sync + 'static, - NSvc: svc::Service, Response = http::Response>, - NSvc: Send + 'static, + NSvc: svc::Service< + http::Request, + Response = http::Response, + Error = Error, + > + Clone + + Send + + 'static, NSvc::Error: Into, NSvc::Future: Send, { self.map_stack(|config, rt, http| { - let config::ProxyConfig { - dispatch_timeout, - max_in_flight_requests, - buffer_capacity, - .. - } = config.proxy; - http.check_new_service::() .push_on_service( svc::layers() .push(http::BoxRequest::layer()) - // Limit the number of in-flight requests. When the proxy is - // at capacity, go into failfast after a dispatch timeout. If - // the router is unavailable, then spawn the service on a - // background task to ensure it becomes ready without new - // requests being processed. - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests)) - .push(svc::FailFast::layer("HTTP Server", dispatch_timeout)) - .push_spawn_buffer(buffer_capacity) + // Limit the number of in-flight outbound requests + // (across all targets). + // + // TODO(ver) This concurrency limit applies only to + // requests that do not yet have responses, but ignores + // streaming bodies. We should change this to an + // HTTP-specific imlementation that tracks request and + // response bodies. + .push(svc::ConcurrencyLimitLayer::new( + config.proxy.max_in_flight_requests, + )) + // Shed load by failing requests when the concurrency + // limit is reached. No delay is used before failfast + // goes into effect so it is expected that the inner. + .push(svc::FailFast::layer("HTTP Server", Default::default())) .push(rt.metrics.http_errors.to_layer()) // Tear down server connections when a peer proxy generates an error. .push(ProxyConnectionClose::layer()), diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 2c5fba4be6..2b24760b63 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -83,13 +83,12 @@ impl Outbound> { let detect_http = config.proxy.detect_http(); let Config { allow_discovery, + http_request_buffer, + discovery_idle_timeout, proxy: ProxyConfig { server: ServerConfig { h2_settings, .. }, - dispatch_timeout, max_in_flight_requests, - buffer_capacity, - cache_max_idle_age, .. }, .. @@ -148,13 +147,11 @@ impl Outbound> { .stack .layer(stack_labels("http", "logical")), ) - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer("HTTP Logical", *dispatch_timeout)) - .push_spawn_buffer(*buffer_capacity), + .push_buffer("HTTP Logical", http_request_buffer), ) // Caches the profile-based stack so that it can be reused across // multiple requests to the same canonical destination. - .push_cache(*cache_max_idle_age) + .push_cache(*discovery_idle_timeout) .push_on_service( svc::layers() .push(http::strip_header::request::layer(DST_OVERRIDE_HEADER)) @@ -188,14 +185,6 @@ impl Outbound> { } }, http_endpoint - .push_on_service( - svc::layers() - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer( - "Ingress server", - *dispatch_timeout, - )), - ) .instrument(|_: &_| info_span!("forward")) .into_inner(), ) @@ -235,7 +224,6 @@ impl Outbound> { // background task (i.e., by `SpawnReady`). Otherwise, the // inner service is always ready (because it's a router). .push(svc::ConcurrencyLimitLayer::new(*max_in_flight_requests)) - .push(svc::FailFast::layer("Ingress server", *dispatch_timeout)) .push(rt.metrics.http_errors.to_layer()), ) .push(http::ServerRescue::layer(config.emit_headers)) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index d44dc82742..bfb3234a03 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -20,7 +20,7 @@ pub(crate) mod test_util; pub use self::metrics::Metrics; use futures::Stream; use linkerd_app_core::{ - config::ProxyConfig, + config::{BufferConfig, ProxyConfig}, drain, http_tracing::OpenCensusSink, identity, io, profiles, @@ -49,9 +49,27 @@ const EWMA_DECAY: Duration = Duration::from_secs(10); #[derive(Clone, Debug)] pub struct Config { - pub proxy: ProxyConfig, pub allow_discovery: AddrMatch, + pub proxy: ProxyConfig, + + /// Configures the duration the proxy will retain idle stacks (with no + /// active connections) for an outbound address. When an idle stack is + /// dropped, all cached service discovery information is dropped. + pub discovery_idle_timeout: Duration, + + /// Configures how connections are buffered *for each outbound address*. + /// + /// A buffer capacity of 100 means that 100 connections may be buffered for + /// each IP:port to which an application attempts to connect. + pub tcp_connection_buffer: BufferConfig, + + /// Configures how HTTP requests are buffered *for each outbound address*. + /// + /// A buffer capacity of 100 means that 100 requests may be buffered for + /// each IP:port to which an application has opened an outbound TCP connection. + pub http_request_buffer: BufferConfig, + // In "ingress mode", we assume we are always routing HTTP requests and do // not perform per-target-address discovery. Non-HTTP connections are // forwarded without discovery/routing/mTLS. diff --git a/linkerd/app/outbound/src/logical.rs b/linkerd/app/outbound/src/logical.rs index 216089c077..495eda5f5b 100644 --- a/linkerd/app/outbound/src/logical.rs +++ b/linkerd/app/outbound/src/logical.rs @@ -136,6 +136,9 @@ impl Outbound { .push_http_endpoint() .push_http_concrete(resolve.clone()) .push_http_logical() + .map_stack(|config, _, stk| { + stk.push_buffer_on_service("HTTP Server", &config.http_request_buffer) + }) .push_http_server() .into_inner(); diff --git a/linkerd/app/outbound/src/tcp/logical.rs b/linkerd/app/outbound/src/tcp/logical.rs index 49abf946ad..76f8222080 100644 --- a/linkerd/app/outbound/src/tcp/logical.rs +++ b/linkerd/app/outbound/src/tcp/logical.rs @@ -1,7 +1,7 @@ use super::{Concrete, Endpoint, Logical}; use crate::{endpoint, resolve, Outbound}; use linkerd_app_core::{ - config, drain, io, profiles, + drain, io, profiles, proxy::{ api_resolve::{ConcreteAddr, Metadata}, core::Resolve, @@ -39,12 +39,11 @@ impl Outbound { R::Future: Send + Unpin, { self.map_stack(|config, rt, connect| { - let config::ProxyConfig { - buffer_capacity, - cache_max_idle_age, - dispatch_timeout, + let crate::Config { + discovery_idle_timeout, + tcp_connection_buffer, .. - } = config.proxy; + } = config; let resolve = svc::stack(resolve.into_service()) .check_service::() @@ -60,17 +59,11 @@ impl Outbound { .check_service::() .into_inner(); - connect + let concrete = connect .push(svc::stack::WithoutConnectionMetadata::layer()) .push_make_thunk() - .instrument(|t: &Endpoint| { - debug_span!( - "endpoint", - server.addr = %t.addr, - server.id = t.tls.value().map(|tls| tracing::field::display(&tls.server_id)), - ) - }) - .push(resolve::layer(resolve, config.proxy.cache_max_idle_age * 2)) + .instrument(|t: &Endpoint| debug_span!("endpoint", addr = %t.addr)) + .push(resolve::layer(resolve, *discovery_idle_timeout * 2)) .push_on_service( svc::layers() .push(tcp::balance::layer( @@ -87,8 +80,10 @@ impl Outbound { .push(drain::Retain::layer(rt.drain.clone())), ) .into_new_service() + .push(svc::ArcNewService::layer()); + + concrete .push_map_target(Concrete::from) - .push(svc::ArcNewService::layer()) .check_new_service::<(ConcreteAddr, Logical), I>() .push(profiles::split::layer()) .push_on_service( @@ -99,11 +94,13 @@ impl Outbound { .stack .layer(crate::stack_labels("tcp", "logical")), ) - .push(svc::layer::mk(svc::SpawnReady::new)) - .push(svc::FailFast::layer("TCP Logical", dispatch_timeout)) - .push_spawn_buffer(buffer_capacity), + // TODO(ver) We should instead buffer per concrete + // target. + .push_buffer("TCP Logical", tcp_connection_buffer), ) - .push_cache(cache_max_idle_age) + // TODO(ver) Can we replace this evicting cache? The detect + // stack would have to hold/reuse inner stacks. + .push_cache(*discovery_idle_timeout) .check_new_service::() .instrument(|_: &Logical| debug_span!("tcp")) .check_new_service::() diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index dff0466ff2..325e5ac0d2 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -1,7 +1,8 @@ use crate::Config; pub use futures::prelude::*; use linkerd_app_core::{ - config, drain, exp_backoff, metrics, + config::{self, BufferConfig}, + drain, exp_backoff, metrics, proxy::{ http::{h1, h2}, tap, @@ -13,6 +14,10 @@ pub use linkerd_app_test as support; use std::{str::FromStr, time::Duration}; pub(crate) fn default_config() -> Config { + let buffer = BufferConfig { + capacity: 10_000, + failfast_timeout: Duration::from_secs(3), + }; Config { ingress_mode: false, emit_headers: true, @@ -38,13 +43,13 @@ pub(crate) fn default_config() -> Config { }, h2_settings: h2::Settings::default(), }, - buffer_capacity: 10_000, - cache_max_idle_age: Duration::from_secs(60), - dispatch_timeout: Duration::from_secs(3), max_in_flight_requests: 10_000, detect_protocol_timeout: Duration::from_secs(3), }, inbound_ips: Default::default(), + discovery_idle_timeout: Duration::from_secs(60), + tcp_connection_buffer: buffer.clone(), + http_request_buffer: buffer, } } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 70e6effb24..61e662d379 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -96,8 +96,13 @@ pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE"; const ENV_INGRESS_MODE: &str = "LINKERD2_PROXY_INGRESS_MODE"; -const ENV_INBOUND_DISPATCH_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_DISPATCH_TIMEOUT"; -const ENV_OUTBOUND_DISPATCH_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_DISPATCH_TIMEOUT"; +const ENV_INBOUND_HTTP_QUEUE_CAPACITY: &str = "LINKERD2_PROXY_INBOUND_HTTP_QUEUE_CAPACITY"; +const ENV_INBOUND_HTTP_FAILFAST_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_HTTP_FAILFAST_TIMEOUT"; + +const ENV_OUTBOUND_TCP_QUEUE_CAPACITY: &str = "LINKERD2_PROXY_OUTBOUND_TCP_QUEUE_CAPACITY"; +const ENV_OUTBOUND_TCP_FAILFAST_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_TCP_FAILFAST_TIMEOUT"; +const ENV_OUTBOUND_HTTP_QUEUE_CAPACITY: &str = "LINKERD2_PROXY_OUTBOUND_HTTP_QUEUE_CAPACITY"; +const ENV_OUTBOUND_HTTP_FAILFAST_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_HTTP_FAILFAST_TIMEOUT"; pub const ENV_INBOUND_DETECT_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_DETECT_TIMEOUT"; const ENV_OUTBOUND_DETECT_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_DETECT_TIMEOUT"; @@ -111,11 +116,6 @@ const ENV_OUTBOUND_ACCEPT_KEEPALIVE: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_KEEP const ENV_INBOUND_CONNECT_KEEPALIVE: &str = "LINKERD2_PROXY_INBOUND_CONNECT_KEEPALIVE"; const ENV_OUTBOUND_CONNECT_KEEPALIVE: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_KEEPALIVE"; -pub const ENV_BUFFER_CAPACITY: &str = "LINKERD2_PROXY_BUFFER_CAPACITY"; - -pub const ENV_INBOUND_ROUTER_MAX_IDLE_AGE: &str = "LINKERD2_PROXY_INBOUND_ROUTER_MAX_IDLE_AGE"; -pub const ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE: &str = "LINKERD2_PROXY_OUTBOUND_ROUTER_MAX_IDLE_AGE"; - const ENV_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_MAX_IDLE_CONNS_PER_ENDPOINT"; const ENV_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT"; @@ -220,6 +220,8 @@ const ENV_INITIAL_CONNECTION_WINDOW_SIZE: &str = const ENV_INBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT"; +const ENV_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT: &str = + "LINKERD2_PROXY_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT"; const ENV_SHUTDOWN_GRACE_PERIOD: &str = "LINKERD2_PROXY_SHUTDOWN_GRACE_PERIOD"; @@ -227,18 +229,29 @@ const ENV_SHUTDOWN_GRACE_PERIOD: &str = "LINKERD2_PROXY_SHUTDOWN_GRACE_PERIOD"; const DEFAULT_OUTBOUND_LISTEN_ADDR: &str = "127.0.0.1:4140"; pub const DEFAULT_INBOUND_LISTEN_ADDR: &str = "0.0.0.0:4143"; pub const DEFAULT_CONTROL_LISTEN_ADDR: &str = "0.0.0.0:4190"; + const DEFAULT_ADMIN_LISTEN_ADDR: &str = "127.0.0.1:4191"; const DEFAULT_METRICS_RETAIN_IDLE: Duration = Duration::from_secs(10 * 60); -const DEFAULT_INBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(1); + +const DEFAULT_INBOUND_HTTP_QUEUE_CAPACITY: usize = 100; +const DEFAULT_INBOUND_HTTP_FAILFAST_TIMEOUT: Duration = Duration::from_secs(1); const DEFAULT_INBOUND_DETECT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_INBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(300); const DEFAULT_INBOUND_CONNECT_BACKOFF: ExponentialBackoff = ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_millis(500), 0.1); -const DEFAULT_OUTBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(3); + +const DEFAULT_OUTBOUND_TCP_QUEUE_CAPACITY: usize = 10; +const DEFAULT_OUTBOUND_TCP_FAILFAST_TIMEOUT: Duration = Duration::from_secs(3); +const DEFAULT_OUTBOUND_HTTP_QUEUE_CAPACITY: usize = 100; +const DEFAULT_OUTBOUND_HTTP_FAILFAST_TIMEOUT: Duration = Duration::from_secs(3); const DEFAULT_OUTBOUND_DETECT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_OUTBOUND_CONNECT_TIMEOUT: Duration = Duration::from_secs(1); const DEFAULT_OUTBOUND_CONNECT_BACKOFF: ExponentialBackoff = ExponentialBackoff::new_unchecked(Duration::from_millis(100), Duration::from_millis(500), 0.1); + +const DEFAULT_CONTROL_QUEUE_CAPACITY: usize = 100; +const DEFAULT_CONTROL_FAILFAST_TIMEOUT: Duration = Duration::from_secs(10); + const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf"; const DEFAULT_INITIAL_STREAM_WINDOW_SIZE: u32 = 65_535; // Protocol default @@ -248,26 +261,34 @@ const DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE: u32 = 1048576; // 1MB ~ 16 streams const DEFAULT_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(2 * 60); // This configuration limits the amount of time Linkerd retains cached clients & -// connections. +// connections for a given destination ip:port, as referenced by the application +// client. // // After this timeout expires, the proxy will need to re-resolve destination // metadata. The outbound default of 5s matches Kubernetes' default DNS TTL. On // the outbound side, especially, we want to use a limited idle timeout since // stale clients/connections can have a severe memory impact, especially when -// the application communicates with many endpoints or at high concurrency. -// -// On the inbound side, we want to be a bit more permissive so that periodic, as -// the number of endpoints should generally be pretty constrained. -const DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(20); -const DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(5); +// the application communicates with many destinations. +const ENV_OUTBOUND_DISCOVERY_IDLE_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_DISCOVERY_IDLE_TIMEOUT"; +const DEFAULT_OUTBOUND_DISCOVERY_IDLE_TIMEOUT: Duration = Duration::from_secs(5); + +// On the inbound side, we may lookup per-port policy or per-service profile +// configuration. We are more permissive in retaining inbound configuration, +// because we expect this to be a generally lower-cardinality set of +// configurations to discover. +const ENV_INBOUND_DISCOVERY_IDLE_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_DISCOVERY_IDLE_TIMEOUT"; +const DEFAULT_INBOUND_DISCOVERY_IDLE_TIMEOUT: Duration = Duration::from_secs(90); // XXX This default inbound connection idle timeout should be less than or equal // to the server's idle timeout so that we don't try to reuse a connection as it // is being timed out of the server. // -// In the future this should be made configurable per-server from the proxy API. +// TODO(ver) this should be made configurable per-server from the proxy API. const DEFAULT_INBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(3); +// TODO(ver) This should be configurable at the load balancer level. +const DEFAULT_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(3); + // By default, we don't limit the number of connections a connection pol may // use, as doing so can severely impact CPU utilization for applications with // many concurrent requests. It's generally preferable to use the MAX_IDLE_AGE @@ -281,18 +302,8 @@ const DEFAULT_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: usize = usize::MAX; const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = 100_000; const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = 100_000; -// This value should be large enough to admit requests without exerting -// backpressure so that requests implicitly buffer in the executor; but it -// should be small enough that callers can't force the proxy to consume an -// extreme amount of memory. Also keep in mind that there may be several buffers -// used in a given proxy, each of which assumes this capacity. -// -// The value of 10K is chosen somewhat arbitrarily, but seems high enough to -// buffer requests for high-load services. -const DEFAULT_BUFFER_CAPACITY: usize = 10_000; - const DEFAULT_DESTINATION_PROFILE_SUFFIXES: &str = "svc.cluster.local."; -const DEFAULT_DESTINATION_PROFILE_IDLE_TIMEOUT: Duration = Duration::from_millis(500); +const DEFAULT_DESTINATION_PROFILE_SKIP_TIMEOUT: Duration = Duration::from_millis(500); const DEFAULT_IDENTITY_MIN_REFRESH: Duration = Duration::from_secs(10); const DEFAULT_IDENTITY_MAX_REFRESH: Duration = Duration::from_secs(60 * 60 * 24); @@ -309,11 +320,21 @@ pub fn parse_config(strings: &S) -> Result let admin_listener_addr = parse(strings, ENV_ADMIN_LISTEN_ADDR, parse_socket_addr); let inbound_detect_timeout = parse(strings, ENV_INBOUND_DETECT_TIMEOUT, parse_duration); - let inbound_dispatch_timeout = parse(strings, ENV_INBOUND_DISPATCH_TIMEOUT, parse_duration); let inbound_connect_timeout = parse(strings, ENV_INBOUND_CONNECT_TIMEOUT, parse_duration); + let inbound_http_buffer_capacity = + parse(strings, ENV_INBOUND_HTTP_QUEUE_CAPACITY, parse_number); + let inbound_http_failfast_timeout = + parse(strings, ENV_INBOUND_HTTP_FAILFAST_TIMEOUT, parse_duration); let outbound_detect_timeout = parse(strings, ENV_OUTBOUND_DETECT_TIMEOUT, parse_duration); - let outbound_dispatch_timeout = parse(strings, ENV_OUTBOUND_DISPATCH_TIMEOUT, parse_duration); + let outbound_tcp_buffer_capacity = + parse(strings, ENV_OUTBOUND_TCP_QUEUE_CAPACITY, parse_number); + let outbound_tcp_failfast_timeout = + parse(strings, ENV_OUTBOUND_TCP_FAILFAST_TIMEOUT, parse_duration); + let outbound_http_buffer_capacity = + parse(strings, ENV_OUTBOUND_HTTP_QUEUE_CAPACITY, parse_number); + let outbound_http_failfast_timeout = + parse(strings, ENV_OUTBOUND_HTTP_FAILFAST_TIMEOUT, parse_duration); let outbound_connect_timeout = parse(strings, ENV_OUTBOUND_CONNECT_TIMEOUT, parse_duration); let inbound_accept_keepalive = parse(strings, ENV_INBOUND_ACCEPT_KEEPALIVE, parse_duration); @@ -330,12 +351,10 @@ pub fn parse_config(strings: &S) -> Result parse_port_set, ); - let buffer_capacity = parse(strings, ENV_BUFFER_CAPACITY, parse_number); - - let inbound_cache_max_idle_age = - parse(strings, ENV_INBOUND_ROUTER_MAX_IDLE_AGE, parse_duration); - let outbound_cache_max_idle_age = - parse(strings, ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE, parse_duration); + let inbound_discovery_idle_timeout = + parse(strings, ENV_INBOUND_DISCOVERY_IDLE_TIMEOUT, parse_duration); + let outbound_discovery_idle_timeout = + parse(strings, ENV_OUTBOUND_DISCOVERY_IDLE_TIMEOUT, parse_duration); let inbound_max_idle_per_endpoint = parse( strings, @@ -372,7 +391,7 @@ pub fn parse_config(strings: &S) -> Result let dst_addr = parse_control_addr(strings, ENV_DESTINATION_SVC_BASE); let dst_token = strings.get(ENV_DESTINATION_CONTEXT); - let dst_profile_idle_timeout = parse( + let dst_profile_skip_timeout = parse( strings, ENV_DESTINATION_PROFILE_INITIAL_TIMEOUT, parse_duration, @@ -400,8 +419,6 @@ pub fn parse_config(strings: &S) -> Result ..Default::default() }; - let buffer_capacity = buffer_capacity?.unwrap_or(DEFAULT_BUFFER_CAPACITY); - let dst_profile_suffixes = dst_profile_suffixes? .unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()); let dst_profile_networks = dst_profile_networks?.unwrap_or_default(); @@ -441,11 +458,17 @@ pub fn parse_config(strings: &S) -> Result keepalive, h2_settings, }; - let cache_max_idle_age = - outbound_cache_max_idle_age?.unwrap_or(DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE); + let discovery_idle_timeout = + outbound_discovery_idle_timeout?.unwrap_or(DEFAULT_OUTBOUND_DISCOVERY_IDLE_TIMEOUT); let max_idle = outbound_max_idle_per_endpoint?.unwrap_or(DEFAULT_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT); let keepalive = Keepalive(outbound_connect_keepalive?); + let connection_pool_timeout = parse( + strings, + ENV_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT, + parse_duration, + )?; + let connect = ConnectConfig { keepalive, timeout: outbound_connect_timeout?.unwrap_or(DEFAULT_OUTBOUND_CONNECT_TIMEOUT), @@ -457,14 +480,22 @@ pub fn parse_config(strings: &S) -> Result h2_settings, h1_settings: h1::PoolSettings { max_idle, - idle_timeout: cache_max_idle_age, + idle_timeout: connection_pool_timeout + .unwrap_or(DEFAULT_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT), }, }; let detect_protocol_timeout = outbound_detect_timeout?.unwrap_or(DEFAULT_OUTBOUND_DETECT_TIMEOUT); - let dispatch_timeout = - outbound_dispatch_timeout?.unwrap_or(DEFAULT_OUTBOUND_DISPATCH_TIMEOUT); + + let tcp_buffer_capacity = + outbound_tcp_buffer_capacity?.unwrap_or(DEFAULT_OUTBOUND_TCP_QUEUE_CAPACITY); + let tcp_failfast_timeout = + outbound_tcp_failfast_timeout?.unwrap_or(DEFAULT_OUTBOUND_TCP_FAILFAST_TIMEOUT); + let http_buffer_capacity = + outbound_http_buffer_capacity?.unwrap_or(DEFAULT_OUTBOUND_HTTP_QUEUE_CAPACITY); + let http_failfast_timeout = + outbound_http_failfast_timeout?.unwrap_or(DEFAULT_OUTBOUND_HTTP_FAILFAST_TIMEOUT); outbound::Config { ingress_mode, @@ -473,14 +504,20 @@ pub fn parse_config(strings: &S) -> Result proxy: ProxyConfig { server, connect, - cache_max_idle_age, - buffer_capacity, - dispatch_timeout, max_in_flight_requests: outbound_max_in_flight? .unwrap_or(DEFAULT_OUTBOUND_MAX_IN_FLIGHT), detect_protocol_timeout, }, inbound_ips: inbound_ips.clone(), + discovery_idle_timeout, + tcp_connection_buffer: BufferConfig { + capacity: tcp_buffer_capacity, + failfast_timeout: tcp_failfast_timeout, + }, + http_request_buffer: BufferConfig { + capacity: http_buffer_capacity, + failfast_timeout: http_failfast_timeout, + }, } }; @@ -502,8 +539,8 @@ pub fn parse_config(strings: &S) -> Result keepalive, h2_settings, }; - let cache_max_idle_age = - inbound_cache_max_idle_age?.unwrap_or(DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE); + let discovery_idle_timeout = + inbound_discovery_idle_timeout?.unwrap_or(DEFAULT_INBOUND_DISCOVERY_IDLE_TIMEOUT); let max_idle = inbound_max_idle_per_endpoint?.unwrap_or(DEFAULT_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT); let connection_pool_timeout = parse( @@ -530,8 +567,6 @@ pub fn parse_config(strings: &S) -> Result let detect_protocol_timeout = inbound_detect_timeout?.unwrap_or(DEFAULT_INBOUND_DETECT_TIMEOUT); - let dispatch_timeout = - inbound_dispatch_timeout?.unwrap_or(DEFAULT_INBOUND_DISPATCH_TIMEOUT); // Ensure that connections that directly target the inbound port are secured (unless // identity is disabled). @@ -601,7 +636,10 @@ pub fn parse_config(strings: &S) -> Result ControlConfig { addr, connect, - buffer_capacity, + buffer: BufferConfig { + capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, + failfast_timeout: DEFAULT_CONTROL_FAILFAST_TIMEOUT, + }, } }; @@ -610,7 +648,7 @@ pub fn parse_config(strings: &S) -> Result ports, workload, control, - cache_max_idle_age, + cache_max_idle_age: discovery_idle_timeout, } } @@ -693,7 +731,7 @@ pub fn parse_config(strings: &S) -> Result inbound::policy::Config::Fixed { default, - cache_max_idle_age, + cache_max_idle_age: discovery_idle_timeout, ports: require_identity_ports .into_iter() .chain(require_tls_ports) @@ -709,17 +747,22 @@ pub fn parse_config(strings: &S) -> Result proxy: ProxyConfig { server, connect, - cache_max_idle_age, - buffer_capacity, - dispatch_timeout, max_in_flight_requests: inbound_max_in_flight? .unwrap_or(DEFAULT_INBOUND_MAX_IN_FLIGHT), detect_protocol_timeout, }, policy, - profile_idle_timeout: dst_profile_idle_timeout? - .unwrap_or(DEFAULT_DESTINATION_PROFILE_IDLE_TIMEOUT), + profile_skip_timeout: dst_profile_skip_timeout? + .unwrap_or(DEFAULT_DESTINATION_PROFILE_SKIP_TIMEOUT), allowed_ips: inbound_ips.into(), + + discovery_idle_timeout, + http_request_buffer: BufferConfig { + capacity: inbound_http_buffer_capacity? + .unwrap_or(DEFAULT_INBOUND_HTTP_QUEUE_CAPACITY), + failfast_timeout: inbound_http_failfast_timeout? + .unwrap_or(DEFAULT_INBOUND_HTTP_FAILFAST_TIMEOUT), + }, } }; @@ -730,12 +773,20 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.proxy.connect.clone() }; + let failfast_timeout = if addr.addr.is_loopback() { + inbound.http_request_buffer.failfast_timeout + } else { + outbound.http_request_buffer.failfast_timeout + }; super::dst::Config { context: dst_token?.unwrap_or_default(), control: ControlConfig { addr, connect, - buffer_capacity, + buffer: BufferConfig { + capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, + failfast_timeout, + }, }, } }; @@ -765,7 +816,11 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.proxy.connect.clone() }; - + let failfast_timeout = if addr.addr.is_loopback() { + inbound.http_request_buffer.failfast_timeout + } else { + outbound.http_request_buffer.failfast_timeout + }; let attributes = oc_attributes_file_path .map(|path| match path { Some(path) => oc_trace_attributes(path), @@ -779,7 +834,10 @@ pub fn parse_config(strings: &S) -> Result control: ControlConfig { addr, connect, - buffer_capacity: 10, + buffer: BufferConfig { + capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, + failfast_timeout, + }, }, })) } @@ -804,12 +862,20 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.proxy.connect.clone() }; + let failfast_timeout = if addr.addr.is_loopback() { + inbound.http_request_buffer.failfast_timeout + } else { + outbound.http_request_buffer.failfast_timeout + }; identity::Config { certify, control: ControlConfig { addr, connect, - buffer_capacity: 1, + buffer: BufferConfig { + capacity: DEFAULT_CONTROL_QUEUE_CAPACITY, + failfast_timeout, + }, }, documents, } diff --git a/linkerd/proxy/http/src/server.rs b/linkerd/proxy/http/src/server.rs index 54857bed4d..5c26655589 100644 --- a/linkerd/proxy/http/src/server.rs +++ b/linkerd/proxy/http/src/server.rs @@ -118,7 +118,7 @@ where debug!(?version, "Handling as HTTP"); Box::pin(async move { - let (svc, closed) = SetClientHandle::new(io.peer_addr()?, inner.clone()); + let (svc, closed) = SetClientHandle::new(io.peer_addr()?, inner); match version { Version::Http1 => { diff --git a/linkerd/service-profiles/src/http/service.rs b/linkerd/service-profiles/src/http/service.rs index f95cc7f813..639b7bcf76 100644 --- a/linkerd/service-profiles/src/http/service.rs +++ b/linkerd/service-profiles/src/http/service.rs @@ -22,12 +22,40 @@ pub struct NewServiceRouter(N); #[derive(Debug)] pub struct ServiceRouter { - new_route: N, target: T, - rx: ReceiverStream, - http_routes: Vec<(RequestMatch, Route)>, - services: HashMap, + new_route: N, + routes: Routes, default: S, + profile: Receiver, + profile_rx: ReceiverStream, +} + +#[derive(Clone, Debug)] +struct Routes { + matches: Vec<(RequestMatch, Route)>, + services: HashMap, +} + +impl Default for Routes { + fn default() -> Self { + Self { + matches: Vec::new(), + services: HashMap::new(), + } + } +} + +impl Clone for ServiceRouter { + fn clone(&self) -> Self { + Self { + target: self.target.clone(), + new_route: self.new_route.clone(), + default: self.default.clone(), + routes: self.routes.clone(), + profile: self.profile.clone(), + profile_rx: self.profile.clone().into(), + } + } } // === impl NewServiceRouter === @@ -49,12 +77,12 @@ where let rx = target.param(); let default = self.0.new_service((None, target.clone())); ServiceRouter { - default, target, - rx: rx.into(), - http_routes: Vec::new(), - services: HashMap::new(), new_route: self.0.clone(), + routes: Default::default(), + default, + profile: rx.clone(), + profile_rx: rx.into(), } } } @@ -73,18 +101,19 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // If the routes have been updated, update the cache. - if let Poll::Ready(Some(Profile { http_routes, .. })) = self.rx.poll_next_unpin(cx) { + if let Poll::Ready(Some(Profile { http_routes, .. })) = self.profile_rx.poll_next_unpin(cx) + { debug!(routes = %http_routes.len(), "Updating HTTP routes"); let routes = http_routes .iter() .map(|(_, r)| r.clone()) .collect::>(); - self.http_routes = http_routes; + self.routes.matches = http_routes; // Clear out defunct routes before building any missing routes. - self.services.retain(|r, _| routes.contains(r)); + self.routes.services.retain(|r, _| routes.contains(r)); for route in routes.into_iter() { - if let hash_map::Entry::Vacant(ent) = self.services.entry(route) { + if let hash_map::Entry::Vacant(ent) = self.routes.services.entry(route) { let route = ent.key().clone(); let svc = self .new_route @@ -98,11 +127,15 @@ where } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = match super::route_for_request(&self.http_routes, &req) { + let inner = match super::route_for_request(&self.routes.matches, &req) { Some(route) => { // If the request matches a route, use the route's service. trace!(?route, "Using route service"); - self.services.get(route).expect("route must exist").clone() + self.routes + .services + .get(route) + .expect("route must exist") + .clone() } None => { // Otherwise, use the default service. diff --git a/linkerd/stack/Cargo.toml b/linkerd/stack/Cargo.toml index b9eb0e8287..49afedbcdd 100644 --- a/linkerd/stack/Cargo.toml +++ b/linkerd/stack/Cargo.toml @@ -16,7 +16,7 @@ parking_lot = "0.12" pin-project = "1" thiserror = "1" tokio = { version = "1", features = ["time"] } -tower = { version = "0.4", features = ["filter", "util"] } +tower = { version = "0.4", features = ["filter", "spawn-ready", "util"] } tracing = "0.1" [dev-dependencies] diff --git a/linkerd/stack/src/switch_ready.rs b/linkerd/stack/src/switch_ready.rs index 075c70fb8a..959cfd479c 100644 --- a/linkerd/stack/src/switch_ready.rs +++ b/linkerd/stack/src/switch_ready.rs @@ -6,14 +6,14 @@ use std::{ task::{Context, Poll}, }; use tokio::time; -use tower::util::Either; +use tower::{spawn_ready::SpawnReady, util::Either}; use tracing::{debug, trace}; /// A service which falls back to a secondary service if the primary service /// takes too long to become ready. #[derive(Debug)] pub struct SwitchReady { - primary: A, + primary: SpawnReady, secondary: B, switch_after: time::Duration, sleep: Pin>, @@ -78,7 +78,12 @@ impl SwitchReady { /// the `secondary` service is used until the primary service becomes ready again. pub fn new(primary: A, secondary: B, switch_after: time::Duration) -> Self { Self { - primary, + // The primary service is wrapped in a `SpawnReady` so that it can + // still become ready even when we've reverted to using the + // secondary service. + primary: SpawnReady::new(primary), + // The secondary service is not wrapped because we don't really care + // about driving it to readiness unless the primary has timed out. secondary, switch_after, // The sleep is reset whenever the service becomes unready; this @@ -90,16 +95,17 @@ impl SwitchReady { } } -impl tower::Service for SwitchReady +impl tower::Service for SwitchReady where - A: tower::Service, + Req: 'static, + A: tower::Service + Send + 'static, A::Error: Into, - B: tower::Service, + B: tower::Service, B::Error: Into, { type Response = A::Response; type Error = Error; - type Future = Either; + type Future = Either< as tower::Service>::Future, B::Future>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { @@ -156,7 +162,7 @@ where } } - fn call(&mut self, req: R) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { trace!(state = ?self.state, "SwitchReady::call"); match self.state { State::Primary => Either::A(self.primary.call(req)), @@ -166,20 +172,6 @@ where } } -impl Clone for SwitchReady { - fn clone(&self) -> Self { - Self { - primary: self.primary.clone(), - secondary: self.secondary.clone(), - switch_after: self.switch_after, - // Reset the state and the sleep; each clone of the underlying services - // may become ready independently (e.g. semaphore). - sleep: Box::pin(time::sleep(time::Duration::default())), - state: State::Primary, - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -226,6 +218,7 @@ mod tests { // The primary service becomes ready. a_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -257,6 +250,7 @@ mod tests { // The secondary service becomes ready. b_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -287,6 +281,7 @@ mod tests { // The secondary service becomes ready. b_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -297,6 +292,7 @@ mod tests { // The primary service becomes ready. a_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -312,6 +308,7 @@ mod tests { // The primary service becomes ready again. a_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -343,6 +340,7 @@ mod tests { // The primary service becomes ready. a_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -358,6 +356,7 @@ mod tests { // The primary service becomes ready. a_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -375,6 +374,7 @@ mod tests { // The primary service becomes ready. a_handle.allow(1); + tokio::task::yield_now().await; assert_ready_ok!(switch.poll_ready()); let call = switch.call(()); @@ -402,6 +402,7 @@ mod tests { // Error the primary a_handle.send_error("lol"); + tokio::task::yield_now().await; assert_ready_err!(switch.poll_ready()); }