Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metric observer & pool wait timing #1900

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ memchr = { version = "2.4.1", default-features = false }
num-bigint = { version = "0.4.0", default-features = false, optional = true, features = ["std"] }
once_cell = "1.9.0"
percent-encoding = "2.1.0"
pin-project = "1.0.10"
abonander marked this conversation as resolved.
Show resolved Hide resolved
rand = { version = "0.8.4", default-features = false, optional = true, features = ["std", "std_rng"] }
regex = { version = "1.5.5", optional = true }
rsa = { version = "0.6.0", optional = true }
Expand Down
103 changes: 101 additions & 2 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::connection::{Floating, Idle, Live};
use super::{
connection::{Floating, Idle, Live},
PoolMetricsObserver,
};
use crate::connection::ConnectOptions;
use crate::connection::Connection;
use crate::database::Database;
Expand All @@ -13,6 +16,7 @@ use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::{future::Future, pin::Pin, task::Poll};

use std::time::{Duration, Instant};

Expand Down Expand Up @@ -169,7 +173,12 @@ impl<DB: Database> SharedPool<DB> {
self.options.connect_timeout,
async {
loop {
let permit = self.semaphore.acquire(1).await;
// Decorate the semaphore permit acquisition to record the
// duration it takes to be granted (or timed out).
let permit = PollWallClockRecorder::new(
|d| self.options.metric_observer.permit_wait_time(d),
self.semaphore.acquire(1),
).await;

if self.is_closed() {
return Err(Error::PoolClosed);
Expand Down Expand Up @@ -409,3 +418,93 @@ impl<DB: Database> Drop for DecrementSizeGuard<DB> {
self.pool.semaphore.release(1);
}
}

use pin_project::{pin_project, pinned_drop};

/// A [`PollWallClockRecorder`] measures the duration of time (in milliseconds)
/// from first poll of `F`, to drop of this type.
///
/// The typical lifecycle of a future is `create -> poll -> ready -> drop`, with
/// the drop happening an insignificant duration of time after the future
/// returns [`Poll::Ready`].
///
/// Another common lifecycle path that should be instrumented is when the
/// created future's caller times out before returning [`Poll::Ready`], with a
/// lifecycle similar to `create -> poll -> pending -> timeout -> drop`. This
/// results in the instrumented future being dropped an insignificant duration
/// of time after the timeout occurs.
///
/// This instrumentation therefore (approximately) measures the time between
/// first poll and yielding a value, or the future timing out and being dropped.
#[pin_project(PinnedDrop)]
struct PollWallClockRecorder<F, R>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is honestly pretty overengineered. I think for most cases something like this would suffice:

async fn time_execution<F: Future>(fut: F) -> (F::Output, Duration) {
    // Won't be collected until the future is first polled
    let started_at = Instant::now();
    let ret = fut.await;
    (ret, started_at.elapsed())
}

In other cases, we wouldn't be timing the execution of a single Future and so would need a different approach anyway.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with this, but it's flawed in the case of timeouts being hit. A simplified example of the usage for measuring the pool wait is:

        sqlx_rt::timeout(timeout, async {
            let (permit, duration) = time_execution(semaphore.acquire(1)).await;

            // do stuff with permit
        }).await;

When this is called, time_execution() is called, and the instant is read. Lets say there are no permits available, then fut.await in time_execution() returns Poll::Pending. Eventually the timeout may fire before the future is woken and fut, the time_execution() future, and their respective state (including the instant) is dropped without ever being polled again.

This will cause us to incorrectly miss recording the duration of time spent waiting on a pool permit when the connection timeout is hit. This case is the most critical example of connection pool starvation so capturing it is important.

I attempted to describe this situation in the docs, but feel free to suggest improvements 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For timeouts, I think that should be a completely different method of the observer that's invoked. It doesn't make sense to record the wait time when it times out because that's going to be basically just recording the timeout duration itself, within a margin of error.

On that note, while it's out of scope of this PR, I've been meaning to revise the definition of PoolOptions::connect_timeout() because I think it's misleading. connect_timeout should probably be specifically for timing out on connection attempts to the database, with a separate acquire_timeout capping the maximum wait time for a permit (or maybe just the whole Pool::acquire() call itself).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for separating out the connect_timeout() and acquire_timeout() (and maybe even an exec_timeout() that covers only the query exec portion, or an "overall" timeout that includes all three phases) - I think this would be much more intuitive.

I agree emitting a "a timeout has occurred" event would be useful in order to track the number of queries that fail due to timeouts (regardless of which phase it was in) - perhaps this can be added in a follow-up PR.

It doesn't make sense to record the wait time when it times out

I'm not sure I agree, as the pool wait == timeout value case is both valid and useful information, but perhaps we're trying to answer different questions with this metric.

I hope to answer "how saturated is my request pool", which doesn't place any importance on whether or not a timeout occurs. If we capture the pool wait time irrespective of a timeout being hit, and we know the configured timeout value, we can divide the actual wait time by the configured timeout value, and easily compute a relative "percentage of time spent waiting for a connection" which may be ~100% in the case of hitting a timeout during the semaphore wait. Should the timeout occur almost immediately after the permit has been granted, we'd get a 99% pool wait. If no timeout occurs, we still produce a relative measurement of pool saturation - i.e. waiting for half the configured timeout would give us a 50% pool wait measurement. It would be easy to create an alert for "pool wait > 50% of timeout" for example.

What would the benefit of emitting a different metric for a timeout be? If a timeout triggers a different method as you suggest, what does answering the saturation question look like if the timeout occurs during/immediately after/significantly after the permit has been granted? Would always emitting the pool wait duration + an "a timeout occurred" event be sufficient?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counterargument: if you do want to record timeouts as part of the wait time, you can simply forward to that method from the one that's called on a timeout, e.g.:

fn acquire_timed_out(&self, total_wait: Duration) {
    self.permit_wait_time(total_wait);
}

However, as you want to implement it, if a user for some reason does not want timeouts to factor in to the permit wait time metric then they have to manually filter those out:

fn permit_wait_time(&self, time: Duration) {
    // Even more complicated if they have to get this value from configuration.
    if time == ACQUIRE_TIMEOUT {
        return;
    }

    ...
}

The former is simpler.

A separate callback for acquires timing out makes it easier for an application to flag itself as "unhealthy". If the pool is at 95% saturation, that is probably a good reason to alert someone or signal to a monitoring daemon to spin up extra capacity but 95% doesn't mean that requests are timing out yet. Having to check that the wait time equals the timeout to signal an unhealthy status unnecessarily complicates the logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply forwarding a general timeout event to permit_wait_time doesn't work - it doesn't account for timeouts occurring after a permit has been acquired, such as "timed out during query I/O". It's not a permit wait timeout, so it doesn't make sense to then record it as a pool wait timeout.

Why would we accept the duration in fn acquire_timed_out(&self, total_wait: Duration) ? As you said:

It doesn't make sense to record the wait time when it times out because that's going to be basically just recording the timeout duration itself, within a margin of error.

I'm not really sure what use case we're trying to cover with the proposed changes. Is this:

A separate callback for acquires timing out makes it easier for an application to flag itself as "unhealthy".

the use case you're trying to satisfy? If so, we could always emit a pool_wait_time() metric once the pool wait is complete/timed out, and any time a timeout occurs (including during the pool wait) emit a timeout_occurred() event:

  • metrics.pool_wait_time(duration)
  • metrics.timeout_occurred()

Or, how about we instrument each phase of the query, and add a "did time out" flag to the observer fn? So for a timeout during the pool wait, we would see the following calls:

  • metrics.pool_wait_timeout(duration, timeout=true)

and for timeouts occurring after the pool wait, during query I/O:

  • metrics.pool_wait_time(duration, timeout=false)
  • metrics.query_exec_time(duration, timeout=true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of "should timeouts be included in the wait metric?" I'm with @domodwyer – I consider the metric to be "how long did we wait in a queue before either succeeding or failing (for whatever reason) to acquire a connection". If it fails because of a timeout, then that's OK, that still counts.

If the pool is completely saturated and no connections are getting released, I would expect to see the metrics telling me that the wait times are always the timeout period. I would also like to see metrics telling me about connection errors (including failing to acquire a connection), as the E in USE. These two together should give me a pretty good idea of what's going on.

where
R: Fn(Duration),
{
// A closure the measurement will be called with.
recorder: R,

// The instant of the first poll.
//
// None at construction time, Some after first poll.
started_at: Option<Instant>,

// The inner future that requires pinning to call poll().
#[pin]
fut: F,
}

impl<F, R> PollWallClockRecorder<F, R>
where
R: Fn(Duration),
{
/// Record the poll duration of `F`, calling `recorder` with the duration of
/// time between first poll and drop of `Self` in milliseconds.
pub(crate) fn new(recorder: R, fut: F) -> Self {
Self {
recorder,
fut,
started_at: None,
}
}
}

impl<F, R> Future for PollWallClockRecorder<F, R>
where
F: Future,
R: Fn(Duration),
{
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();

// Initialise the `started_at` instant on the first poll.
//
// This is more accurate than initialising the Instant in the
// constructor of this type, which would incorrectly include the time
// from construction to first poll in the measurement (which also
// includes the time a task waits to be scheduled onto a runtime worker
// thread).
let _started_at = this.started_at.get_or_insert_with(Instant::now);

// Pass through the poll to the inner future.
this.fut.poll(cx)
}
}

#[pinned_drop]
impl<F, R> PinnedDrop for PollWallClockRecorder<F, R>
where
R: Fn(Duration),
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();

// The future could be dropped without ever being polled, so started_at
// may not be initialised.
if let Some(started_at) = this.started_at {
(this.recorder)(Duration::from_millis(started_at.elapsed().as_millis() as _));
}
}
}
116 changes: 116 additions & 0 deletions sqlx-core/src/pool/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//! Pool metric / event instrumentation.

use std::{ops::Deref, sync::Arc, time::Duration};

/// An observer of [`Pool`] metrics.
///
/// ```rust
domodwyer marked this conversation as resolved.
Show resolved Hide resolved
/// use std::sync::Arc;
/// use sqlx::pool::PoolMetricsObserver;
///
/// #[derive(Default)]
/// struct Observer;
///
/// impl PoolMetricsObserver for Observer {
/// fn permit_wait_time(&self, time: std::time::Duration) {
/// println!(
/// "waited {} milliseconds to get a slot in the connection pool",
/// time.as_millis()
/// );
/// }
/// }
///
/// # #[cfg(feature = "any")]
/// # async fn _example() -> Result<(), sqlx::Error> {
/// # let database_url = "";
/// // Initialise the observer as a dyn PoolMetricsObserver
/// let metrics: Arc<dyn PoolMetricsObserver> = Arc::new(Observer::default());
///
/// // Configure the pool to push metrics to the observer
/// # use sqlx_core::any::AnyPoolOptions;
/// # use sqlx::Executor;
/// let pool = AnyPoolOptions::new()
/// .max_connections(1)
/// .metrics_observer(Arc::clone(&metrics))
/// .connect(&database_url)
/// .await?;
///
/// // Use the pool and see the wait times!
/// pool.execute("SELECT 1;").await?;
/// # Ok(())
/// # }
/// ```
///
/// [`Pool`]: crate::pool::Pool
pub trait PoolMetricsObserver: Send + Sync + 'static {
/// Called with the [`Duration`] spent waiting on a permit for a connection
/// to be granted from the underlying connection pool, each time a permit
/// acquisition attempt completes (successfully or not).
///
/// # Blocking
///
/// The [`acquire()`] call blocks while this method is called by the
/// connection pool. Implementations should aim to return relatively
/// quickly.
///
/// # Semantics
///
/// This value is incremented once a connection permit is granted, and does
/// NOT include the time taken to perform any liveness checks on connections
/// or time taken to establish a connection, if needed.
///
/// If a [connection timeout][1] expires while waiting for a connection from
/// the pool, the duration of time waiting for the permit is included in
/// this measurement.
///
/// NOTE: this may report a small wait duration even if connection permits
/// are immediately available when calling [`acquire()`], as acquiring one
/// is not instantaneous.
///
/// [1]: crate::pool::PoolOptions::connect_timeout()
/// [`acquire()`]: crate::pool::Pool::acquire()
/// [`Pool`]: crate::pool::Pool
fn permit_wait_time(&self, time: Duration) {
let _ = time;
}

#[doc(hidden)]
fn __into_dyn_arc(self) -> Arc<dyn PoolMetricsObserver>
where
Self: Sized + 'static,
{
// There's no need for end users to implement this.
Arc::new(self)
}
}

