Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update linkerd2-cache and linkerd2-lock to std::future #490

Merged
merged 15 commits into from
May 4, 2020
120 changes: 58 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ debug = false
webpki = { git = "https://github.com/seanmonstar/webpki", branch = "cert-dns-names-0.21" }
# backport danburkert/prost#268 to `prost` 0.5 temporarily.
prost = { git = "https://github.com/linkerd/prost", branch = "v0.5.x" }
tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"}
tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"}
tokio = { version = "0.2", git = "https://github.com/tokio-rs/tokio", rev = "45773c56413267cbcf9d5e7877e8dc4afc1e5b07"}
19 changes: 10 additions & 9 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,16 @@ impl<S> Stack<S> {
// self.push(http::insert::target::layer())
// }

// pub fn cache<T, L, U>(self, track: L) -> Stack<cache::Cache<T, cache::layer::NewTrack<L, S>>>
// where
// T: Eq + std::hash::Hash,
// S: NewService<T> + Clone,
// L: tower::layer::Layer<cache::layer::Track<S>> + Clone,
// L::Service: NewService<T, Service = U>,
// {
// self.push(cache::CacheLayer::new(track))
// }
pub fn cache<T, L, U>(self, track: L) -> Stack<cache::Cache<T, cache::layer::NewTrack<L, S>>>
where
T: Eq + std::hash::Hash + 'static,
S: NewService<T> + Clone,
S::Service: 'static,
L: tower::layer::Layer<cache::layer::Track<S>> + Clone,
L::Service: NewService<T, Service = U>,
{
self.push(cache::CacheLayer::new(track))
}

pub fn push_fallback<F: Clone>(self, fallback: F) -> Stack<stack::Fallback<S, F>> {
self.push(stack::FallbackLayer::new(fallback))
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ tower = "0.1"
tower-grpc = { version = "0.1", default-features = false, features = ["protobuf"] }
tracing = "0.1.9"
tracing-futures = "0.1"
webpki = "0.21"
webpki = "=0.21.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, i published an updated version of our fork a while ago https://github.com/linkerd/webpki/tree/cert-dns-names-0.21

futures-03 = { package = "futures", version = "0.3", features = ["compat"]}

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions linkerd/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ edition = "2018"
publish = false

[dependencies]
futures = "0.1"
futures = "0.3"
linkerd2-error = { path = "../error" }
linkerd2-lock = { path = "../lock" }
linkerd2-stack = { path = "../stack" }
tokio = "0.1"
tower = "0.1"
tokio = "0.2"
tower = { version = "0.3", default-features = false, features = ["util"] }
tracing = "0.1"
tracing-futures = "0.1"
34 changes: 19 additions & 15 deletions linkerd/cache/benches/cache.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#![feature(test)]

extern crate linkerd2_cache;
use futures::{future, Async, Future, Poll};
use futures::future;
use linkerd2_cache::{Cache, Handle};
use linkerd2_error::Never;
use linkerd2_stack::NewService;
use std::future::Future;
use std::task::{Context, Poll};
use tower::util::ServiceExt;
use tower::Service;

extern crate test;
Expand Down Expand Up @@ -37,10 +38,10 @@ struct NewSometimesEvict;
impl Service<usize> for NeverEvict {
type Response = usize;
type Error = Never;
type Future = futures::future::FutureResult<Self::Response, Self::Error>;
type Future = future::Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, rhs: usize) -> Self::Future {
Expand All @@ -62,10 +63,10 @@ impl NewService<(usize, Handle)> for NewNeverEvict {
impl Service<usize> for AlwaysEvict {
type Response = usize;
type Error = Never;
type Future = futures::future::FutureResult<Self::Response, Self::Error>;
type Future = future::Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, rhs: usize) -> Self::Future {
Expand All @@ -84,10 +85,10 @@ impl NewService<(usize, Handle)> for NewAlwaysEvict {
impl Service<usize> for SometimesEvict {
type Response = usize;
type Error = Never;
type Future = futures::future::FutureResult<Self::Response, Self::Error>;
type Future = future::Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, rhs: usize) -> Self::Future {
Expand Down Expand Up @@ -117,13 +118,16 @@ fn run_bench<N>(num_svc: usize, new_svc: N, b: &mut Bencher)
where
N: NewService<(usize, Handle)>,
N::Service: Service<usize>,
N::Service: Clone,
N::Service: Clone + Send + 'static,
<N::Service as Service<usize>>::Error: std::fmt::Debug,
{
let mut cache = Cache::new(new_svc);
b.iter(|| {
for n in 0..num_svc {
cache.poll_ready().unwrap();
let _r = cache.call(n).wait().unwrap().call(n);
futures::executor::block_on(async {
let mut svc = cache.ready_and().await.unwrap();
svc.call(n).await.unwrap().call(n).await.unwrap();
})
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions linkerd/cache/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Cache;
use crate::Handle;
use futures::Poll;
use linkerd2_stack::NewService;
use std::task::{Context, Poll};

pub struct CacheLayer<T, L> {
track_layer: L,
Expand Down Expand Up @@ -85,8 +85,8 @@ impl<T, S: tower::Service<T>> tower::Service<T> for Track<S> {
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: T) -> Self::Future {
Expand Down
43 changes: 19 additions & 24 deletions linkerd/cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#![deny(warnings, rust_2018_idioms)]

use futures::{future, Async, Poll};
use futures::future;
use linkerd2_error::Never;
use linkerd2_lock::{Guard, Lock};
use linkerd2_stack::NewService;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use tracing::{debug, trace};

pub mod layer;
Expand Down Expand Up @@ -63,38 +63,33 @@ where

impl<T, N> tower::Service<T> for Cache<T, N>
where
T: Clone + Eq + Hash,
T: Clone + Eq + Hash + Send + 'static,
N: NewService<(T, Handle)>,
N::Service: Clone,
N::Service: Clone + Send + 'static,
{
type Response = N::Service;
type Error = Never;
type Future = future::FutureResult<Self::Response, Self::Error>;
type Future = future::Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.guard.is_none() {
match self.lock.poll_acquire() {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(mut services) => {
// Drop defunct services before interacting with the cache.
let n = services.len();
services.retain(|_, (_, weak)| {
if weak.strong_count() > 0 {
true
} else {
debug!("Dropping defunct service");
false
}
});
trace!(services = services.len(), dropped = n - services.len());

self.guard = Some(services);
let mut services = futures::ready!(self.lock.poll_acquire(cx));
// Drop defunct services before interacting with the cache.
let n = services.len();
services.retain(|_, (_, weak)| {
if weak.strong_count() > 0 {
true
} else {
trace!("Dropping defunct service");
false
}
}
});
debug!(services = services.len(), dropped = n - services.len());
self.guard = Some(services);
}

debug_assert!(self.guard.is_some(), "guard must be acquired");
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}

fn call(&mut self, target: T) -> Self::Future {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/dns/name/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ edition = "2018"
publish = false

[dependencies]
webpki = "0.21"
webpki = "=0.21.0"
untrusted = "0.7"
13 changes: 8 additions & 5 deletions linkerd/lock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ A middleware that provides mutual exclusion.
"""

[dependencies]
futures = "0.1"
futures = "0.3"
linkerd2-error = { path = "../error" }
tower = "0.1"
tower = { version = "0.3", default-features = false }
tracing = "0.1"
tokio = { version = "0.2.19", features = ["sync", "macros", "rt-core"] }

[dev-dependencies]
rand = "0.7"
tokio = "0.1"
tracing-futures = "0.1"
tracing-futures = { version = "0.1", features = ["std-future"] }
tracing-log = "0.1"
tracing-subscriber = "0.2.3"
tracing-subscriber = "0.2.5"
tower = { version = "0.3", default-features = false, features = ["util"] }
tokio-test = "0.2"
tower-test = "0.3"
1 change: 0 additions & 1 deletion linkerd/lock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub mod error;
mod layer;
mod lock;
mod service;
mod shared;
#[cfg(test)]
mod test;

Expand Down
Loading