From b7e797e65c109f6da895e8897b7d04a8c2cdfaa0 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Sat, 7 May 2022 02:55:47 +0100 Subject: [PATCH 1/4] feat: metric observer & pool wait timing This commit adds the public PoolMetricsObserver trait for end users to optionally implement, in order to receive metric / event data from the underlying connection Pool. PoolMetricsObserver currently exposes a single metric: the duration of time spent waiting for a permit from the connection pool semaphore. This allows users to quantify the saturation of the connection pool (USE-style metrics) and identify when the workload exceeds the available pool capacity. A default no-op method implementation is provided so end-users can choose which metrics they care about, and not have to implement the full set of methods. --- Cargo.lock | 9 +-- sqlx-core/Cargo.toml | 1 + sqlx-core/src/pool/inner.rs | 105 +++++++++++++++++++++++++++++++++- sqlx-core/src/pool/metrics.rs | 85 +++++++++++++++++++++++++++ sqlx-core/src/pool/mod.rs | 2 + sqlx-core/src/pool/options.rs | 13 +++++ tests/any/pool.rs | 98 ++++++++++++++++++++++++++++++- 7 files changed, 306 insertions(+), 7 deletions(-) create mode 100644 sqlx-core/src/pool/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 9c97ad8262..8a1cf1a066 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1659,18 +1659,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1622113ce508488160cff04e6abc60960e676d330e1ca0f77c0b8df17c81438f" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b95af56fee93df76d721d356ac1ca41fccf168bc448eb14049234df764ba3e76" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" dependencies = [ "proc-macro2", "quote", @@ -2393,6 +2393,7 @@ dependencies = [ "once_cell", "paste", "percent-encoding", + "pin-project", "rand", "regex", "rsa", diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 96cca286fa..a9b17c73a6 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -158,6 +158,7 @@ memchr = { version = "2.4.1", default-features = false } num-bigint = { version = "0.4.0", default-features = false, optional = true, features = ["std"] } once_cell = "1.9.0" percent-encoding = "2.1.0" +pin-project = "1.0.10" rand = { version = "0.8.4", default-features = false, optional = true, features = ["std", "std_rng"] } regex = { version = "1.5.5", optional = true } rsa = { version = "0.6.0", optional = true } diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index ffbd69a89b..c40f23359b 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -13,6 +13,7 @@ use std::mem; use std::ptr; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; +use std::{future::Future, pin::Pin, task::Poll}; use std::time::{Duration, Instant}; @@ -169,7 +170,19 @@ impl SharedPool { self.options.connect_timeout, async { loop { - let permit = self.semaphore.acquire(1).await; + // Decorate the semaphore permit acquisition to record the + // duration it takes to be granted (or timed out). + let fut = self.semaphore.acquire(1); + let permit = if let Some(obs) = &self.options.metric_observer { + // A metric observer is registered, so instrument the + // semaphore wait. + PollWallClockRecorder::new( + |d| obs.permit_wait_time(d), + fut, + ).await + } else { + fut.await + }; if self.is_closed() { return Err(Error::PoolClosed); @@ -409,3 +422,93 @@ impl Drop for DecrementSizeGuard { self.pool.semaphore.release(1); } } + +use pin_project::{pin_project, pinned_drop}; + +/// A [`PollWallClockRecorder`] measures the duration of time (in milliseconds) +/// from first poll of `F`, to drop of this type. +/// +/// The typical lifecycle of a future is `create -> poll -> ready -> drop`, with +/// the drop happening an insignificant duration of time after the future +/// returns [`Poll::Ready`]. +/// +/// Another common lifecycle path that should be instrumented is when the +/// created future's caller times out before returning [`Poll::Ready`], with a +/// lifecycle similar to `create -> poll -> pending -> timeout -> drop`. This +/// results in the instrumented future being dropped an insignificant duration +/// of time after the timeout occurs. +/// +/// This instrumentation therefore (approximately) measures the time between +/// first poll and yielding a value, or the future timing out and being dropped. +#[pin_project(PinnedDrop)] +struct PollWallClockRecorder +where + R: Fn(Duration), +{ + // A closure the measurement will be called with. + recorder: R, + + // The instant of the first poll. + // + // None at construction time, Some after first poll. + started_at: Option, + + // The inner future that requires pinning to call poll(). + #[pin] + fut: F, +} + +impl PollWallClockRecorder +where + R: Fn(Duration), +{ + /// Record the poll duration of `F`, calling `recorder` with the duration of + /// time between first poll and drop of `Self` in milliseconds. + pub(crate) fn new(recorder: R, fut: F) -> Self { + Self { + recorder, + fut, + started_at: None, + } + } +} + +impl Future for PollWallClockRecorder +where + F: Future, + R: Fn(Duration), +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + + // Initialise the `started_at` instant on the first poll. + // + // This is more accurate than initialising the Instant in the + // constructor of this type, which would incorrectly include the time + // from construction to first poll in the measurement (which also + // includes the time a task waits to be scheduled onto a runtime worker + // thread). + let _started_at = this.started_at.get_or_insert_with(Instant::now); + + // Pass through the poll to the inner future. + this.fut.poll(cx) + } +} + +#[pinned_drop] +impl PinnedDrop for PollWallClockRecorder +where + R: Fn(Duration), +{ + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + + // The future could be dropped without ever being polled, so started_at + // may not be initialised. + if let Some(started_at) = this.started_at { + (this.recorder)(Duration::from_millis(started_at.elapsed().as_millis() as _)); + } + } +} diff --git a/sqlx-core/src/pool/metrics.rs b/sqlx-core/src/pool/metrics.rs new file mode 100644 index 0000000000..8302f7e816 --- /dev/null +++ b/sqlx-core/src/pool/metrics.rs @@ -0,0 +1,85 @@ +//! Pool metric / event instrumentation. + +use std::{ops::Deref, sync::Arc, time::Duration}; + +/// An observer of [`Pool`] metrics. +/// +/// ```rust +/// use std::sync::Arc; +/// use sqlx::pool::PoolMetricsObserver; +/// +/// #[derive(Default)] +/// struct Observer; +/// +/// impl PoolMetricsObserver for Observer { +/// fn permit_wait_time(&self, time: std::time::Duration) { +/// println!( +/// "waited {} milliseconds to get a slot in the connection pool", +/// time.as_millis() +/// ); +/// } +/// } +/// +/// # #[cfg(feature = "any")] +/// # async fn _example() -> Result<(), sqlx::Error> { +/// # let database_url = ""; +/// // Initialise the observer as a dyn PoolMetricsObserver +/// let metrics: Arc = Arc::new(Observer::default()); +/// +/// // Configure the pool to push metrics to the observer +/// # use sqlx_core::any::AnyPoolOptions; +/// # use sqlx::Executor; +/// let pool = AnyPoolOptions::new() +/// .max_connections(1) +/// .metrics_observer(Arc::clone(&metrics)) +/// .connect(&database_url) +/// .await?; +/// +/// // Use the pool and see the wait times! +/// pool.execute("SELECT 1;").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// [`Pool`]: crate::pool::Pool +pub trait PoolMetricsObserver: Send + Sync { + /// Called with the [`Duration`] spent waiting on a permit for a connection + /// to be granted from the underlying connection pool, each time a permit + /// acquisition attempt completes (successfully or not). + /// + /// # Blocking + /// + /// The [`acquire()`] call blocks while this method is called by the + /// connection pool. Implementations should aim to return relatively + /// quickly. + /// + /// # Semantics + /// + /// This value is incremented once a connection permit is granted, and does + /// NOT include the time taken to perform any liveness checks on connections + /// or time taken to establish a connection, if needed. + /// + /// If a [connection timeout][1] expires while waiting for a connection from + /// the pool, the duration of time waiting for the permit is included in + /// this measurement. + /// + /// NOTE: this may report a small wait duration even if connection permits + /// are immediately available when calling [`acquire()`], as acquiring one + /// is not instantaneous. + /// + /// [1]: crate::pool::PoolOptions::connect_timeout() + /// [`acquire()`]: crate::pool::Pool::acquire() + /// [`Pool`]: crate::pool::Pool + fn permit_wait_time(&self, time: Duration) { + let _ = time; + } +} + +impl PoolMetricsObserver for Arc +where + T: PoolMetricsObserver, +{ + fn permit_wait_time(&self, time: Duration) { + self.deref().permit_wait_time(time) + } +} diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 6ca6739144..c5c4773dfe 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -87,10 +87,12 @@ mod maybe; mod connection; mod inner; +mod metrics; mod options; pub use self::connection::PoolConnection; pub(crate) use self::maybe::MaybePoolConnection; +pub use self::metrics::PoolMetricsObserver; pub use self::options::PoolOptions; /// An asynchronous pool of SQLx database connections. diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 3be566d793..31145afd3f 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -3,6 +3,7 @@ use crate::database::Database; use crate::error::Error; use crate::pool::inner::SharedPool; use crate::pool::Pool; +use crate::pool::PoolMetricsObserver; use futures_core::future::BoxFuture; use sqlx_rt::spawn; use std::cmp; @@ -33,6 +34,7 @@ pub struct PoolOptions { pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, pub(crate) fair: bool, + pub(crate) metric_observer: Option>, } impl Default for PoolOptions { @@ -54,6 +56,7 @@ impl PoolOptions { idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), fair: true, + metric_observer: None, } } @@ -226,6 +229,16 @@ impl PoolOptions { Pool(shared) } + + /// Push [`Pool`] metric / events to the specified [`PoolMetricsObserver`] + /// implementation. + pub fn metrics_observer( + mut self, + observer: impl Into>>, + ) -> Self { + self.metric_observer = observer.into(); + self + } } async fn init_min_connections(pool: &Arc>) -> Result<(), Error> { diff --git a/tests/any/pool.rs b/tests/any/pool.rs index 91b97978bf..abeb417764 100644 --- a/tests/any/pool.rs +++ b/tests/any/pool.rs @@ -1,7 +1,7 @@ -use sqlx::any::AnyPoolOptions; +use sqlx::{any::AnyPoolOptions, pool::PoolMetricsObserver}; use std::sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }; use std::time::Duration; @@ -64,3 +64,97 @@ async fn pool_should_be_returned_failed_transactions() -> anyhow::Result<()> { Ok(()) } + +#[sqlx_macros::test] +async fn pool_wait_duration_counter_increases() -> anyhow::Result<()> { + const DELAY_MS: u64 = 10; + + let recorder_state = Arc::new(Mutex::new(Vec::with_capacity(2))); + let metrics: Arc = Arc::new(MetricRecorder { + wait: Arc::clone(&recorder_state), + }); + + let pool = Arc::new( + AnyPoolOptions::new() + .max_connections(1) + .metrics_observer(Arc::clone(&metrics)) + .connect(&dotenv::var("DATABASE_URL")?) + .await?, + ); + + let conn_1 = pool.acquire().await?; + + // Grab a timestamp before conn_2 starts waiting, and compute the duration + // once the task handle is joined to derive an upper bound on the pool wait + // time. + let started_at = std::time::Instant::now(); + + // A signal to indicate the second acquisition task spawned below has been + // scheduled and is executing. + let (tx, spawned) = futures::channel::oneshot::channel(); + + // This acquire blocks for conn_1 to be returned to the pool + let handle = sqlx_rt::spawn({ + let pool = Arc::clone(&pool); + async move { + tx.send(()).expect("test not listening"); + let _conn_2 = pool.acquire().await?; + Result::<(), anyhow::Error>::Ok(()) + } + }); + + // Wait for the second acquisition attempt to spawn and begin executing. + spawned.await.expect("task panic"); + + // Wait a known duration of time and then drop conn_1, unblocking conn_2. + sqlx_rt::sleep(Duration::from_millis(DELAY_MS)).await; + drop(conn_1); + + // Allow conn_2 to be acquired, and then immediately returning, joining the + // task handle. + let _ = handle.await.expect("acquire() task failed"); + + // At this point, conn_2 would have been acquired and immediately dropped. + // + // Now conn_2 has definitely stopped waiting (as acquire() returned and the + // task was joined), the upper bound on pool wait time can be derived. + let upper_bound = started_at.elapsed(); + + // Inspecting the wait times should show 2 permit acquisitions. + let waits = recorder_state.lock().unwrap().clone(); + assert_eq!(waits.len(), 2); + + // We can derive a upper and lower bound for the permit acquisition duration + // of conn_2, and use it to verify it is correctly recorded. + // + // The permit wait time MUST be at least, or equal to, DELAY_MS and no more + // than upper_bound. + let wait = waits[1]; + assert!( + wait.as_millis() as u64 >= DELAY_MS, + "expected at least {}, got {} when validating {:?}", + DELAY_MS, + wait.as_millis(), + waits, + ); + assert!( + wait < upper_bound, + "expected at most {:?}, got {:?} when validating {:?}", + upper_bound, + wait, + waits + ); + + Ok(()) +} + +#[derive(Debug, Default)] +struct MetricRecorder { + wait: Arc>>, +} + +impl PoolMetricsObserver for MetricRecorder { + fn permit_wait_time(&self, time: Duration) { + self.wait.lock().unwrap().push(time) + } +} From 1d25e95a97f68d8fbd1c9ee8c00fddeea0f615f8 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 10 Jun 2022 13:52:56 +0100 Subject: [PATCH 2/4] refactor: impl PoolMetricsObserver for Option Reduces call-site boilerplate, allowing the caller to "always" call the metric wrapper and let the impl decide what to do with the results, turning a None into a no-op and passing the call into the child if Some. --- sqlx-core/src/pool/inner.rs | 17 +++++------------ sqlx-core/src/pool/metrics.rs | 8 ++++++++ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index c40f23359b..b62a8f6ade 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -1,4 +1,4 @@ -use super::connection::{Floating, Idle, Live}; +use super::{connection::{Floating, Idle, Live}, PoolMetricsObserver}; use crate::connection::ConnectOptions; use crate::connection::Connection; use crate::database::Database; @@ -172,17 +172,10 @@ impl SharedPool { loop { // Decorate the semaphore permit acquisition to record the // duration it takes to be granted (or timed out). - let fut = self.semaphore.acquire(1); - let permit = if let Some(obs) = &self.options.metric_observer { - // A metric observer is registered, so instrument the - // semaphore wait. - PollWallClockRecorder::new( - |d| obs.permit_wait_time(d), - fut, - ).await - } else { - fut.await - }; + let permit = PollWallClockRecorder::new( + |d| self.options.metric_observer.permit_wait_time(d), + self.semaphore.acquire(1), + ).await; if self.is_closed() { return Err(Error::PoolClosed); diff --git a/sqlx-core/src/pool/metrics.rs b/sqlx-core/src/pool/metrics.rs index 8302f7e816..0ccdb1033c 100644 --- a/sqlx-core/src/pool/metrics.rs +++ b/sqlx-core/src/pool/metrics.rs @@ -83,3 +83,11 @@ where self.deref().permit_wait_time(time) } } + +impl PoolMetricsObserver for Option> { + fn permit_wait_time(&self, time: Duration) { + if let Some(v) = self { + v.permit_wait_time(time) + } + } +} From 1b5b2e4f94b6ea9a10b76a68d9b5ea2015d3e548 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 10 Jun 2022 14:28:41 +0100 Subject: [PATCH 3/4] refactor: simplify metrics builder option Changes the connection options "metrics_observer" method to accept a impl PoolMetricsObserver. --- sqlx-core/src/pool/metrics.rs | 25 ++++++++++++++++++++++++- sqlx-core/src/pool/options.rs | 7 ++----- tests/any/pool.rs | 16 ++++++++++++++-- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/sqlx-core/src/pool/metrics.rs b/sqlx-core/src/pool/metrics.rs index 0ccdb1033c..e335f4a641 100644 --- a/sqlx-core/src/pool/metrics.rs +++ b/sqlx-core/src/pool/metrics.rs @@ -42,7 +42,7 @@ use std::{ops::Deref, sync::Arc, time::Duration}; /// ``` /// /// [`Pool`]: crate::pool::Pool -pub trait PoolMetricsObserver: Send + Sync { +pub trait PoolMetricsObserver: Send + Sync + 'static { /// Called with the [`Duration`] spent waiting on a permit for a connection /// to be granted from the underlying connection pool, each time a permit /// acquisition attempt completes (successfully or not). @@ -73,6 +73,15 @@ pub trait PoolMetricsObserver: Send + Sync { fn permit_wait_time(&self, time: Duration) { let _ = time; } + + #[doc(hidden)] + fn __into_dyn_arc(self) -> Arc + where + Self: Sized + 'static, + { + // There's no need for end users to implement this. + Arc::new(self) + } } impl PoolMetricsObserver for Arc @@ -82,6 +91,13 @@ where fn permit_wait_time(&self, time: Duration) { self.deref().permit_wait_time(time) } + + fn __into_dyn_arc(self) -> Arc + where + Self: Sized, + { + self + } } impl PoolMetricsObserver for Option> { @@ -90,4 +106,11 @@ impl PoolMetricsObserver for Option> { v.permit_wait_time(time) } } + + fn __into_dyn_arc(self) -> Arc + where + Self: Sized, + { + panic!("Option> should not be wrapped in another Arc"); + } } diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 31145afd3f..77721da657 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -232,11 +232,8 @@ impl PoolOptions { /// Push [`Pool`] metric / events to the specified [`PoolMetricsObserver`] /// implementation. - pub fn metrics_observer( - mut self, - observer: impl Into>>, - ) -> Self { - self.metric_observer = observer.into(); + pub fn metrics_observer(mut self, observer: impl PoolMetricsObserver) -> Self { + self.metric_observer = Some(observer.__into_dyn_arc()); self } } diff --git a/tests/any/pool.rs b/tests/any/pool.rs index abeb417764..ef9102c685 100644 --- a/tests/any/pool.rs +++ b/tests/any/pool.rs @@ -70,7 +70,7 @@ async fn pool_wait_duration_counter_increases() -> anyhow::Result<()> { const DELAY_MS: u64 = 10; let recorder_state = Arc::new(Mutex::new(Vec::with_capacity(2))); - let metrics: Arc = Arc::new(MetricRecorder { + let metrics = Arc::new(MetricRecorder { wait: Arc::clone(&recorder_state), }); @@ -148,7 +148,19 @@ async fn pool_wait_duration_counter_increases() -> anyhow::Result<()> { Ok(()) } -#[derive(Debug, Default)] +// Static asserts that various types are accepted. +#[sqlx_macros::test] +fn assert_metric_types() { + let metrics = MetricRecorder { + wait: Default::default(), + }; + + AnyPoolOptions::new() + .metrics_observer(metrics.clone()) + .metrics_observer(Arc::new(metrics)); +} + +#[derive(Debug, Default, Clone)] struct MetricRecorder { wait: Arc>>, } From 2d3f26073fe585a15c3f3574b555c4c955fa2839 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 10 Jun 2022 14:33:56 +0100 Subject: [PATCH 4/4] style: work around cargo fmt bug Removes some whitespace that caused cargo fmt to bail. --- sqlx-core/src/pool/inner.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index b62a8f6ade..6a3763ea3f 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -1,4 +1,7 @@ -use super::{connection::{Floating, Idle, Live}, PoolMetricsObserver}; +use super::{ + connection::{Floating, Idle, Live}, + PoolMetricsObserver, +}; use crate::connection::ConnectOptions; use crate::connection::Connection; use crate::database::Database; @@ -173,7 +176,7 @@ impl SharedPool { // Decorate the semaphore permit acquisition to record the // duration it takes to be granted (or timed out). let permit = PollWallClockRecorder::new( - |d| self.options.metric_observer.permit_wait_time(d), + |d| self.options.metric_observer.permit_wait_time(d), self.semaphore.acquire(1), ).await;