From 577ad5285a55c344beba066b040c11bd1d828f49 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 6 Jan 2023 12:28:15 -0800 Subject: [PATCH] Support router-scoped caches (#2128) `stack::NewRouter` reuses its inner stack for all `Router` instances. This makes it difficult to implement a cache that is scoped just to the router (and not shared across all routers). This change: * Removes the `linkerd_stack::router` module; * Adds the `linkerd-router` crate; * Adds an `ahash` cache implementation to the router crate with a convenience layer; * The `RecognizeRoute` is now called `SelectRoute`; * `Router` is now called `OneshotRoute`; * `NewOneshotRoute` now can use an `ExtractParam` to acquire a `SelectRoute`; and * A `NewCloneService` module is added to support lifting a stack to be shared across routes (i.e. to preserve the prior behavior). --- Cargo.lock | 15 ++ Cargo.toml | 1 + linkerd/app/core/Cargo.toml | 3 +- linkerd/app/core/src/svc.rs | 22 ++- linkerd/app/gateway/src/lib.rs | 11 +- linkerd/app/inbound/src/http/router.rs | 26 ++- linkerd/app/outbound/Cargo.toml | 5 +- linkerd/app/outbound/src/ingress.rs | 59 ++++--- linkerd/router/Cargo.toml | 15 ++ linkerd/router/src/cache.rs | 94 +++++++++++ linkerd/router/src/lib.rs | 163 +++++++++++++++++++ linkerd/service-profiles/src/http/proxy.rs | 4 +- linkerd/service-profiles/src/http/service.rs | 6 +- linkerd/stack/src/lib.rs | 4 +- linkerd/stack/src/new_service.rs | 19 +++ linkerd/stack/src/router.rs | 101 ------------ 16 files changed, 395 insertions(+), 153 deletions(-) create mode 100644 linkerd/router/Cargo.toml create mode 100644 linkerd/router/src/cache.rs create mode 100644 linkerd/router/src/lib.rs delete mode 100644 linkerd/stack/src/router.rs diff --git a/Cargo.lock b/Cargo.lock index 6289a24cb0..b925dac1b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -822,6 +822,7 @@ dependencies = [ "linkerd-proxy-tcp", "linkerd-proxy-transport", "linkerd-reconnect", + "linkerd-router", "linkerd-server-policy", "linkerd-service-profiles", "linkerd-stack", @@ -945,6 +946,7 @@ dependencies = [ "linkerd-meshtls", "linkerd-meshtls-rustls", "linkerd-retry", + "linkerd-router", "linkerd-tracing", "parking_lot", "pin-project", @@ -1510,6 +1512,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-router" +version = "0.1.0" +dependencies = [ + "ahash", + "futures", + "linkerd-error", + "linkerd-stack", + "parking_lot", + "thiserror", + "tracing", +] + [[package]] name = "linkerd-server-policy" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 620c9322c6..315407349c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ members = [ "linkerd/proxy/transport", "linkerd/reconnect", "linkerd/retry", + "linkerd/router", "linkerd/server-policy", "linkerd/service-profiles", "linkerd/signal", diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 438c2ec0ed..0adc5a364c 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -13,7 +13,6 @@ independently of the inbound and outbound proxy logic. """ [dependencies] -linkerd-conditional = { path = "../../conditional" } bytes = "1" drain = { version = "0.1", features = ["retain"] } http = "0.2" @@ -22,6 +21,7 @@ hyper = { version = "0.14", features = ["http1", "http2"] } futures = { version = "0.3", default-features = false } ipnet = "2.7" linkerd-addr = { path = "../../addr" } +linkerd-conditional = { path = "../../conditional" } linkerd-dns = { path = "../../dns" } linkerd-detect = { path = "../../detect" } linkerd-duplex = { path = "../../duplex" } @@ -48,6 +48,7 @@ linkerd-proxy-tap = { path = "../../proxy/tap" } linkerd-proxy-tcp = { path = "../../proxy/tcp" } linkerd-proxy-transport = { path = "../../proxy/transport" } linkerd-reconnect = { path = "../../reconnect" } +linkerd-router = { path = "../../router" } linkerd-server-policy = { path = "../../server-policy" } linkerd-service-profiles = { path = "../../service-profiles" } linkerd-stack = { path = "../../stack" } diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index f444b176c6..bfad621159 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -5,11 +5,8 @@ use crate::{config::BufferConfig, idle_cache, Error}; use linkerd_error::Recover; use linkerd_exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}; pub use linkerd_reconnect::NewReconnect; -pub use linkerd_stack::{ - self as stack, layer, ArcNewService, BoxCloneService, BoxService, BoxServiceLayer, Either, - ExtractParam, Fail, FailFast, Filter, InsertParam, LoadShed, MakeConnection, MapErr, - MapTargetLayer, NewRouter, NewService, Param, Predicate, UnwrapOr, -}; +pub use linkerd_router::{self as router, NewOneshotRoute}; +pub use linkerd_stack::{self as stack, *}; use linkerd_stack::{failfast, OnService}; pub use linkerd_stack_tracing::{GetSpan, NewInstrument, NewInstrumentLayer}; use std::{ @@ -175,6 +172,13 @@ impl Stack { self.push(stack::OnServiceLayer::new(layer)) } + /// Wraps the inner `S` with `NewCloneService` so that the stack holds a + /// `NewService` that always returns a clone of `S` regardless of the target + /// value. + pub fn push_new_clone(self) -> Stack> { + self.push(layer::mk(NewCloneService::from)) + } + /// Wraps the inner service with a response timeout such that timeout errors are surfaced as a /// `ConnectTimeout` error. /// @@ -265,6 +269,14 @@ impl Stack { self } + pub fn check_new_new(self) -> Self + where + S: NewService, + S::Service: NewService, + { + self + } + pub fn check_new_clone(self) -> Self where S: NewService, diff --git a/linkerd/app/gateway/src/lib.rs b/linkerd/app/gateway/src/lib.rs index 9d75cb5210..47988e729e 100644 --- a/linkerd/app/gateway/src/lib.rs +++ b/linkerd/app/gateway/src/lib.rs @@ -203,7 +203,11 @@ where .with_stack( // A router is needed so that we use each request's HTTP version // (i.e. after server-side orig-proto downgrading). - http.push(svc::NewRouter::layer(|(_, target)| RouteHttp(target))) + http.push_on_service(svc::LoadShed::layer()) + .push_new_clone() + .push(svc::NewOneshotRoute::layer_via( + |(_, target): &(_, HttpTransportHeader)| RouteHttp(target.clone()), + )) .push(inbound.authorize_http()) .push_http_insert_target::(), ) @@ -297,10 +301,11 @@ impl Param for HttpTransportHeader { // === impl RouteHttp === -impl svc::stack::RecognizeRoute> for RouteHttp { +impl svc::router::SelectRoute> for RouteHttp { type Key = HttpTarget; + type Error = Error; - fn recognize(&self, req: &http::Request) -> Result { + fn select(&self, req: &http::Request) -> Result { let target = self.0.target.clone(); let version = req.version().try_into()?; Ok(HttpTarget { target, version }) diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index c363796b89..7327970456 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -71,7 +71,9 @@ impl Inbound { Response = http::Response, Error = Error, Future = impl Send, - > + Clone, + > + Clone + + Send + + Unpin, >, > where @@ -80,8 +82,8 @@ impl Inbound { + Param> + Param + Param, - T: Clone + Send + 'static, - P: profiles::GetProfile + Clone + Send + Sync + 'static, + T: Clone + Send + Unpin + 'static, + P: profiles::GetProfile + Clone + Send + Sync + Unpin + 'static, P::Future: Send, P::Error: Send, C: svc::MakeConnection + Clone + Send + Sync + Unpin + 'static, @@ -222,10 +224,15 @@ impl Inbound { } }) // Routes each request to a target, obtains a service for that target, and - // dispatches the request. NewRouter moves the NewService into the service type, so - // minimize it's type footprint with a Box. - .push(svc::ArcNewService::layer()) - .push(svc::NewRouter::layer(LogicalPerRequest::from)) + // dispatches the request. + .check_new_service::>() + .push_on_service(svc::LoadShed::layer()) + .push_new_clone() + .check_new_new::<(policy::HttpRoutePermit, T), Logical>() + .push(svc::NewOneshotRoute::layer_via(|(permit, t): &(policy::HttpRoutePermit, T)| { + LogicalPerRequest::from((permit.clone(), t.clone())) + })) + .check_new_service::<(policy::HttpRoutePermit, T), http::Request>() .push(policy::NewHttpPolicy::layer(rt.metrics.http_authz.clone())) // Used by tap. .push_http_insert_target::() @@ -267,10 +274,11 @@ where } } -impl svc::stack::RecognizeRoute> for LogicalPerRequest { +impl svc::router::SelectRoute> for LogicalPerRequest { type Key = Logical; + type Error = Infallible; - fn recognize(&self, req: &http::Request) -> Result { + fn select(&self, req: &http::Request) -> Result { use linkerd_app_core::{ http_request_authority_addr, http_request_host_addr, CANONICAL_DST_HEADER, }; diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 8ed979980b..9a34742015 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -23,6 +23,7 @@ linkerd-http-classify = { path = "../../http-classify" } linkerd-http-retry = { path = "../../http-retry" } linkerd-identity = { path = "../../identity" } linkerd-retry = { path = "../../retry" } +linkerd-router = { path = "../../router" } parking_lot = "0.12" thiserror = "1" tokio = { version = "1", features = ["sync"] } @@ -35,7 +36,9 @@ hyper = { version = "0.14", features = ["http1", "http2"] } linkerd-app-test = { path = "../test" } linkerd-io = { path = "../../io", features = ["tokio-test"] } linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] } -linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = ["test-util"] } +linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ + "test-util", +] } linkerd-tracing = { path = "../../tracing", features = ["ansi"] } parking_lot = "0.12" tokio = { version = "1", features = ["macros", "sync", "time"] } diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 7523a3f9b6..9a359bb295 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -11,6 +11,7 @@ use linkerd_app_core::{ transport::{OrigDstAddr, Remote, ServerAddr}, AddrMatch, Error, Infallible, NameAddr, }; +use linkerd_router as router; use thiserror::Error; use tracing::{debug, debug_span, info_span}; @@ -194,36 +195,20 @@ impl Outbound> { // many times. Forwarding stacks are not cached explicitly, as there // are no real resources we need to share across connections. This // allows us to avoid buffering requests to these endpoints. - .push(svc::NewRouter::layer( - |http::Accept { orig_dst, protocol }| { - move |req: &http::Request<_>| { - // Use either the override header or the original destination address. - let target = - match http::authority_from_header(req, DST_OVERRIDE_HEADER) { - None => Target::Forward(orig_dst), - Some(a) => { - let dst = - NameAddr::from_authority_with_default_port(&a, 80) - .map_err(|_| InvalidOverrideHeader)?; - Target::Override(dst) - } - }; - Ok(Http { - target, - version: protocol, - }) - } - }, - )) + .check_new_service::, http::Request<_>>() + .push_on_service(svc::LoadShed::layer()) + .push_new_clone() + .check_new_new::>() + .push(router::NewOneshotRoute::layer_via(|a: &http::Accept| { + SelectTarget(*a) + })) + .check_new_service::>() .push(http::NewNormalizeUri::layer()) .push_on_service( svc::layers() .push(http::MarkAbsoluteForm::layer()) - // The concurrency-limit can force the service into - // fail-fast, but it need not be driven to readiness on a - // background task (i.e., by `SpawnReady`). Otherwise, the - // inner service is always ready (because it's a router). .push(svc::ConcurrencyLimitLayer::new(*max_in_flight_requests)) + .push(svc::LoadShed::layer()) .push(rt.metrics.http_errors.to_layer()), ) .push(http::ServerRescue::layer(config.emit_headers)) @@ -258,3 +243,27 @@ impl Outbound> { }) } } + +#[derive(Clone, Debug)] +struct SelectTarget(http::Accept); + +impl svc::router::SelectRoute> for SelectTarget { + type Key = Http; + type Error = InvalidOverrideHeader; + + fn select(&self, req: &http::Request) -> Result { + Ok(Http { + version: self.0.protocol, + // Use either the override header or the original destination + // address. + target: http::authority_from_header(req, DST_OVERRIDE_HEADER) + .map(|a| { + NameAddr::from_authority_with_default_port(&a, 80) + .map_err(|_| InvalidOverrideHeader) + .map(Target::Override) + }) + .transpose()? + .unwrap_or(Target::Forward(self.0.orig_dst)), + }) + } +} diff --git a/linkerd/router/Cargo.toml b/linkerd/router/Cargo.toml new file mode 100644 index 0000000000..2208766780 --- /dev/null +++ b/linkerd/router/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "linkerd-router" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +publish = false + +[dependencies] +ahash = "0.8" +futures = { version = "0.3", default-features = false } +parking_lot = "0.12" +thiserror = "1" +tracing = "0.1" +linkerd-error = { path = "../error" } +linkerd-stack = { path = "../stack" } diff --git a/linkerd/router/src/cache.rs b/linkerd/router/src/cache.rs new file mode 100644 index 0000000000..4c83ba3fc6 --- /dev/null +++ b/linkerd/router/src/cache.rs @@ -0,0 +1,94 @@ +use crate::NewService; +use ahash::AHashMap; +use parking_lot::Mutex; +use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; + +/// A [`NewService`] that produces [`Cache`]s. +#[derive(Debug)] +pub struct NewCache { + inner: N, + _marker: PhantomData, +} + +/// A [`NewService`] that lazily builds an inner `S`-typed service for each +/// `K`-typed target. +/// +/// An inner service is built once for each `K`-typed target. The inner service +/// is then cloned for each `K` value. It is not dropped until all clones of the +/// cache are dropped. +#[derive(Debug)] +pub struct Cache { + new_inner: N, + services: Arc>>, +} + +// === impl NewCache === + +impl NewCache { + pub fn new(inner: N) -> Self { + Self { + inner, + _marker: PhantomData, + } + } +} + +impl NewService for NewCache +where + T: Clone, + N: NewService, + M: NewService, +{ + type Service = Cache; + + #[inline] + fn new_service(&self, target: T) -> Self::Service { + Cache::new(self.inner.new_service(target)) + } +} + +impl Clone for NewCache { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _marker: self._marker, + } + } +} + +// === impl Cache === + +impl Cache { + pub(super) fn new(new_inner: N) -> Self { + Self { + new_inner, + services: Default::default(), + } + } +} + +impl NewService for Cache +where + K: Eq + Hash + Clone, + N: NewService, + N::Service: Clone, +{ + type Service = N::Service; + + fn new_service(&self, key: K) -> Self::Service { + self.services + .lock() + .entry(key.clone()) + .or_insert_with(|| self.new_inner.new_service(key)) + .clone() + } +} + +impl Clone for Cache { + fn clone(&self) -> Self { + Self { + new_inner: self.new_inner.clone(), + services: self.services.clone(), + } + } +} diff --git a/linkerd/router/src/lib.rs b/linkerd/router/src/lib.rs new file mode 100644 index 0000000000..324539628b --- /dev/null +++ b/linkerd/router/src/lib.rs @@ -0,0 +1,163 @@ +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +use futures::{future, prelude::*}; +use linkerd_error::Error; +use linkerd_stack::{layer, ExtractParam, NewService, Oneshot, Service, ServiceExt}; +use std::{ + fmt::Debug, + marker::PhantomData, + task::{Context, Poll}, +}; +use tracing::debug; + +mod cache; + +pub use self::cache::{Cache, NewCache}; + +pub trait SelectRoute { + type Key; + type Error: Into; + + /// Given a a request, returns the key matching this request. + /// + /// If no route matches the request, this method returns an error. + fn select(&self, req: &Req) -> Result; +} + +/// A [`NewService`] that builds `Route` services for targets that provide a +/// [`SelectRoute`]. +/// +/// The selector is built with an `X`-typed [`ExtractParam`] implementation. +#[derive(Debug)] +pub struct NewOneshotRoute { + extract: X, + inner: N, + _marker: PhantomData Sel>, +} + +/// Dispatches requests to a new `S`-typed inner service. +/// +/// Each request is matched against the route table and routed to a new inner +/// service via a [`Oneshot`]. +#[derive(Clone, Debug)] +pub struct OneshotRoute { + select: Sel, + new_route: N, +} + +// === impl NewOneshotRoute === + +impl NewOneshotRoute { + pub fn new(extract: X, inner: N) -> Self { + Self { + extract, + inner, + _marker: PhantomData, + } + } +} + +impl NewOneshotRoute { + /// Builds a [`layer::Layer`] that produces `NewOneshotRoute`s. + /// + /// Targets must produce a `Sel` via the provided `X`-typed + /// [`ExtractParam`]. + pub fn layer_via(extract: X) -> impl layer::Layer + Clone { + layer::mk(move |inner| Self::new(extract.clone(), inner)) + } +} + +impl NewOneshotRoute { + /// Builds a [`layer::Layer`] that produces `NewOneshotRoute`s. + /// + /// Target types must implement `Param`. + pub fn layer() -> impl layer::Layer + Clone { + Self::layer_via(()) + } +} + +impl NewOneshotRoute> { + /// Builds a [`layer::Layer`] that produces `NewOneshotRoute`s using a cache + /// of inner services. + /// + /// Target types must implement `Param`. + pub fn layer_cached() -> impl layer::Layer + Clone { + layer::mk(move |inner: N| Self::new((), NewCache::new(inner))) + } +} + +impl NewService for NewOneshotRoute +where + X: ExtractParam, + N: NewService, +{ + type Service = OneshotRoute; + + fn new_service(&self, target: T) -> Self::Service { + let select = self.extract.extract_param(&target); + let new_route = self.inner.new_service(target); + OneshotRoute { select, new_route } + } +} + +impl Clone for NewOneshotRoute { + fn clone(&self) -> Self { + Self { + extract: self.extract.clone(), + inner: self.inner.clone(), + _marker: PhantomData, + } + } +} + +// === impl OneshotRoute === + +impl Service for OneshotRoute +where + Sel: SelectRoute, + N: NewService, + S: Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = Error; + type Future = future::Either< + future::MapErr, fn(S::Error) -> Error>, + future::Ready>, + >; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + match self.select.select(&req) { + Ok(key) => future::Either::Left({ + let route = self.new_route.new_service(key); + route.oneshot(req).map_err(Into::into) + }), + Err(e) => future::Either::Right({ + let error = e.into(); + debug!(%error, "Failed to route request"); + future::err(error) + }), + } + } +} + +// === impl SelectRoute === + +impl SelectRoute for F +where + K: Clone, + E: std::error::Error + Send + Sync + 'static, + F: Fn(&T) -> Result, +{ + type Key = K; + type Error = E; + + fn select(&self, t: &T) -> Result { + (self)(t) + } +} diff --git a/linkerd/service-profiles/src/http/proxy.rs b/linkerd/service-profiles/src/http/proxy.rs index bd77735712..a4607d28fd 100644 --- a/linkerd/service-profiles/src/http/proxy.rs +++ b/linkerd/service-profiles/src/http/proxy.rs @@ -13,8 +13,8 @@ use tracing::{debug, trace}; /// A router that uses a per-route `Proxy` to wrap a common underlying /// `Service`. /// -/// This router is similar to `linkerd_stack::NewRouter` and -/// `linkerd_cache::Cache` with a few differences: +/// This router is similar to `linkerd_router::NewOneshotRoute` and +/// `linkerd_router::Cache` with a few differences: /// /// * It's `Proxy`-specific; /// * Routes are constructed eagerly as the profile updates; diff --git a/linkerd/service-profiles/src/http/service.rs b/linkerd/service-profiles/src/http/service.rs index 3b19b04008..320cc63b81 100644 --- a/linkerd/service-profiles/src/http/service.rs +++ b/linkerd/service-profiles/src/http/service.rs @@ -9,11 +9,11 @@ use std::{ }; use tracing::{debug, trace}; -/// A router that uses a per-route `Service` (with a fallback service when no +/// A router that uses a per-route [`Service`] (with a fallback service when no /// route is matched). /// -/// This router is similar to `linkerd_stack::NewRouter` and -/// `linkerd_cache::Cache` with a few differences: +/// This router is similar to `linkerd_router::NewOneshotRoute` and +/// `linkerd_router::Cache` with a few differences: /// /// * Routes are constructed eagerly as the profile updates; /// * Routes are removed eagerly as the profile updates (i.e. there's no diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index 5c192938a5..62fe3e04fd 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -23,7 +23,6 @@ pub mod new_service; mod on_service; pub mod proxy; mod result; -mod router; mod switch_ready; mod timeout; mod unwrap_or; @@ -45,11 +44,10 @@ pub use self::{ map_err::MapErr, map_target::{MapTarget, MapTargetLayer, MapTargetService}, monitor::{Monitor, MonitorError, MonitorNewService, MonitorService, NewMonitor}, - new_service::NewService, + new_service::{NewCloneService, NewService}, on_service::{OnService, OnServiceLayer}, proxy::Proxy, result::ResultService, - router::{NewRouter, RecognizeRoute}, switch_ready::{NewSwitchReady, SwitchReady}, timeout::{Timeout, TimeoutError}, unwrap_or::UnwrapOr, diff --git a/linkerd/stack/src/new_service.rs b/linkerd/stack/src/new_service.rs index 58e2a24fad..eb7f0ba0b0 100644 --- a/linkerd/stack/src/new_service.rs +++ b/linkerd/stack/src/new_service.rs @@ -18,6 +18,9 @@ pub struct FromMakeService { make_service: S, } +#[derive(Clone, Debug)] +pub struct NewCloneService(S); + // === impl NewService === impl NewService for F @@ -49,3 +52,19 @@ where FutureService::new(self.make_service.clone().oneshot(target)) } } + +// === impl NewCloneService === + +impl From for NewCloneService { + fn from(inner: S) -> Self { + Self(inner) + } +} + +impl NewService for NewCloneService { + type Service = S; + + fn new_service(&self, _: T) -> Self::Service { + self.0.clone() + } +} diff --git a/linkerd/stack/src/router.rs b/linkerd/stack/src/router.rs deleted file mode 100644 index 473e837459..0000000000 --- a/linkerd/stack/src/router.rs +++ /dev/null @@ -1,101 +0,0 @@ -use crate::{layer, NewService}; -use futures::{future, TryFutureExt}; -use linkerd_error::Error; -use std::task::{Context, Poll}; -use tower::util::{Oneshot, ServiceExt}; - -/// Determines the router key for each `T`-typed target. -pub trait RecognizeRoute { - type Key: Clone; - - fn recognize(&self, t: &T) -> Result; -} - -#[derive(Clone, Debug)] -pub struct NewRouter { - new_recognize: T, - inner: N, -} - -#[derive(Clone, Debug)] -pub struct Router { - recognize: T, - inner: N, -} - -impl NewRouter { - fn new(new_recognize: K, inner: N) -> Self { - Self { - new_recognize, - inner, - } - } - - /// Creates a layer that produces Routers. - /// - /// The provided `new_recognize` is expected to implement a `NewService` that - /// produces `Recognize` implementations. - pub fn layer(new_recognize: K) -> impl layer::Layer + Clone - where - K: Clone, - { - layer::mk(move |inner| Self::new(new_recognize.clone(), inner)) - } -} - -impl NewService for NewRouter -where - K: NewService, - N: Clone, -{ - type Service = Router; - - fn new_service(&self, t: T) -> Self::Service { - Router { - recognize: self.new_recognize.new_service(t), - inner: self.inner.clone(), - } - } -} - -impl tower::Service for Router -where - K: RecognizeRoute, - N: NewService, - S: tower::Service, - S::Error: Into, -{ - type Response = S::Response; - type Error = Error; - type Future = future::Either< - future::ErrInto, Error>, - future::Ready>, - >; - - #[inline] - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Req) -> Self::Future { - match self.recognize.recognize(&req) { - Ok(key) => { - let svc = self.inner.new_service(key); - future::Either::Left(svc.oneshot(req).err_into::()) - } - Err(e) => future::Either::Right(future::err(e)), - } - } -} - -impl RecognizeRoute for F -where - K: Clone, - F: Fn(&T) -> Result, -{ - type Key = K; - - fn recognize(&self, t: &T) -> Result { - (self)(t) - } -}