Skip to content

Commit

Permalink
distribute: Add a backend cache (#2122)
Browse files Browse the repository at this point in the history
`NewDistribute` will be used to implement per-route traffic distribution
policies so that, for instance, each request route may use a different
traffic split. In this setup, we want each instance--re-constructed as
configuration changes--to reuse a common set of concrete services so
that load balancer state need not be lost when a policy changes.

This change adds a `distribute::BackendCache` module that produces a
`NewDistribute` for an updated, cached set of backends.
  • Loading branch information
olix0r authored Jan 5, 2023
1 parent b5e1b85 commit 47678e6
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 9 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"

[[package]]
name = "ahash"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
]

[[package]]
name = "aho-corasick"
version = "0.7.20"
Expand Down Expand Up @@ -989,9 +1001,12 @@ dependencies = [
name = "linkerd-distribute"
version = "0.1.0"
dependencies = [
"ahash",
"indexmap",
"linkerd-stack",
"parking_lot",
"rand",
"tokio",
"tokio-test",
"tower-test",
"tracing",
Expand Down
3 changes: 3 additions & 0 deletions linkerd/distribute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ license = "Apache-2.0"
publish = false

[dependencies]
ahash = "0.8"
indexmap = "1"
linkerd-stack = { path = "../stack" }
parking_lot = "0.12"
rand = { version = "0.8", features = ["small_rng"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"

[dev-dependencies]
Expand Down
87 changes: 87 additions & 0 deletions linkerd/distribute/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use super::{params, NewDistribute};
use linkerd_stack::{layer, NewService, Param};
use parking_lot::Mutex;
use std::{fmt::Debug, hash::Hash, sync::Arc};

/// A [`NewService`] that produces [`NewDistribute`]s using a shared cache of
/// backends.
///
/// On each call to [`NewService::new_service`], the cache extracts a new set of
/// [`params::Backends`] from the target to determine which
/// services should be added/removed from the cache.
#[derive(Debug)]
pub struct BackendCache<K, N, S>(Arc<Inner<K, N, S>>);

#[derive(Debug)]
struct Inner<K, N, S> {
new_backend: N,
backends: Mutex<ahash::AHashMap<K, S>>,
}

// === impl BackendCache ===

impl<K, N, S> BackendCache<K, N, S> {
pub fn new(new_backend: N) -> Self {
Self(Arc::new(Inner {
new_backend,
backends: Default::default(),
}))
}

pub fn layer() -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(Self::new)
}
}

impl<T, K, N> NewService<T> for BackendCache<K, N, N::Service>
where
T: Param<params::Backends<K>>,
K: Eq + Hash + Clone + Debug,
N: NewService<K>,
N::Service: Clone,
{
type Service = NewDistribute<K, N::Service>;

fn new_service(&self, target: T) -> Self::Service {
let params::Backends(backends) = target.param();

let mut cache = self.0.backends.lock();

// Remove all backends that aren't in the updated set of addrs.
cache.retain(|backend, _| {
if backends.contains(backend) {
true
} else {
tracing::debug!(?backend, "Removing");
false
}
});

// If there are additional addrs, cache a new service for each.
debug_assert!(backends.len() >= cache.len());
if backends.len() > cache.len() {
cache.reserve(backends.len());
for backend in &*backends {
// Skip rebuilding targets we already have a stack for.
if cache.contains_key(backend) {
tracing::trace!(?backend, "Retaining");
continue;
}

tracing::debug!(?backend, "Adding");
cache.insert(
backend.clone(),
self.0.new_backend.new_service(backend.clone()),
);
}
}

NewDistribute::from(cache.clone())
}
}

impl<K, N, S> Clone for BackendCache<K, N, S> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
4 changes: 3 additions & 1 deletion linkerd/distribute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]

mod cache;
mod params;
mod service;
mod stack;

pub use self::{
params::{Distribution, WeightedKeys},
cache::BackendCache,
params::{Backends, Distribution, WeightedKeys},
service::Distribute,
stack::NewDistribute,
};
28 changes: 27 additions & 1 deletion linkerd/distribute/src/params.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use ahash::AHashSet;
use rand::distributions::{WeightedError, WeightedIndex};
use std::sync::Arc;
use std::{fmt::Debug, hash::Hash, sync::Arc};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Backends<K>(pub(crate) Arc<AHashSet<K>>)
where
K: Eq + Hash + Clone + Debug;

/// A parameter type that configures how a [`Distribute`] should behave.
///
Expand All @@ -23,6 +29,26 @@ pub struct WeightedKeys<K> {
index: WeightedIndex<u32>,
}

// === impl Backends ===

impl<K> From<Arc<AHashSet<K>>> for Backends<K>
where
K: Eq + Hash + Clone + Debug + Send + Sync + 'static,
{
fn from(inner: Arc<AHashSet<K>>) -> Self {
Self(inner)
}
}

impl<K> FromIterator<K> for Backends<K>
where
K: Eq + Hash + Clone + Debug + Send + Sync + 'static,
{
fn from_iter<T: IntoIterator<Item = K>>(iter: T) -> Self {
Self(Arc::new(iter.into_iter().collect()))
}
}

// === impl Distribution ===

impl<K> From<K> for Distribution<K> {
Expand Down
12 changes: 12 additions & 0 deletions linkerd/distribute/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ impl<K: Clone, S: Clone> Clone for Distribute<K, S> {
}
}

impl<K, S> Default for Distribute<K, S> {
/// Returns an empty distribution. This distribution will never become
/// ready.
fn default() -> Self {
Self {
backends: Default::default(),
selection: Selection::Empty,
ready_idx: None,
}
}
}

// === impl Selection ===

impl<K> From<Distribution<K>> for Selection<K> {
Expand Down
15 changes: 8 additions & 7 deletions linkerd/distribute/src/stack.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
use crate::{Distribute, Distribution};
use ahash::AHashMap;
use linkerd_stack::{NewService, Param};
use std::{collections::HashMap, hash::Hash, sync::Arc};
use std::{hash::Hash, sync::Arc};

/// Builds `Distribute` services for a specific `Distribution`.
#[derive(Clone, Debug)]
pub struct NewDistribute<K, S> {
/// All potential backends.
all_backends: Arc<HashMap<K, S>>,
all_backends: Arc<AHashMap<K, S>>,
}

// === impl NewDistribute ===

impl<K, S> From<Arc<HashMap<K, S>>> for NewDistribute<K, S> {
fn from(all_backends: Arc<HashMap<K, S>>) -> Self {
impl<K, S> From<Arc<AHashMap<K, S>>> for NewDistribute<K, S> {
fn from(all_backends: Arc<AHashMap<K, S>>) -> Self {
Self { all_backends }
}
}

impl<K, S> From<HashMap<K, S>> for NewDistribute<K, S> {
fn from(backends: HashMap<K, S>) -> Self {
impl<K, S> From<AHashMap<K, S>> for NewDistribute<K, S> {
fn from(backends: AHashMap<K, S>) -> Self {
Arc::new(backends).into()
}
}

impl<K: Hash + Eq, S> FromIterator<(K, S)> for NewDistribute<K, S> {
fn from_iter<T: IntoIterator<Item = (K, S)>>(iter: T) -> Self {
iter.into_iter().collect::<HashMap<_, _>>().into()
iter.into_iter().collect::<AHashMap<_, _>>().into()
}
}

Expand Down

0 comments on commit 47678e6

Please sign in to comment.