Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection acquire expand #1737

Merged
merged 12 commits into from
Jul 13, 2023
105 changes: 89 additions & 16 deletions src/driver/sqlx_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,24 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
let result = &mut self.pool.acquire().await;
if let Ok(conn) = result {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.execute(conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
})
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand All @@ -99,13 +108,22 @@ impl SqlxMySqlPoolConnection {
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
debug_print!("{}", sql);

if let Ok(conn) = &mut self.pool.acquire().await {
let result = &mut self.pool.acquire().await;
if let Ok(conn) = result {
match conn.execute(sql).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand All @@ -115,7 +133,8 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
let result = &mut self.pool.acquire().await;
if let Ok(conn) = result {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_one(conn).await {
Ok(row) => Ok(Some(row.into())),
Expand All @@ -126,7 +145,15 @@ impl SqlxMySqlPoolConnection {
}
})
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand All @@ -136,15 +163,24 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
let result = &mut self.pool.acquire().await;
if let Ok(conn) = result {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_all(conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
})
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand All @@ -153,14 +189,23 @@ impl SqlxMySqlPoolConnection {
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
debug_print!("{}", stmt);

if let Ok(conn) = self.pool.acquire().await {
let result = self.pool.acquire().await;
if let Ok(conn) = result {
Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand All @@ -171,7 +216,8 @@ impl SqlxMySqlPoolConnection {
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
if let Ok(conn) = self.pool.acquire().await {
let result = self.pool.acquire().await;
if let Ok(conn) = result {
DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
Expand All @@ -180,7 +226,15 @@ impl SqlxMySqlPoolConnection {
)
.await
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand All @@ -200,7 +254,8 @@ impl SqlxMySqlPoolConnection {
T: Send,
E: std::error::Error + Send,
{
if let Ok(conn) = self.pool.acquire().await {
let result = self.pool.acquire().await;
if let Ok(conn) = result {
let transaction = DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
Expand All @@ -211,7 +266,16 @@ impl SqlxMySqlPoolConnection {
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(callback).await
} else {
Err(DbErr::ConnectionAcquire.into())
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout).into())
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))
.into()),
}
}
}

Expand All @@ -224,13 +288,22 @@ impl SqlxMySqlPoolConnection {

/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
if let Ok(conn) = &mut self.pool.acquire().await {
let result = &mut self.pool.acquire().await;
if let Ok(conn) = result {
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
match result {
Err(sqlx::Error::PoolTimedOut) => {
Err(DbErr::ConnectionAcquire(ConnAcquireErr::Timeout))
}
// Err(PoolClosed) => Err(DbErr::ConnectionAcquire(ConnAcquireErr::ConnectionClosed)),
_ => Err(DbErr::ConnectionAcquire(ConnAcquireErr::Unknown(
"connection failed".to_string(),
))),
}
}
}

Expand Down
Loading