Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: pfreixes/bb8
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: add_get_stats
Choose a base ref
...
head repository: pfreixes/bb8
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: more_statistics
Choose a head ref
Can’t automatically merge. Don’t worry, you can still create the pull request.
  • 11 commits
  • 4 files changed
  • 1 contributor

Commits on Apr 18, 2024

  1. Provide statistics of gets and contention

    Two new attributes are added to the State object for providing to the
    user information about the number of gets and which ones suffered from
    contention.
    
    By doing the following operation `gets_with_contention/gets` the user can understand
    if the connection pool is underprovisioned or if the min idle connections is not properly
    configured.
    pfreixes committed Apr 18, 2024
    Copy the full SHA
    4f1caff View commit details
  2. Address lint issues

    pfreixes committed Apr 18, 2024
    Copy the full SHA
    238d9b8 View commit details
  3. Fix more lint issues

    pfreixes committed Apr 18, 2024
    Copy the full SHA
    a928d9f View commit details

Commits on May 2, 2024

  1. Apply review feedback

    pfreixes committed May 2, 2024
    Copy the full SHA
    ea8f436 View commit details
  2. Rename type name

    pfreixes committed May 2, 2024
    Copy the full SHA
    750bc7d View commit details
  3. Rename test name

    pfreixes committed May 2, 2024
    Copy the full SHA
    a88d9a3 View commit details
  4. Copy the full SHA
    f49432c View commit details
  5. Copy the full SHA
    81686ae View commit details
  6. Copy the full SHA
    57c54cd View commit details
  7. Copy the full SHA
    d298729 View commit details
  8. Copy the full SHA
    b9fce9e View commit details
Showing with 280 additions and 7 deletions.
  1. +6 −0 bb8/src/api.rs
  2. +125 −5 bb8/src/inner.rs
  3. +17 −2 bb8/src/internals.rs
  4. +132 −0 bb8/tests/test.rs
6 changes: 6 additions & 0 deletions bb8/src/api.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use std::time::Duration;
use async_trait::async_trait;

use crate::inner::PoolInner;
pub use crate::inner::Statistics;
use crate::internals::Conn;
pub use crate::internals::State;

@@ -45,6 +46,11 @@ impl<M: ManageConnection> Pool<M> {
Builder::new()
}

/// Returns statistics about the historical usage of the pool.
pub fn statistics(&self) -> Statistics {
self.inner.statistics()
}

/// Returns information about the current state of the pool.
pub fn state(&self) -> State {
self.inner.state()
130 changes: 125 additions & 5 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::{max, min};
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

@@ -9,14 +10,15 @@ use futures_util::TryFutureExt;
use tokio::spawn;
use tokio::time::{interval_at, sleep, timeout, Interval};

use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool, State};
use crate::api::{Builder, ConnectionState, ManageConnection, PooledConnection, RunError, State};
use crate::internals::{Approval, ApprovalIter, Conn, SharedPool};

pub(crate) struct PoolInner<M>
where
M: ManageConnection + Send,
{
inner: Arc<SharedPool<M>>,
pool_inner_stats: Arc<SharedPoolInnerStatistics>,
}

impl<M> PoolInner<M>
@@ -25,6 +27,7 @@ where
{
pub(crate) fn new(builder: Builder<M>, manager: M) -> Self {
let inner = Arc::new(SharedPool::new(builder, manager));
let pool_inner_stats = Arc::new(SharedPoolInnerStatistics::default());

if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() {
let start = Instant::now() + inner.statics.reaper_rate;
@@ -33,12 +36,16 @@ where
Reaper {
interval,
pool: Arc::downgrade(&inner),
pool_inner_stats: Arc::downgrade(&pool_inner_stats),
}
.run(),
);
}

Self { inner }
Self {
inner,
pool_inner_stats,
}
}

pub(crate) async fn start_connections(&self) -> Result<(), M::Error> {
@@ -85,6 +92,8 @@ where
}

pub(crate) async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
let mut wait_time_start = None;

