Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection acquire expand #1737

Merged
merged 12 commits into from
Jul 13, 2023
136 changes: 56 additions & 80 deletions src/driver/sqlx_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,24 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.execute(conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
})
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.execute(conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
})
}

/// Execute an unprepared SQL statement on a MySQL backend
#[instrument(level = "trace")]
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
debug_print!("{}", sql);

if let Ok(conn) = &mut self.pool.acquire().await {
match conn.execute(sql).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
match conn.execute(sql).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
}

Expand All @@ -115,19 +109,16 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_one(conn).await {
Ok(row) => Ok(Some(row.into())),
Err(err) => match err {
sqlx::Error::RowNotFound => Ok(None),
_ => Err(sqlx_error_to_query_err(err)),
},
}
})
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_one(conn).await {
Ok(row) => Ok(Some(row.into())),
Err(err) => match err {
sqlx::Error::RowNotFound => Ok(None),
_ => Err(sqlx_error_to_query_err(err)),
},
}
})
}

/// Get the results of a query returning them as a Vec<[QueryResult]>
Expand All @@ -136,32 +127,26 @@ impl SqlxMySqlPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_all(conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
})
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_all(conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
})
}

/// Stream the results of executing a SQL query
#[instrument(level = "trace")]
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
debug_print!("{}", stmt);

if let Ok(conn) = self.pool.acquire().await {
Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
}

/// Bundle a set of SQL statements that execute together.
Expand All @@ -171,17 +156,14 @@ impl SqlxMySqlPoolConnection {
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
if let Ok(conn) = self.pool.acquire().await {
DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
}

/// Create a MySQL transaction
Expand All @@ -200,19 +182,16 @@ impl SqlxMySqlPoolConnection {
T: Send,
E: std::error::Error + Send,
{
if let Ok(conn) = self.pool.acquire().await {
let transaction = DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(callback).await
} else {
Err(DbErr::ConnectionAcquire.into())
}
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
let transaction = DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(callback).await
}

pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
Expand All @@ -224,13 +203,10 @@ impl SqlxMySqlPoolConnection {

/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
if let Ok(conn) = &mut self.pool.acquire().await {
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
}

Expand Down
136 changes: 56 additions & 80 deletions src/driver/sqlx_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,24 @@ impl SqlxPostgresPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.execute(conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
})
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.execute(conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
})
}

/// Execute an unprepared SQL statement on a PostgreSQL backend
#[instrument(level = "trace")]
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
debug_print!("{}", sql);

if let Ok(conn) = &mut self.pool.acquire().await {
match conn.execute(sql).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
match conn.execute(sql).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
}

Expand All @@ -130,19 +124,16 @@ impl SqlxPostgresPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_one(conn).await {
Ok(row) => Ok(Some(row.into())),
Err(err) => match err {
sqlx::Error::RowNotFound => Ok(None),
_ => Err(sqlx_error_to_query_err(err)),
},
}
})
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_one(conn).await {
Ok(row) => Ok(Some(row.into())),
Err(err) => match err {
sqlx::Error::RowNotFound => Ok(None),
_ => Err(sqlx_error_to_query_err(err)),
},
}
})
}

/// Get the results of a query returning them as a Vec<[QueryResult]>
Expand All @@ -151,32 +142,26 @@ impl SqlxPostgresPoolConnection {
debug_print!("{}", stmt);

let query = sqlx_query(&stmt);
if let Ok(conn) = &mut self.pool.acquire().await {
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_all(conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
})
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_all(conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
})
}

/// Stream the results of executing a SQL query
#[instrument(level = "trace")]
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
debug_print!("{}", stmt);

if let Ok(conn) = self.pool.acquire().await {
Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
}

/// Bundle a set of SQL statements that execute together.
Expand All @@ -186,17 +171,14 @@ impl SqlxPostgresPoolConnection {
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
if let Ok(conn) = self.pool.acquire().await {
DatabaseTransaction::new_postgres(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
} else {
Err(DbErr::ConnectionAcquire)
}
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
DatabaseTransaction::new_postgres(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
}

/// Create a PostgreSQL transaction
Expand All @@ -215,19 +197,16 @@ impl SqlxPostgresPoolConnection {
T: Send,
E: std::error::Error + Send,
{
if let Ok(conn) = self.pool.acquire().await {
let transaction = DatabaseTransaction::new_postgres(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(callback).await
} else {
Err(DbErr::ConnectionAcquire.into())
}
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
let transaction = DatabaseTransaction::new_postgres(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(callback).await
}

pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
Expand All @@ -239,13 +218,10 @@ impl SqlxPostgresPoolConnection {

/// Checks if a connection to the database is still valid.
pub async fn ping(&self) -> Result<(), DbErr> {
if let Ok(conn) = &mut self.pool.acquire().await {
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
} else {
Err(DbErr::ConnectionAcquire)
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
}

Expand Down
Loading