Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update routers to support per-request backend distributions #2095

Merged
merged 111 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
8d0d573
wip: buffers
olix0r Dec 16, 2022
c1bc339
make each buffer configurable
olix0r Dec 16, 2022
9fe1f22
split concrete into a separate stack
olix0r Dec 16, 2022
f972b2b
concrete cache
olix0r Dec 20, 2022
1ad89e1
-checks
olix0r Dec 20, 2022
018331b
per-route distributor
olix0r Dec 20, 2022
f2f7e0e
Move buffer & stack metrics into concrete stack
olix0r Dec 20, 2022
6f18cb3
wip
olix0r Dec 22, 2022
b734788
wip
olix0r Dec 22, 2022
9f01cde
simplify distributor
olix0r Dec 22, 2022
3615bd1
wip
olix0r Dec 22, 2022
0465195
Move dynamic routers into outbound
olix0r Dec 22, 2022
b1972b7
Merge branch 'main' into ver/buffmas
olix0r Dec 23, 2022
cbc7758
Use BufferConfig directly
olix0r Dec 23, 2022
a211d3e
Restore concurrency limit
olix0r Dec 24, 2022
c7922a4
Merge branch 'main' into ver/buffmas
olix0r Dec 24, 2022
6a93036
touchup route stuff; buffer configs
olix0r Dec 27, 2022
9ebb0a8
restore endpoint trace contexts
olix0r Dec 27, 2022
9afd8fb
TODOs
olix0r Dec 27, 2022
9e8e0bf
typo
olix0r Dec 27, 2022
671ca3e
config: Decouple HTTP and TCP buffering config
olix0r Dec 27, 2022
ab86158
fixup stack + tests
olix0r Dec 27, 2022
a72313f
Merge branch 'main' into ver/concretemas
olix0r Dec 27, 2022
35314a1
Merge branch 'ver/buffmas' into ver/concretemas
olix0r Dec 27, 2022
76180d1
rewrite `FailFast` to drain buffers without becoming ready
hawkw Dec 23, 2022
dea82cc
Update linkerd/stack/src/failfast.rs
hawkw Dec 23, 2022
2928409
Update linkerd/stack/src/failfast.rs
hawkw Dec 23, 2022
3af02bf
empty -> invalid
hawkw Dec 23, 2022
6eaa093
360 noscope
hawkw Dec 23, 2022
259bda1
lol apparently there's a test that expects `scope`
hawkw Dec 23, 2022
f5f2634
add todo
hawkw Dec 23, 2022
ae67263
oh this one also expects the error message
hawkw Dec 23, 2022
33e4b02
s/advertise/Gate
hawkw Dec 23, 2022
aeeddde
propagate inner service's error more eagerly
hawkw Dec 23, 2022
890b757
Merge branch 'main' into ver/concretemas
olix0r Dec 27, 2022
f2ee2b8
separate load shedding from failfast
hawkw Dec 27, 2022
508fe26
Apply suggestions from code review
hawkw Dec 27, 2022
7ad21df
put back missing clone for loadshed
hawkw Dec 27, 2022
6d3eb56
ensure failfast bg task is canceled on drop
hawkw Dec 27, 2022
3728bec
allow constructing ungated FailFasts
hawkw Dec 27, 2022
e74e955
whoops forgot to rename stuff to `layer_gated`
hawkw Dec 27, 2022
190c39d
Update linkerd/stack/src/failfast.rs
hawkw Dec 28, 2022
3b08665
Split out failfast into smaller modules (#2094)
olix0r Dec 28, 2022
304afcf
docs touchups
olix0r Dec 28, 2022
914aed0
Remove redundant spawnready
olix0r Dec 28, 2022
01bda98
imports
olix0r Dec 28, 2022
c20a0e8
Reset open on clone; add some commentary
olix0r Dec 28, 2022
f7a48af
Merge branch 'eliza/failmas' into ver/concretemas
olix0r Dec 28, 2022
2fd5e5d
Merge branch 'main' into ver/concretemas
olix0r Dec 28, 2022
7598421
Merge branch 'main' into ver/concretemas
olix0r Dec 28, 2022
8457392
Remove outdated comment
olix0r Dec 28, 2022
019c2e6
deny warnings
olix0r Dec 28, 2022
17cc43f
Add a Lazy stack middleware
olix0r Dec 28, 2022
07fd66e
Merge branch 'ver/lazymas' into ver/concretemas
olix0r Dec 28, 2022
7db1379
Restore logical caches
olix0r Dec 28, 2022
3355497
TODOs
olix0r Dec 28, 2022
269f496
lazily initialize route stacks
olix0r Dec 28, 2022
82d9c5b
update logical router in the background task (#2086)
hawkw Dec 29, 2022
2243b02
Watch stack module (#2100)
olix0r Dec 29, 2022
a5f953b
feedback
olix0r Dec 29, 2022
97da3e5
fixup comment headers
olix0r Dec 29, 2022
4cb9ac0
touchup
olix0r Dec 29, 2022
e4f0cf4
Merge branch 'main' into ver/concretemas
olix0r Dec 29, 2022
eadb2f1
wip
olix0r Dec 30, 2022
e122dd0
generate distributions per-route (#2107)
hawkw Dec 31, 2022
21cbf82
unify HTTP and TCP logical routers (#2108)
hawkw Dec 31, 2022
e6ee9b4
ws
olix0r Dec 31, 2022
b4a28d9
Fail outbound requests that match no routes
hawkw Dec 31, 2022
db90da9
remove stray `dbg!` in `watch` (#2111)
hawkw Jan 2, 2023
554f567
refactor router (#2115)
olix0r Jan 4, 2023
dc7e82d
fix use of default profile in tests (#2113)
hawkw Jan 5, 2023
f2d39f2
Merge branch 'main' into ver/concretemas
olix0r Jan 5, 2023
279b952
Merge branch 'main' into ver/concretemas
olix0r Jan 5, 2023
e46e3c7
Back-out unnecessary changes to profiles
olix0r Jan 5, 2023
2e33e96
Revert "fix use of default profile in tests (#2113)"
olix0r Jan 5, 2023
866d610
add a default route
olix0r Jan 5, 2023
1215bb5
Simplify router type constraints
olix0r Jan 5, 2023
b3206e2
Merge branch 'main' into ver/concretemas
olix0r Jan 5, 2023
22b8af8
Merge branch 'main' into ver/concretemas
olix0r Jan 5, 2023
12c5777
-unused file
olix0r Jan 5, 2023
ddff3e7
distribute: Add a backend cache
olix0r Jan 5, 2023
e7a96c0
Merge branch 'ver/distributemas' into ver/concretemas
olix0r Jan 5, 2023
e38bbb6
distribute: Add a backend cache
olix0r Jan 5, 2023
1d152c4
Merge branch 'ver/distributemas' into ver/concretemas
olix0r Jan 5, 2023
8fd5e3a
comment
olix0r Jan 5, 2023
0865ae9
fix mergo
olix0r Jan 5, 2023
d840fb7
fix empty distributions for routes without backends
hawkw Jan 5, 2023
fb23e0b
slightly simpler to use first available
hawkw Jan 5, 2023
fe74387
add missing load-shed layers to lb buffers
hawkw Jan 5, 2023
13ee98a
Merge branch 'main' into ver/concretemas
olix0r Jan 5, 2023
62287f5
back out unneeded changes
olix0r Jan 5, 2023
8b4cf4a
undo sorting
olix0r Jan 5, 2023
d79917f
undo profile proto handling changes
olix0r Jan 5, 2023
bcc054e
-deadcode
olix0r Jan 5, 2023
0205b92
Merge branch 'main' into ver/concretemas
olix0r Jan 6, 2023
15aec0c
Merge branch 'main' into ver/concretemas
hawkw Jan 6, 2023
c34b1c9
Merge branch 'main' into ver/concretemas
olix0r Jan 6, 2023
69e526f
profile deps
olix0r Jan 6, 2023
a422568
Merge branch 'main' into ver/concretemas
olix0r Jan 6, 2023
7b09f69
-needless dep
olix0r Jan 6, 2023
2f80777
Merge branch 'main' into ver/concretemas
olix0r Jan 7, 2023
6bb0568
simplify deps
olix0r Jan 7, 2023
d54b705
remove verbose checks
olix0r Jan 7, 2023
075e9fe
imports
olix0r Jan 7, 2023
a17c6b2
revisit buffer/cache/loadshed
olix0r Jan 7, 2023
d076539
comments
olix0r Jan 8, 2023
3c062ea
fixup! comments
olix0r Jan 8, 2023
749b042
Update linkerd/app/outbound/src/tcp/concrete.rs
olix0r Jan 8, 2023
f4fe942
Update routers to support per-request backend distributions
olix0r Jan 8, 2023
d4fb42a
typo
olix0r Jan 9, 2023
42dc550
ws
olix0r Jan 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -939,14 +939,14 @@ dependencies = [
"hyper",
"linkerd-app-core",
"linkerd-app-test",
"linkerd-distribute",
"linkerd-http-classify",
"linkerd-http-retry",
"linkerd-identity",
"linkerd-io",
"linkerd-meshtls",
"linkerd-meshtls-rustls",
"linkerd-retry",
"linkerd-router",
"linkerd-tracing",
"parking_lot",
"pin-project",
Expand Down Expand Up @@ -1547,7 +1547,6 @@ dependencies = [
"futures",
"http",
"http-body",
"indexmap",
"linkerd-addr",
"linkerd-dns-name",
"linkerd-error",
Expand All @@ -1557,10 +1556,8 @@ dependencies = [
"linkerd-tonic-watch",
"linkerd2-proxy-api",
"once_cell",
"pin-project",
"prost-types",
"quickcheck",
"rand",
"regex",
"thiserror",
"tokio",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd-app-core = { path = "../core" }
linkerd-distribute = { path = "../../distribute" }
linkerd-http-classify = { path = "../../http-classify" }
linkerd-http-retry = { path = "../../http-retry" }
linkerd-identity = { path = "../../identity" }
linkerd-retry = { path = "../../retry" }
linkerd-router = { path = "../../router" }
parking_lot = "0.12"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
Expand Down
43 changes: 35 additions & 8 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{http, logical::Concrete, tcp, Outbound};
use crate::{http, logical::Concrete, stack_labels, tcp, Outbound};
use linkerd_app_core::{
io, metrics,
profiles::LogicalAddr,
Expand All @@ -13,6 +13,7 @@ use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
};
use tracing::info_span;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Endpoint<P> {
Expand Down Expand Up @@ -201,7 +202,14 @@ impl<P: Copy + std::fmt::Debug> MapEndpoint<Concrete<P>, Metadata> for FromMetad
// === Outbound ===

impl<S> Outbound<S> {
pub fn push_endpoint<I>(self) -> Outbound<svc::ArcNewTcp<tcp::Endpoint, I>>
/// Builds a stack that handles forwarding a connection to a single endpoint
/// (i.e. without routing and load balancing).
///
/// HTTP protocol detection may still be performed on the connection.
///
/// A service produced by this stack is used for a single connection (i.e.
/// without any form of caching for reuse across connections).
pub fn push_forward<I>(self) -> Outbound<svc::ArcNewTcp<tcp::Endpoint, I>>
where
Self: Clone + 'static,
S: svc::MakeConnection<tcp::Connect, Metadata = Local<ClientAddr>, Error = io::Error>,
Expand All @@ -211,19 +219,38 @@ impl<S> Outbound<S> {
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
I: fmt::Debug + Send + Sync + Unpin + 'static,
{
// The forwarding stacks are **NOT** cached, since they don't do any
// external discovery.
let http = self
.clone()
.push_tcp_endpoint::<http::Connect>()
.push_http_endpoint()
.map_stack(|config, _, stk| {
stk.push_buffer_on_service("HTTP Server", &config.http_request_buffer)
.map_stack(|config, rt, stk| {
stk.push_on_service(
svc::layers()
.push(
rt.metrics
.proxy
.stack
.layer(stack_labels("http", "forward")),
)
// TODO(ver): This buffer config should be distinct from
// that in the concrete stack. It should probably be
// derived from the target so that we can configure it
// via the API.
.push_buffer("HTTP Forward", &config.http_request_buffer),
)
})
.push_http_server()
.into_inner();

self.push_tcp_endpoint()
.push_tcp_forward()
.push_detect_http(http)
let opaque = self.push_tcp_endpoint().push_tcp_forward();

opaque.push_detect_http(http).map_stack(|_, _, stk| {
stk.instrument(|e: &tcp::Endpoint| info_span!("forward", endpoint = %e.addr))
.push_on_service(svc::BoxService::layer())
.push(svc::ArcNewService::layer())
})
}
}

Expand Down Expand Up @@ -256,7 +283,7 @@ pub mod tests {
"i don't like you, go away",
))
}))
.push_endpoint()
.push_forward()
.into_inner()
.new_service(tcp::Endpoint::forward(
OrigDstAddr(addr),
Expand Down
77 changes: 36 additions & 41 deletions linkerd/app/outbound/src/http/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,30 @@ use linkerd_app_core::{
},
svc, Error, Infallible,
};
use tracing::info_span;

impl<N> Outbound<N> {
/// Builds a [`svc::NewService`] stack that builds buffered HTTP load
/// balancer services for [`Concrete`] targets.
///
/// When a balancer has no available inner services, it goes into
/// 'failfast'. While in failfast, buffered requests are failed and the
/// service becomes unavailable so callers may choose alternate concrete
/// services.
//
// TODO(ver) make the outer target type generic/parameterized.
pub fn push_http_concrete<NSvc, R>(
self,
resolve: R,
) -> Outbound<
svc::ArcNewService<
Concrete,
impl svc::Service<
http::Request<http::BoxBody>,
Response = http::Response<http::BoxBody>,
Error = Error,
Future = impl Send,
>,
http::Request<http::BoxBody>,
Response = http::Response<http::BoxBody>,
Error = Error,
Future = impl Send,
> + Clone,
Comment on lines +31 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: is this indentation rustfmt-approved? it seems off to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's all rustfmt's doing.

>,
>
where
Expand All @@ -38,14 +48,7 @@ impl<N> Outbound<N> {
R::Future: Send + Unpin,
{
self.map_stack(|config, rt, endpoint| {
let crate::Config {
discovery_idle_timeout,
..
} = config;
let watchdog = *discovery_idle_timeout * 2;

let resolve = svc::stack(resolve.into_service())
.check_service::<ConcreteAddr>()
.push_request_filter(|c: Concrete| Ok::<_, Infallible>(c.resolve))
.push(svc::layer::mk(move |inner| {
map_endpoint::Resolve::new(
Expand All @@ -59,51 +62,43 @@ impl<N> Outbound<N> {
.into_inner();

endpoint
.check_new_service::<Endpoint, http::Request<http::BoxBody>>()
.push_on_service(
svc::layers().push(http::BoxRequest::layer()).push(
rt.metrics
.proxy
.stack
.layer(stack_labels("http", "balance.endpoint")),
),
rt.metrics
.proxy
.stack
.layer(stack_labels("http", "endpoint")),
)
.instrument(|t: &Endpoint| tracing::debug_span!("endpoint", addr = %t.addr))
.check_new_service::<Endpoint, http::Request<_>>()
.instrument(|e: &Endpoint| info_span!("endpoint", addr = %e.addr))
// Resolve the service to its endpoints and balance requests over them.
//
// If the balancer has been empty/unavailable, eagerly fail requests.
// When the balancer is in failfast, spawn the service in a background
// task so it becomes ready without new requests.
//
// We *don't* ensure that the endpoint is driven to readiness here, because this
// might cause us to continually attempt to reestablish connections without
// consulting discovery to see whether the endpoint has been removed. Instead, the
// endpoint layer spawns each _connection_ attempt on a background task, but the
// endpoint stack spawns each _connection_ attempt on a background task, but the
// decision to attempt the connection must be driven by the balancer.
.push(resolve::layer(resolve, watchdog))
//
// TODO(ver) remove the watchdog timeout.
.push(resolve::layer(resolve, config.discovery_idle_timeout * 2))
.push_on_service(http::balance::layer(
crate::EWMA_DEFAULT_RTT,
crate::EWMA_DECAY,
))
.check_make_service::<Concrete, http::Request<http::BoxBody>>()
.push(svc::MapErr::layer(Into::into))
// Drives the initial resolution via the service's readiness.
.into_new_service()
.push_on_service(
svc::layers()
.push(http::balance::layer(
crate::EWMA_DEFAULT_RTT,
crate::EWMA_DECAY,
))
.push(http::BoxResponse::layer())
.push(
rt.metrics
.proxy
.stack
.layer(stack_labels("http", "balancer")),
.layer(stack_labels("http", "concrete")),
)
.push(http::BoxResponse::layer()),
.push_buffer("HTTP Concrete", &config.http_request_buffer),
)
.check_make_service::<Concrete, http::Request<_>>()
.push(svc::MapErr::layer(Into::into))
// Drives the initial resolution via the service's readiness.
.into_new_service()
// The concrete address is only set when the profile could be
// resolved. Endpoint resolution is skipped when there is no
// concrete address.
.instrument(|c: &Concrete| tracing::debug_span!("concrete", service = %c.resolve))
.instrument(|c: &Concrete| info_span!("concrete", svc = %c.resolve))
.push(svc::ArcNewService::layer())
})
}
Expand Down
23 changes: 22 additions & 1 deletion linkerd/app/outbound/src/http/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ use tracing::debug_span;
pub struct Skip;

impl<N> Outbound<N> {
/// Builds a `NewService` that produces services that optionally (depending
/// on the target) perform HTTP protocol detection on sockets.
///
/// When HTTP is detected, an HTTP service is build from the provided HTTP
/// stack. In either case, the inner service is built for each connection so
/// inner services must implement caching as needed.
//
// TODO(ver) We can be smarter about reusing inner services across
// connections by moving caching into this stack...
//
// TODO(ver) Let discovery influence whether we assume an HTTP protocol
// without deteciton.
pub fn push_detect_http<T, U, NSvc, H, HSvc, I>(self, http: H) -> Outbound<svc::ArcNewTcp<T, I>>
where
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
Expand All @@ -31,6 +43,8 @@ impl<N> Outbound<N> {
self.map_stack(|config, rt, tcp| {
let ServerConfig { h2_settings, .. } = config.proxy.server;

let tcp = tcp.instrument(|_: &_| debug_span!("opaque"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like the opaque span is being added to the TCP stack that's used if detection is disabled, or if HTTP is not detected...i feel like it might be clearer if there were separate "opaque" and "tcp" spans to differentiate between "no HTTP detected" and "HTTP detection skipped" cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rather not vary that at the span level. we have events on the parent that indicate any detection timeouts.


svc::stack(http)
.push_on_service(
svc::layers()
Expand All @@ -46,7 +60,14 @@ impl<N> Outbound<N> {
.push_on_service(svc::MapTargetLayer::new(io::EitherIo::Right))
.into_inner(),
))
.push_on_service(svc::BoxService::layer())
.push_on_service(
svc::layers()
// `DetectService` oneshots the inner service, so we add
// a loadshed to prevent leaking tasks if (for some
// unexpected reason) the inner service is not ready.
.push(svc::LoadShed::layer())
.push(svc::BoxService::layer()),
)
.check_new_service::<(Option<http::Version>, T), _>()
.push_map_target(detect::allow_timeout)
.push(svc::ArcNewService::layer())
Expand Down
Loading