From 8a398d82e91e1893fd49b72b8844e091dcf7d821 Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 18:31:01 +0100 Subject: [PATCH 1/3] Fix high priority transaction issues (clean from updated master) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✅ **Issue 1: ABORT command not supported** - Move transaction command handling before SQL parsing - ABORT now works without 'sql parser error: Expected: an SQL statement, found: ABORT' - Early transaction detection prevents parser from rejecting ABORT ✅ **Issue 2: Nested transactions not implemented** - Handle nested BEGIN commands like PostgreSQL (ignore with warning) - TransactionStatus::Transaction + BEGIN -> Execution response (not error) - Matches PostgreSQL behavior of allowing nested transaction blocks These fixes address the main issues seen in integration tests: - ⚠️ ABORT: sql parser error -> ✅ ABORT works - ⚠️ Nested BEGIN: not implemented -> ✅ Nested BEGIN ignored (PostgreSQL standard) - More stable transaction state management Built against latest master with all recent updates included. --- datafusion-postgres/src/handlers.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 06d7c98..fb7f86a 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -320,9 +320,15 @@ impl DfSessionService { match query_lower.trim() { "begin" | "begin transaction" | "begin work" | "start transaction" => { match client.transaction_status() { - TransactionStatus::Idle | TransactionStatus::Transaction => { + TransactionStatus::Idle => { Ok(Some(Response::TransactionStart(Tag::new("BEGIN")))) } + TransactionStatus::Transaction => { + // PostgreSQL behavior: ignore nested BEGIN, just return SUCCESS + // This matches PostgreSQL's handling of nested transaction blocks + log::warn!("BEGIN command ignored: already in transaction block"); + Ok(Some(Response::Execution(Tag::new("BEGIN")))) + } TransactionStatus::Error => { // Can't start new transaction from failed state Err(PgWireError::UserError(Box::new( @@ -417,6 +423,13 @@ impl SimpleQueryHandler for DfSessionService { C: ClientInfo + Unpin + Send + Sync, { log::debug!("Received query: {query}"); // Log the query for debugging + + // Check for transaction commands early to avoid SQL parsing issues with ABORT + let query_lower = query.to_lowercase().trim().to_string(); + if let Some(resp) = self.try_respond_transaction_statements(client, &query_lower).await? { + return Ok(vec![resp]); + } + let mut statements = parse(query).map_err(|e| PgWireError::ApiError(Box::new(e)))?; // TODO: deal with multiple statements @@ -449,12 +462,7 @@ impl SimpleQueryHandler for DfSessionService { return Ok(vec![resp]); } - if let Some(resp) = self - .try_respond_transaction_statements(client, &query_lower) - .await? - { - return Ok(vec![resp]); - } + if let Some(resp) = self .try_respond_show_statements(client, &query_lower) From 23efe8ce53a5745e2eb0036ba9e3644616ada365 Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 18:32:12 +0100 Subject: [PATCH 2/3] fmt --- datafusion-postgres/src/handlers.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index fb7f86a..797e4ad 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -423,13 +423,16 @@ impl SimpleQueryHandler for DfSessionService { C: ClientInfo + Unpin + Send + Sync, { log::debug!("Received query: {query}"); // Log the query for debugging - + // Check for transaction commands early to avoid SQL parsing issues with ABORT let query_lower = query.to_lowercase().trim().to_string(); - if let Some(resp) = self.try_respond_transaction_statements(client, &query_lower).await? { + if let Some(resp) = self + .try_respond_transaction_statements(client, &query_lower) + .await? + { return Ok(vec![resp]); } - + let mut statements = parse(query).map_err(|e| PgWireError::ApiError(Box::new(e)))?; // TODO: deal with multiple statements @@ -462,8 +465,6 @@ impl SimpleQueryHandler for DfSessionService { return Ok(vec![resp]); } - - if let Some(resp) = self .try_respond_show_statements(client, &query_lower) .await? From 1f984b03232dcfe2c5e0d334276f2720163dfb8b Mon Sep 17 00:00:00 2001 From: Oluwapeluwa Ibrahim Date: Wed, 10 Sep 2025 19:26:10 +0100 Subject: [PATCH 3/3] Fix high priority transaction issues - comprehensive solution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✅ **Issue 1: ABORT command not supported** - Handle transaction commands in BOTH simple and extended query paths - ABORT bypasses SQL parser completely in both protocols - Extended query parser creates dummy plan for transaction commands ✅ **Issue 2: Nested transactions not implemented** - Nested BEGIN returns success instead of error (PostgreSQL standard) - Warning logged but no failure - matches PostgreSQL behavior - Proper transaction state handling for all cases ✅ **Issue 3: Comprehensive transaction command coverage** - Fixed both SimpleQueryHandler and ExtendedQueryHandler paths - Transaction commands handled before SQL parsing in all cases - Supports all PostgreSQL transaction variants (ABORT, BEGIN WORK, etc.) This addresses all the failing test cases: - ⚠️ ABORT: sql parser error -> ✅ ABORT works in all protocols - ⚠️ Nested BEGIN: not implemented -> ✅ Nested BEGIN ignored with warning - ⚠️ Extended query BEGIN issues -> ✅ Extended query transaction support Built and tested against latest master branch. --- datafusion-postgres/src/handlers.rs | 34 +++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 797e4ad..2e64435 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -706,8 +706,38 @@ impl QueryParser for Parser { sql: &str, _types: &[Type], ) -> PgWireResult { - log::debug!("Received parse extended query: {sql}"); // Log for - // debugging + log::debug!("Received parse extended query: {sql}"); // Log for debugging + + // Check for transaction commands that shouldn't be parsed by DataFusion + let sql_lower = sql.to_lowercase(); + let sql_trimmed = sql_lower.trim(); + if matches!( + sql_trimmed, + "begin" + | "begin transaction" + | "begin work" + | "start transaction" + | "commit" + | "commit transaction" + | "commit work" + | "end" + | "end transaction" + | "rollback" + | "rollback transaction" + | "rollback work" + | "abort" + ) { + // Return a dummy plan for transaction commands - they'll be handled by transaction handler + let dummy_schema = datafusion::common::DFSchema::empty(); + let dummy_plan = datafusion::logical_expr::LogicalPlan::EmptyRelation( + datafusion::logical_expr::EmptyRelation { + produce_one_row: false, + schema: std::sync::Arc::new(dummy_schema), + }, + ); + return Ok((sql.to_string(), dummy_plan)); + } + let mut statements = parse(sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?; let mut statement = statements.remove(0);