Skip to content

Commit

Permalink
Support router-scoped caches (#2128)
Browse files Browse the repository at this point in the history
`stack::NewRouter` reuses its inner stack for all `Router` instances.
This makes it difficult to implement a cache that is scoped just to the
router (and not shared across all routers).

This change:

* Removes the `linkerd_stack::router` module;
* Adds the `linkerd-router` crate;
* Adds an `ahash` cache implementation to the router crate with a
  convenience layer;
* The `RecognizeRoute` is now called `SelectRoute`;
* `Router` is now called `OneshotRoute`;
* `NewOneshotRoute` now can use an `ExtractParam` to acquire a
   `SelectRoute`; and
* A `NewCloneService` module is added to support lifting a stack
  to be shared across routes (i.e. to preserve the prior behavior).
  • Loading branch information
olix0r authored Jan 6, 2023
1 parent c153d4b commit 577ad52
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 153 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ dependencies = [
"linkerd-proxy-tcp",
"linkerd-proxy-transport",
"linkerd-reconnect",
"linkerd-router",
"linkerd-server-policy",
"linkerd-service-profiles",
"linkerd-stack",
Expand Down Expand Up @@ -945,6 +946,7 @@ dependencies = [
"linkerd-meshtls",
"linkerd-meshtls-rustls",
"linkerd-retry",
"linkerd-router",
"linkerd-tracing",
"parking_lot",
"pin-project",
Expand Down Expand Up @@ -1510,6 +1512,19 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-router"
version = "0.1.0"
dependencies = [
"ahash",
"futures",
"linkerd-error",
"linkerd-stack",
"parking_lot",
"thiserror",
"tracing",
]

[[package]]
name = "linkerd-server-policy"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ members = [
"linkerd/proxy/transport",
"linkerd/reconnect",
"linkerd/retry",
"linkerd/router",
"linkerd/server-policy",
"linkerd/service-profiles",
"linkerd/signal",
Expand Down
3 changes: 2 additions & 1 deletion linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ independently of the inbound and outbound proxy logic.
"""

[dependencies]
linkerd-conditional = { path = "../../conditional" }
bytes = "1"
drain = { version = "0.1", features = ["retain"] }
http = "0.2"
Expand All @@ -22,6 +21,7 @@ hyper = { version = "0.14", features = ["http1", "http2"] }
futures = { version = "0.3", default-features = false }
ipnet = "2.7"
linkerd-addr = { path = "../../addr" }
linkerd-conditional = { path = "../../conditional" }
linkerd-dns = { path = "../../dns" }
linkerd-detect = { path = "../../detect" }
linkerd-duplex = { path = "../../duplex" }
Expand All @@ -48,6 +48,7 @@ linkerd-proxy-tap = { path = "../../proxy/tap" }
linkerd-proxy-tcp = { path = "../../proxy/tcp" }
linkerd-proxy-transport = { path = "../../proxy/transport" }
linkerd-reconnect = { path = "../../reconnect" }
linkerd-router = { path = "../../router" }
linkerd-server-policy = { path = "../../server-policy" }
linkerd-service-profiles = { path = "../../service-profiles" }
linkerd-stack = { path = "../../stack" }
Expand Down
22 changes: 17 additions & 5 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ use crate::{config::BufferConfig, idle_cache, Error};
use linkerd_error::Recover;
use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream};
pub use linkerd_reconnect::NewReconnect;
pub use linkerd_stack::{
self as stack, layer, ArcNewService, BoxCloneService, BoxService, BoxServiceLayer, Either,
ExtractParam, Fail, FailFast, Filter, InsertParam, LoadShed, MakeConnection, MapErr,
MapTargetLayer, NewRouter, NewService, Param, Predicate, UnwrapOr,
};
pub use linkerd_router::{self as router, NewOneshotRoute};
pub use linkerd_stack::{self as stack, *};
use linkerd_stack::{failfast, OnService};
pub use linkerd_stack_tracing::{GetSpan, NewInstrument, NewInstrumentLayer};
use std::{
Expand Down Expand Up @@ -175,6 +172,13 @@ impl<S> Stack<S> {
self.push(stack::OnServiceLayer::new(layer))
}

/// Wraps the inner `S` with `NewCloneService` so that the stack holds a
/// `NewService` that always returns a clone of `S` regardless of the target
/// value.
pub fn push_new_clone(self) -> Stack<NewCloneService<S>> {
self.push(layer::mk(NewCloneService::from))
}

/// Wraps the inner service with a response timeout such that timeout errors are surfaced as a
/// `ConnectTimeout` error.
///
Expand Down Expand Up @@ -265,6 +269,14 @@ impl<S> Stack<S> {
self
}

pub fn check_new_new<T, U>(self) -> Self
where
S: NewService<T>,
S::Service: NewService<U>,
{
self
}

pub fn check_new_clone<T>(self) -> Self
where
S: NewService<T>,
Expand Down
11 changes: 8 additions & 3 deletions linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,11 @@ where
.with_stack(
// A router is needed so that we use each request's HTTP version
// (i.e. after server-side orig-proto downgrading).
http.push(svc::NewRouter::layer(|(_, target)| RouteHttp(target)))
http.push_on_service(svc::LoadShed::layer())
.push_new_clone()
.push(svc::NewOneshotRoute::layer_via(
|(_, target): &(_, HttpTransportHeader)| RouteHttp(target.clone()),
))
.push(inbound.authorize_http())
.push_http_insert_target::<tls::ClientId>(),
)
Expand Down Expand Up @@ -297,10 +301,11 @@ impl Param<policy::ServerLabel> for HttpTransportHeader {

// === impl RouteHttp ===

impl<B> svc::stack::RecognizeRoute<http::Request<B>> for RouteHttp<HttpTransportHeader> {
impl<B> svc::router::SelectRoute<http::Request<B>> for RouteHttp<HttpTransportHeader> {
type Key = HttpTarget;
type Error = Error;

fn recognize(&self, req: &http::Request<B>) -> Result<Self::Key, Error> {
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Error> {
let target = self.0.target.clone();
let version = req.version().try_into()?;
Ok(HttpTarget { target, version })
Expand Down
26 changes: 17 additions & 9 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ impl<C> Inbound<C> {
Response = http::Response<http::BoxBody>,
Error = Error,
Future = impl Send,
> + Clone,
> + Clone
+ Send
+ Unpin,
>,
>
where
Expand All @@ -80,8 +82,8 @@ impl<C> Inbound<C> {
+ Param<Remote<ClientAddr>>
+ Param<tls::ConditionalServerTls>
+ Param<policy::AllowPolicy>,
T: Clone + Send + 'static,
P: profiles::GetProfile<profiles::LookupAddr> + Clone + Send + Sync + 'static,
T: Clone + Send + Unpin + 'static,
P: profiles::GetProfile<profiles::LookupAddr> + Clone + Send + Sync + Unpin + 'static,
P::Future: Send,
P::Error: Send,
C: svc::MakeConnection<Http> + Clone + Send + Sync + Unpin + 'static,
Expand Down Expand Up @@ -222,10 +224,15 @@ impl<C> Inbound<C> {
}
})
// Routes each request to a target, obtains a service for that target, and
// dispatches the request. NewRouter moves the NewService into the service type, so
// minimize it's type footprint with a Box.
.push(svc::ArcNewService::layer())
.push(svc::NewRouter::layer(LogicalPerRequest::from))
// dispatches the request.
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_on_service(svc::LoadShed::layer())
.push_new_clone()
.check_new_new::<(policy::HttpRoutePermit, T), Logical>()
.push(svc::NewOneshotRoute::layer_via(|(permit, t): &(policy::HttpRoutePermit, T)| {
LogicalPerRequest::from((permit.clone(), t.clone()))
}))
.check_new_service::<(policy::HttpRoutePermit, T), http::Request<http::BoxBody>>()
.push(policy::NewHttpPolicy::layer(rt.metrics.http_authz.clone()))
// Used by tap.
.push_http_insert_target::<tls::ConditionalServerTls>()
Expand Down Expand Up @@ -267,10 +274,11 @@ where
}
}

impl<A> svc::stack::RecognizeRoute<http::Request<A>> for LogicalPerRequest {
impl<B> svc::router::SelectRoute<http::Request<B>> for LogicalPerRequest {
type Key = Logical;
type Error = Infallible;

fn recognize(&self, req: &http::Request<A>) -> Result<Self::Key> {
fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Infallible> {
use linkerd_app_core::{
http_request_authority_addr, http_request_host_addr, CANONICAL_DST_HEADER,
};
Expand Down
5 changes: 4 additions & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ linkerd-http-classify = { path = "../../http-classify" }
linkerd-http-retry = { path = "../../http-retry" }
linkerd-identity = { path = "../../identity" }
linkerd-retry = { path = "../../retry" }
linkerd-router = { path = "../../router" }
parking_lot = "0.12"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
Expand All @@ -35,7 +36,9 @@ hyper = { version = "0.14", features = ["http1", "http2"] }
linkerd-app-test = { path = "../test" }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = ["test-util"] }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
"test-util",
] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
parking_lot = "0.12"
tokio = { version = "1", features = ["macros", "sync", "time"] }
Expand Down
59 changes: 34 additions & 25 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use linkerd_app_core::{
transport::{OrigDstAddr, Remote, ServerAddr},
AddrMatch, Error, Infallible, NameAddr,
};
use linkerd_router as router;
use thiserror::Error;
use tracing::{debug, debug_span, info_span};

Expand Down Expand Up @@ -194,36 +195,20 @@ impl Outbound<svc::ArcNewHttp<http::Endpoint>> {
// many times. Forwarding stacks are not cached explicitly, as there
// are no real resources we need to share across connections. This
// allows us to avoid buffering requests to these endpoints.
.push(svc::NewRouter::layer(
|http::Accept { orig_dst, protocol }| {
move |req: &http::Request<_>| {
// Use either the override header or the original destination address.
let target =
match http::authority_from_header(req, DST_OVERRIDE_HEADER) {
None => Target::Forward(orig_dst),
Some(a) => {
let dst =
NameAddr::from_authority_with_default_port(&a, 80)
.map_err(|_| InvalidOverrideHeader)?;
Target::Override(dst)
}
};
Ok(Http {
target,
version: protocol,
})
}
},
))
.check_new_service::<Http<Target>, http::Request<_>>()
.push_on_service(svc::LoadShed::layer())
.push_new_clone()
.check_new_new::<http::Accept, Http<Target>>()
.push(router::NewOneshotRoute::layer_via(|a: &http::Accept| {
SelectTarget(*a)
}))
.check_new_service::<http::Accept, http::Request<_>>()
.push(http::NewNormalizeUri::layer())
.push_on_service(
svc::layers()
.push(http::MarkAbsoluteForm::layer())
// The concurrency-limit can force the service into
// fail-fast, but it need not be driven to readiness on a
// background task (i.e., by `SpawnReady`). Otherwise, the
// inner service is always ready (because it's a router).
.push(svc::ConcurrencyLimitLayer::new(*max_in_flight_requests))
.push(svc::LoadShed::layer())
.push(rt.metrics.http_errors.to_layer()),
)
.push(http::ServerRescue::layer(config.emit_headers))
Expand Down Expand Up @@ -258,3 +243,27 @@ impl Outbound<svc::ArcNewHttp<http::Endpoint>> {
})
}
}

#[derive(Clone, Debug)]
struct SelectTarget(http::Accept);

impl<B> svc::router::SelectRoute<http::Request<B>> for SelectTarget {
type Key = Http<Target>;
type Error = InvalidOverrideHeader;

fn select(&self, req: &http::Request<B>) -> Result<Self::Key, Self::Error> {
Ok(Http {
version: self.0.protocol,
// Use either the override header or the original destination
// address.
target: http::authority_from_header(req, DST_OVERRIDE_HEADER)
.map(|a| {
NameAddr::from_authority_with_default_port(&a, 80)
.map_err(|_| InvalidOverrideHeader)
.map(Target::Override)
})
.transpose()?
.unwrap_or(Target::Forward(self.0.orig_dst)),
})
}
}
15 changes: 15 additions & 0 deletions linkerd/router/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "linkerd-router"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
publish = false

[dependencies]
ahash = "0.8"
futures = { version = "0.3", default-features = false }
parking_lot = "0.12"
thiserror = "1"
tracing = "0.1"
linkerd-error = { path = "../error" }
linkerd-stack = { path = "../stack" }
Loading

0 comments on commit 577ad52

Please sign in to comment.