impl<T> PoolMetricsObserver for Arc<T>
where
T: PoolMetricsObserver,
{
fn permit_wait_time(&self, time: Duration) {
self.deref().permit_wait_time(time)
}

fn __into_dyn_arc(self) -> Arc<dyn PoolMetricsObserver>
where
Self: Sized,
{
self
}
}

impl PoolMetricsObserver for Option<Arc<dyn PoolMetricsObserver>> {
fn permit_wait_time(&self, time: Duration) {
if let Some(v) = self {
v.permit_wait_time(time)
}
}

fn __into_dyn_arc(self) -> Arc<dyn PoolMetricsObserver>
where
Self: Sized,
{
panic!("Option<Arc<_>> should not be wrapped in another Arc");
}
}
2 changes: 2 additions & 0 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ mod maybe;

mod connection;
mod inner;
mod metrics;
mod options;

pub use self::connection::PoolConnection;
pub(crate) use self::maybe::MaybePoolConnection;
pub use self::metrics::PoolMetricsObserver;
pub use self::options::PoolOptions;

/// An asynchronous pool of SQLx database connections.
Expand Down
10 changes: 10 additions & 0 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::database::Database;
use crate::error::Error;
use crate::pool::inner::SharedPool;
use crate::pool::Pool;
use crate::pool::PoolMetricsObserver;
use futures_core::future::BoxFuture;
use sqlx_rt::spawn;
use std::cmp;
Expand Down Expand Up @@ -33,6 +34,7 @@ pub struct PoolOptions<DB: Database> {
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
pub(crate) fair: bool,
pub(crate) metric_observer: Option<Arc<dyn PoolMetricsObserver>>,
abonander marked this conversation as resolved.
Show resolved Hide resolved
}

impl<DB: Database> Default for PoolOptions<DB> {
Expand All @@ -54,6 +56,7 @@ impl<DB: Database> PoolOptions<DB> {
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
metric_observer: None,
}
}

Expand Down Expand Up @@ -226,6 +229,13 @@ impl<DB: Database> PoolOptions<DB> {

Pool(shared)
}

/// Push [`Pool`] metric / events to the specified [`PoolMetricsObserver`]
/// implementation.
pub fn metrics_observer(mut self, observer: impl PoolMetricsObserver) -> Self {
self.metric_observer = Some(observer.__into_dyn_arc());
self
}
}

async fn init_min_connections<DB: Database>(pool: &Arc<SharedPool<DB>>) -> Result<(), Error> {
Expand Down
Loading