From bc5207a80a80688df854a7ca55491d4ee6f06e00 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 2 Nov 2023 12:57:23 +0530 Subject: [PATCH 1/2] only update wrappers_fdw_stats table in a read-write transaction --- wrappers/Cargo.lock | 2 +- wrappers/src/stats.rs | 67 +++++++++++++++++++++++++------------------ 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/wrappers/Cargo.lock b/wrappers/Cargo.lock index d8703d1e..dfbe28a6 100644 --- a/wrappers/Cargo.lock +++ b/wrappers/Cargo.lock @@ -4321,7 +4321,7 @@ dependencies = [ [[package]] name = "wrappers" -version = "0.1.17" +version = "0.1.18" dependencies = [ "arrow-array", "async-compression", diff --git a/wrappers/src/stats.rs b/wrappers/src/stats.rs index 57ad9152..6881738c 100644 --- a/wrappers/src/stats.rs +++ b/wrappers/src/stats.rs @@ -43,26 +43,35 @@ fn get_stats_table() -> String { // increase stats value #[allow(dead_code)] pub(crate) fn inc_stats(fdw_name: &str, metric: Metric, inc: i64) { - let sql = format!( - "insert into {} as s (fdw_name, {}) values($1, $2) + if !is_txn_read_only() { + let sql = format!( + "insert into {} as s (fdw_name, {}) values($1, $2) on conflict(fdw_name) do update set {} = coalesce(s.{}, 0) + excluded.{}, updated_at = timezone('utc'::text, now())", - get_stats_table(), - metric, - metric, - metric, - metric - ); - Spi::run_with_args( - &sql, - Some(vec![ - (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), - (PgBuiltInOids::INT8OID.oid(), inc.into_datum()), - ]), - ) - .unwrap(); + get_stats_table(), + metric, + metric, + metric, + metric + ); + Spi::run_with_args( + &sql, + Some(vec![ + (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), + (PgBuiltInOids::INT8OID.oid(), inc.into_datum()), + ]), + ) + .unwrap(); + } +} + +fn is_txn_read_only() -> bool { + let read_only_txn = Spi::get_one("show transaction_read_only") + .unwrap_or(Some("on")) + .unwrap_or("on"); + read_only_txn == "on" } // get metadata @@ -82,20 +91,22 @@ pub(crate) fn get_metadata(fdw_name: &str) -> Option { // set metadata #[allow(dead_code)] pub(crate) fn set_metadata(fdw_name: &str, metadata: Option) { - let sql = format!( - "insert into {} as s (fdw_name, metadata) values($1, $2) + if !is_txn_read_only() { + let sql = format!( + "insert into {} as s (fdw_name, metadata) values($1, $2) on conflict(fdw_name) do update set metadata = $2, updated_at = timezone('utc'::text, now())", - get_stats_table() - ); - Spi::run_with_args( - &sql, - Some(vec![ - (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), - (PgBuiltInOids::JSONBOID.oid(), metadata.into_datum()), - ]), - ) - .unwrap(); + get_stats_table() + ); + Spi::run_with_args( + &sql, + Some(vec![ + (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), + (PgBuiltInOids::JSONBOID.oid(), metadata.into_datum()), + ]), + ) + .unwrap(); + } } From abb7b70d3936c229c796ba2cf4c687e7bc36ce73 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 2 Nov 2023 14:29:23 +0530 Subject: [PATCH 2/2] chore: review comments feedback --- wrappers/src/stats.rs | 79 ++++++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/wrappers/src/stats.rs b/wrappers/src/stats.rs index 6881738c..423bc643 100644 --- a/wrappers/src/stats.rs +++ b/wrappers/src/stats.rs @@ -40,38 +40,37 @@ fn get_stats_table() -> String { .unwrap_or_else(|| panic!("cannot find fdw stats table '{}'", FDW_STATS_TABLE)) } +fn is_txn_read_only() -> bool { + Spi::get_one("show transaction_read_only") == Ok(Some("on")) +} + // increase stats value #[allow(dead_code)] pub(crate) fn inc_stats(fdw_name: &str, metric: Metric, inc: i64) { - if !is_txn_read_only() { - let sql = format!( - "insert into {} as s (fdw_name, {}) values($1, $2) + if is_txn_read_only() { + return; + } + + let sql = format!( + "insert into {} as s (fdw_name, {}) values($1, $2) on conflict(fdw_name) do update set {} = coalesce(s.{}, 0) + excluded.{}, updated_at = timezone('utc'::text, now())", - get_stats_table(), - metric, - metric, - metric, - metric - ); - Spi::run_with_args( - &sql, - Some(vec![ - (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), - (PgBuiltInOids::INT8OID.oid(), inc.into_datum()), - ]), - ) - .unwrap(); - } -} - -fn is_txn_read_only() -> bool { - let read_only_txn = Spi::get_one("show transaction_read_only") - .unwrap_or(Some("on")) - .unwrap_or("on"); - read_only_txn == "on" + get_stats_table(), + metric, + metric, + metric, + metric + ); + Spi::run_with_args( + &sql, + Some(vec![ + (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), + (PgBuiltInOids::INT8OID.oid(), inc.into_datum()), + ]), + ) + .unwrap(); } // get metadata @@ -91,22 +90,24 @@ pub(crate) fn get_metadata(fdw_name: &str) -> Option { // set metadata #[allow(dead_code)] pub(crate) fn set_metadata(fdw_name: &str, metadata: Option) { - if !is_txn_read_only() { - let sql = format!( - "insert into {} as s (fdw_name, metadata) values($1, $2) + if is_txn_read_only() { + return; + } + + let sql = format!( + "insert into {} as s (fdw_name, metadata) values($1, $2) on conflict(fdw_name) do update set metadata = $2, updated_at = timezone('utc'::text, now())", - get_stats_table() - ); - Spi::run_with_args( - &sql, - Some(vec![ - (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), - (PgBuiltInOids::JSONBOID.oid(), metadata.into_datum()), - ]), - ) - .unwrap(); - } + get_stats_table() + ); + Spi::run_with_args( + &sql, + Some(vec![ + (PgBuiltInOids::TEXTOID.oid(), fdw_name.into_datum()), + (PgBuiltInOids::JSONBOID.oid(), metadata.into_datum()), + ]), + ) + .unwrap(); }