From c511c5d889a5312cbbfacf46fdc5a7f1d86b21f5 Mon Sep 17 00:00:00 2001 From: NikSne Date: Tue, 13 May 2025 18:23:15 +0300 Subject: [PATCH 1/2] chore: update metrics to 0.24.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c21ed7c..03d9f28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ async-trait = "0.1" futures-timer = "3.0.2" log = "0.4" thiserror = "1.0" -metrics = "0.23.0" +metrics = "0.24.0" tracing = { version = "0.1", features = ["attributes"] } tracing-subscriber = "0.3.11" From 2428ee45e3f8d8ffb4010d28c5dac27d062a373b Mon Sep 17 00:00:00 2001 From: NikSne Date: Tue, 13 May 2025 19:41:35 +0300 Subject: [PATCH 2/2] feat(mobc): add manager type to metrics This can be useful when application uses more than one mobc pool P.S. I think it shouldn't be implemented by changing C to the M: Manager in Conn structs, so it needs a refactor. Maybe an optional builder argument for the pool name? --- src/conn.rs | 68 +++++++++++++++++++++++++------------------- src/lib.rs | 41 +++++++++++--------------- src/metrics_utils.rs | 37 +++++++++++++++++------- 3 files changed, 81 insertions(+), 65 deletions(-) diff --git a/src/conn.rs b/src/conn.rs index 383e65b..5b144b5 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -10,19 +10,27 @@ use std::{ use metrics::counter; use tokio::sync::OwnedSemaphorePermit; -use crate::metrics_utils::{ - GaugeGuard, ACTIVE_CONNECTIONS, CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS, +use crate::{ + metrics_utils::{ + GaugeGuard, ACTIVE_CONNECTIONS, CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, + OPEN_CONNECTIONS, + }, + Manager, }; -pub(crate) struct ActiveConn { - inner: C, - state: ConnState, +pub(crate) struct ActiveConn { + inner: M::Connection, + state: ConnState, _permit: OwnedSemaphorePermit, - _active_connections_gauge: GaugeGuard, + _active_connections_gauge: GaugeGuard, } -impl ActiveConn { - pub(crate) fn new(inner: C, permit: OwnedSemaphorePermit, state: ConnState) -> ActiveConn { +impl ActiveConn { + pub(crate) fn new( + inner: M::Connection, + permit: OwnedSemaphorePermit, + state: ConnState, + ) -> ActiveConn { Self { inner, state, @@ -31,7 +39,7 @@ impl ActiveConn { } } - pub(crate) fn into_idle(self) -> IdleConn { + pub(crate) fn into_idle(self) -> IdleConn { IdleConn { inner: self.inner, state: self.state, @@ -47,31 +55,31 @@ impl ActiveConn { self.state.brand_new = brand_new; } - pub(crate) fn into_raw(self) -> C { + pub(crate) fn into_raw(self) -> M::Connection { self.inner } - pub(crate) fn as_raw_ref(&self) -> &C { + pub(crate) fn as_raw_ref(&self) -> &M::Connection { &self.inner } - pub(crate) fn as_raw_mut(&mut self) -> &mut C { + pub(crate) fn as_raw_mut(&mut self) -> &mut M::Connection { &mut self.inner } } -pub(crate) struct IdleConn { - inner: C, - state: ConnState, - _idle_connections_gauge: GaugeGuard, +pub(crate) struct IdleConn { + inner: M::Connection, + state: ConnState, + _idle_connections_gauge: GaugeGuard, } -impl IdleConn { +impl IdleConn { pub(crate) fn is_brand_new(&self) -> bool { self.state.brand_new } - pub(crate) fn into_active(self, permit: OwnedSemaphorePermit) -> ActiveConn { + pub(crate) fn into_active(self, permit: OwnedSemaphorePermit) -> ActiveConn { ActiveConn::new(self.inner, permit, self.state) } @@ -113,7 +121,7 @@ impl IdleConn { self.state.last_checked_at = Instant::now() } - pub(crate) fn split_raw(self) -> (C, ConnSplit) { + pub(crate) fn split_raw(self) -> (M::Connection, ConnSplit) { ( self.inner, ConnSplit::new(self.state, self._idle_connections_gauge), @@ -121,17 +129,17 @@ impl IdleConn { } } -pub(crate) struct ConnState { +pub(crate) struct ConnState { pub(crate) created_at: Instant, pub(crate) last_used_at: Instant, pub(crate) last_checked_at: Instant, pub(crate) brand_new: bool, total_connections_open: Arc, total_connections_closed: Arc, - _open_connections_gauge: GaugeGuard, + _open_connections_gauge: GaugeGuard, } -impl ConnState { +impl ConnState { pub(crate) fn new( total_connections_open: Arc, total_connections_closed: Arc, @@ -149,7 +157,7 @@ impl ConnState { } } -impl Drop for ConnState { +impl Drop for ConnState { fn drop(&mut self) { self.total_connections_open.fetch_sub(1, Ordering::Relaxed); self.total_connections_closed @@ -158,14 +166,14 @@ impl Drop for ConnState { } } -pub(crate) struct ConnSplit { - state: ConnState, - gauge: GaugeGuard, - _phantom: PhantomData, +pub(crate) struct ConnSplit { + state: ConnState, + gauge: GaugeGuard, + _phantom: PhantomData, } -impl ConnSplit { - fn new(state: ConnState, gauge: GaugeGuard) -> Self { +impl ConnSplit { + fn new(state: ConnState, gauge: GaugeGuard) -> Self { Self { state, gauge, @@ -173,7 +181,7 @@ impl ConnSplit { } } - pub(crate) fn restore(self, raw: C) -> IdleConn { + pub(crate) fn restore(self, raw: M::Connection) -> IdleConn { IdleConn { inner: raw, state: self.state, diff --git a/src/lib.rs b/src/lib.rs index 1aaf136..3461257 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,7 +103,7 @@ use futures_util::FutureExt; use futures_util::SinkExt; use futures_util::StreamExt; use metrics::gauge; -use metrics_utils::DurationHistogramGuard; +use metrics_utils::{get_manager_type, DurationHistogramGuard}; pub use spawn::spawn; use std::fmt; use std::future::Future; @@ -157,14 +157,14 @@ pub trait Manager: Send + Sync + 'static { struct SharedPool { config: ShareConfig, manager: M, - internals: Mutex>, + internals: Mutex>, state: PoolState, semaphore: Arc, } -struct PoolInternals { +struct PoolInternals { config: InternalConfig, - free_conns: Vec>, + free_conns: Vec>, wait_duration: Duration, cleaner_ch: Option>, } @@ -176,7 +176,7 @@ struct PoolState { wait_count: AtomicU64, } -impl Drop for PoolInternals { +impl Drop for PoolInternals { fn drop(&mut self) { log::debug!("Pool internal drop"); } @@ -322,7 +322,7 @@ impl Pool { config.max_open as usize }; - gauge!(IDLE_CONNECTIONS).set(0.0); + gauge!(IDLE_CONNECTIONS, "manager" => get_manager_type::()).set(0.0); let (share_config, internal_config) = config.split(); let internals = Mutex::new(PoolInternals { @@ -387,7 +387,7 @@ impl Pool { } async fn get_connection(&self) -> Result, Error> { - let _guard = GaugeGuard::increment(WAIT_COUNT); + let _guard = GaugeGuard::::increment(WAIT_COUNT); let c = self.get_or_create_conn().await?; let conn = Connection { @@ -401,8 +401,8 @@ impl Pool { async fn validate_conn( &self, internal_config: InternalConfig, - conn: IdleConn, - ) -> Option> { + conn: IdleConn, + ) -> Option> { if conn.is_brand_new() { return Some(conn); } @@ -428,9 +428,9 @@ impl Pool { Some(conn) } - async fn get_or_create_conn(&self) -> Result, Error> { + async fn get_or_create_conn(&self) -> Result, Error> { self.0.state.wait_count.fetch_add(1, Ordering::Relaxed); - let wait_guard = DurationHistogramGuard::start(WAIT_DURATION); + let wait_guard = DurationHistogramGuard::::start(WAIT_DURATION); let semaphore = Arc::clone(&self.0.semaphore); let permit = semaphore @@ -460,7 +460,7 @@ impl Pool { async fn open_new_connection( &self, permit: OwnedSemaphorePermit, - ) -> Result, Error> { + ) -> Result, Error> { log::debug!("creating new connection from manager"); match self.0.manager.connect().await { Ok(c) => { @@ -499,10 +499,7 @@ impl Pool { } } -async fn recycle_conn( - shared: &Arc>, - mut conn: ActiveConn, -) { +async fn recycle_conn(shared: &Arc>, mut conn: ActiveConn) { if conn_still_valid(shared, &mut conn) { conn.set_brand_new(false); let internals = shared.internals.lock().await; @@ -510,10 +507,7 @@ async fn recycle_conn( } } -fn conn_still_valid( - shared: &Arc>, - conn: &mut ActiveConn, -) -> bool { +fn conn_still_valid(shared: &Arc>, conn: &mut ActiveConn) -> bool { if !shared.manager.validate(conn.as_raw_mut()) { log::debug!("bad conn when check in"); return false; @@ -522,10 +516,7 @@ fn conn_still_valid( true } -fn put_idle_conn( - mut internals: MutexGuard<'_, PoolInternals>, - conn: ActiveConn, -) { +fn put_idle_conn(mut internals: MutexGuard<'_, PoolInternals>, conn: ActiveConn) { let idle_conn = conn.into_idle(); // Treat max_idle == 0 as unlimited idle connections. if internals.config.max_idle == 0 @@ -608,7 +599,7 @@ async fn clean_connection(shared: &Weak>) -> bool { /// A smart pointer wrapping a connection. pub struct Connection { pool: Pool, - conn: Option>, + conn: Option>, } impl Connection { diff --git a/src/metrics_utils.rs b/src/metrics_utils.rs index a58af43..14b70b0 100644 --- a/src/metrics_utils.rs +++ b/src/metrics_utils.rs @@ -1,10 +1,14 @@ use std::{ + any::type_name, + marker::PhantomData, mem::ManuallyDrop, time::{Duration, Instant}, }; use metrics::{describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +use crate::Manager; + // counters pub const OPENED_TOTAL: &str = "mobc_pool_connections_opened_total"; pub const CLOSED_TOTAL: &str = "mobc_pool_connections_closed_total"; @@ -48,45 +52,58 @@ pub fn describe_metrics() { ); } -pub(crate) struct GaugeGuard { +pub(crate) fn get_manager_type() -> &'static str { + type_name::() + .split("::") + .last() + .expect("we shouldn't get None here") +} + +pub(crate) struct GaugeGuard { key: &'static str, + _phantom: PhantomData, } -impl GaugeGuard { +impl GaugeGuard { pub fn increment(key: &'static str) -> Self { - gauge!(key).increment(1.0); - Self { key } + gauge!(key, "manager" => get_manager_type::()).increment(1.0); + Self { + key, + _phantom: PhantomData, + } } } -impl Drop for GaugeGuard { +impl Drop for GaugeGuard { fn drop(&mut self) { - gauge!(self.key).decrement(1.0); + gauge!(self.key, "manager" => get_manager_type::()).decrement(1.0); } } -pub(crate) struct DurationHistogramGuard { +pub(crate) struct DurationHistogramGuard { start: Instant, key: &'static str, + _phantom: PhantomData, } -impl DurationHistogramGuard { +impl DurationHistogramGuard { pub(crate) fn start(key: &'static str) -> Self { Self { start: Instant::now(), key, + _phantom: PhantomData, } } pub(crate) fn into_elapsed(self) -> Duration { let this = ManuallyDrop::new(self); let elapsed = this.start.elapsed(); - histogram!(this.key).record(elapsed); + histogram!(this.key, "manager" => get_manager_type::()).record(elapsed); elapsed } } -impl Drop for DurationHistogramGuard { +impl Drop for DurationHistogramGuard { fn drop(&mut self) { histogram!(self.key).record(self.start.elapsed()); }