Skip to content

Commit

Permalink
feat: metric observer & pool wait timing
Browse files Browse the repository at this point in the history
This commit adds the public PoolMetricsObserver trait for end users to
optionally implement, in order to receive metric / event data from the
underlying connection Pool.

PoolMetricsObserver currently exposes a single metric: the duration of
time spent waiting for a permit from the connection pool semaphore. This
allows users to quantify the saturation of the connection pool
(USE-style metrics) and identify when the workload exceeds the available
pool capacity.

A default no-op method implementation is provided so end-users can
choose which metrics they care about, and not have to implement the full
set of methods.
  • Loading branch information
domodwyer committed Jun 9, 2022
1 parent edaf7d0 commit b7e797e
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 7 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ memchr = { version = "2.4.1", default-features = false }
num-bigint = { version = "0.4.0", default-features = false, optional = true, features = ["std"] }
once_cell = "1.9.0"
percent-encoding = "2.1.0"
pin-project = "1.0.10"
rand = { version = "0.8.4", default-features = false, optional = true, features = ["std", "std_rng"] }
regex = { version = "1.5.5", optional = true }
rsa = { version = "0.6.0", optional = true }
Expand Down
105 changes: 104 additions & 1 deletion sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::{future::Future, pin::Pin, task::Poll};

use std::time::{Duration, Instant};

Expand Down Expand Up @@ -169,7 +170,19 @@ impl<DB: Database> SharedPool<DB> {
self.options.connect_timeout,
async {
loop {
let permit = self.semaphore.acquire(1).await;
// Decorate the semaphore permit acquisition to record the
// duration it takes to be granted (or timed out).
let fut = self.semaphore.acquire(1);
let permit = if let Some(obs) = &self.options.metric_observer {
// A metric observer is registered, so instrument the
// semaphore wait.
PollWallClockRecorder::new(
|d| obs.permit_wait_time(d),
fut,
).await
} else {
fut.await
};

if self.is_closed() {
return Err(Error::PoolClosed);
Expand Down Expand Up @@ -409,3 +422,93 @@ impl<DB: Database> Drop for DecrementSizeGuard<DB> {
self.pool.semaphore.release(1);
}
}

use pin_project::{pin_project, pinned_drop};

/// A [`PollWallClockRecorder`] measures the duration of time (in milliseconds)
/// from first poll of `F`, to drop of this type.
///
/// The typical lifecycle of a future is `create -> poll -> ready -> drop`, with
/// the drop happening an insignificant duration of time after the future
/// returns [`Poll::Ready`].
///
/// Another common lifecycle path that should be instrumented is when the
/// created future's caller times out before returning [`Poll::Ready`], with a
/// lifecycle similar to `create -> poll -> pending -> timeout -> drop`. This
/// results in the instrumented future being dropped an insignificant duration
/// of time after the timeout occurs.
///
/// This instrumentation therefore (approximately) measures the time between
/// first poll and yielding a value, or the future timing out and being dropped.
#[pin_project(PinnedDrop)]
struct PollWallClockRecorder<F, R>
where
R: Fn(Duration),
{
// A closure the measurement will be called with.
recorder: R,

// The instant of the first poll.
//
// None at construction time, Some after first poll.
started_at: Option<Instant>,

// The inner future that requires pinning to call poll().
#[pin]
fut: F,
}

impl<F, R> PollWallClockRecorder<F, R>
where
R: Fn(Duration),
{
/// Record the poll duration of `F`, calling `recorder` with the duration of
/// time between first poll and drop of `Self` in milliseconds.
pub(crate) fn new(recorder: R, fut: F) -> Self {
Self {
recorder,
fut,
started_at: None,
}
}
}

impl<F, R> Future for PollWallClockRecorder<F, R>
where
F: Future,
R: Fn(Duration),
{
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();

// Initialise the `started_at` instant on the first poll.
//
// This is more accurate than initialising the Instant in the
// constructor of this type, which would incorrectly include the time
// from construction to first poll in the measurement (which also
// includes the time a task waits to be scheduled onto a runtime worker
// thread).
let _started_at = this.started_at.get_or_insert_with(Instant::now);

// Pass through the poll to the inner future.
this.fut.poll(cx)
}
}

#[pinned_drop]
impl<F, R> PinnedDrop for PollWallClockRecorder<F, R>
where
R: Fn(Duration),
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();

// The future could be dropped without ever being polled, so started_at
// may not be initialised.
if let Some(started_at) = this.started_at {
(this.recorder)(Duration::from_millis(started_at.elapsed().as_millis() as _));
}
}
}
85 changes: 85 additions & 0 deletions sqlx-core/src/pool/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! Pool metric / event instrumentation.

