Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: Decouple HTTP and TCP buffering config #2078

Merged
merged 16 commits into from
Dec 27, 2022
9 changes: 6 additions & 3 deletions linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ pub struct ServerConfig {
pub h2_settings: h2::Settings,
}

#[derive(Clone, Debug)]
pub struct BufferConfig {
pub capacity: usize,
pub failfast_timeout: Duration,
}

#[derive(Clone, Debug)]
pub struct ConnectConfig {
pub backoff: ExponentialBackoff,
Expand All @@ -26,9 +32,6 @@ pub struct ConnectConfig {
pub struct ProxyConfig {
pub server: ServerConfig,
pub connect: ConnectConfig,
pub buffer_capacity: usize,
pub cache_max_idle_age: Duration,
pub dispatch_timeout: Duration,
pub max_in_flight_requests: usize,
pub detect_protocol_timeout: Duration,
}
Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ impl Config {
.into_new_service()
.push(metrics.to_layer::<classify::Response, _, _>())
.push(self::add_origin::layer())
.push_on_service(svc::layers().push_spawn_buffer(self.buffer_capacity))
.push_buffer_on_service(
"Controller client",
self.buffer_capacity,
time::Duration::from_secs(60),
)
.instrument(|c: &ControlAddr| tracing::info_span!("controller", addr = %c.addr))
.push_map_target(move |()| addr.clone())
.push(svc::ArcNewService::layer())
Expand Down
92 changes: 71 additions & 21 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ pub use linkerd_stack::{
NewRouter, NewService, Param, Predicate, UnwrapOr,
};
pub use linkerd_stack_tracing::{GetSpan, NewInstrument, NewInstrumentLayer};
use stack::OnService;
use std::{
marker::PhantomData,
task::{Context, Poll},
time::Duration,
};
use tower::{
buffer::{Buffer as TowerBuffer, BufferLayer},
buffer::Buffer as TowerBuffer,
layer::util::{Identity, Stack as Pair},
make::MakeService,
};
Expand All @@ -30,6 +32,13 @@ pub struct AlwaysReconnect(ExponentialBackoff);

pub type Buffer<Req, Rsp, E> = TowerBuffer<BoxService<Req, Rsp, E>, Req>;

pub struct BufferLayer<Req> {
name: &'static str,
capacity: usize,
failfast_timeout: Duration,
_marker: PhantomData<fn(Req)>,
}

pub type BoxHttp<B = http::BoxBody> =
BoxService<http::Request<B>, http::Response<http::BoxBody>, Error>;

Expand Down Expand Up @@ -80,15 +89,16 @@ impl<L> Layers<L> {
}

/// Buffers requests in an mpsc, spawning the inner service onto a dedicated task.
pub fn push_spawn_buffer<Req>(
pub fn push_buffer<Req>(
self,
name: &'static str,
capacity: usize,
) -> Layers<Pair<Pair<L, BoxServiceLayer<Req>>, BufferLayer<Req>>>
failfast_timeout: Duration,
) -> Layers<Pair<L, BufferLayer<Req>>>
where
Req: Send + 'static,
{
self.push(BoxServiceLayer::new())
.push(BufferLayer::new(capacity))
self.push(buffer(name, capacity, failfast_timeout))
}

pub fn push_on_service<U>(self, layer: U) -> Layers<Pair<L, stack::OnServiceLayer<U>>> {
Expand Down Expand Up @@ -152,22 +162,6 @@ impl<S> Stack<S> {
self.push(NewReconnect::layer(AlwaysReconnect(backoff)))
}

/// Buffer requests when when the next layer is out of capacity.
pub fn spawn_buffer<Req, Rsp>(
self,
capacity: usize,
) -> Stack<Buffer<Req, S::Response, S::Error>>
where
Req: Send + 'static,
S: Service<Req> + Send + 'static,
S::Response: Send + 'static,
S::Error: Into<Error> + Send + Sync + 'static,
S::Future: Send,
{
self.push(BoxServiceLayer::new())
.push(BufferLayer::new(capacity))
}

/// Assuming `S` implements `NewService` or `MakeService`, applies the given
/// `L`-typed layer on each service produced by `S`.
pub fn push_on_service<L: Clone>(self, layer: L) -> Stack<stack::OnService<L, S>> {
Expand Down Expand Up @@ -202,6 +196,18 @@ impl<S> Stack<S> {
self.push(http::insert::NewResponseInsert::layer())
}

pub fn push_buffer_on_service<Req>(
olix0r marked this conversation as resolved.
Show resolved Hide resolved
self,
name: &'static str,
capacity: usize,
failfast_timeout: Duration,
) -> Stack<OnService<BufferLayer<Req>, S>>
where
Req: Send + 'static,
{
self.push_on_service(buffer(name, capacity, failfast_timeout))
}

pub fn push_cache<T>(self, idle: Duration) -> Stack<cache::NewCachedService<T, S>>
where
T: Clone + Eq + std::fmt::Debug + std::hash::Hash + Send + Sync + 'static,
Expand Down Expand Up @@ -381,3 +387,47 @@ impl<E: Into<Error>> Recover<E> for AlwaysReconnect {
Ok(self.0.stream())
}
}

// === impl BufferLayer ===

fn buffer<Req>(
name: &'static str,
capacity: usize,
failfast_timeout: Duration,
) -> BufferLayer<Req> {
BufferLayer {
name,
capacity,
failfast_timeout,
_marker: PhantomData,
}
}

impl<Req, S> Layer<S> for BufferLayer<Req>
where
Req: Send + 'static,
S: Service<Req, Error = Error> + Send + 'static,
S::Future: Send,
{
type Service = Buffer<Req, S::Response, Error>;

fn layer(&self, inner: S) -> Self::Service {
Buffer::new(
BoxService::new(
FailFast::layer(self.name, self.failfast_timeout).layer(SpawnReady::new(inner)),
),
self.capacity,
)
}
}

impl<Req> Clone for BufferLayer<Req> {
fn clone(&self) -> Self {
Self {
capacity: self.capacity,
name: self.name,
failfast_timeout: self.failfast_timeout,
_marker: self._marker,
}
}
}
31 changes: 15 additions & 16 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod tests;

use self::gateway::NewGateway;
use linkerd_app_core::{
config::ProxyConfig,
identity, io, metrics,
profiles::{self, DiscoveryRejected},
proxy::{
Expand Down Expand Up @@ -81,12 +80,7 @@ where
R::Resolution: Send,
R::Future: Send + Unpin,
{
let ProxyConfig {
buffer_capacity,
cache_max_idle_age,
dispatch_timeout,
..
} = inbound.config().proxy.clone();
let inbound_config = inbound.config().clone();
let local_id = identity::LocalId(inbound.identity().name().clone());

// For each gatewayed connection that is *not* HTTP, use the target from the
Expand Down Expand Up @@ -158,11 +152,13 @@ where
.stack
.layer(metrics::StackLabels::inbound("tcp", "gateway")),
)
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("TCP Gateway", dispatch_timeout))
.push_spawn_buffer(buffer_capacity),
.push_buffer(
"TCP Gatweay",
inbound_config.tcp_server_buffer.capacity,
inbound_config.tcp_server_buffer.failfast_timeout,
olix0r marked this conversation as resolved.
Show resolved Hide resolved
),
)
.push_cache(cache_max_idle_age)
.push_cache(inbound_config.profile_idle_timeout)
.check_new_service::<NameAddr, I>();

// Cache an HTTP gateway service for each destination and HTTP version.
Expand All @@ -173,7 +169,8 @@ where
let endpoint = outbound.push_tcp_endpoint().push_http_endpoint();
let http = endpoint
.clone()
.push_http_logical(resolve)
.push_http_concrete(resolve)
.push_http_logical()
.into_stack()
.push_switch(Ok::<_, Infallible>, endpoint.into_stack())
.push(NewGateway::layer(local_id))
Expand All @@ -193,11 +190,13 @@ where
.stack
.layer(metrics::StackLabels::inbound("http", "gateway")),
)
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("Gateway", dispatch_timeout))
.push_spawn_buffer(buffer_capacity),
.push_buffer(
"Gateway",
inbound_config.http_logical_buffer.capacity,
inbound_config.http_logical_buffer.failfast_timeout,
),
)
.push_cache(cache_max_idle_age)
.push_cache(inbound_config.profile_idle_timeout)
.push_on_service(
svc::layers()
.push(http::Retain::layer())
Expand Down
10 changes: 3 additions & 7 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,16 @@ impl<C> Inbound<C> {
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
// Skip the profile stack if it takes too long to become ready.
.push_when_unready(
config.profile_idle_timeout,
config.profile_skip_timeout,
http.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.into_inner(),
)
.push_on_service(
svc::layers()
.push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical")))
.push(svc::FailFast::layer(
"HTTP Logical",
config.proxy.dispatch_timeout,
))
.push_spawn_buffer(config.proxy.buffer_capacity),
.push_buffer("HTTP Logical", config.http_logical_buffer.capacity, config.http_logical_buffer.failfast_timeout),
)
.push_cache(config.proxy.cache_max_idle_age)
.push_cache(config.profile_idle_timeout)
.push_on_service(
svc::layers()
.push(http::Retain::layer())
Expand Down
20 changes: 9 additions & 11 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ impl<H> Inbound<H> {
self.map_stack(|config, rt, http| {
let ProxyConfig {
server: ServerConfig { h2_settings, .. },
dispatch_timeout,
max_in_flight_requests,
..
} = config.proxy;

Expand All @@ -58,15 +56,15 @@ impl<H> Inbound<H> {
svc::layers()
.push(http::BoxRequest::layer())
// Downgrades the protocol if upgraded by an outbound proxy.
.push(http::orig_proto::Downgrade::layer())
// Limit the number of in-flight requests. When the proxy is
// at capacity, go into failfast after a dispatch timeout.
// Note that the inner service _always_ returns ready (due
// to `NewRouter`) and the concurrency limit need not be
// driven outside of the request path, so there's no need
// for SpawnReady
.push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests))
.push(svc::FailFast::layer("HTTP Server", dispatch_timeout)),
.push(http::orig_proto::Downgrade::layer()),
// Limit the number of in-flight requests. When the proxy is
// at capacity, go into failfast after a dispatch timeout.
// Note that the inner service _always_ returns ready (due
// to `NewRouter`) and the concurrency limit need not be
// driven outside of the request path, so there's no need
// for SpawnReady
//.push(svc::ConcurrencyLimitLayer::new(max_in_flight_requests))
//.push(svc::FailFast::layer("HTTP Server", dispatch_timeout)),
olix0r marked this conversation as resolved.
Show resolved Hide resolved
)
.push(rt.metrics.http_errors.to_layer())
.push(ServerRescue::layer())
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) mod test_util;

pub use self::{metrics::Metrics, policy::DefaultPolicy};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig},
config::{BufferConfig, ConnectConfig, ProxyConfig},
drain,
http_tracing::OpenCensusSink,
identity, io,
Expand All @@ -39,8 +39,12 @@ pub struct Config {
pub allow_discovery: NameMatch,
pub proxy: ProxyConfig,
pub policy: policy::Config,
pub profile_idle_timeout: Duration,
pub allowed_ips: transport::AllowIps,

pub profile_skip_timeout: Duration,
pub profile_idle_timeout: Duration,
pub tcp_server_buffer: BufferConfig,
pub http_logical_buffer: BufferConfig,
}

#[derive(Clone)]
Expand Down
16 changes: 11 additions & 5 deletions linkerd/app/inbound/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub fn default_config() -> Config {
};

Config {
policy,
allow_discovery: Some(cluster_local).into_iter().collect(),
proxy: config::ProxyConfig {
server: config::ServerConfig {
Expand All @@ -72,15 +73,20 @@ pub fn default_config() -> Config {
},
h2_settings: h2::Settings::default(),
},
buffer_capacity: 10_000,
cache_max_idle_age: Duration::from_secs(20),
dispatch_timeout: Duration::from_secs(1),
max_in_flight_requests: 10_000,
detect_protocol_timeout: Duration::from_secs(10),
},
policy,
profile_idle_timeout: Duration::from_millis(500),
allowed_ips: Default::default(),
tcp_server_buffer: config::BufferConfig {
capacity: 10_000,
failfast_timeout: Duration::from_secs(1),
},
http_logical_buffer: config::BufferConfig {
capacity: 10_000,
failfast_timeout: Duration::from_secs(1),
},
profile_idle_timeout: Duration::from_secs(20),
profile_skip_timeout: Duration::from_secs(1),
}
}

Expand Down
17 changes: 6 additions & 11 deletions linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,19 @@ impl<N> Outbound<N> {
}))
.push_on_service(
svc::layers()
// If the traffic split is empty/unavailable, eagerly fail
// requests. When the split is in failfast, spawn the
// service in a background task so it becomes ready without
// new requests.
.push(svc::layer::mk(svc::SpawnReady::new))
.push(
rt.metrics
.proxy
.stack
.layer(crate::stack_labels("tcp", "server")),
)
.push(svc::FailFast::layer(
.push_buffer(
"TCP Server",
config.proxy.dispatch_timeout,
))
.push_spawn_buffer(config.proxy.buffer_capacity),
config.tcp_connection_buffer.capacity,
config.tcp_connection_buffer.failfast_timeout,
),
)
.push_cache(config.proxy.cache_max_idle_age)
.push_cache(config.orig_dst_idle_timeout)
.push(svc::ArcNewService::layer())
.check_new_service::<T, I>()
})
Expand Down Expand Up @@ -189,7 +184,7 @@ mod tests {
// service after `idle_timeout`.
let cfg = {
let mut cfg = default_config();
cfg.proxy.cache_max_idle_age = idle_timeout;
cfg.orig_dst_idle_timeout = idle_timeout;
cfg
};
let (rt, _shutdown) = runtime();
Expand Down
Loading