let future = async {
loop {
let (conn, approvals) = self.inner.pop();
@@ -96,6 +105,7 @@ where
let mut conn = match conn {
Some(conn) => PooledConnection::new(self, conn),
None => {
wait_time_start = Some(Instant::now());
self.inner.notify.notified().await;
continue;
}
@@ -108,6 +118,9 @@ where
match self.inner.manager.is_valid(&mut conn).await {
Ok(()) => return Ok(conn),
Err(e) => {
self.pool_inner_stats
.invalid_closed_connections
.fetch_add(1, Ordering::SeqCst);
self.inner.forward_error(e);
conn.state = ConnectionState::Invalid;
continue;
@@ -116,10 +129,22 @@ where
}
};

match timeout(self.inner.statics.connection_timeout, future).await {
let result = match timeout(self.inner.statics.connection_timeout, future).await {
Ok(result) => result,
_ => Err(RunError::TimedOut),
};

if let Some(wait_time_start) = wait_time_start {
let wait_time = Instant::now() - wait_time_start;
self.pool_inner_stats
.gets_waited_wait_time_micro
.fetch_add(wait_time.as_micros() as u64, Ordering::SeqCst);
self.pool_inner_stats
.gets_waited
.fetch_add(1, Ordering::SeqCst);
}
self.pool_inner_stats.gets.fetch_add(1, Ordering::SeqCst);
result
}

pub(crate) async fn connect(&self) -> Result<M::Connection, M::Error> {
@@ -139,13 +164,53 @@ where
match (state, self.inner.manager.has_broken(&mut conn.conn)) {
(ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()),
(_, _) => {
self.pool_inner_stats
.broken_closed_connections
.fetch_add(1, Ordering::SeqCst);
let approvals = locked.dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
self.inner.notify.notify_waiters();
}
}
}

/// Returns statistics about the historical usage of the pool.
pub(crate) fn statistics(&self) -> Statistics {
let gets = self.pool_inner_stats.gets.load(Ordering::SeqCst);
let gets_waited = self.pool_inner_stats.gets_waited.load(Ordering::SeqCst);
let openned_connections = self
.pool_inner_stats
.openned_connections
.load(Ordering::SeqCst);
let broken_closed_connections = self
.pool_inner_stats
.broken_closed_connections
.load(Ordering::SeqCst);
let invalid_closed_connections = self
.pool_inner_stats
.invalid_closed_connections
.load(Ordering::SeqCst);
let gets_waited_wait_time_micro = self
.pool_inner_stats
.gets_waited_wait_time_micro
.load(Ordering::SeqCst);

let locked = self.inner.internals.lock();
let max_idle_time_closed_connections = locked.max_idle_time_closed_connections;
let max_life_time_closed_connections = locked.max_life_time_closed_connections;

Statistics {
gets,
gets_waited,
gets_waited_wait_time_micro,
max_idle_time_closed_connections,
max_life_time_closed_connections,
invalid_closed_connections,
broken_closed_connections,
openned_connections,
}
}

/// Returns information about the current state of the pool.
pub(crate) fn state(&self) -> State {
self.inner.internals.lock().state()
@@ -178,6 +243,9 @@ where
.internals
.lock()
.put(conn, Some(approval), self.inner.clone());
self.pool_inner_stats
.openned_connections
.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
Err(e) => {
@@ -212,6 +280,7 @@ where
fn clone(&self) -> Self {
PoolInner {
inner: self.inner.clone(),
pool_inner_stats: self.pool_inner_stats.clone(),
}
}
}
@@ -228,14 +297,18 @@ where
struct Reaper<M: ManageConnection> {
interval: Interval,
pool: Weak<SharedPool<M>>,
pool_inner_stats: Weak<SharedPoolInnerStatistics>,
}

impl<M: ManageConnection> Reaper<M> {
async fn run(mut self) {
loop {
let _ = self.interval.tick().await;
let pool = match self.pool.upgrade() {
Some(inner) => PoolInner { inner },
Some(inner) => PoolInner {
inner,
pool_inner_stats: self.pool_inner_stats.upgrade().unwrap(),
},
None => break,
};

@@ -244,3 +317,50 @@ impl<M: ManageConnection> Reaper<M> {
}
}
}

#[derive(Default)]
struct SharedPoolInnerStatistics {
gets: AtomicU64,
gets_waited: AtomicU64,
gets_waited_wait_time_micro: AtomicU64,
invalid_closed_connections: AtomicU64,
broken_closed_connections: AtomicU64,
openned_connections: AtomicU64,
}

/// Statistics about the historical usage of the `Pool`.
#[derive(Debug)]
#[non_exhaustive]
pub struct Statistics {
/// Information about gets
/// Total gets performed, you should consider that the
/// value can overflow and start from 0 eventually.
pub gets: u64,
/// Total gets performed that had to wait for having a
/// connection available. The value can overflow and
/// start from 0 eventually.
pub gets_waited: u64,
/// Total time waited by gets that suffered from contention
/// in microseconds. The value can overflow and start
/// from 0 eventually.
pub gets_waited_wait_time_micro: u64,
/// Total connections closed because they reached the
/// max idle time configured. The value can
/// overflow and start from 0 eventually.
pub max_idle_time_closed_connections: u64,
/// Total connections closed because they reached the
/// max life time configured. The value can
/// overflow and start from 0 eventually.
pub max_life_time_closed_connections: u64,
/// Total connections not used from the pool because they
/// were considered invalid by the manager. The value can
/// overflow and start from 0 eventually.
pub invalid_closed_connections: u64,
/// Total connections not returned to the pool because they
/// were considered broken by the manager. The value can
/// overflow and start from 0 eventually.
pub broken_closed_connections: u64,
/// Total connections openned. The value can overflow and
/// start from 0 eventually.
pub openned_connections: u64,
}
19 changes: 17 additions & 2 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
@@ -63,6 +63,8 @@ where
conns: VecDeque<IdleConn<M::Connection>>,
num_conns: u32,
pending_conns: u32,
pub max_idle_time_closed_connections: u64,
pub max_life_time_closed_connections: u64,
}

impl<M> PoolInternals<M>
@@ -137,20 +139,31 @@ where
}

pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
let mut max_life_time_closed = 0;
let mut max_idle_time_closed = 0;
let now = Instant::now();
let before = self.conns.len();

self.conns.retain(|conn| {
let mut keep = true;
if let Some(timeout) = config.idle_timeout {
keep &= now - conn.idle_start < timeout;
if now - conn.idle_start >= timeout {
max_idle_time_closed += 1;
keep &= false;
}
}
if let Some(lifetime) = config.max_lifetime {
keep &= now - conn.conn.birth < lifetime;
if now - conn.conn.birth >= lifetime {
max_life_time_closed += 1;
keep &= false;
}
}
keep
});

self.max_idle_time_closed_connections += max_idle_time_closed;
self.max_life_time_closed_connections += max_life_time_closed;

self.dropped((before - self.conns.len()) as u32, config)
}

@@ -171,6 +184,8 @@ where
conns: VecDeque::new(),
num_conns: 0,
pending_conns: 0,
max_idle_time_closed_connections: 0,
max_life_time_closed_connections: 0,
}
}
}
Loading