Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 38 additions & 30 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,27 @@ use std::{
use metrics::counter;
use tokio::sync::OwnedSemaphorePermit;

use crate::metrics_utils::{
GaugeGuard, ACTIVE_CONNECTIONS, CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL, OPEN_CONNECTIONS,
use crate::{
metrics_utils::{
GaugeGuard, ACTIVE_CONNECTIONS, CLOSED_TOTAL, IDLE_CONNECTIONS, OPENED_TOTAL,
OPEN_CONNECTIONS,
},
Manager,
};

pub(crate) struct ActiveConn<C> {
inner: C,
state: ConnState,
pub(crate) struct ActiveConn<M: Manager> {
inner: M::Connection,
state: ConnState<M>,
_permit: OwnedSemaphorePermit,
_active_connections_gauge: GaugeGuard,
_active_connections_gauge: GaugeGuard<M>,
}

impl<C> ActiveConn<C> {
pub(crate) fn new(inner: C, permit: OwnedSemaphorePermit, state: ConnState) -> ActiveConn<C> {
impl<M: Manager> ActiveConn<M> {
pub(crate) fn new(
inner: M::Connection,
permit: OwnedSemaphorePermit,
state: ConnState<M>,
) -> ActiveConn<M> {
Self {
inner,
state,
Expand All @@ -31,7 +39,7 @@ impl<C> ActiveConn<C> {
}
}

pub(crate) fn into_idle(self) -> IdleConn<C> {
pub(crate) fn into_idle(self) -> IdleConn<M> {
IdleConn {
inner: self.inner,
state: self.state,
Expand All @@ -47,31 +55,31 @@ impl<C> ActiveConn<C> {
self.state.brand_new = brand_new;
}

pub(crate) fn into_raw(self) -> C {
pub(crate) fn into_raw(self) -> M::Connection {
self.inner
}

pub(crate) fn as_raw_ref(&self) -> &C {
pub(crate) fn as_raw_ref(&self) -> &M::Connection {
&self.inner
}

pub(crate) fn as_raw_mut(&mut self) -> &mut C {
pub(crate) fn as_raw_mut(&mut self) -> &mut M::Connection {
&mut self.inner
}
}

pub(crate) struct IdleConn<C> {
inner: C,
state: ConnState,
_idle_connections_gauge: GaugeGuard,
pub(crate) struct IdleConn<M: Manager> {
inner: M::Connection,
state: ConnState<M>,
_idle_connections_gauge: GaugeGuard<M>,
}

impl<C> IdleConn<C> {
impl<M: Manager> IdleConn<M> {
pub(crate) fn is_brand_new(&self) -> bool {
self.state.brand_new
}

pub(crate) fn into_active(self, permit: OwnedSemaphorePermit) -> ActiveConn<C> {
pub(crate) fn into_active(self, permit: OwnedSemaphorePermit) -> ActiveConn<M> {
ActiveConn::new(self.inner, permit, self.state)
}

Expand Down Expand Up @@ -113,25 +121,25 @@ impl<C> IdleConn<C> {
self.state.last_checked_at = Instant::now()
}

pub(crate) fn split_raw(self) -> (C, ConnSplit<C>) {
pub(crate) fn split_raw(self) -> (M::Connection, ConnSplit<M>) {
(
self.inner,
ConnSplit::new(self.state, self._idle_connections_gauge),
)
}
}

pub(crate) struct ConnState {
pub(crate) struct ConnState<M: Manager> {
pub(crate) created_at: Instant,
pub(crate) last_used_at: Instant,
pub(crate) last_checked_at: Instant,
pub(crate) brand_new: bool,
total_connections_open: Arc<AtomicU64>,
total_connections_closed: Arc<AtomicU64>,
_open_connections_gauge: GaugeGuard,
_open_connections_gauge: GaugeGuard<M>,
}

impl ConnState {
impl<M: Manager> ConnState<M> {
pub(crate) fn new(
total_connections_open: Arc<AtomicU64>,
total_connections_closed: Arc<AtomicU64>,
Expand All @@ -149,7 +157,7 @@ impl ConnState {
}
}

impl Drop for ConnState {
impl<M: Manager> Drop for ConnState<M> {
fn drop(&mut self) {
self.total_connections_open.fetch_sub(1, Ordering::Relaxed);
self.total_connections_closed
Expand All @@ -158,22 +166,22 @@ impl Drop for ConnState {
}
}

pub(crate) struct ConnSplit<C> {
state: ConnState,
gauge: GaugeGuard,
_phantom: PhantomData<C>,
pub(crate) struct ConnSplit<M: Manager> {
state: ConnState<M>,
gauge: GaugeGuard<M>,
_phantom: PhantomData<M>,
}

impl<C> ConnSplit<C> {
fn new(state: ConnState, gauge: GaugeGuard) -> Self {
impl<M: Manager> ConnSplit<M> {
fn new(state: ConnState<M>, gauge: GaugeGuard<M>) -> Self {
Self {
state,
gauge,
_phantom: PhantomData,
}
}

pub(crate) fn restore(self, raw: C) -> IdleConn<C> {
pub(crate) fn restore(self, raw: M::Connection) -> IdleConn<M> {
IdleConn {
inner: raw,
state: self.state,
Expand Down
41 changes: 16 additions & 25 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ use futures_util::FutureExt;
use futures_util::SinkExt;
use futures_util::StreamExt;
use metrics::gauge;
use metrics_utils::DurationHistogramGuard;
use metrics_utils::{get_manager_type, DurationHistogramGuard};
pub use spawn::spawn;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -157,14 +157,14 @@ pub trait Manager: Send + Sync + 'static {
struct SharedPool<M: Manager> {
config: ShareConfig,
manager: M,
internals: Mutex<PoolInternals<M::Connection>>,
internals: Mutex<PoolInternals<M>>,
state: PoolState,
semaphore: Arc<Semaphore>,
}

struct PoolInternals<C> {
struct PoolInternals<M: Manager> {
config: InternalConfig,
free_conns: Vec<IdleConn<C>>,
free_conns: Vec<IdleConn<M>>,
wait_duration: Duration,
cleaner_ch: Option<Sender<()>>,
}
Expand All @@ -176,7 +176,7 @@ struct PoolState {
wait_count: AtomicU64,
}

impl<C> Drop for PoolInternals<C> {
impl<M: Manager> Drop for PoolInternals<M> {
fn drop(&mut self) {
log::debug!("Pool internal drop");
}
Expand Down Expand Up @@ -322,7 +322,7 @@ impl<M: Manager> Pool<M> {
config.max_open as usize
};

gauge!(IDLE_CONNECTIONS).set(0.0);
gauge!(IDLE_CONNECTIONS, "manager" => get_manager_type::<M>()).set(0.0);

let (share_config, internal_config) = config.split();
let internals = Mutex::new(PoolInternals {
Expand Down Expand Up @@ -387,7 +387,7 @@ impl<M: Manager> Pool<M> {
}

async fn get_connection(&self) -> Result<Connection<M>, Error<M::Error>> {
let _guard = GaugeGuard::increment(WAIT_COUNT);
let _guard = GaugeGuard::<M>::increment(WAIT_COUNT);
let c = self.get_or_create_conn().await?;

let conn = Connection {
Expand All @@ -401,8 +401,8 @@ impl<M: Manager> Pool<M> {
async fn validate_conn(
&self,
internal_config: InternalConfig,
conn: IdleConn<M::Connection>,
) -> Option<IdleConn<M::Connection>> {
conn: IdleConn<M>,
) -> Option<IdleConn<M>> {
if conn.is_brand_new() {
return Some(conn);
}
Expand All @@ -428,9 +428,9 @@ impl<M: Manager> Pool<M> {
Some(conn)
}

async fn get_or_create_conn(&self) -> Result<ActiveConn<M::Connection>, Error<M::Error>> {
async fn get_or_create_conn(&self) -> Result<ActiveConn<M>, Error<M::Error>> {
self.0.state.wait_count.fetch_add(1, Ordering::Relaxed);
let wait_guard = DurationHistogramGuard::start(WAIT_DURATION);
let wait_guard = DurationHistogramGuard::<M>::start(WAIT_DURATION);

let semaphore = Arc::clone(&self.0.semaphore);
let permit = semaphore
Expand Down Expand Up @@ -460,7 +460,7 @@ impl<M: Manager> Pool<M> {
async fn open_new_connection(
&self,
permit: OwnedSemaphorePermit,
) -> Result<ActiveConn<M::Connection>, Error<M::Error>> {
) -> Result<ActiveConn<M>, Error<M::Error>> {
log::debug!("creating new connection from manager");
match self.0.manager.connect().await {
Ok(c) => {
Expand Down Expand Up @@ -499,21 +499,15 @@ impl<M: Manager> Pool<M> {
}
}

async fn recycle_conn<M: Manager>(
shared: &Arc<SharedPool<M>>,
mut conn: ActiveConn<M::Connection>,
) {
async fn recycle_conn<M: Manager>(shared: &Arc<SharedPool<M>>, mut conn: ActiveConn<M>) {
if conn_still_valid(shared, &mut conn) {
conn.set_brand_new(false);
let internals = shared.internals.lock().await;
put_idle_conn::<M>(internals, conn);
}
}

fn conn_still_valid<M: Manager>(
shared: &Arc<SharedPool<M>>,
conn: &mut ActiveConn<M::Connection>,
) -> bool {
fn conn_still_valid<M: Manager>(shared: &Arc<SharedPool<M>>, conn: &mut ActiveConn<M>) -> bool {
if !shared.manager.validate(conn.as_raw_mut()) {
log::debug!("bad conn when check in");
return false;
Expand All @@ -522,10 +516,7 @@ fn conn_still_valid<M: Manager>(
true
}

fn put_idle_conn<M: Manager>(
mut internals: MutexGuard<'_, PoolInternals<M::Connection>>,
conn: ActiveConn<M::Connection>,
) {
fn put_idle_conn<M: Manager>(mut internals: MutexGuard<'_, PoolInternals<M>>, conn: ActiveConn<M>) {
let idle_conn = conn.into_idle();
// Treat max_idle == 0 as unlimited idle connections.
if internals.config.max_idle == 0
Expand Down Expand Up @@ -608,7 +599,7 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
/// A smart pointer wrapping a connection.
pub struct Connection<M: Manager> {
pool: Pool<M>,
conn: Option<ActiveConn<M::Connection>>,
conn: Option<ActiveConn<M>>,
}

impl<M: Manager> Connection<M> {
Expand Down
37 changes: 27 additions & 10 deletions src/metrics_utils.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{
any::type_name,
marker::PhantomData,
mem::ManuallyDrop,
time::{Duration, Instant},
};

use metrics::{describe_counter, describe_gauge, describe_histogram, gauge, histogram};

use crate::Manager;

// counters
pub const OPENED_TOTAL: &str = "mobc_pool_connections_opened_total";
pub const CLOSED_TOTAL: &str = "mobc_pool_connections_closed_total";
Expand Down Expand Up @@ -48,45 +52,58 @@ pub fn describe_metrics() {
);
}

pub(crate) struct GaugeGuard {
pub(crate) fn get_manager_type<M: Manager>() -> &'static str {
type_name::<M>()
.split("::")
.last()
.expect("we shouldn't get None here")
}

pub(crate) struct GaugeGuard<M: Manager> {
key: &'static str,
_phantom: PhantomData<M>,
}

impl GaugeGuard {
impl<M: Manager> GaugeGuard<M> {
pub fn increment(key: &'static str) -> Self {
gauge!(key).increment(1.0);
Self { key }
gauge!(key, "manager" => get_manager_type::<M>()).increment(1.0);
Self {
key,
_phantom: PhantomData,
}
}
}

impl Drop for GaugeGuard {
impl<M: Manager> Drop for GaugeGuard<M> {
fn drop(&mut self) {
gauge!(self.key).decrement(1.0);
gauge!(self.key, "manager" => get_manager_type::<M>()).decrement(1.0);
}
}

pub(crate) struct DurationHistogramGuard {
pub(crate) struct DurationHistogramGuard<M: Manager> {
start: Instant,
key: &'static str,
_phantom: PhantomData<M>,
}

impl DurationHistogramGuard {
impl<M: Manager> DurationHistogramGuard<M> {
pub(crate) fn start(key: &'static str) -> Self {
Self {
start: Instant::now(),
key,
_phantom: PhantomData,
}
}

pub(crate) fn into_elapsed(self) -> Duration {
let this = ManuallyDrop::new(self);
let elapsed = this.start.elapsed();
histogram!(this.key).record(elapsed);
histogram!(this.key, "manager" => get_manager_type::<M>()).record(elapsed);
elapsed
}
}

impl Drop for DurationHistogramGuard {
impl<M: Manager> Drop for DurationHistogramGuard<M> {
fn drop(&mut self) {
histogram!(self.key).record(self.start.elapsed());
}
Expand Down