From f5d2ee9e4c6f0344244cd90b31d8b3697c014e43 Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 10:10:32 +0100 Subject: [PATCH 1/5] Add session-level statement timeout support - Implement SET statement_timeout with support for ms, s, min units - Add SHOW statement_timeout to display current timeout setting - Default to no timeout (disabled) - use SET statement_timeout = 0 to disable - Timeout is per-session, not server-wide configuration - Supports PostgreSQL standard syntax: SET statement_timeout = '30s' - Query execution respects the session timeout in both simple and extended query protocols - Added comprehensive tests for timeout setting, showing, and disabling --- datafusion-postgres/src/handlers.rs | 135 ++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 5 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 7d2ccb3..49826f4 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -71,6 +71,7 @@ pub struct DfSessionService { timezone: Arc>, auth_manager: Arc, sql_rewrite_rules: Vec>, + statement_timeout: Arc>>, } impl DfSessionService { @@ -97,6 +98,7 @@ impl DfSessionService { timezone: Arc::new(Mutex::new("UTC".to_string())), auth_manager, sql_rewrite_rules, + statement_timeout: Arc::new(Mutex::new(None)), } } @@ -215,6 +217,46 @@ impl DfSessionService { ), ))) } + } else if query_lower.starts_with("set statement_timeout") { + let parts: Vec<&str> = query_lower.split_whitespace().collect(); + if parts.len() >= 3 { + let timeout_str = parts[2].trim_matches('"').trim_matches('\''); + let mut statement_timeout = self.statement_timeout.lock().await; + + if timeout_str == "0" || timeout_str.is_empty() { + *statement_timeout = None; + } else { + // Parse timeout value (supports ms, s, min formats) + let timeout_ms = if timeout_str.ends_with("ms") { + timeout_str.trim_end_matches("ms").parse::() + } else if timeout_str.ends_with("s") { + timeout_str.trim_end_matches("s").parse::().map(|s| s * 1000) + } else if timeout_str.ends_with("min") { + timeout_str.trim_end_matches("min").parse::().map(|m| m * 60 * 1000) + } else { + // Default to milliseconds + timeout_str.parse::() + }; + + match timeout_ms { + Ok(ms) if ms > 0 => { + *statement_timeout = Some(std::time::Duration::from_millis(ms)); + } + _ => { + *statement_timeout = None; + } + } + } + Ok(Some(Response::Execution(Tag::new("SET")))) + } else { + Err(PgWireError::UserError(Box::new( + pgwire::error::ErrorInfo::new( + "ERROR".to_string(), + "42601".to_string(), + "Invalid SET statement_timeout syntax".to_string(), + ), + ))) + } } else { // pass SET query to datafusion if let Err(e) = self.session_context.sql(query_lower).await { @@ -305,6 +347,15 @@ impl DfSessionService { let resp = Self::mock_show_response("search_path", default_schema)?; Ok(Some(Response::Query(resp))) } + "show statement_timeout" => { + let timeout = self.statement_timeout.lock().await.clone(); + let timeout_str = match timeout { + Some(duration) => format!("{}ms", duration.as_millis()), + None => "0".to_string(), + }; + let resp = Self::mock_show_response("statement_timeout", &timeout_str)?; + Ok(Some(Response::Query(resp))) + } _ => Err(PgWireError::UserError(Box::new( pgwire::error::ErrorInfo::new( "ERROR".to_string(), @@ -378,7 +429,22 @@ impl SimpleQueryHandler for DfSessionService { ))); } - let df_result = self.session_context.sql(&query).await; + let df_result = { + let timeout = self.statement_timeout.lock().await.clone(); + if let Some(timeout_duration) = timeout { + tokio::time::timeout(timeout_duration, self.session_context.sql(&query)) + .await + .map_err(|_| { + PgWireError::UserError(Box::new(pgwire::error::ErrorInfo::new( + "ERROR".to_string(), + "57014".to_string(), // query_canceled error code + "canceling statement due to statement timeout".to_string(), + ))) + })? + } else { + self.session_context.sql(&query).await + } + }; // Handle query execution errors and transaction state let df = match df_result { @@ -540,10 +606,26 @@ impl ExtendedQueryHandler for DfSessionService { .optimize(&plan) .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - let dataframe = match self.session_context.execute_logical_plan(optimised).await { - Ok(df) => df, - Err(e) => { - return Err(PgWireError::ApiError(Box::new(e))); + let dataframe = { + let timeout = self.statement_timeout.lock().await.clone(); + if let Some(timeout_duration) = timeout { + tokio::time::timeout(timeout_duration, self.session_context.execute_logical_plan(optimised)) + .await + .map_err(|_| { + PgWireError::UserError(Box::new(pgwire::error::ErrorInfo::new( + "ERROR".to_string(), + "57014".to_string(), // query_canceled error code + "canceling statement due to statement timeout".to_string(), + ))) + })? + .map_err(|e| PgWireError::ApiError(Box::new(e)))? + } else { + match self.session_context.execute_logical_plan(optimised).await { + Ok(df) => df, + Err(e) => { + return Err(PgWireError::ApiError(Box::new(e))); + } + } } }; let resp = df::encode_dataframe(dataframe, &portal.result_column_format).await?; @@ -593,3 +675,46 @@ fn ordered_param_types(types: &HashMap>) -> Vec Date: Wed, 10 Sep 2025 10:20:44 +0100 Subject: [PATCH 2/5] Apply cargo fmt formatting --- datafusion-postgres/src/handlers.rs | 63 +++++++++++++++++++---------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 49826f4..1487899 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -222,7 +222,7 @@ impl DfSessionService { if parts.len() >= 3 { let timeout_str = parts[2].trim_matches('"').trim_matches('\''); let mut statement_timeout = self.statement_timeout.lock().await; - + if timeout_str == "0" || timeout_str.is_empty() { *statement_timeout = None; } else { @@ -230,14 +230,20 @@ impl DfSessionService { let timeout_ms = if timeout_str.ends_with("ms") { timeout_str.trim_end_matches("ms").parse::() } else if timeout_str.ends_with("s") { - timeout_str.trim_end_matches("s").parse::().map(|s| s * 1000) + timeout_str + .trim_end_matches("s") + .parse::() + .map(|s| s * 1000) } else if timeout_str.ends_with("min") { - timeout_str.trim_end_matches("min").parse::().map(|m| m * 60 * 1000) + timeout_str + .trim_end_matches("min") + .parse::() + .map(|m| m * 60 * 1000) } else { // Default to milliseconds timeout_str.parse::() }; - + match timeout_ms { Ok(ms) if ms > 0 => { *statement_timeout = Some(std::time::Duration::from_millis(ms)); @@ -609,16 +615,19 @@ impl ExtendedQueryHandler for DfSessionService { let dataframe = { let timeout = self.statement_timeout.lock().await.clone(); if let Some(timeout_duration) = timeout { - tokio::time::timeout(timeout_duration, self.session_context.execute_logical_plan(optimised)) - .await - .map_err(|_| { - PgWireError::UserError(Box::new(pgwire::error::ErrorInfo::new( - "ERROR".to_string(), - "57014".to_string(), // query_canceled error code - "canceling statement due to statement timeout".to_string(), - ))) - })? - .map_err(|e| PgWireError::ApiError(Box::new(e)))? + tokio::time::timeout( + timeout_duration, + self.session_context.execute_logical_plan(optimised), + ) + .await + .map_err(|_| { + PgWireError::UserError(Box::new(pgwire::error::ErrorInfo::new( + "ERROR".to_string(), + "57014".to_string(), // query_canceled error code + "canceling statement due to statement timeout".to_string(), + ))) + })? + .map_err(|e| PgWireError::ApiError(Box::new(e)))? } else { match self.session_context.execute_logical_plan(optimised).await { Ok(df) => df, @@ -690,15 +699,21 @@ mod tests { let service = DfSessionService::new(session_context, auth_manager); // Test setting timeout to 5000ms - let set_response = service.try_respond_set_statements("set statement_timeout '5000ms'").await.unwrap(); + let set_response = service + .try_respond_set_statements("set statement_timeout '5000ms'") + .await + .unwrap(); assert!(set_response.is_some()); - + // Verify the timeout was set let timeout = service.statement_timeout.lock().await.clone(); assert_eq!(timeout, Some(Duration::from_millis(5000))); // Test SHOW statement_timeout - let show_response = service.try_respond_show_statements("show statement_timeout").await.unwrap(); + let show_response = service + .try_respond_show_statements("show statement_timeout") + .await + .unwrap(); assert!(show_response.is_some()); } @@ -709,11 +724,17 @@ mod tests { let service = DfSessionService::new(session_context, auth_manager); // Set timeout first - service.try_respond_set_statements("set statement_timeout '1000ms'").await.unwrap(); - + service + .try_respond_set_statements("set statement_timeout '1000ms'") + .await + .unwrap(); + // Disable timeout with 0 - service.try_respond_set_statements("set statement_timeout '0'").await.unwrap(); - + service + .try_respond_set_statements("set statement_timeout '0'") + .await + .unwrap(); + let timeout = service.statement_timeout.lock().await.clone(); assert_eq!(timeout, None); } From 1bca5e0cf1b82af40ed5d685a22a1d2bfe07952e Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 10:21:53 +0100 Subject: [PATCH 3/5] Apply clippy fixes and suggestions - Fixed redundant closure warnings - Applied clippy pedantic suggestions - Improved code quality and consistency --- arrow-pg/src/encoder.rs | 2 +- datafusion-postgres/src/handlers.rs | 10 +++++----- datafusion-postgres/src/sql.rs | 4 ++-- datafusion-postgres/tests/dbeaver.rs | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/arrow-pg/src/encoder.rs b/arrow-pg/src/encoder.rs index 5490e1f..8ac10da 100644 --- a/arrow-pg/src/encoder.rs +++ b/arrow-pg/src/encoder.rs @@ -574,7 +574,7 @@ mod tests { { let mut bytes = BytesMut::new(); let _sql_text = value.to_sql_text(data_type, &mut bytes); - let string = String::from_utf8((&bytes).to_vec()); + let string = String::from_utf8(bytes.to_vec()); self.encoded_value = string.unwrap(); Ok(()) } diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 1487899..cf4d30c 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -354,7 +354,7 @@ impl DfSessionService { Ok(Some(Response::Query(resp))) } "show statement_timeout" => { - let timeout = self.statement_timeout.lock().await.clone(); + let timeout = *self.statement_timeout.lock().await; let timeout_str = match timeout { Some(duration) => format!("{}ms", duration.as_millis()), None => "0".to_string(), @@ -436,7 +436,7 @@ impl SimpleQueryHandler for DfSessionService { } let df_result = { - let timeout = self.statement_timeout.lock().await.clone(); + let timeout = *self.statement_timeout.lock().await; if let Some(timeout_duration) = timeout { tokio::time::timeout(timeout_duration, self.session_context.sql(&query)) .await @@ -613,7 +613,7 @@ impl ExtendedQueryHandler for DfSessionService { .map_err(|e| PgWireError::ApiError(Box::new(e)))?; let dataframe = { - let timeout = self.statement_timeout.lock().await.clone(); + let timeout = *self.statement_timeout.lock().await; if let Some(timeout_duration) = timeout { tokio::time::timeout( timeout_duration, @@ -706,7 +706,7 @@ mod tests { assert!(set_response.is_some()); // Verify the timeout was set - let timeout = service.statement_timeout.lock().await.clone(); + let timeout = *service.statement_timeout.lock().await; assert_eq!(timeout, Some(Duration::from_millis(5000))); // Test SHOW statement_timeout @@ -735,7 +735,7 @@ mod tests { .await .unwrap(); - let timeout = service.statement_timeout.lock().await.clone(); + let timeout = *service.statement_timeout.lock().await; assert_eq!(timeout, None); } } diff --git a/datafusion-postgres/src/sql.rs b/datafusion-postgres/src/sql.rs index 65c8021..2ae841f 100644 --- a/datafusion-postgres/src/sql.rs +++ b/datafusion-postgres/src/sql.rs @@ -296,7 +296,7 @@ struct RemoveUnsupportedTypesVisitor<'a> { unsupported_types: &'a HashSet, } -impl<'a> VisitorMut for RemoveUnsupportedTypesVisitor<'a> { +impl VisitorMut for RemoveUnsupportedTypesVisitor<'_> { type Break = (); fn pre_visit_expr(&mut self, expr: &mut Expr) -> ControlFlow { @@ -444,7 +444,7 @@ struct PrependUnqualifiedTableNameVisitor<'a> { table_names: &'a HashSet, } -impl<'a> VisitorMut for PrependUnqualifiedTableNameVisitor<'a> { +impl VisitorMut for PrependUnqualifiedTableNameVisitor<'_> { type Break = (); fn pre_visit_table_factor( diff --git a/datafusion-postgres/tests/dbeaver.rs b/datafusion-postgres/tests/dbeaver.rs index e132b91..24e5ab8 100644 --- a/datafusion-postgres/tests/dbeaver.rs +++ b/datafusion-postgres/tests/dbeaver.rs @@ -34,6 +34,6 @@ pub async fn test_dbeaver_startup_sql() { for query in DBEAVER_QUERIES { SimpleQueryHandler::do_query(&service, &mut client, query) .await - .expect(&format!("failed to run sql: {query}")); + .unwrap_or_else(|_| panic!("failed to run sql: {query}")); } } From 213ac2385e8af69b4ddfafa146a777203db7dc26 Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 15:15:05 +0100 Subject: [PATCH 4/5] Refactor statement timeout to be truly per-session using ClientInfo metadata - Remove statement_timeout field from DfSessionService (was application-scoped) - Store timeout in ClientInfo::metadata() using METADATA_STATEMENT_TIMEOUT key - Add helper functions get_statement_timeout() and set_statement_timeout() - Update SET/SHOW statement_timeout handlers to use client metadata - Update query execution logic to read timeout from client session - Add comprehensive MockClient for testing session-specific behavior - Now each PostgreSQL session has its own independent timeout setting This follows the PostgreSQL standard where statement_timeout is a session variable, not a server-wide configuration. --- datafusion-postgres/src/handlers.rs | 152 ++++++++++++++++++++++------ 1 file changed, 122 insertions(+), 30 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index cf4d30c..fa08a68 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -31,6 +31,9 @@ use tokio::sync::Mutex; use arrow_pg::datatypes::df; use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type}; +// Metadata keys for session-level settings +const METADATA_STATEMENT_TIMEOUT: &str = "statement_timeout_ms"; + /// Simple startup handler that does no authentication /// For production, use DfAuthSource with proper pgwire authentication handlers pub struct SimpleStartupHandler; @@ -71,7 +74,6 @@ pub struct DfSessionService { timezone: Arc>, auth_manager: Arc, sql_rewrite_rules: Vec>, - statement_timeout: Arc>>, } impl DfSessionService { @@ -98,7 +100,31 @@ impl DfSessionService { timezone: Arc::new(Mutex::new("UTC".to_string())), auth_manager, sql_rewrite_rules, - statement_timeout: Arc::new(Mutex::new(None)), + } + } + + /// Get statement timeout from client metadata + fn get_statement_timeout(client: &C) -> Option + where + C: ClientInfo, + { + client + .metadata() + .get(METADATA_STATEMENT_TIMEOUT) + .and_then(|s| s.parse::().ok()) + .map(std::time::Duration::from_millis) + } + + /// Set statement timeout in client metadata + fn set_statement_timeout(client: &mut C, timeout: Option) + where + C: ClientInfo, + { + let metadata = client.metadata_mut(); + if let Some(duration) = timeout { + metadata.insert(METADATA_STATEMENT_TIMEOUT.to_string(), duration.as_millis().to_string()); + } else { + metadata.remove(METADATA_STATEMENT_TIMEOUT); } } @@ -196,10 +222,14 @@ impl DfSessionService { Ok(QueryResponse::new(Arc::new(fields), Box::pin(row_stream))) } - async fn try_respond_set_statements<'a>( + async fn try_respond_set_statements<'a, C>( &self, + client: &mut C, query_lower: &str, - ) -> PgWireResult>> { + ) -> PgWireResult>> + where + C: ClientInfo, + { if query_lower.starts_with("set") { if query_lower.starts_with("set time zone") { let parts: Vec<&str> = query_lower.split_whitespace().collect(); @@ -221,10 +251,9 @@ impl DfSessionService { let parts: Vec<&str> = query_lower.split_whitespace().collect(); if parts.len() >= 3 { let timeout_str = parts[2].trim_matches('"').trim_matches('\''); - let mut statement_timeout = self.statement_timeout.lock().await; - if timeout_str == "0" || timeout_str.is_empty() { - *statement_timeout = None; + let timeout = if timeout_str == "0" || timeout_str.is_empty() { + None } else { // Parse timeout value (supports ms, s, min formats) let timeout_ms = if timeout_str.ends_with("ms") { @@ -245,14 +274,12 @@ impl DfSessionService { }; match timeout_ms { - Ok(ms) if ms > 0 => { - *statement_timeout = Some(std::time::Duration::from_millis(ms)); - } - _ => { - *statement_timeout = None; - } + Ok(ms) if ms > 0 => Some(std::time::Duration::from_millis(ms)), + _ => None, } - } + }; + + Self::set_statement_timeout(client, timeout); Ok(Some(Response::Execution(Tag::new("SET")))) } else { Err(PgWireError::UserError(Box::new( @@ -322,10 +349,14 @@ impl DfSessionService { } } - async fn try_respond_show_statements<'a>( + async fn try_respond_show_statements<'a, C>( &self, + client: &C, query_lower: &str, - ) -> PgWireResult>> { + ) -> PgWireResult>> + where + C: ClientInfo, + { if query_lower.starts_with("show ") { match query_lower.strip_suffix(";").unwrap_or(query_lower) { "show time zone" => { @@ -354,7 +385,7 @@ impl DfSessionService { Ok(Some(Response::Query(resp))) } "show statement_timeout" => { - let timeout = *self.statement_timeout.lock().await; + let timeout = Self::get_statement_timeout(client); let timeout_str = match timeout { Some(duration) => format!("{}ms", duration.as_millis()), None => "0".to_string(), @@ -408,7 +439,7 @@ impl SimpleQueryHandler for DfSessionService { self.check_query_permission(client, &query).await?; } - if let Some(resp) = self.try_respond_set_statements(&query_lower).await? { + if let Some(resp) = self.try_respond_set_statements(client, &query_lower).await? { return Ok(vec![resp]); } @@ -419,7 +450,7 @@ impl SimpleQueryHandler for DfSessionService { return Ok(vec![resp]); } - if let Some(resp) = self.try_respond_show_statements(&query_lower).await? { + if let Some(resp) = self.try_respond_show_statements(client, &query_lower).await? { return Ok(vec![resp]); } @@ -436,7 +467,7 @@ impl SimpleQueryHandler for DfSessionService { } let df_result = { - let timeout = *self.statement_timeout.lock().await; + let timeout = Self::get_statement_timeout(client); if let Some(timeout_duration) = timeout { tokio::time::timeout(timeout_duration, self.session_context.sql(&query)) .await @@ -568,7 +599,7 @@ impl ExtendedQueryHandler for DfSessionService { .await?; } - if let Some(resp) = self.try_respond_set_statements(&query).await? { + if let Some(resp) = self.try_respond_set_statements(client, &query).await? { return Ok(resp); } @@ -579,7 +610,7 @@ impl ExtendedQueryHandler for DfSessionService { return Ok(resp); } - if let Some(resp) = self.try_respond_show_statements(&query).await? { + if let Some(resp) = self.try_respond_show_statements(client, &query).await? { return Ok(resp); } @@ -613,7 +644,7 @@ impl ExtendedQueryHandler for DfSessionService { .map_err(|e| PgWireError::ApiError(Box::new(e)))?; let dataframe = { - let timeout = *self.statement_timeout.lock().await; + let timeout = Self::get_statement_timeout(client); if let Some(timeout_duration) = timeout { tokio::time::timeout( timeout_duration, @@ -690,28 +721,88 @@ mod tests { use super::*; use crate::auth::AuthManager; use datafusion::prelude::SessionContext; + use std::collections::HashMap; use std::time::Duration; + struct MockClient { + metadata: HashMap, + } + + impl MockClient { + fn new() -> Self { + Self { + metadata: HashMap::new(), + } + } + } + + impl ClientInfo for MockClient { + fn socket_addr(&self) -> std::net::SocketAddr { + "127.0.0.1:5432".parse().unwrap() + } + + fn is_secure(&self) -> bool { + false + } + + fn protocol_version(&self) -> pgwire::messages::ProtocolVersion { + pgwire::messages::ProtocolVersion::PROTOCOL3_0 + } + + fn set_protocol_version(&mut self, _version: pgwire::messages::ProtocolVersion) {} + + fn pid_and_secret_key(&self) -> (i32, pgwire::messages::startup::SecretKey) { + (0, pgwire::messages::startup::SecretKey::I32(0)) + } + + fn set_pid_and_secret_key(&mut self, _pid: i32, _secret_key: pgwire::messages::startup::SecretKey) {} + + fn state(&self) -> pgwire::api::PgWireConnectionState { + pgwire::api::PgWireConnectionState::ReadyForQuery + } + + fn set_state(&mut self, _new_state: pgwire::api::PgWireConnectionState) {} + + fn transaction_status(&self) -> pgwire::messages::response::TransactionStatus { + pgwire::messages::response::TransactionStatus::Idle + } + + fn set_transaction_status(&mut self, _new_status: pgwire::messages::response::TransactionStatus) {} + + fn metadata(&self) -> &HashMap { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut HashMap { + &mut self.metadata + } + + fn client_certificates<'a>(&self) -> Option<&[rustls_pki_types::CertificateDer<'a>]> { + None + } + } + #[tokio::test] async fn test_statement_timeout_set_and_show() { let session_context = Arc::new(SessionContext::new()); let auth_manager = Arc::new(AuthManager::new()); let service = DfSessionService::new(session_context, auth_manager); + let mut client = MockClient::new(); // Test setting timeout to 5000ms let set_response = service - .try_respond_set_statements("set statement_timeout '5000ms'") + .try_respond_set_statements(&mut client, "set statement_timeout '5000ms'") .await .unwrap(); assert!(set_response.is_some()); - // Verify the timeout was set - let timeout = *service.statement_timeout.lock().await; + // Verify the timeout was set in client metadata + let timeout = DfSessionService::get_statement_timeout(&client); assert_eq!(timeout, Some(Duration::from_millis(5000))); // Test SHOW statement_timeout let show_response = service - .try_respond_show_statements("show statement_timeout") + .try_respond_show_statements(&client, "show statement_timeout") .await .unwrap(); assert!(show_response.is_some()); @@ -722,20 +813,21 @@ mod tests { let session_context = Arc::new(SessionContext::new()); let auth_manager = Arc::new(AuthManager::new()); let service = DfSessionService::new(session_context, auth_manager); + let mut client = MockClient::new(); // Set timeout first service - .try_respond_set_statements("set statement_timeout '1000ms'") + .try_respond_set_statements(&mut client, "set statement_timeout '1000ms'") .await .unwrap(); // Disable timeout with 0 service - .try_respond_set_statements("set statement_timeout '0'") + .try_respond_set_statements(&mut client, "set statement_timeout '0'") .await .unwrap(); - let timeout = *service.statement_timeout.lock().await; + let timeout = DfSessionService::get_statement_timeout(&client); assert_eq!(timeout, None); } } From c0bab99dd5201b5c3394b2fc2aba0c201c716f5d Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 15:17:35 +0100 Subject: [PATCH 5/5] fmt --- datafusion-postgres/src/handlers.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index fa08a68..06d7c98 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -122,7 +122,10 @@ impl DfSessionService { { let metadata = client.metadata_mut(); if let Some(duration) = timeout { - metadata.insert(METADATA_STATEMENT_TIMEOUT.to_string(), duration.as_millis().to_string()); + metadata.insert( + METADATA_STATEMENT_TIMEOUT.to_string(), + duration.as_millis().to_string(), + ); } else { metadata.remove(METADATA_STATEMENT_TIMEOUT); } @@ -439,7 +442,10 @@ impl SimpleQueryHandler for DfSessionService { self.check_query_permission(client, &query).await?; } - if let Some(resp) = self.try_respond_set_statements(client, &query_lower).await? { + if let Some(resp) = self + .try_respond_set_statements(client, &query_lower) + .await? + { return Ok(vec![resp]); } @@ -450,7 +456,10 @@ impl SimpleQueryHandler for DfSessionService { return Ok(vec![resp]); } - if let Some(resp) = self.try_respond_show_statements(client, &query_lower).await? { + if let Some(resp) = self + .try_respond_show_statements(client, &query_lower) + .await? + { return Ok(vec![resp]); } @@ -755,7 +764,12 @@ mod tests { (0, pgwire::messages::startup::SecretKey::I32(0)) } - fn set_pid_and_secret_key(&mut self, _pid: i32, _secret_key: pgwire::messages::startup::SecretKey) {} + fn set_pid_and_secret_key( + &mut self, + _pid: i32, + _secret_key: pgwire::messages::startup::SecretKey, + ) { + } fn state(&self) -> pgwire::api::PgWireConnectionState { pgwire::api::PgWireConnectionState::ReadyForQuery @@ -767,7 +781,11 @@ mod tests { pgwire::messages::response::TransactionStatus::Idle } - fn set_transaction_status(&mut self, _new_status: pgwire::messages::response::TransactionStatus) {} + fn set_transaction_status( + &mut self, + _new_status: pgwire::messages::response::TransactionStatus, + ) { + } fn metadata(&self) -> &HashMap { &self.metadata