use std::{ops::Deref, sync::Arc, time::Duration};

/// An observer of [`Pool`] metrics.
///
/// ```rust
/// use std::sync::Arc;
/// use sqlx::pool::PoolMetricsObserver;
///
/// #[derive(Default)]
/// struct Observer;
///
/// impl PoolMetricsObserver for Observer {
/// fn permit_wait_time(&self, time: std::time::Duration) {
/// println!(
/// "waited {} milliseconds to get a slot in the connection pool",
/// time.as_millis()
/// );
/// }
/// }
///
/// # #[cfg(feature = "any")]
/// # async fn _example() -> Result<(), sqlx::Error> {
/// # let database_url = "";
/// // Initialise the observer as a dyn PoolMetricsObserver
/// let metrics: Arc<dyn PoolMetricsObserver> = Arc::new(Observer::default());
///
/// // Configure the pool to push metrics to the observer
/// # use sqlx_core::any::AnyPoolOptions;
/// # use sqlx::Executor;
/// let pool = AnyPoolOptions::new()
/// .max_connections(1)
/// .metrics_observer(Arc::clone(&metrics))
/// .connect(&database_url)
/// .await?;
///
/// // Use the pool and see the wait times!
/// pool.execute("SELECT 1;").await?;
/// # Ok(())
/// # }
/// ```
///
/// [`Pool`]: crate::pool::Pool
pub trait PoolMetricsObserver: Send + Sync {
/// Called with the [`Duration`] spent waiting on a permit for a connection
/// to be granted from the underlying connection pool, each time a permit
/// acquisition attempt completes (successfully or not).
///
/// # Blocking
///
/// The [`acquire()`] call blocks while this method is called by the
/// connection pool. Implementations should aim to return relatively
/// quickly.
///
/// # Semantics
///
/// This value is incremented once a connection permit is granted, and does
/// NOT include the time taken to perform any liveness checks on connections
/// or time taken to establish a connection, if needed.
///
/// If a [connection timeout][1] expires while waiting for a connection from
/// the pool, the duration of time waiting for the permit is included in
/// this measurement.
///
/// NOTE: this may report a small wait duration even if connection permits
/// are immediately available when calling [`acquire()`], as acquiring one
/// is not instantaneous.
///
/// [1]: crate::pool::PoolOptions::connect_timeout()
/// [`acquire()`]: crate::pool::Pool::acquire()
/// [`Pool`]: crate::pool::Pool
fn permit_wait_time(&self, time: Duration) {
let _ = time;
}
}

impl<T> PoolMetricsObserver for Arc<T>
where
T: PoolMetricsObserver,
{
fn permit_wait_time(&self, time: Duration) {
self.deref().permit_wait_time(time)
}
}
2 changes: 2 additions & 0 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ mod maybe;

mod connection;
mod inner;
mod metrics;
mod options;

pub use self::connection::PoolConnection;
pub(crate) use self::maybe::MaybePoolConnection;
pub use self::metrics::PoolMetricsObserver;
pub use self::options::PoolOptions;

/// An asynchronous pool of SQLx database connections.
Expand Down
13 changes: 13 additions & 0 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::database::Database;
use crate::error::Error;
use crate::pool::inner::SharedPool;
use crate::pool::Pool;
use crate::pool::PoolMetricsObserver;
use futures_core::future::BoxFuture;
use sqlx_rt::spawn;
use std::cmp;
Expand Down Expand Up @@ -33,6 +34,7 @@ pub struct PoolOptions<DB: Database> {
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
pub(crate) fair: bool,
pub(crate) metric_observer: Option<Arc<dyn PoolMetricsObserver>>,
}

impl<DB: Database> Default for PoolOptions<DB> {
Expand All @@ -54,6 +56,7 @@ impl<DB: Database> PoolOptions<DB> {
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
metric_observer: None,
}
}

Expand Down Expand Up @@ -226,6 +229,16 @@ impl<DB: Database> PoolOptions<DB> {

Pool(shared)
}

/// Push [`Pool`] metric / events to the specified [`PoolMetricsObserver`]
/// implementation.
pub fn metrics_observer(
mut self,
observer: impl Into<Option<Arc<dyn PoolMetricsObserver>>>,
) -> Self {
self.metric_observer = observer.into();
self
}
}

async fn init_min_connections<DB: Database>(pool: &Arc<SharedPool<DB>>) -> Result<(), Error> {
Expand Down
Loading

0 comments on commit b7e797e

Please sign in to comment.