Skip to content

Commit 1d64a11

Browse files
authored
update buffers to use Tokio 0.3 MPSC channels (#759)
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s `buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's. The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime. Most of the Tokio synchronization primitives lost their `poll`-based interfaces in 0.3 as part of the move to intrusive lists of wakers for synchronization primitives (see tokio-rs/tokio#2325, tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes advantage of the inherently pinned nature of `async fn` and `async` blocks to avoid needing a separate heap allocation to store the waiter state for a task waiting on a synchronization primitive. However, it means that a synchronization primitive can _only_ be waited on when the future that waits on it is pinned --- otherwise, there is a potential dangling pointer. The `poll`-based APIs allowed waiting on synchronization primitives from unpinned contexts, so they were removed. To wait on the synchronization primitives from contexts that may not be pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>` around the future that's waiting on the synchronization primitive. This ensures that the future will not move while it's part of the wait list. It's important to note that this isn't an _additional_ allocation per waiter versus Tokio 0.2; instead, it's the same allocation that would have _always_ happened internally to the synchronization primitive in the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that it can be avoided when used with `async`/`await` syntax, and added by the user when polling the sync primitives. Because we need to poll channel senders in `tower::Service` implementations' `poll_ready` functions, it was necessary to introduce our own bounded MPSC channel type that exposes a polling-based API. When the buffer's channel is full, we want to exert backpressure in `poll_ready`, so that callers such as load balancers could choose to call another service rather than waiting for buffer capacity. This branch adds a new `linkerd2-channel` crate that implements a pollable bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a `tokio::sync::Semaphore` to implement bounding. It's worth noting that this is, essentially, how `tokio::sync::mpsc`'s bounded channel is implemented --- it also uses the semaphore. However, our implementation exposes a `poll_ready` method by boxing the future that waits to acquire a semaphore permit, which the Tokio channel does not expose. Finally, I've added some tests for the `linkerd2-channel` crate, based on Tokio's tests for the MPSC channel, modified where the APIs differ. This should help ensure we get similar behavior to what we expect from Tokio's MPSCs. This was factored out of PR #732. Signed-off-by: Eliza Weisman <[email protected]>
1 parent ee3fa14 commit 1d64a11

File tree

11 files changed

+614
-68
lines changed

11 files changed

+614
-68
lines changed

Cargo.lock

+107-55
Large diffs are not rendered by default.

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ members = [
1010
"linkerd/app/profiling",
1111
"linkerd/app/test",
1212
"linkerd/app",
13-
"linkerd/cache",
1413
"linkerd/buffer",
14+
"linkerd/cache",
15+
"linkerd/channel",
1516
"linkerd/concurrency-limit",
1617
"linkerd/conditional",
1718
"linkerd/dns/name",

linkerd/buffer/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ publish = false
77

88
[dependencies]
99
futures = "0.3"
10+
linkerd2-channel = { path = "../channel" }
1011
linkerd2-error = { path = "../error" }
1112
tokio = { version = "0.2", features = ["sync", "stream", "time", "macros"] }
1213
tower = { version = "0.3", default_features = false, features = ["util"] }

linkerd/buffer/src/dispatch.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::error::{IdleError, ServiceError};
22
use crate::InFlight;
33
use futures::{prelude::*, select_biased};
4+
use linkerd2_channel as mpsc;
45
use linkerd2_error::Error;
56
use std::sync::Arc;
6-
use tokio::sync::mpsc;
77
use tower::util::ServiceExt;
88
use tracing::trace;
99

@@ -54,7 +54,7 @@ pub(crate) async fn run<S, Req, I>(
5454
e = idle().fuse() => {
5555
let error = ServiceError(Arc::new(e.into()));
5656
trace!(%error, "Idling out inner service");
57-
return;
57+
break;
5858
}
5959
}
6060
}
@@ -64,7 +64,7 @@ pub(crate) async fn run<S, Req, I>(
6464
mod test {
6565
use super::*;
6666
use std::time::Duration;
67-
use tokio::sync::{mpsc, oneshot};
67+
use tokio::sync::oneshot;
6868
use tokio::time::delay_for;
6969
use tokio_test::{assert_pending, assert_ready, task};
7070
use tower_test::mock;
@@ -101,12 +101,13 @@ mod test {
101101
delay_for(max_idle).await;
102102

103103
// Send a request after the deadline has fired but before the
104-
// dispatch future is polled. Ensure that the request is admitted, resetting idleness.
105-
tx.try_send({
104+
// dispatch future is polled. Ensure that the request is admitted,
105+
// resetting idleness.
106+
tx.send({
106107
let (tx, _rx) = oneshot::channel();
107108
super::InFlight { request: (), tx }
108109
})
109-
.ok()
110+
.await
110111
.expect("request not sent");
111112

112113
assert_pending!(dispatch.poll());

linkerd/buffer/src/lib.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#![recursion_limit = "256"]
22

3+
use linkerd2_channel as mpsc;
34
use linkerd2_error::Error;
4-
use std::{future::Future, pin::Pin, time::Duration};
5-
use tokio::sync::{mpsc, oneshot};
5+
use std::{fmt, future::Future, pin::Pin, time::Duration};
6+
use tokio::sync::oneshot;
67

78
mod dispatch;
89
pub mod error;
@@ -43,3 +44,13 @@ where
4344
let dispatch = dispatch::run(inner, rx, idle);
4445
(Buffer::new(tx), dispatch)
4546
}
47+
48+
// Required so that `TrySendError`/`SendError` can be `expect`ed.
49+
impl<Req, Rsp> fmt::Debug for InFlight<Req, Rsp> {
50+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51+
f.debug_struct("InFlight")
52+
.field("request_type", &std::any::type_name::<Req>())
53+
.field("response_type", &std::any::type_name::<Rsp>())
54+
.finish()
55+
}
56+
}

linkerd/buffer/src/service.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::error::Closed;
22
use crate::InFlight;
3+
use linkerd2_channel as mpsc;
34
use linkerd2_error::Error;
45
use std::task::{Context, Poll};
56
use std::{future::Future, pin::Pin};
6-
use tokio::sync::{mpsc, oneshot};
7+
use tokio::sync::oneshot;
78

89
pub struct Buffer<Req, Rsp> {
910
/// The queue on which in-flight requests are sent to the inner service.
@@ -27,14 +28,13 @@ where
2728
type Future = Pin<Box<dyn Future<Output = Result<Rsp, Error>> + Send + 'static>>;
2829

2930
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30-
self.tx.poll_ready(cx).map_err(|_| Closed(()).into())
31+
self.tx.poll_ready(cx).map_err(Into::into)
3132
}
3233

3334
fn call(&mut self, request: Req) -> Self::Future {
3435
let (tx, rx) = oneshot::channel();
3536
self.tx
3637
.try_send(InFlight { request, tx })
37-
.ok()
3838
.expect("poll_ready must be called");
3939
Box::pin(async move { rx.await.map_err(|_| Closed(()))??.await })
4040
}

linkerd/channel/Cargo.toml

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "linkerd2-channel"
3+
version = "0.1.0"
4+
authors = ["Linkerd Developers <[email protected]>"]
5+
edition = "2018"
6+
publish = false
7+
description = """
8+
A bounded MPSC channel where senders expose a `poll_ready` method.
9+
"""
10+
11+
[dependencies]
12+
tokio = { version = "0.3", features = ["sync", "stream"] }
13+
futures = "0.3"
14+
15+
[dev-dependencies]
16+
tokio = { version = "0.3", features = ["sync", "stream", "macros"] }
17+
tokio-test = "0.3"

linkerd/channel/src/lib.rs

+191
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
use futures::{future, ready, Stream};
2+
use std::sync::{Arc, Weak};
3+
use std::task::{Context, Poll};
4+
use std::{fmt, future::Future, mem, pin::Pin};
5+
use tokio::sync::{mpsc, OwnedSemaphorePermit as Permit, Semaphore};
6+
7+
use self::error::{SendError, TrySendError};
8+
pub use tokio::sync::mpsc::error;
9+
10+
/// Returns a new pollable, bounded MPSC channel.
11+
///
12+
/// Unlike `tokio::sync`'s `MPSC` channel, this channel exposes a `poll_ready`
13+
/// function, at the cost of an allocation when driving it to readiness.
14+
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
15+
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
16+
let semaphore = Arc::new(Semaphore::new(buffer));
17+
let (tx, rx) = mpsc::unbounded_channel();
18+
let rx = Receiver {
19+
rx,
20+
semaphore: Arc::downgrade(&semaphore),
21+
buffer,
22+
};
23+
let tx = Sender {
24+
tx,
25+
semaphore,
26+
state: State::Empty,
27+
};
28+
(tx, rx)
29+
}
30+
31+
/// A bounded, pollable MPSC sender.
32+
///
33+
/// This is similar to Tokio's bounded MPSC channel's `Sender` type, except that
34+
/// it exposes a `poll_ready` function, at the cost of an allocation when
35+
/// driving it to readiness.
36+
pub struct Sender<T> {
37+
tx: mpsc::UnboundedSender<(T, Permit)>,
38+
semaphore: Arc<Semaphore>,
39+
state: State,
40+
}
41+
42+
/// A bounded MPSC receiver.
43+
///
44+
/// This is similar to Tokio's bounded MPSC channel's `Receiver` type.
45+
pub struct Receiver<T> {
46+
rx: mpsc::UnboundedReceiver<(T, Permit)>,
47+
semaphore: Weak<Semaphore>,
48+
buffer: usize,
49+
}
50+
51+
enum State {
52+
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync>>),
53+
Acquired(Permit),
54+
Empty,
55+
}
56+
57+
impl<T> Sender<T> {
58+
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError<()>>> {
59+
loop {
60+
self.state = match self.state {
61+
State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
62+
State::Waiting(ref mut f) => State::Acquired(ready!(Pin::new(f).poll(cx))),
63+
State::Acquired(_) if self.tx.is_closed() => {
64+
return Poll::Ready(Err(SendError(())))
65+
}
66+
State::Acquired(_) => return Poll::Ready(Ok(())),
67+
}
68+
}
69+
}
70+
71+
pub async fn ready(&mut self) -> Result<(), SendError<()>> {
72+
future::poll_fn(|cx| self.poll_ready(cx)).await
73+
}
74+
75+
pub fn try_send(&mut self, value: T) -> Result<(), TrySendError<T>> {
76+
if self.tx.is_closed() {
77+
return Err(TrySendError::Closed(value));
78+
}
79+
self.state = match mem::replace(&mut self.state, State::Empty) {
80+
// Have we previously acquired a permit?
81+
State::Acquired(permit) => {
82+
self.send2(value, permit);
83+
return Ok(());
84+
}
85+
// Okay, can we acquire a permit now?
86+
State::Empty => {
87+
if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
88+
self.send2(value, permit);
89+
return Ok(());
90+
}
91+
State::Empty
92+
}
93+
state => state,
94+
};
95+
Err(TrySendError::Full(value))
96+
}
97+
98+
pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
99+
if let Err(_) = self.ready().await {
100+
return Err(SendError(value));
101+
}
102+
match mem::replace(&mut self.state, State::Empty) {
103+
State::Acquired(permit) => {
104+
self.send2(value, permit);
105+
Ok(())
106+
}
107+
state => panic!("unexpected state after poll_ready: {:?}", state),
108+
}
109+
}
110+
111+
fn send2(&mut self, value: T, permit: Permit) {
112+
self.tx.send((value, permit)).ok().expect("was not closed");
113+
}
114+
}
115+
116+
impl<T> Clone for Sender<T> {
117+
fn clone(&self) -> Self {
118+
Self {
119+
tx: self.tx.clone(),
120+
semaphore: self.semaphore.clone(),
121+
state: State::Empty,
122+
}
123+
}
124+
}
125+
126+
impl<T> fmt::Debug for Sender<T> {
127+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128+
f.debug_struct("Sender")
129+
.field("message_type", &std::any::type_name::<T>())
130+
.field("state", &self.state)
131+
.field("semaphore", &self.semaphore)
132+
.finish()
133+
}
134+
}
135+
136+
// === impl Receiver ===
137+
138+
impl<T> Receiver<T> {
139+
pub async fn recv(&mut self) -> Option<T> {
140+
self.rx.recv().await.map(|(t, _)| t)
141+
}
142+
143+
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
144+
let res = ready!(Pin::new(&mut self.rx).poll_next(cx));
145+
Poll::Ready(res.map(|(t, _)| t))
146+
}
147+
}
148+
149+
impl<T> Stream for Receiver<T> {
150+
type Item = T;
151+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
152+
let res = ready!(Pin::new(&mut self.as_mut().rx).poll_next(cx));
153+
Poll::Ready(res.map(|(t, _)| t))
154+
}
155+
}
156+
157+
impl<T> Drop for Receiver<T> {
158+
fn drop(&mut self) {
159+
if let Some(semaphore) = self.semaphore.upgrade() {
160+
// Close the buffer by releasing any senders waiting on channel capacity.
161+
// If more than `usize::MAX >> 3` permits are added to the semaphore, it
162+
// will panic.
163+
const MAX: usize = std::usize::MAX >> 4;
164+
semaphore.add_permits(MAX - self.buffer - semaphore.available_permits());
165+
}
166+
}
167+
}
168+
169+
impl<T> fmt::Debug for Receiver<T> {
170+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171+
f.debug_struct("Receiver")
172+
.field("message_type", &std::any::type_name::<T>())
173+
.field("semaphore", &self.semaphore)
174+
.finish()
175+
}
176+
}
177+
178+
// === impl State ===
179+
180+
impl fmt::Debug for State {
181+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182+
fmt::Display::fmt(
183+
match self {
184+
State::Acquired(_) => "State::Acquired(..)",
185+
State::Waiting(_) => "State::Waiting(..)",
186+
State::Empty => "State::Empty",
187+
},
188+
f,
189+
)
190+
}
191+
}

0 commit comments

Comments
 (0)