Skip to content

Commit

Permalink
Parameterize the load balancer stack (#2142)
Browse files Browse the repository at this point in the history
The load balancer/discovery modules in the concrete stacks use
hardcoded ewma parameters.

In preparation for making these parameters discoverable via the
API, this change modifies the balancer to accept this configuration
as a target param.

Furthermore, this change cleans up the balancer API to simplify the
way that we convert Resolve resolutions to Discover streams for
the balancer: the discovery buffer is spawned immediately instead
of using a MakeService. In practice, we flatten the 'make' future
into the discovery stream in the readycache, anyway; so there's no
functional difference.

This change removes the discovery watchdog timeout. We've never
seen it triggered in practice.
  • Loading branch information
olix0r authored Jan 10, 2023
1 parent 8dad0af commit 6d2abbc
Show file tree
Hide file tree
Showing 34 changed files with 666 additions and 991 deletions.
37 changes: 26 additions & 11 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,17 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"

[[package]]
name = "futures-macro"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "futures-sink"
version = "0.3.25"
Expand All @@ -410,6 +421,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
Expand Down Expand Up @@ -813,7 +825,6 @@ dependencies = [
"linkerd-opencensus",
"linkerd-proxy-api-resolve",
"linkerd-proxy-core",
"linkerd-proxy-discover",
"linkerd-proxy-dns-resolve",
"linkerd-proxy-http",
"linkerd-proxy-identity-client",
Expand Down Expand Up @@ -1322,31 +1333,33 @@ dependencies = [
]

[[package]]
name = "linkerd-proxy-core"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"tower",
]

[[package]]
name = "linkerd-proxy-discover"
name = "linkerd-proxy-balance"
version = "0.1.0"
dependencies = [
"futures",
"futures-util",
"indexmap",
"linkerd-error",
"linkerd-proxy-core",
"linkerd-stack",
"pin-project",
"rand",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tracing",
]

[[package]]
name = "linkerd-proxy-core"
version = "0.1.0"
dependencies = [
"futures",
"linkerd-error",
"tower",
]

[[package]]
name = "linkerd-proxy-dns-resolve"
version = "0.1.0"
Expand Down Expand Up @@ -1382,6 +1395,7 @@ dependencies = [
"linkerd-error",
"linkerd-http-box",
"linkerd-io",
"linkerd-proxy-balance",
"linkerd-stack",
"linkerd-tracing",
"pin-project",
Expand Down Expand Up @@ -1461,6 +1475,7 @@ dependencies = [
"futures",
"linkerd-duplex",
"linkerd-error",
"linkerd-proxy-balance",
"linkerd-stack",
"pin-project",
"rand",
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ members = [
"linkerd/metrics",
"linkerd/opencensus",
"linkerd/proxy/api-resolve",
"linkerd/proxy/dns-resolve",
"linkerd/proxy/balance",
"linkerd/proxy/core",
"linkerd/proxy/discover",
"linkerd/proxy/dns-resolve",
"linkerd/proxy/http",
"linkerd/proxy/identity-client",
"linkerd/proxy/resolve",
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ linkerd-metrics = { path = "../../metrics", features = ["linkerd-stack"] }
linkerd-opencensus = { path = "../../opencensus" }
linkerd-proxy-core = { path = "../../proxy/core" }
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd-proxy-discover = { path = "../../proxy/discover" }
linkerd-proxy-identity-client = { path = "../../proxy/identity-client" }
linkerd-proxy-http = { path = "../../proxy/http" }
linkerd-proxy-resolve = { path = "../../proxy/resolve" }
Expand Down
125 changes: 65 additions & 60 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
classify, config, control, dns, identity, metrics, proxy::http, svc, tls,
transport::ConnectTcp, Addr, Error,
classify, config, dns, identity, metrics, proxy::http, svc, tls, transport::ConnectTcp, Addr,
Error,
};
use futures::future::Either;
use std::fmt;
use tokio::time;
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tracing::warn;
use tracing::{info_span, warn};

#[derive(Clone, Debug)]
pub struct Config {
Expand All @@ -27,16 +27,25 @@ impl svc::Param<Addr> for ControlAddr {
}
}

impl svc::Param<http::balance::EwmaConfig> for ControlAddr {
fn param(&self) -> http::balance::EwmaConfig {
EWMA_CONFIG
}
}

impl fmt::Display for ControlAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.addr, f)
}
}

