Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP feat: implement metrics-gathering support for Pool #1908

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ all-types = [
]
bigdecimal = ["bigdecimal_", "num-bigint"]
decimal = ["rust_decimal", "num-bigint"]
json = ["serde", "serde_json"]
json = ["serde", "serde_json", "enum-map/serde"]

# runtimes
runtime-actix-native-tls = [
Expand Down Expand Up @@ -185,6 +185,7 @@ hashlink = "0.8.0"
indexmap = "1.6.0"
hkdf = { version = "0.12.0", optional = true }
event-listener = "2.5.2"
enum-map = "2.4.0"

[dev-dependencies]
sqlx = { version = "0.6.0", path = "..", features = ["postgres", "sqlite", "mysql"] }
Expand Down
70 changes: 58 additions & 12 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;

use crate::pool::metrics::{AcquirePhase, PoolMetricsCollector};
use crate::pool::options::PoolConnectionMetadata;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -178,13 +179,22 @@ impl<DB: Database> PoolInner<DB> {
return Err(Error::PoolClosed);
}

let deadline = Instant::now() + self.options.acquire_timeout;
let metrics = &self.options.metrics;

sqlx_rt::timeout(
let acquire_start = Instant::now();

let deadline = acquire_start + self.options.acquire_timeout;

let mut phase = AcquirePhase::Waiting;

let res = sqlx_rt::timeout(
self.options.acquire_timeout,
async {
loop {
phase = AcquirePhase::Waiting;
let waiting_start = Instant::now();
let permit = self.semaphore.acquire(1).await;
metrics.permit_wait_time(waiting_start.elapsed());

if self.is_closed() {
return Err(Error::PoolClosed);
Expand All @@ -194,7 +204,7 @@ impl<DB: Database> PoolInner<DB> {
let guard = match self.pop_idle(permit) {

// Then, check that we can use it...
Ok(conn) => match check_idle_conn(conn, &self.options).await {
Ok(conn) => match check_idle_conn(conn, &self.options, &mut phase).await {

// All good!
Ok(live) => return Ok(live),
Expand All @@ -207,24 +217,40 @@ impl<DB: Database> PoolInner<DB> {
// we can open a new connection
guard
} else {
// I can't imagine this occurring unless there's a race condition where
// the number of available permits can exceed the max size
// without the pool being closed.
//
// If this does happen, the safest thing to do is return to the top
// and wait for another permit.
log::debug!("woke but was unable to acquire idle connection or open new one; retrying");
continue;
}
};

// Attempt to connect...
return self.connect(deadline, guard).await;
return self.connect(deadline, guard, &mut phase).await;
}
}
)
.await
.map_err(|_| Error::PoolTimedOut)?
.map_err(|_| {
metrics.acquire_timed_out(phase);
Error::PoolTimedOut
})?;

if res.is_ok() {
metrics.connection_acquired(acquire_start.elapsed());
}

res
}

pub(super) async fn connect(
self: &Arc<Self>,
deadline: Instant,
guard: DecrementSizeGuard<DB>,
phase: &mut AcquirePhase,
) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
Expand All @@ -238,16 +264,20 @@ impl<DB: Database> PoolInner<DB> {

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again

*phase = AcquirePhase::Connecting;
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};

let res = if let Some(callback) = &self.options.after_connect {
*phase = AcquirePhase::AfterConnectCallback;

// See comment on `PoolOptions::after_connect`
let meta = PoolConnectionMetadata {
age: Duration::ZERO,
idle_for: Duration::ZERO,
};

callback(&mut raw, meta).await
} else {
Ok(())
Expand All @@ -258,6 +288,7 @@ impl<DB: Database> PoolInner<DB> {
Err(e) => {
log::error!("error returned from after_connect: {:?}", e);
// The connection is broken, don't try to close nicely.
*phase = AcquirePhase::ClosingInvalidConnection;
let _ = raw.close_hard().await;

// Fall through to the backoff.
Expand All @@ -279,6 +310,8 @@ impl<DB: Database> PoolInner<DB> {
Err(_) => return Err(Error::PoolTimedOut),
}

*phase = AcquirePhase::Backoff;

// If the connection is refused, wait in exponentially
// increasing steps for the server to come up,
// capped by a factor of the remaining time until the deadline
Expand Down Expand Up @@ -310,7 +343,10 @@ impl<DB: Database> PoolInner<DB> {

// We skip `after_release` since the connection was never provided to user code
// besides `after_connect`, if they set it.
self.release(self.connect(deadline, guard).await?);
self.release(
self.connect(deadline, guard, &mut AcquirePhase::Connecting)
.await?,
);
}

Ok(())
Expand Down Expand Up @@ -351,16 +387,22 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
phase: &mut AcquirePhase,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&conn, options) {
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close().await);
}

if options.test_before_acquire {
*phase = AcquirePhase::TestBeforeAcquire;

// Check that the connection is still live
if let Err(e) = conn.ping().await {
*phase = AcquirePhase::ClosingInvalidConnection;

// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
Expand All @@ -371,16 +413,20 @@ async fn check_idle_conn<DB: Database>(
}

if let Some(test) = &options.before_acquire {
*phase = AcquirePhase::BeforeAcquireCallback;

let meta = conn.metadata();
match test(&mut conn.live.raw, meta).await {
Ok(false) => {
// connection was rejected by user-defined hook, close nicely
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close().await);
}

Err(error) => {
log::warn!("error from `before_acquire`: {}", error);
// connection is broken so don't try to close nicely
*phase = AcquirePhase::ClosingInvalidConnection;
return Err(conn.close_hard().await);
}

Expand Down
150 changes: 150 additions & 0 deletions sqlx-core/src/pool/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! Metrics collection utilities for [`Pool`][crate::pool::Pool].
//!
//!

use std::sync::Arc;
use std::time::Duration;

// Saves a bunch of redundant links in docs.
// Just `#[cfg(doc)]` doesn't work for some reason.
#[cfg_attr(not(doc), allow(unused_imports))]
use {
crate::connection::Connection,
crate::pool::{Pool, PoolOptions},
};

mod simple;

pub use simple::{
AcquireTimeoutsPerPhase, SimplePoolMetrics, SimplePoolMetricsSnapshot, SimpleTimingStats,
};

/// Describes a type that can collect metrics from [`Pool`].
///
/// You can set the metrics collector for a `Pool` instance using [`PoolOptions::metrics_collector`].
///
/// For an easy-start implementation, see [`SimplePoolMetrics`].
///
/// All methods on this trait have provided impls so you can override just the ones you care about.
pub trait PoolMetricsCollector: Send + Sync + 'static {
/// Record when [`Pool::acquire()`] is called.
fn acquire_called(&self) {}

/// Record how long a [`Pool::acquire()`] call waited for a semaphore permit.
///
/// This is the first stage of `acquire()` and gives the call the right-of-way to either
/// pop a connection from the idle queue or open a new one.
///
/// This time is likely to increase as the pool comes under higher and higher load,
/// and will asymptotically approach the [acquire timeout][PoolOptions::acquire_timeout].
///
/// If `acquire()` times out while waiting for a permit, this method will not be called.
/// You will get an <code>acquire_timed_out([AcquirePhase::Waiting])</code> call instead.
///
/// [acquire_timed_out]: Self::acquire_timed_out
fn permit_wait_time(&self, duration: Duration) {
drop(duration);
}

/// Record when [`Pool::acquire()`] times out as governed by [`PoolOptions::acquire_timeout`].
///
/// `acquire()` has several internal asynchronous operations that it may time out on.
/// The given [`AcquirePhase`] tells you which one timed out.
fn acquire_timed_out(&self, phase: AcquirePhase) {
drop(phase);
}

/// Record when a connection is successfully acquired.
fn connection_acquired(&self, total_wait: Duration) {
drop(total_wait);
}
}

macro_rules! opt_delegate {
($receiver:ident.$method:ident $( ( $($arg:expr),*) )?) => {
if let Some(this) = $receiver {
this.$method($( $($arg),* )?);
}
}
}

#[doc(hidden)]
impl PoolMetricsCollector for Option<Arc<dyn PoolMetricsCollector>> {
fn acquire_called(&self) {
opt_delegate!(self.acquire_called());
}

fn permit_wait_time(&self, duration: Duration) {
opt_delegate!(self.permit_wait_time(duration));
}

fn acquire_timed_out(&self, phase: AcquirePhase) {
opt_delegate!(self.acquire_timed_out(phase));
}

fn connection_acquired(&self, total_wait: Duration) {
opt_delegate!(self.connection_acquired(total_wait));
}
}

/// The phase that [`Pool::acquire()`] was in when it timed out.
///
/// [`Pool::acquire()`] has several internal asynchronous operations, any of which may lead
/// to it timing out. Which phases are executed depends on multiple things:
///
/// * The pool's configuration.
/// * If an idle connection was available or not.
/// * If there is room in the pool for a new connection.
///
/// ### Note: Some Trait impls are Unstable
/// The `enum_map` trait impls are *not* considered part of the stable API.
/// They would not be listed in documentation if it was possible to tell the derive to hide them.
///
/// We reserve the right to update `enum_map` to a non-compatible version if necessary.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, enum_map::Enum)]
#[cfg_attr(feature = "json", derive(serde::Serialize))]
#[non_exhaustive]
pub enum AcquirePhase {
/// Initial [`Pool::acquire()`] phase: waiting for a semaphore permit.
///
/// A permit represents the privilege to acquire a connection, either by popping one
/// from the idle queue or opening a new one.
Waiting,

/// `acquire()` found an idle connection. It then calls [`Connection::ping()`] on it.
///
/// Only done if [`PoolOptions::test_before_acquire`] is `true` (enabled by default).
TestBeforeAcquire,

/// `acquire()` found an idle connection and the `TestBeforeAcquire` phase succeeded
/// or was skipped.
///
/// It then invokes the user-defined [`before_acquire`][PoolOptions::before_acquire] callback, if set.
BeforeAcquireCallback,

/// `acquire()` found an idle connection but decided to close it.
///
/// This may have happened for any of the following reasons:
/// * The connection's age exceeded [`PoolOptions::max_lifetime`].
/// * The `TestBeforeAcquire` phase failed.
/// * The `BeforeAcquireCallback` errored or rejected the connection.
/// * A new connection was opened but the `AfterConnectCallback` phase errored.
ClosingInvalidConnection,

/// `acquire()` either did not find an idle connection or the connection it got failed
/// the `TestBeforeAcquire` or `BeforeAcquireCallback` phase and was closed.
///
/// It then attempted to open a new connection.
Connecting,

/// `acquire()` successfully opened a new connection.
///
/// It then invokes the user-defined [`after_connect`][PoolOptions::after_connect] callback, if set.
AfterConnectCallback,

/// `acquire()` failed to open a new connection or the connection failed the
/// `AfterConnectCallback` phase.
///
/// It then waits in a backoff loop before attempting to open another connection.
Backoff,
}
Loading