Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions quaint/src/connector/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ use crate::ast::{Params, Value};
use crosstarget_utils::time::ElapsedTimeCounter;
use std::future::Future;

pub async fn query<'a, F, T, U>(tag: &'static str, query: &'a str, params: &'a [Value<'_>], f: F) -> crate::Result<T>
pub async fn query<'a, F, T, U>(
tag: &'static str,
db_system_name: &'static str,
query: &'a str,
params: &'a [Value<'_>],
f: F,
) -> crate::Result<T>
where
F: FnOnce() -> U + 'a,
U: Future<Output = crate::Result<T>>,
{
let span = info_span!("quaint:query", "db.statement" = %query);
let span =
info_span!("quaint:query", "db.system" = %db_system_name, "db.statement" = %query, "otel.kind" = "client");
do_query(tag, query, params, f).instrument(span).await
}

Expand Down
7 changes: 4 additions & 3 deletions quaint/src/connector/mssql/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
pub use tiberius;

static SQL_SERVER_DEFAULT_ISOLATION: IsolationLevel = IsolationLevel::ReadCommitted;
const DB_SYSTEM_NAME: &str = "mssql";

#[async_trait]
impl TransactionCapable for Mssql {
Expand Down Expand Up @@ -130,7 +131,7 @@ impl Queryable for Mssql {
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
metrics::query("mssql.query_raw", sql, params, move || async move {
metrics::query("mssql.query_raw", DB_SYSTEM_NAME, sql, params, move || async move {
let mut client = self.client.lock().await;

let mut query = tiberius::Query::new(sql);
Expand Down Expand Up @@ -193,7 +194,7 @@ impl Queryable for Mssql {
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
metrics::query("mssql.execute_raw", sql, params, move || async move {
metrics::query("mssql.execute_raw", DB_SYSTEM_NAME, sql, params, move || async move {
let mut query = tiberius::Query::new(sql);

for param in params {
Expand All @@ -213,7 +214,7 @@ impl Queryable for Mssql {
}

async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
metrics::query("mssql.raw_cmd", cmd, &[], move || async move {
metrics::query("mssql.raw_cmd", DB_SYSTEM_NAME, cmd, &[], move || async move {
let mut client = self.client.lock().await;
self.perform_io(client.simple_query(cmd)).await?.into_results().await?;
Ok(())
Expand Down
8 changes: 5 additions & 3 deletions quaint/src/connector/mysql/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl MysqlUrl {
}
}

const DB_SYSTEM_NAME: &str = "mysql";

/// A connector interface for the MySQL database.
#[derive(Debug)]
pub struct Mysql {
Expand Down Expand Up @@ -195,7 +197,7 @@ impl Queryable for Mysql {
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
metrics::query("mysql.query_raw", sql, params, move || async move {
metrics::query("mysql.query_raw", DB_SYSTEM_NAME, sql, params, move || async move {
self.prepared(sql, |stmt| async move {
let mut conn = self.conn.lock().await;
let rows: Vec<my::Row> = conn.exec(&stmt, conversion::conv_params(params)?).await?;
Expand Down Expand Up @@ -280,7 +282,7 @@ impl Queryable for Mysql {
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
metrics::query("mysql.execute_raw", sql, params, move || async move {
metrics::query("mysql.execute_raw", DB_SYSTEM_NAME, sql, params, move || async move {
self.prepared(sql, |stmt| async move {
let mut conn = self.conn.lock().await;
conn.exec_drop(stmt, conversion::conv_params(params)?).await?;
Expand All @@ -297,7 +299,7 @@ impl Queryable for Mysql {
}

async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
metrics::query("mysql.raw_cmd", cmd, &[], move || async move {
metrics::query("mysql.raw_cmd", DB_SYSTEM_NAME, cmd, &[], move || async move {
self.perform_io(|| async move {
let mut conn = self.conn.lock().await;
let mut result = cmd.run(&mut *conn).await?;
Expand Down
206 changes: 121 additions & 85 deletions quaint/src/connector/postgres/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ impl Debug for PostgresClient {
}
}

const SYSTEM_NAME_POSTGRESQL: &str = "postgresql";
const SYSTEM_NAME_COCKROACHDB: &str = "cockroachdb";

/// A connector interface for the PostgreSQL database.
#[derive(Debug)]
pub struct PostgreSql {
Expand All @@ -65,6 +68,7 @@ pub struct PostgreSql {
is_healthy: AtomicBool,
is_cockroachdb: bool,
is_materialize: bool,
db_system_name: &'static str,
}

/// Key uniquely representing an SQL statement in the prepared statements cache.
Expand Down Expand Up @@ -285,6 +289,12 @@ impl PostgreSql {
}
}

let system_name = if is_cockroachdb {
SYSTEM_NAME_COCKROACHDB
} else {
SYSTEM_NAME_POSTGRESQL
};

Ok(Self {
client: PostgresClient(client),
socket_timeout: url.query_params.socket_timeout,
Expand All @@ -293,6 +303,7 @@ impl PostgreSql {
is_healthy: AtomicBool::new(true),
is_cockroachdb,
is_materialize,
db_system_name: system_name,
})
}

Expand All @@ -308,6 +319,7 @@ impl PostgreSql {
is_healthy: AtomicBool::new(true),
is_cockroachdb: false,
is_materialize: false,
db_system_name: SYSTEM_NAME_POSTGRESQL,
})
}

Expand Down Expand Up @@ -539,72 +551,84 @@ impl Queryable for PostgreSql {
async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.query_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.query_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;

let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());
let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());

for row in rows {
result.rows.push(row.get_result_row()?);
}
for row in rows {
result.rows.push(row.get_result_row()?);
}

Ok(result)
})
Ok(result)
},
)
.await
}

async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.query_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.query_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let col_types = stmt
.columns()
.iter()
.map(|c| PGColumnType::from_pg_type(c.type_()))
.map(ColumnType::from)
.collect::<Vec<_>>();
let rows = self
.perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice()))
.await?;

let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());
let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new());

for row in rows {
result.rows.push(row.get_result_row()?);
}
for row in rows {
result.rows.push(row.get_result_row()?);
}

Ok(result)
})
Ok(result)
},
)
.await
}

Expand Down Expand Up @@ -692,53 +716,65 @@ impl Queryable for PostgreSql {
async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.execute_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.execute_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, &[]).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;

Ok(changes)
})
Ok(changes)
},
)
.await
}

async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
self.check_bind_variables_len(params)?;

metrics::query("postgres.execute_raw", sql, params, move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}
metrics::query(
"postgres.execute_raw",
self.db_system_name,
sql,
params,
move || async move {
let stmt = self.fetch_cached(sql, params).await?;

if stmt.params().len() != params.len() {
let kind = ErrorKind::IncorrectNumberOfParameters {
expected: stmt.params().len(),
actual: params.len(),
};

return Err(Error::builder(kind).build());
}

let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;
let changes = self
.perform_io(self.client.0.execute(&stmt, conversion::conv_params(params).as_slice()))
.await?;

Ok(changes)
})
Ok(changes)
},
)
.await
}

async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
metrics::query("postgres.raw_cmd", cmd, &[], move || async move {
metrics::query("postgres.raw_cmd", self.db_system_name, cmd, &[], move || async move {
self.perform_io(self.client.0.simple_query(cmd)).await?;
Ok(())
})
Expand Down
Loading