type BalanceBody =
http::balance::PendingUntilFirstDataBody<tower::load::peak_ewma::Handle, hyper::Body>;
pub type RspBody =
linkerd_http_metrics::requests::ResponseBody<http::balance::Body<hyper::Body>, classify::Eos>;

pub type RspBody = linkerd_http_metrics::requests::ResponseBody<BalanceBody, classify::Eos>;
const EWMA_CONFIG: http::balance::EwmaConfig = http::balance::EwmaConfig {
default_rtt: time::Duration::from_millis(30),
decay: time::Duration::from_secs(10),
};

impl Config {
pub fn build(
Expand Down Expand Up @@ -82,21 +91,24 @@ impl Config {
.push(self::client::layer())
.push_on_service(svc::MapErr::layer(Into::into))
.into_new_service()
// Ensure that connection is driven independently of the load balancer; but don't drive
// reconnection independently of the balancer. This ensures that new connections are
// only initiated when the balancer tries to move pending endpoints to ready (i.e. after
// checking for discovery updates); but we don't want to continually reconnect without
// checking for discovery updates.
// Ensure that connection is driven independently of the load
// balancer; but don't drive reconnection independently of the
// balancer. This ensures that new connections are only initiated
// when the balancer tries to move pending endpoints to ready (i.e.
// after checking for discovery updates); but we don't want to
// continually reconnect without checking for discovery updates.
.push_on_service(svc::layer::mk(svc::SpawnReady::new))
.push_new_reconnect(self.connect.backoff)
.instrument(|t: &self::client::Target| tracing::info_span!("endpoint", addr = %t.addr))
.push(self::resolve::layer(dns, resolve_backoff))
.push_on_service(self::control::balance::layer())
.into_new_service()
.instrument(|t: &self::client::Target| info_span!("endpoint", addr = %t.addr))
.push_new_clone()
.push(self::balance::layer(dns, resolve_backoff))
.push(metrics.to_layer::<classify::Response, _, _>())
.push(self::add_origin::layer())
// This buffer allows a resolver client to be shared across stacks.
// No load shed is applied here, however, so backpressure may leak
// into the caller task.
.push_buffer_on_service("Controller client", &self.buffer)
.instrument(|c: &ControlAddr| tracing::info_span!("controller", addr = %c.addr))
.instrument(|c: &ControlAddr| info_span!("controller", addr = %c.addr))
.push_map_target(move |()| addr.clone())
.push(svc::ArcNewService::layer())
.into_inner()
Expand Down Expand Up @@ -162,68 +174,61 @@ mod add_origin {
}
}

mod resolve {
use super::client::Target;
mod balance {
use super::{client::Target, ControlAddr};
use crate::{
dns,
proxy::{
discover,
dns_resolve::DnsResolve,
resolve::{map_endpoint, recover},
},
svc,
proxy::{dns_resolve::DnsResolve, http, resolve::recover},
svc, tls,
};
use linkerd_error::Recover;
use std::net::SocketAddr;

pub fn layer<M, R>(
pub fn layer<B, R: Clone, N>(
dns: dns::Resolver,
recover: R,
) -> impl svc::Layer<M, Service = Discover<M, R>>
where
R: Recover + Clone,
R::Backoff: Unpin,
{
svc::layer::mk(move |endpoint| {
discover::resolve(
endpoint,
map_endpoint::Resolve::new(
IntoTarget(()),
recover::Resolve::new(recover.clone(), DnsResolve::new(dns.clone())),
),
)
) -> impl svc::Layer<
N,
Service = http::NewBalancePeakEwma<B, recover::Resolve<R, DnsResolve>, NewIntoTarget<N>>,
> {
let resolve = recover::Resolve::new(recover, DnsResolve::new(dns));
svc::layer::mk(move |inner| {
http::NewBalancePeakEwma::new(NewIntoTarget { inner }, resolve.clone())
})
}

type Discover<M, R> = discover::MakeEndpoint<
discover::FromResolve<
map_endpoint::Resolve<IntoTarget, recover::Resolve<R, DnsResolve>>,
Target,
>,
M,
>;
#[derive(Clone, Debug)]
pub struct NewIntoTarget<N> {
inner: N,
}

#[derive(Clone, Debug)]
pub struct IntoTarget<N> {
inner: N,
server_id: tls::ConditionalClientTls,
}

#[derive(Copy, Clone, Debug)]
pub struct IntoTarget(());
// === impl NewIntoTarget ===

impl map_endpoint::MapEndpoint<super::ControlAddr, ()> for IntoTarget {
type Out = Target;
impl<N: svc::NewService<ControlAddr>> svc::NewService<ControlAddr> for NewIntoTarget<N> {
type Service = IntoTarget<N::Service>;

fn map_endpoint(&self, control: &super::ControlAddr, addr: SocketAddr, _: ()) -> Self::Out {
Target::new(addr, control.identity.clone())
fn new_service(&self, control: ControlAddr) -> Self::Service {
IntoTarget {
server_id: control.identity.clone(),
inner: self.inner.new_service(control),
}
}
}
}

mod balance {
use crate::proxy::http;
use std::time::Duration;
// === impl IntoTarget ===

const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30);
const EWMA_DECAY: Duration = Duration::from_secs(10);
impl<N: svc::NewService<Target>> svc::NewService<(SocketAddr, ())> for IntoTarget<N> {
type Service = N::Service;

pub fn layer<A, B>() -> http::balance::Layer<A, B> {
http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY)
fn new_service(&self, (addr, ()): (SocketAddr, ())) -> Self::Service {
self.inner
.new_service(Target::new(addr, self.server_id.clone()))
}
}
}

Expand Down
1 change: 0 additions & 1 deletion linkerd/app/core/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

pub use linkerd_proxy_api_resolve as api_resolve;
pub use linkerd_proxy_core as core;
pub use linkerd_proxy_discover as discover;
pub use linkerd_proxy_dns_resolve as dns_resolve;
pub use linkerd_proxy_http as http;
pub use linkerd_proxy_resolve as resolve;
Expand Down
9 changes: 9 additions & 0 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ impl<S> Stack<S> {
self
}

pub fn check_new_new_service<T, U, Req>(self) -> Self
where
S: NewService<T>,
S::Service: NewService<U>,
<S::Service as NewService<U>>::Service: Service<Req>,
{
self
}

pub fn check_new_clone<T>(self) -> Self
where
S: NewService<T>,
Expand Down
15 changes: 7 additions & 8 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ use self::gateway::NewGateway;
use linkerd_app_core::{
identity, io, metrics,
profiles::{self, DiscoveryRejected},
proxy::{
api_resolve::{ConcreteAddr, Metadata},
core::Resolve,
http,
},
proxy::{api_resolve::Metadata, core::Resolve, http},
svc::{self, Param},
tls,
transport::{ClientAddr, Local, OrigDstAddr, Remote},
Expand Down Expand Up @@ -76,9 +72,12 @@ where
P::Future: Send + 'static,
P::Error: Send,
R: Clone + Send + Sync + Unpin + 'static,
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
R::Resolution: Send,
R::Future: Send + Unpin,
R: Resolve<outbound::tcp::Concrete, Endpoint = Metadata, Error = Error>,
<R as Resolve<outbound::tcp::Concrete>>::Resolution: Send,
<R as Resolve<outbound::tcp::Concrete>>::Future: Send + Unpin,
R: Resolve<outbound::http::Concrete, Endpoint = Metadata, Error = Error>,
<R as Resolve<outbound::http::Concrete>>::Resolution: Send,
<R as Resolve<outbound::http::Concrete>>::Future: Send + Unpin,
{
let inbound_config = inbound.config().clone();
let local_id = identity::LocalId(inbound.identity().name().clone());
Expand Down
Loading

0 comments on commit 6d2abbc

Please sign in to comment.