diff --git a/Cargo.lock b/Cargo.lock index 989023f251..46c5b9351c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,6 +14,18 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "ahash" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107" +dependencies = [ + "cfg-if", + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -989,9 +1001,12 @@ dependencies = [ name = "linkerd-distribute" version = "0.1.0" dependencies = [ + "ahash", "indexmap", "linkerd-stack", + "parking_lot", "rand", + "tokio", "tokio-test", "tower-test", "tracing", diff --git a/linkerd/distribute/Cargo.toml b/linkerd/distribute/Cargo.toml index 9b25a26b03..5c793030b9 100644 --- a/linkerd/distribute/Cargo.toml +++ b/linkerd/distribute/Cargo.toml @@ -6,9 +6,12 @@ license = "Apache-2.0" publish = false [dependencies] +ahash = "0.8" indexmap = "1" linkerd-stack = { path = "../stack" } +parking_lot = "0.12" rand = { version = "0.8", features = ["small_rng"] } +tokio = { version = "1", features = ["macros"] } tracing = "0.1" [dev-dependencies] diff --git a/linkerd/distribute/src/cache.rs b/linkerd/distribute/src/cache.rs new file mode 100644 index 0000000000..34359c5009 --- /dev/null +++ b/linkerd/distribute/src/cache.rs @@ -0,0 +1,87 @@ +use super::{params, NewDistribute}; +use linkerd_stack::{layer, NewService, Param}; +use parking_lot::Mutex; +use std::{fmt::Debug, hash::Hash, sync::Arc}; + +/// A [`NewService`] that produces [`NewDistribute`]s using a shared cache of +/// backends. +/// +/// On each call to [`NewService::new_service`], the cache extracts a new set of +/// [`params::Backends`] from the target to determine which +/// services should be added/removed from the cache. +#[derive(Debug)] +pub struct BackendCache(Arc>); + +#[derive(Debug)] +struct Inner { + new_backend: N, + backends: Mutex>, +} + +// === impl BackendCache === + +impl BackendCache { + pub fn new(new_backend: N) -> Self { + Self(Arc::new(Inner { + new_backend, + backends: Default::default(), + })) + } + + pub fn layer() -> impl layer::Layer + Clone { + layer::mk(Self::new) + } +} + +impl NewService for BackendCache +where + T: Param>, + K: Eq + Hash + Clone + Debug, + N: NewService, + N::Service: Clone, +{ + type Service = NewDistribute; + + fn new_service(&self, target: T) -> Self::Service { + let params::Backends(backends) = target.param(); + + let mut cache = self.0.backends.lock(); + + // Remove all backends that aren't in the updated set of addrs. + cache.retain(|backend, _| { + if backends.contains(backend) { + true + } else { + tracing::debug!(?backend, "Removing"); + false + } + }); + + // If there are additional addrs, cache a new service for each. + debug_assert!(backends.len() >= cache.len()); + if backends.len() > cache.len() { + cache.reserve(backends.len()); + for backend in &*backends { + // Skip rebuilding targets we already have a stack for. + if cache.contains_key(backend) { + tracing::trace!(?backend, "Retaining"); + continue; + } + + tracing::debug!(?backend, "Adding"); + cache.insert( + backend.clone(), + self.0.new_backend.new_service(backend.clone()), + ); + } + } + + NewDistribute::from(cache.clone()) + } +} + +impl Clone for BackendCache { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/linkerd/distribute/src/lib.rs b/linkerd/distribute/src/lib.rs index c71452e9a3..bed753f1d5 100644 --- a/linkerd/distribute/src/lib.rs +++ b/linkerd/distribute/src/lib.rs @@ -3,12 +3,14 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] +mod cache; mod params; mod service; mod stack; pub use self::{ - params::{Distribution, WeightedKeys}, + cache::BackendCache, + params::{Backends, Distribution, WeightedKeys}, service::Distribute, stack::NewDistribute, }; diff --git a/linkerd/distribute/src/params.rs b/linkerd/distribute/src/params.rs index 762c92b92e..727427e72c 100644 --- a/linkerd/distribute/src/params.rs +++ b/linkerd/distribute/src/params.rs @@ -1,5 +1,11 @@ +use ahash::AHashSet; use rand::distributions::{WeightedError, WeightedIndex}; -use std::sync::Arc; +use std::{fmt::Debug, hash::Hash, sync::Arc}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Backends(pub(crate) Arc>) +where + K: Eq + Hash + Clone + Debug; /// A parameter type that configures how a [`Distribute`] should behave. /// @@ -23,6 +29,26 @@ pub struct WeightedKeys { index: WeightedIndex, } +// === impl Backends === + +impl From>> for Backends +where + K: Eq + Hash + Clone + Debug + Send + Sync + 'static, +{ + fn from(inner: Arc>) -> Self { + Self(inner) + } +} + +impl FromIterator for Backends +where + K: Eq + Hash + Clone + Debug + Send + Sync + 'static, +{ + fn from_iter>(iter: T) -> Self { + Self(Arc::new(iter.into_iter().collect())) + } +} + // === impl Distribution === impl From for Distribution { diff --git a/linkerd/distribute/src/service.rs b/linkerd/distribute/src/service.rs index 706a0d8a0e..e0cd1ade87 100644 --- a/linkerd/distribute/src/service.rs +++ b/linkerd/distribute/src/service.rs @@ -150,6 +150,18 @@ impl Clone for Distribute { } } +impl Default for Distribute { + /// Returns an empty distribution. This distribution will never become + /// ready. + fn default() -> Self { + Self { + backends: Default::default(), + selection: Selection::Empty, + ready_idx: None, + } + } +} + // === impl Selection === impl From> for Selection { diff --git a/linkerd/distribute/src/stack.rs b/linkerd/distribute/src/stack.rs index e57a07c1c8..32351047fd 100644 --- a/linkerd/distribute/src/stack.rs +++ b/linkerd/distribute/src/stack.rs @@ -1,31 +1,32 @@ use crate::{Distribute, Distribution}; +use ahash::AHashMap; use linkerd_stack::{NewService, Param}; -use std::{collections::HashMap, hash::Hash, sync::Arc}; +use std::{hash::Hash, sync::Arc}; /// Builds `Distribute` services for a specific `Distribution`. #[derive(Clone, Debug)] pub struct NewDistribute { /// All potential backends. - all_backends: Arc>, + all_backends: Arc>, } // === impl NewDistribute === -impl From>> for NewDistribute { - fn from(all_backends: Arc>) -> Self { +impl From>> for NewDistribute { + fn from(all_backends: Arc>) -> Self { Self { all_backends } } } -impl From> for NewDistribute { - fn from(backends: HashMap) -> Self { +impl From> for NewDistribute { + fn from(backends: AHashMap) -> Self { Arc::new(backends).into() } } impl FromIterator<(K, S)> for NewDistribute { fn from_iter>(iter: T) -> Self { - iter.into_iter().collect::>().into() + iter.into_iter().collect::>().into() } }