Skip to content

Commit

Permalink
tracing: instrument mpsc channels
Browse files Browse the repository at this point in the history
This commit adds tracing to both bounded and unbounded mpsc channels.

Unbounded channels record the following state fields:

	tx: number of items sent through the channel
	rx: number of items received through the channel
	values: number of items currently held in the channel
	tx_handles: number of Sender handles for the channel
	rx_dropped: true if the Receiver has dropped

Bounded channels record the same fields as above, with the addition of
the static "capacity" field set to the channel buffer size.
  • Loading branch information
domodwyer committed Apr 29, 2022
1 parent fa665b9 commit 4526247
Show file tree
Hide file tree
Showing 3 changed files with 427 additions and 32 deletions.
192 changes: 177 additions & 15 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
use tracing::Span;

use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};

#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;

cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
use crate::time::Duration;
Expand All @@ -20,6 +26,8 @@ use std::task::{Context, Poll};
/// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html
pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: Span,
}

/// Permits to send one value into the channel.
Expand Down Expand Up @@ -47,6 +55,8 @@ pub struct Permit<'a, T> {
/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
pub struct OwnedPermit<T> {
chan: Option<chan::Tx<T, Semaphore>>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: Span,
}

/// Receives values from the associated `Sender`.
Expand All @@ -59,6 +69,8 @@ pub struct OwnedPermit<T> {
pub struct Receiver<T> {
/// The channel receiver.
chan: chan::Rx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: Span,
}

/// Creates a bounded mpsc channel for communicating between asynchronous tasks
Expand Down Expand Up @@ -105,13 +117,50 @@ pub struct Receiver<T> {
/// }
/// }
/// ```
#[track_caller]
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = (semaphore::Semaphore::new(buffer), buffer);
let (tx, rx) = chan::channel(semaphore);

let tx = Sender::new(tx);
let rx = Receiver::new(rx);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = std::panic::Location::caller();
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Sender|Receiver",
kind = "Sync",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);

resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
capacity = buffer,
capacity.op = "override",
)
});

resource_span
};

let (tx, rx) = chan::channel(
semaphore,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span.clone(),
);

let tx = Sender::new(
tx,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span.clone(),
);
let rx = Receiver::new(
rx,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
);

(tx, rx)
}
Expand All @@ -121,8 +170,15 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
type Semaphore = (semaphore::Semaphore, usize);

impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
Receiver { chan }
pub(crate) fn new(
chan: chan::Rx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span,
) -> Receiver<T> {
Receiver {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}

/// Receives the next value for this receiver.
Expand Down Expand Up @@ -184,7 +240,22 @@ impl<T> Receiver<T> {
/// ```
pub async fn recv(&mut self) -> Option<T> {
use crate::future::poll_fn;
poll_fn(|cx| self.chan.recv(cx)).await

#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| poll_fn(|cx| self.chan.recv(cx)),
resource_span,
"Receiver::recv",
"poll_recv",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = poll_fn(|cx| self.chan.recv(cx));

fut.await
}

/// Tries to receive the next value for this receiver.
Expand Down Expand Up @@ -367,8 +438,15 @@ impl<T> fmt::Debug for Receiver<T> {
impl<T> Unpin for Receiver<T> {}

impl<T> Sender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
Sender { chan }
pub(crate) fn new(
chan: chan::Tx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span,
) -> Sender<T> {
Sender {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}

/// Sends a value, waiting until there is capacity.
Expand Down Expand Up @@ -427,7 +505,21 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.reserve().await {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve(),
resource_span,
"Sender::send",
"poll_send",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve();

match fut.await {
Ok(permit) => {
permit.send(value);
Ok(())
Expand Down Expand Up @@ -473,7 +565,21 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
self.chan.closed().await
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.chan.closed(),
resource_span,
"Sender::closed",
"poll_closed",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.chan.closed();

fut.await
}

/// Attempts to immediately send a message on this `Sender`
Expand Down Expand Up @@ -603,7 +709,21 @@ impl<T> Sender<T> {
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
let permit = match crate::time::timeout(timeout, self.reserve()).await {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve(),
resource_span,
"Sender::send_timeout",
"poll_send_timeout",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve();

let permit = match crate::time::timeout(timeout, fut).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Expand Down Expand Up @@ -722,7 +842,21 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.reserve_inner().await?;
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve_inner(),
resource_span,
"Sender::reserve",
"poll_reserve",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve_inner();

fut.await?;
Ok(Permit { chan: &self.chan })
}

Expand Down Expand Up @@ -807,9 +941,25 @@ impl<T> Sender<T> {
/// [`send`]: OwnedPermit::send
/// [`Arc::clone`]: std::sync::Arc::clone
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
self.reserve_inner().await?;
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve_inner(),
resource_span,
"Sender::reserve_owned",
"poll_reserve_owned",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve_inner();

fut.await?;
Ok(OwnedPermit {
chan: Some(self.chan),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span,
})
}

Expand Down Expand Up @@ -937,6 +1087,8 @@ impl<T> Sender<T> {

Ok(OwnedPermit {
chan: Some(self.chan),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span,
})
}

Expand Down Expand Up @@ -994,6 +1146,8 @@ impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
chan: self.chan.clone(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
}
}
Expand Down Expand Up @@ -1119,7 +1273,11 @@ impl<T> OwnedPermit<T> {
});
chan.send(value);

Sender { chan }
Sender {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
}

/// Releases the reserved capacity *without* sending a message, returning the
Expand Down Expand Up @@ -1161,7 +1319,11 @@ impl<T> OwnedPermit<T> {

// Add the permit back to the semaphore
chan.semaphore().add_permit();
Sender { chan }
Sender {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
}
}

Expand Down
Loading

0 comments on commit 4526247

Please sign in to comment.