From aad05dfb11993b63512d923ac06857de92e71ca0 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Tue, 16 Mar 2021 19:40:16 -0300 Subject: [PATCH 01/13] Fixes #69 --- src/Database/PostgreSQL/Simple/Internal.hs | 93 ++++++++++++++-------- test/Main.hs | 13 +++ 2 files changed, 75 insertions(+), 31 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index 63365ea..f6655c5 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -1,6 +1,7 @@ {-# LANGUAGE CPP, BangPatterns, DoAndIfThenElse, RecordWildCards #-} {-# LANGUAGE DeriveDataTypeable, DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE ScopedTypeVariables #-} ------------------------------------------------------------------------------ -- | @@ -24,7 +25,7 @@ module Database.PostgreSQL.Simple.Internal where import Control.Applicative import Control.Exception import Control.Concurrent.MVar -import Control.Monad(MonadPlus(..)) +import Control.Monad(MonadPlus(..), when) import Data.ByteString(ByteString) import qualified Data.ByteString as B import qualified Data.ByteString.Char8 as B8 @@ -329,40 +330,70 @@ exec conn sql = withConnection conn $ \h -> do success <- PQ.sendQuery h sql if success - then awaitResult h Nothing + then do + mfd <- PQ.socket h + case mfd of + Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec" + Just socket -> + -- Here we assume any exceptions are asynchronous, or that + -- they are not from libpq, or that if they come from libpq, + -- they don't do any of the internal libpq state "cleaning" + -- that is necessary. + (consumeUntilNotBusy h socket >> getResult h Nothing) + `catch` \(e :: SomeException) -> do + cancelAndClear h socket + throw e else throwLibPQError h "PQsendQuery failed" where - awaitResult h mres = do - mfd <- PQ.socket h - case mfd of - Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec" - Just fd -> do - threadWaitRead fd - _ <- PQ.consumeInput h -- FIXME? - getResult h mres + cancelAndClear h socket = do + mcncl <- PQ.getCancel h + case mcncl of + Nothing -> pure () + Just cncl -> do + cancelStatus <- PQ.cancel cncl + case cancelStatus of + Left _ -> PQ.errorMessage h >>= \mmsg -> throwLibPQError h ("Database.PostgreSQL.Simple.Internal.cancelAndClear: " <> fromMaybe "Unknown error" mmsg) + Right () -> do + consumeUntilNotBusy h socket + waitForNullResult h + + waitForNullResult h = do + mres <- PQ.getResult h + case mres of + Nothing -> pure () + Just _ -> waitForNullResult h + + -- | Waits until results are ready to be fetched. + consumeUntilNotBusy h socket = do + -- According to https://www.postgresql.org/docs/9.6/libpq-async.html: + -- 1. The isBusy status only changes by calling PQConsumeInput + -- 2. In case of errors, "PQgetResult should be called until it returns a null pointer, to allow libpq to process the error information completely" + -- 3. Also, "A typical application using these functions will have a main loop that uses select() or poll() ... When the main loop detects input ready, it should call PQconsumeInput to read the input. It can then call PQisBusy, followed by PQgetResult if PQisBusy returns false (0)" + busy <- PQ.isBusy h + when busy $ do + threadWaitRead socket + someError <- not <$> PQ.consumeInput h + when someError $ PQ.errorMessage h >>= \mmsg -> throwLibPQError h ("Database.PostgreSQL.Simple.Internal.consumeUntilNotBusy: " <> fromMaybe "Unknown error" mmsg) + consumeUntilNotBusy h socket getResult h mres = do - isBusy <- PQ.isBusy h - if isBusy - then awaitResult h mres - else do - mres' <- PQ.getResult h - case mres' of - Nothing -> case mres of - Nothing -> throwLibPQError h "PQgetResult returned no results" - Just res -> return res - Just res -> do - status <- PQ.resultStatus res - case status of - -- FIXME: handle PQ.CopyBoth and PQ.SingleTuple - PQ.EmptyQuery -> getResult h mres' - PQ.CommandOk -> getResult h mres' - PQ.TuplesOk -> getResult h mres' - PQ.CopyOut -> return res - PQ.CopyIn -> return res - PQ.BadResponse -> getResult h mres' - PQ.NonfatalError -> getResult h mres' - PQ.FatalError -> getResult h mres' + mres' <- PQ.getResult h + case mres' of + Nothing -> case mres of + Nothing -> throwLibPQError h "PQgetResult returned no results" + Just res -> return res + Just res -> do + status <- PQ.resultStatus res + case status of + -- FIXME: handle PQ.CopyBoth and PQ.SingleTuple + PQ.EmptyQuery -> getResult h mres' + PQ.CommandOk -> getResult h mres' + PQ.TuplesOk -> getResult h mres' + PQ.CopyOut -> return res + PQ.CopyIn -> return res + PQ.BadResponse -> getResult h mres' + PQ.NonfatalError -> getResult h mres' + PQ.FatalError -> getResult h mres' #endif -- | A version of 'execute' that does not perform query substitution. diff --git a/test/Main.hs b/test/Main.hs index ac59f99..0810188 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -84,6 +84,7 @@ tests env = testGroup "tests" , testCase "2-ary generic" . testGeneric2 , testCase "3-ary generic" . testGeneric3 , testCase "Timeout" . testTimeout + , testCase "Async exceptions" . testAsyncExceptionFailure ] testBytea :: TestEnv -> TestTree @@ -533,6 +534,18 @@ testDouble TestEnv{..} = do [Only (x :: Double)] <- query_ conn "SELECT '-Infinity'::float8" x @?= (-1 / 0) +-- | Ensures that asynchronous excepions thrown while queries are executing +-- are handled properly. +testAsyncExceptionFailure :: TestEnv -> Assertion +testAsyncExceptionFailure TestEnv{..} = withConn $ \c -> do + -- We need to give it enough time to start executing the query + -- before timing out. One second should be more than enough + execute_ c "SET my.setting TO '42'" + tmt <- timeout (1000 * 1000) (execute_ c "SELECT pg_sleep(60)") + tmt @?= Nothing + -- Any other query should work now without errors. + number42 <- query_ c "SELECT current_setting('my.setting')" + number42 @?= [ Only ("42" :: String) ] testGeneric1 :: TestEnv -> Assertion testGeneric1 TestEnv{..} = do From 53700d709d8e1733e771f4cd236a9d76ebf3ba63 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Sun, 21 Mar 2021 18:01:36 -0300 Subject: [PATCH 02/13] onException instead of catch --- src/Database/PostgreSQL/Simple/Internal.hs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index f6655c5..ca5fc5b 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -340,9 +340,7 @@ exec conn sql = -- they don't do any of the internal libpq state "cleaning" -- that is necessary. (consumeUntilNotBusy h socket >> getResult h Nothing) - `catch` \(e :: SomeException) -> do - cancelAndClear h socket - throw e + `onException` cancelAndClear h socket else throwLibPQError h "PQsendQuery failed" where cancelAndClear h socket = do @@ -365,7 +363,7 @@ exec conn sql = -- | Waits until results are ready to be fetched. consumeUntilNotBusy h socket = do - -- According to https://www.postgresql.org/docs/9.6/libpq-async.html: + -- According to https://www.postgresql.org/docs/current/libpq-async.html : -- 1. The isBusy status only changes by calling PQConsumeInput -- 2. In case of errors, "PQgetResult should be called until it returns a null pointer, to allow libpq to process the error information completely" -- 3. Also, "A typical application using these functions will have a main loop that uses select() or poll() ... When the main loop detects input ready, it should call PQconsumeInput to read the input. It can then call PQisBusy, followed by PQgetResult if PQisBusy returns false (0)" From 1e8363721ba92c8969bb8f7b0d84019a77b679fd Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Thu, 1 Apr 2021 18:38:36 -0300 Subject: [PATCH 03/13] Adding test for synchronous exceptions --- postgresql-simple.cabal | 1 + test/Main.hs | 28 +++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/postgresql-simple.cabal b/postgresql-simple.cabal index 5b22862..829f983 100644 --- a/postgresql-simple.cabal +++ b/postgresql-simple.cabal @@ -166,6 +166,7 @@ test-suite test build-depends: aeson + , async , base , base16-bytestring , bytestring diff --git a/test/Main.hs b/test/Main.hs index 0810188..68312dc 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -7,6 +7,7 @@ {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DerivingVia #-} {-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE TypeApplications #-} #endif module Main (main) where @@ -21,6 +22,8 @@ import Database.PostgreSQL.Simple.Types(Query(..),Values(..), PGArray(..)) import qualified Database.PostgreSQL.Simple.Transaction as ST import Control.Applicative +import Control.Concurrent (threadDelay) +import Control.Concurrent.Async (withAsync, wait) import Control.Exception as E import Control.Monad import Data.Char @@ -85,6 +88,7 @@ tests env = testGroup "tests" , testCase "3-ary generic" . testGeneric3 , testCase "Timeout" . testTimeout , testCase "Async exceptions" . testAsyncExceptionFailure + , testCase "Sync exceptions" . testSyncExceptionFailure ] testBytea :: TestEnv -> TestTree @@ -534,7 +538,7 @@ testDouble TestEnv{..} = do [Only (x :: Double)] <- query_ conn "SELECT '-Infinity'::float8" x @?= (-1 / 0) --- | Ensures that asynchronous excepions thrown while queries are executing +-- | Ensures that asynchronous exceptions thrown while queries are executing -- are handled properly. testAsyncExceptionFailure :: TestEnv -> Assertion testAsyncExceptionFailure TestEnv{..} = withConn $ \c -> do @@ -547,6 +551,28 @@ testAsyncExceptionFailure TestEnv{..} = withConn $ \c -> do number42 <- query_ c "SELECT current_setting('my.setting')" number42 @?= [ Only ("42" :: String) ] +-- | Ensures that synchronous exceptions thrown while queries are executing +-- are handled properly. +testSyncExceptionFailure :: TestEnv -> Assertion +testSyncExceptionFailure TestEnv{..} = do + withConn $ \c1 -> withConn $ \c2 -> do + [ Only (c1Pid :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" + execute_ c1 "SET my.setting TO '42'" + withAsync (execute_ c1 "SELECT pg_sleep(60)") $ \pgSleep -> do + -- We need to give it enough time to start executing the query + -- before canceling it. One second should be more than enough + threadDelay (1000 * 1000) + cancelResult <- query c2 "SELECT pg_cancel_backend(?)" (Only c1Pid) + cancelResult @?= [ Only True ] + killedQuery <- try @SqlError $ wait pgSleep + assertBool "Query was canceled" $ case killedQuery of + Right _ -> False + Left ex -> sqlState ex == "57014" + + -- Any other query should work now without errors. + number42 <- query_ c1 "SELECT current_setting('my.setting')" + number42 @?= [ Only ("42" :: String) ] + testGeneric1 :: TestEnv -> Assertion testGeneric1 TestEnv{..} = do roundTrip conn (Gen1 123) From 377e5df9c2d6b7ff5dfb4a8c73d25d4b73d17e20 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Thu, 1 Apr 2021 18:43:34 -0300 Subject: [PATCH 04/13] Running exception handler in uninterruptible mask --- src/Database/PostgreSQL/Simple/Internal.hs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index ca5fc5b..4cada0a 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -339,8 +339,9 @@ exec conn sql = -- they are not from libpq, or that if they come from libpq, -- they don't do any of the internal libpq state "cleaning" -- that is necessary. - (consumeUntilNotBusy h socket >> getResult h Nothing) - `onException` cancelAndClear h socket + uninterruptibleMask $ \restore -> + restore (consumeUntilNotBusy h socket >> getResult h Nothing) + `onException` cancelAndClear h socket else throwLibPQError h "PQsendQuery failed" where cancelAndClear h socket = do From f224a71756e6217a0b47d2238314de26d7514287 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Thu, 1 Apr 2021 18:58:55 -0300 Subject: [PATCH 05/13] Not using TypeApplications --- test/Main.hs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/Main.hs b/test/Main.hs index 68312dc..abab609 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -7,7 +7,6 @@ {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DerivingVia #-} {-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE TypeApplications #-} #endif module Main (main) where @@ -564,10 +563,10 @@ testSyncExceptionFailure TestEnv{..} = do threadDelay (1000 * 1000) cancelResult <- query c2 "SELECT pg_cancel_backend(?)" (Only c1Pid) cancelResult @?= [ Only True ] - killedQuery <- try @SqlError $ wait pgSleep + killedQuery <- try $ wait pgSleep assertBool "Query was canceled" $ case killedQuery of Right _ -> False - Left ex -> sqlState ex == "57014" + Left (ex :: SqlError) -> sqlState ex == "57014" -- Any other query should work now without errors. number42 <- query_ c1 "SELECT current_setting('my.setting')" From 936991cded9131940a1df7538fa30624f0e8580d Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Sat, 3 Apr 2021 09:47:29 -0300 Subject: [PATCH 06/13] More Exception related tests --- test/Main.hs | 101 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/test/Main.hs b/test/Main.hs index abab609..27c497c 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -27,7 +27,7 @@ import Control.Exception as E import Control.Monad import Data.Char import Data.Foldable (toList) -import Data.List (concat, sort) +import Data.List (concat, sort, isInfixOf) import Data.IORef import Data.Monoid ((<>)) import Data.String (fromString) @@ -50,6 +50,7 @@ import System.FilePath import System.Timeout(timeout) import Data.Time.Compat (getCurrentTime, diffUTCTime) import System.Environment (getEnvironment) +import qualified System.IO as IO import Test.Tasty import Test.Tasty.Golden @@ -86,8 +87,10 @@ tests env = testGroup "tests" , testCase "2-ary generic" . testGeneric2 , testCase "3-ary generic" . testGeneric3 , testCase "Timeout" . testTimeout + , testCase "Expected user exceptions" . testExpectedExceptions , testCase "Async exceptions" . testAsyncExceptionFailure - , testCase "Sync exceptions" . testSyncExceptionFailure + , testCase "Query canceled" . testCanceledQueryExceptions + , testCase "Connection terminated" . testConnectionTerminated ] testBytea :: TestEnv -> TestTree @@ -537,6 +540,21 @@ testDouble TestEnv{..} = do [Only (x :: Double)] <- query_ conn "SELECT '-Infinity'::float8" x @?= (-1 / 0) +-- | Specifies exceptions thrown by postgresql-simple for certain user errors. +testExpectedExceptions :: TestEnv -> Assertion +testExpectedExceptions TestEnv{..} = do + withConn $ \c -> do + execute_ c "SELECT 1,2" `shouldThrow` (\(e :: QueryError) -> "2-column result" `isInfixOf` show e) + execute_ c "SELECT 1/0" `shouldThrow` (\(e :: SqlError) -> sqlState e == "22012") + (query_ c "SELECT 1, 2, 3" :: IO [(String, Int)]) `shouldThrow` (\(e :: ResultError) -> errSQLType e == "int4" && errHaskellType e == "Text") + +shouldThrow :: forall e a. Exception e => IO a -> (e -> Bool) -> IO () +shouldThrow f pred = do + ea <- try f + assertBool "Exception is as expected" $ case ea of + Right _ -> False + Left (ex :: e) -> pred ex + -- | Ensures that asynchronous exceptions thrown while queries are executing -- are handled properly. testAsyncExceptionFailure :: TestEnv -> Assertion @@ -544,33 +562,76 @@ testAsyncExceptionFailure TestEnv{..} = withConn $ \c -> do -- We need to give it enough time to start executing the query -- before timing out. One second should be more than enough execute_ c "SET my.setting TO '42'" - tmt <- timeout (1000 * 1000) (execute_ c "SELECT pg_sleep(60)") - tmt @?= Nothing - -- Any other query should work now without errors. - number42 <- query_ c "SELECT current_setting('my.setting')" - number42 @?= [ Only ("42" :: String) ] + testAsyncException c (1000 * 1000) (execute_ c "SELECT pg_sleep(60)") + testAsyncException c (1000 * 1000) $ + bracket_ (execute_ c "CREATE TABLE IF NOT EXISTS copy_cancel (v INT)") (execute_ c "DROP TABLE IF EXISTS copy_cancel") $ + bracket_ (copy_ c "COPY copy_cancel FROM STDIN (FORMAT CSV)") (putCopyEnd c) $ do + putCopyData c "1\n" + threadDelay (1000 * 1000 * 60) --- | Ensures that synchronous exceptions thrown while queries are executing --- are handled properly. -testSyncExceptionFailure :: TestEnv -> Assertion -testSyncExceptionFailure TestEnv{..} = do + where + testAsyncException c timeLimit f = do + tmt <- timeout timeLimit f + tmt @?= Nothing + -- Any other query should work now without errors. + number42 <- query_ c "SELECT current_setting('my.setting')" + number42 @?= [ Only ("42" :: String) ] + +-- | Ensures that canceled queries don't invalidate the Connection and specifies how +-- they can be detected. +testCanceledQueryExceptions :: TestEnv -> Assertion +testCanceledQueryExceptions TestEnv{..} = do withConn $ \c1 -> withConn $ \c2 -> do [ Only (c1Pid :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" execute_ c1 "SET my.setting TO '42'" - withAsync (execute_ c1 "SELECT pg_sleep(60)") $ \pgSleep -> do + + testCancelation c1 c2 c1Pid execPgSleep $ \(ex :: SqlError) -> sqlState ex == "57014" + + -- What should we expect when COPY is canceled and putCopyEnd runs? The same SqlError as above, perhaps? Right now, + -- detecting if a query was canceled involves detecting two distinct types of exception. + testCancelation c1 c2 c1Pid execCopy $ \(ex :: IOException) -> "Database.PostgreSQL.Simple.Copy.putCopyEnd: failed to parse command status" `isInfixOf` show ex + && "ERROR: canceling statement due to user request" `isInfixOf` show ex + + -- Any other query should work now without errors. + number42 <- query_ c1 "SELECT current_setting('my.setting')" + number42 @?= [ Only ("42" :: String) ] + + where + execPgSleep c = execute_ c "SELECT pg_sleep(60)" + execCopy c = + bracket_ (execute_ c "CREATE TABLE IF NOT EXISTS copy_cancel (v INT)") (execute_ c "DROP TABLE IF EXISTS copy_cancel") $ + bracket_ (copy_ c "COPY copy_cancel FROM STDIN (FORMAT CSV)") (putCopyEnd c) $ do + putCopyData c "1\n" + threadDelay (1000 * 1000 * 2) + -- putCopyEnd will run after pg_cancel_backend due to threadDelays + testCancelation c1 c2 cPid f exPred = withAsync (f c1) $ \longRunningAction -> do -- We need to give it enough time to start executing the query -- before canceling it. One second should be more than enough threadDelay (1000 * 1000) - cancelResult <- query c2 "SELECT pg_cancel_backend(?)" (Only c1Pid) + cancelResult <- query c2 "SELECT pg_cancel_backend(?)" (Only cPid) + cancelResult @?= [ Only True ] + wait longRunningAction `shouldThrow` exPred + -- Connection is still usable after query canceled + [ Only (cPidAgain :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" + cPid @?= cPidAgain + +-- | Ensures that a specific type of exception is thrown when +-- the connection is terminated abruptly. +testConnectionTerminated :: TestEnv -> Assertion +testConnectionTerminated TestEnv{..} = do + withConn $ \c1 -> withConn $ \c2 -> do + [ Only (c1Pid :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" + withAsync (execute_ c1 "SELECT pg_sleep(60)") $ \pgSleep -> do + -- We need to give it enough time to start executing the query + -- before terminating it. One second should be more than enough + threadDelay (1000 * 1000) + cancelResult <- query c2 "SELECT pg_terminate_backend(?)" (Only c1Pid) cancelResult @?= [ Only True ] killedQuery <- try $ wait pgSleep - assertBool "Query was canceled" $ case killedQuery of + assertBool "Connection was terminated" $ case killedQuery of Right _ -> False - Left (ex :: SqlError) -> sqlState ex == "57014" - - -- Any other query should work now without errors. - number42 <- query_ c1 "SELECT current_setting('my.setting')" - number42 @?= [ Only ("42" :: String) ] + Left (ex :: SqlError) -> ("server closed the connection unexpectedly" `isInfixOf` show (sqlErrorMsg ex)) + && sqlExecStatus ex == FatalError testGeneric1 :: TestEnv -> Assertion testGeneric1 TestEnv{..} = do @@ -658,6 +719,8 @@ withTestEnv connstr cb = main :: IO () main = withConnstring $ \connstring -> do + IO.hSetBuffering IO.stdout IO.NoBuffering + IO.hSetBuffering IO.stderr IO.NoBuffering withTestEnv connstring (defaultMain . tests) withConnstring :: (BS8.ByteString -> IO ()) -> IO () From 0aab4083d4eb4026774d1497c8928a4662e244d6 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Sat, 3 Apr 2021 10:59:08 -0300 Subject: [PATCH 07/13] More descriptive exception in case of pg_terminate_backend --- src/Database/PostgreSQL/Simple/Copy.hs | 6 ++--- src/Database/PostgreSQL/Simple/Cursor.hs | 2 +- src/Database/PostgreSQL/Simple/Internal.hs | 23 +++++++++++-------- .../Simple/Internal/PQResultUtils.hs | 16 ++++++------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Copy.hs b/src/Database/PostgreSQL/Simple/Copy.hs index 2c36c6c..b874d87 100644 --- a/src/Database/PostgreSQL/Simple/Copy.hs +++ b/src/Database/PostgreSQL/Simple/Copy.hs @@ -92,9 +92,9 @@ doCopy funcName conn template q = do #if MIN_VERSION_postgresql_libpq(0,9,2) PQ.SingleTuple -> errMsg "single-row mode is not supported" #endif - PQ.BadResponse -> throwResultError funcName result status - PQ.NonfatalError -> throwResultError funcName result status - PQ.FatalError -> throwResultError funcName result status + PQ.BadResponse -> throwResultError funcName conn result status + PQ.NonfatalError -> throwResultError funcName conn result status + PQ.FatalError -> throwResultError funcName conn result status data CopyOutResult = CopyOutRow !B.ByteString -- ^ Data representing either exactly diff --git a/src/Database/PostgreSQL/Simple/Cursor.hs b/src/Database/PostgreSQL/Simple/Cursor.hs index 85babd9..1f8ae21 100644 --- a/src/Database/PostgreSQL/Simple/Cursor.hs +++ b/src/Database/PostgreSQL/Simple/Cursor.hs @@ -78,7 +78,7 @@ foldForwardWithParser (Cursor name conn) parser chunkSize f a0 = do Right <$> foldM' inner a0 0 (nrows - 1) else return $ Left a0 - _ -> throwResultError "foldForwardWithParser" result status + _ -> throwResultError "foldForwardWithParser" conn result status -- | Fold over a chunk of rows, calling the supplied fold-like function -- on each row as it is received. In case the cursor is exhausted, diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index 4cada0a..65a0222 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -336,9 +336,10 @@ exec conn sql = Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec" Just socket -> -- Here we assume any exceptions are asynchronous, or that - -- they are not from libpq, or that if they come from libpq, - -- they don't do any of the internal libpq state "cleaning" - -- that is necessary. + -- they are not from libpq. If an error happens in libpq + -- (e.g. the query being canceled or session terminated), + -- libpq will not throw, but will instead return a Result + -- indicating an error uninterruptibleMask $ \restore -> restore (consumeUntilNotBusy h socket >> getResult h Nothing) `onException` cancelAndClear h socket @@ -402,7 +403,7 @@ execute_ conn q@(Query stmt) = do finishExecute conn q result finishExecute :: Connection -> Query -> PQ.Result -> IO Int64 -finishExecute _conn q result = do +finishExecute conn q result = do status <- PQ.resultStatus result case status of -- FIXME: handle PQ.CopyBoth and PQ.SingleTuple @@ -425,9 +426,9 @@ finishExecute _conn q result = do throwIO $ QueryError "execute: COPY TO is not supported" q PQ.CopyIn -> throwIO $ QueryError "execute: COPY FROM is not supported" q - PQ.BadResponse -> throwResultError "execute" result status - PQ.NonfatalError -> throwResultError "execute" result status - PQ.FatalError -> throwResultError "execute" result status + PQ.BadResponse -> throwResultError "execute" conn result status + PQ.NonfatalError -> throwResultError "execute" conn result status + PQ.FatalError -> throwResultError "execute" conn result status where mkInteger str = B8.foldl' delta 0 str where @@ -436,9 +437,11 @@ finishExecute _conn q result = do then 10 * acc + fromIntegral (ord c - ord '0') else error ("finishExecute: not an int: " ++ B8.unpack str) -throwResultError :: ByteString -> PQ.Result -> PQ.ExecStatus -> IO a -throwResultError _ result status = do - errormsg <- fromMaybe "" <$> +throwResultError :: ByteString -> Connection -> PQ.Result -> PQ.ExecStatus -> IO a +throwResultError _ conn result status = do + -- Some errors only exist in "errorMessage" + mConnectionError <- withConnection conn PQ.errorMessage + errormsg <- fromMaybe "" . (mConnectionError <|>) <$> PQ.resultErrorField result PQ.DiagMessagePrimary detail <- fromMaybe "" <$> PQ.resultErrorField result PQ.DiagMessageDetail diff --git a/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs b/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs index d1dac27..c458c5e 100644 --- a/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs +++ b/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs @@ -37,14 +37,14 @@ import Control.Monad.Trans.Reader import Control.Monad.Trans.State.Strict finishQueryWith :: RowParser r -> Connection -> Query -> PQ.Result -> IO [r] -finishQueryWith parser conn q result = finishQueryWith' q result $ do +finishQueryWith parser conn q result = finishQueryWith' conn q result $ do nrows <- PQ.ntuples result ncols <- PQ.nfields result forM' 0 (nrows-1) $ \row -> getRowWith parser row ncols conn result finishQueryWithV :: RowParser r -> Connection -> Query -> PQ.Result -> IO (V.Vector r) -finishQueryWithV parser conn q result = finishQueryWith' q result $ do +finishQueryWithV parser conn q result = finishQueryWith' conn q result $ do nrows <- PQ.ntuples result let PQ.Row nrows' = nrows ncols <- PQ.nfields result @@ -56,7 +56,7 @@ finishQueryWithV parser conn q result = finishQueryWith' q result $ do V.unsafeFreeze mv finishQueryWithVU :: VU.Unbox r => RowParser r -> Connection -> Query -> PQ.Result -> IO (VU.Vector r) -finishQueryWithVU parser conn q result = finishQueryWith' q result $ do +finishQueryWithVU parser conn q result = finishQueryWith' conn q result $ do nrows <- PQ.ntuples result let PQ.Row nrows' = nrows ncols <- PQ.nfields result @@ -67,8 +67,8 @@ finishQueryWithVU parser conn q result = finishQueryWith' q result $ do MVU.unsafeWrite mv (fromIntegral row') value VU.unsafeFreeze mv -finishQueryWith' :: Query -> PQ.Result -> IO a -> IO a -finishQueryWith' q result k = do +finishQueryWith' :: Connection -> Query -> PQ.Result -> IO a -> IO a +finishQueryWith' conn q result k = do status <- PQ.resultStatus result case status of PQ.TuplesOk -> k @@ -82,9 +82,9 @@ finishQueryWith' q result k = do #if MIN_VERSION_postgresql_libpq(0,9,2) PQ.SingleTuple -> queryErr "query: single-row mode is not supported" #endif - PQ.BadResponse -> throwResultError "query" result status - PQ.NonfatalError -> throwResultError "query" result status - PQ.FatalError -> throwResultError "query" result status + PQ.BadResponse -> throwResultError "query" conn result status + PQ.NonfatalError -> throwResultError "query" conn result status + PQ.FatalError -> throwResultError "query" conn result status where queryErr msg = throwIO $ QueryError msg q From 5be9661b00f7e809427e3c688f6a9b002a53f4c7 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Mon, 5 Apr 2021 12:14:50 -0300 Subject: [PATCH 08/13] Masking a larger section of code --- src/Database/PostgreSQL/Simple/Internal.hs | 38 +++++++++++++--------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index 65a0222..91d3d3c 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -327,24 +327,30 @@ exec conn sql = Just res -> return res #else exec conn sql = - withConnection conn $ \h -> do - success <- PQ.sendQuery h sql - if success - then do - mfd <- PQ.socket h - case mfd of - Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec" - Just socket -> - -- Here we assume any exceptions are asynchronous, or that - -- they are not from libpq. If an error happens in libpq - -- (e.g. the query being canceled or session terminated), - -- libpq will not throw, but will instead return a Result - -- indicating an error - uninterruptibleMask $ \restore -> - restore (consumeUntilNotBusy h socket >> getResult h Nothing) + withConnection conn $ \h -> withSocket h $ \socket -> uninterruptibleMask $ \restore -> + -- If an error happens in libpq + -- (e.g. the query being canceled or session terminated), + -- libpq will not throw, but will instead return a Result + -- indicating an error. But if an asynchronous exception + -- is thrown, it might be necessary to reset the libpq's + -- connection state so the connection can still be used. + restore (sendQueryAndWaitForResults h socket) `onException` cancelAndClear h socket - else throwLibPQError h "PQsendQuery failed" + where + withSocket h f = do + mfd <- PQ.socket h + case mfd of + Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec" + Just socket -> f socket + + sendQueryAndWaitForResults h socket = do + success <- PQ.sendQuery h sql + if success then do + consumeUntilNotBusy h socket + getResult h Nothing + else throwLibPQError h "PQsendQuery failed" + cancelAndClear h socket = do mcncl <- PQ.getCancel h case mcncl of From d0f6c6efbb2d13bba58185b4c889562f2d29b525 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Fri, 9 Apr 2021 19:11:19 -0300 Subject: [PATCH 09/13] Using interruptible masking --- src/Database/PostgreSQL/Simple/Internal.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index 91d3d3c..b93ec1e 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -327,14 +327,14 @@ exec conn sql = Just res -> return res #else exec conn sql = - withConnection conn $ \h -> withSocket h $ \socket -> uninterruptibleMask $ \restore -> + withConnection conn $ \h -> withSocket h $ \socket-> -- If an error happens in libpq -- (e.g. the query being canceled or session terminated), -- libpq will not throw, but will instead return a Result -- indicating an error. But if an asynchronous exception - -- is thrown, it might be necessary to reset the libpq's + -- is thrown, it might be necessary to reset libpq's -- connection state so the connection can still be used. - restore (sendQueryAndWaitForResults h socket) + sendQueryAndWaitForResults h socket `onException` cancelAndClear h socket where From 5bb660b68aff5dfc2b5dd187f2992a7a086cddc4 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Fri, 9 Apr 2021 19:14:04 -0300 Subject: [PATCH 10/13] Shorter pg_sleep in tests --- test/Main.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Main.hs b/test/Main.hs index 27c497c..cc7e8b1 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -562,7 +562,7 @@ testAsyncExceptionFailure TestEnv{..} = withConn $ \c -> do -- We need to give it enough time to start executing the query -- before timing out. One second should be more than enough execute_ c "SET my.setting TO '42'" - testAsyncException c (1000 * 1000) (execute_ c "SELECT pg_sleep(60)") + testAsyncException c (1000 * 1000) (execute_ c "SELECT pg_sleep(5)") testAsyncException c (1000 * 1000) $ bracket_ (execute_ c "CREATE TABLE IF NOT EXISTS copy_cancel (v INT)") (execute_ c "DROP TABLE IF EXISTS copy_cancel") $ bracket_ (copy_ c "COPY copy_cancel FROM STDIN (FORMAT CSV)") (putCopyEnd c) $ do @@ -597,7 +597,7 @@ testCanceledQueryExceptions TestEnv{..} = do number42 @?= [ Only ("42" :: String) ] where - execPgSleep c = execute_ c "SELECT pg_sleep(60)" + execPgSleep c = execute_ c "SELECT pg_sleep(5)" execCopy c = bracket_ (execute_ c "CREATE TABLE IF NOT EXISTS copy_cancel (v INT)") (execute_ c "DROP TABLE IF EXISTS copy_cancel") $ bracket_ (copy_ c "COPY copy_cancel FROM STDIN (FORMAT CSV)") (putCopyEnd c) $ do @@ -621,7 +621,7 @@ testConnectionTerminated :: TestEnv -> Assertion testConnectionTerminated TestEnv{..} = do withConn $ \c1 -> withConn $ \c2 -> do [ Only (c1Pid :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" - withAsync (execute_ c1 "SELECT pg_sleep(60)") $ \pgSleep -> do + withAsync (execute_ c1 "SELECT pg_sleep(5)") $ \pgSleep -> do -- We need to give it enough time to start executing the query -- before terminating it. One second should be more than enough threadDelay (1000 * 1000) From 1ba62a522b16bad03949a01c743015b6c67406d0 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Sun, 23 Oct 2022 08:46:14 -0300 Subject: [PATCH 11/13] Changing approach to clearing state in next query This allows us to avoid running queries inside exception handlers by running query canceling code before the next query is issued. --- src/Database/PostgreSQL/Simple/Internal.hs | 21 ++++++++-- test/Main.hs | 47 +++++++++++++++++++++- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index b93ec1e..0ed0ba8 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -55,7 +55,7 @@ import Control.Monad.Trans.Class import GHC.Generics import GHC.IO.Exception #if !defined(mingw32_HOST_OS) -import Control.Concurrent(threadWaitRead, threadWaitWrite) +import Control.Concurrent(threadWaitRead, threadWaitWrite, threadDelay) #endif -- | A Field represents metadata about a particular field @@ -78,6 +78,10 @@ data Connection = Connection { connectionHandle :: {-# UNPACK #-} !(MVar PQ.Connection) , connectionObjects :: {-# UNPACK #-} !(MVar TypeInfoCache) , connectionTempNameCounter :: {-# UNPACK #-} !(IORef Int64) + , connectionMayHaveOrphanedStatement :: {-# UNPACK #-} !(IORef Bool) + -- ^ True if there could be a statement running in postgres in this connection, but + -- postgresql-simple is not waiting for results from it. This can happen when + -- postgresql-simple is interrupted by asynchronous exceptions. } deriving (Typeable) instance Eq Connection where @@ -235,6 +239,7 @@ connectPostgreSQL connstr = do connectionHandle <- newMVar conn connectionObjects <- newMVar (IntMap.empty) connectionTempNameCounter <- newIORef 0 + connectionMayHaveOrphanedStatement <- newIORef False let wconn = Connection{..} version <- PQ.serverVersion conn let settings @@ -327,15 +332,22 @@ exec conn sql = Just res -> return res #else exec conn sql = - withConnection conn $ \h -> withSocket h $ \socket-> + withConnection conn $ \h -> withSocket h $ \socket-> uninterruptibleMask $ \restore -> -- If an error happens in libpq -- (e.g. the query being canceled or session terminated), -- libpq will not throw, but will instead return a Result -- indicating an error. But if an asynchronous exception -- is thrown, it might be necessary to reset libpq's -- connection state so the connection can still be used. - sendQueryAndWaitForResults h socket - `onException` cancelAndClear h socket + -- We do that in a future statement because it is not safe + -- to run more queries inside exception handlers. + restore (do + needsToCancel <- readIORef (connectionMayHaveOrphanedStatement conn) + when needsToCancel $ do + cancelAndClear h socket + writeIORef (connectionMayHaveOrphanedStatement conn) False + sendQueryAndWaitForResults h socket) + `onException` writeIORef (connectionMayHaveOrphanedStatement conn) True where withSocket h f = do @@ -485,6 +497,7 @@ newNullConnection = do connectionHandle <- newMVar =<< PQ.newNullConnection connectionObjects <- newMVar IntMap.empty connectionTempNameCounter <- newIORef 0 + connectionMayHaveOrphanedStatement <- newIORef False return Connection{..} data Row = Row { diff --git a/test/Main.hs b/test/Main.hs index cc7e8b1..9b604bd 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -16,7 +16,7 @@ import Database.PostgreSQL.Simple.ToField (ToField) import Database.PostgreSQL.Simple.FromField (FromField) import Database.PostgreSQL.Simple.HStore import Database.PostgreSQL.Simple.Newtypes -import Database.PostgreSQL.Simple.Internal (breakOnSingleQuestionMark) +import Database.PostgreSQL.Simple.Internal (breakOnSingleQuestionMark, connectionMayHaveOrphanedStatement) import Database.PostgreSQL.Simple.Types(Query(..),Values(..), PGArray(..)) import qualified Database.PostgreSQL.Simple.Transaction as ST @@ -88,6 +88,7 @@ tests env = testGroup "tests" , testCase "3-ary generic" . testGeneric3 , testCase "Timeout" . testTimeout , testCase "Expected user exceptions" . testExpectedExceptions + , testCase "Orphaned running query state mgmt" . testOrphanedRunningQueryStateMgmt , testCase "Async exceptions" . testAsyncExceptionFailure , testCase "Query canceled" . testCanceledQueryExceptions , testCase "Connection terminated" . testConnectionTerminated @@ -555,6 +556,50 @@ shouldThrow f pred = do Right _ -> False Left (ex :: e) -> pred ex +-- | Ensures that the state associated with there being an orphaned +-- running statement in a connection is updated accordingly. +testOrphanedRunningQueryStateMgmt :: TestEnv -> Assertion +testOrphanedRunningQueryStateMgmt TestEnv{..} = withConn $ \c -> do + -- 1. Connections are created with no orphaned running queries, naturally. + runState c `shouldReturn` False + + -- 2. Interrupting a query that is still running should set the state + -- to True. + -- We need to give it enough time to start executing the query + -- before timing out. One second should be more than enough + void $ timeout (1000 * 1000) (execute_ c "SELECT pg_sleep(100)") + runState c `shouldReturn` True + + -- 3. Running a new query should clear the state again + [ Only (num13 :: Int) ] <- query c "SELECT 13" () + num13 @?= 13 + runState c `shouldReturn` False + + -- 4. Interrupting a query but letting it run until completion shouldn't + -- matter (postgresql-simple has no way of knowing that), but no errors + -- should come out of it + void $ timeout (1000 * 1000) (execute_ c "SELECT pg_sleep(2)") + runState c `shouldReturn` True + + -- One second has passed, wait 2 more to ensure the query finished. + -- The state is still True. + threadDelay (1000 * 1000 * 2) + runState c `shouldReturn` True + + -- 5. Check that nothing wrong happens if we try to cancel a query + -- that is no longer running (this happens automatically by running another query) + [ Only (num17 :: Int) ] <- query c "SELECT 17" () + num17 @?= 17 + runState c `shouldReturn` False + + where + runState = readIORef . connectionMayHaveOrphanedStatement + shouldReturn :: (Eq a, Show a, HasCallStack) => IO a -> a -> IO () + shouldReturn f expected = do + actual <- f + actual @?= expected + + -- | Ensures that asynchronous exceptions thrown while queries are executing -- are handled properly. testAsyncExceptionFailure :: TestEnv -> Assertion From 726cc01a4749fc7ceb943163061ae5e13e004c60 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Sun, 23 Oct 2022 09:19:10 -0300 Subject: [PATCH 12/13] Finer grained exception handling block and better documentation --- src/Database/PostgreSQL/Simple/Internal.hs | 38 ++++++++++++---------- test/Main.hs | 4 +++ 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index 0ed0ba8..4890f91 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -332,21 +332,23 @@ exec conn sql = Just res -> return res #else exec conn sql = - withConnection conn $ \h -> withSocket h $ \socket-> uninterruptibleMask $ \restore -> - -- If an error happens in libpq - -- (e.g. the query being canceled or session terminated), - -- libpq will not throw, but will instead return a Result - -- indicating an error. But if an asynchronous exception - -- is thrown, it might be necessary to reset libpq's - -- connection state so the connection can still be used. - -- We do that in a future statement because it is not safe - -- to run more queries inside exception handlers. - restore (do - needsToCancel <- readIORef (connectionMayHaveOrphanedStatement conn) - when needsToCancel $ do - cancelAndClear h socket - writeIORef (connectionMayHaveOrphanedStatement conn) False - sendQueryAndWaitForResults h socket) + withConnection conn $ \h -> withSocket h $ \socket-> uninterruptibleMask $ \restore -> do + -- 1. If postgresql-simple was interrupted when waiting for query results + -- before, cancel that query (it may even have completed by now, but that's fine) + -- before issuing a new one. + restore $ do + needsToCancel <- readIORef (connectionMayHaveOrphanedStatement conn) + when needsToCancel $ do + cancelRunningQuery h socket + writeIORef (connectionMayHaveOrphanedStatement conn) False + + -- 2. Ideally, the code that issues the query and waits for results + -- should not throw exceptions. That way we know an exception means + -- postgresql-simple was interrupted and the query might still be running. + -- Still, even if the code throws exceptions for other reasons, it means + -- we'll try to cancel a running query later once, which is fairly inocuous + -- as long as such exceptions are rare (which they should be). + restore (sendQueryAndWaitForResults h socket) `onException` writeIORef (connectionMayHaveOrphanedStatement conn) True where @@ -363,14 +365,16 @@ exec conn sql = getResult h Nothing else throwLibPQError h "PQsendQuery failed" - cancelAndClear h socket = do + cancelRunningQuery h socket = do mcncl <- PQ.getCancel h case mcncl of Nothing -> pure () Just cncl -> do cancelStatus <- PQ.cancel cncl case cancelStatus of - Left _ -> PQ.errorMessage h >>= \mmsg -> throwLibPQError h ("Database.PostgreSQL.Simple.Internal.cancelAndClear: " <> fromMaybe "Unknown error" mmsg) + Left _ -> PQ.errorMessage h >>= \mmsg -> throwLibPQError h ("Database.PostgreSQL.Simple.Internal.cancelRunningQuery: " <> fromMaybe "Unknown error" mmsg + <> "\nIt looks like postgresql-simple was previously interrupted by an exception while waiting for query results." + <> " Because of that, before issuing a new query, we tried to cancel that previous query that was interrupted, but failed to do so.") Right () -> do consumeUntilNotBusy h socket waitForNullResult h diff --git a/test/Main.hs b/test/Main.hs index 9b604bd..8c06ec1 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -592,6 +592,10 @@ testOrphanedRunningQueryStateMgmt TestEnv{..} = withConn $ \c -> do num17 @?= 17 runState c `shouldReturn` False + -- 6. Other errors that are not interruptions don't change the connection's state + execute_ c "SELECT 1/0" `shouldThrow` (\(_ :: SqlError) -> True) + runState c `shouldReturn` False + where runState = readIORef . connectionMayHaveOrphanedStatement shouldReturn :: (Eq a, Show a, HasCallStack) => IO a -> a -> IO () From 952a7e9c9c3f588759a8bb1030205a9865c5dbd8 Mon Sep 17 00:00:00 2001 From: Marcelo Zabani Date: Thu, 27 Oct 2022 17:07:52 -0300 Subject: [PATCH 13/13] Preserving internal functions' signatures and tidying up a bit --- src/Database/PostgreSQL/Simple/Copy.hs | 6 +++--- src/Database/PostgreSQL/Simple/Cursor.hs | 2 +- src/Database/PostgreSQL/Simple/Internal.hs | 19 ++++++++----------- .../Simple/Internal/PQResultUtils.hs | 16 ++++++++-------- test/Main.hs | 19 ------------------- 5 files changed, 20 insertions(+), 42 deletions(-) diff --git a/src/Database/PostgreSQL/Simple/Copy.hs b/src/Database/PostgreSQL/Simple/Copy.hs index b874d87..2c36c6c 100644 --- a/src/Database/PostgreSQL/Simple/Copy.hs +++ b/src/Database/PostgreSQL/Simple/Copy.hs @@ -92,9 +92,9 @@ doCopy funcName conn template q = do #if MIN_VERSION_postgresql_libpq(0,9,2) PQ.SingleTuple -> errMsg "single-row mode is not supported" #endif - PQ.BadResponse -> throwResultError funcName conn result status - PQ.NonfatalError -> throwResultError funcName conn result status - PQ.FatalError -> throwResultError funcName conn result status + PQ.BadResponse -> throwResultError funcName result status + PQ.NonfatalError -> throwResultError funcName result status + PQ.FatalError -> throwResultError funcName result status data CopyOutResult = CopyOutRow !B.ByteString -- ^ Data representing either exactly diff --git a/src/Database/PostgreSQL/Simple/Cursor.hs b/src/Database/PostgreSQL/Simple/Cursor.hs index 1f8ae21..85babd9 100644 --- a/src/Database/PostgreSQL/Simple/Cursor.hs +++ b/src/Database/PostgreSQL/Simple/Cursor.hs @@ -78,7 +78,7 @@ foldForwardWithParser (Cursor name conn) parser chunkSize f a0 = do Right <$> foldM' inner a0 0 (nrows - 1) else return $ Left a0 - _ -> throwResultError "foldForwardWithParser" conn result status + _ -> throwResultError "foldForwardWithParser" result status -- | Fold over a chunk of rows, calling the supplied fold-like function -- on each row as it is received. In case the cursor is exhausted, diff --git a/src/Database/PostgreSQL/Simple/Internal.hs b/src/Database/PostgreSQL/Simple/Internal.hs index 4890f91..184184a 100644 --- a/src/Database/PostgreSQL/Simple/Internal.hs +++ b/src/Database/PostgreSQL/Simple/Internal.hs @@ -1,7 +1,6 @@ {-# LANGUAGE CPP, BangPatterns, DoAndIfThenElse, RecordWildCards #-} {-# LANGUAGE DeriveDataTypeable, DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE ScopedTypeVariables #-} ------------------------------------------------------------------------------ -- | @@ -55,7 +54,7 @@ import Control.Monad.Trans.Class import GHC.Generics import GHC.IO.Exception #if !defined(mingw32_HOST_OS) -import Control.Concurrent(threadWaitRead, threadWaitWrite, threadDelay) +import Control.Concurrent(threadWaitRead, threadWaitWrite) #endif -- | A Field represents metadata about a particular field @@ -425,7 +424,7 @@ execute_ conn q@(Query stmt) = do finishExecute conn q result finishExecute :: Connection -> Query -> PQ.Result -> IO Int64 -finishExecute conn q result = do +finishExecute _conn q result = do status <- PQ.resultStatus result case status of -- FIXME: handle PQ.CopyBoth and PQ.SingleTuple @@ -448,9 +447,9 @@ finishExecute conn q result = do throwIO $ QueryError "execute: COPY TO is not supported" q PQ.CopyIn -> throwIO $ QueryError "execute: COPY FROM is not supported" q - PQ.BadResponse -> throwResultError "execute" conn result status - PQ.NonfatalError -> throwResultError "execute" conn result status - PQ.FatalError -> throwResultError "execute" conn result status + PQ.BadResponse -> throwResultError "execute" result status + PQ.NonfatalError -> throwResultError "execute" result status + PQ.FatalError -> throwResultError "execute" result status where mkInteger str = B8.foldl' delta 0 str where @@ -459,11 +458,9 @@ finishExecute conn q result = do then 10 * acc + fromIntegral (ord c - ord '0') else error ("finishExecute: not an int: " ++ B8.unpack str) -throwResultError :: ByteString -> Connection -> PQ.Result -> PQ.ExecStatus -> IO a -throwResultError _ conn result status = do - -- Some errors only exist in "errorMessage" - mConnectionError <- withConnection conn PQ.errorMessage - errormsg <- fromMaybe "" . (mConnectionError <|>) <$> +throwResultError :: ByteString -> PQ.Result -> PQ.ExecStatus -> IO a +throwResultError _ result status = do + errormsg <- fromMaybe "" <$> PQ.resultErrorField result PQ.DiagMessagePrimary detail <- fromMaybe "" <$> PQ.resultErrorField result PQ.DiagMessageDetail diff --git a/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs b/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs index c458c5e..d1dac27 100644 --- a/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs +++ b/src/Database/PostgreSQL/Simple/Internal/PQResultUtils.hs @@ -37,14 +37,14 @@ import Control.Monad.Trans.Reader import Control.Monad.Trans.State.Strict finishQueryWith :: RowParser r -> Connection -> Query -> PQ.Result -> IO [r] -finishQueryWith parser conn q result = finishQueryWith' conn q result $ do +finishQueryWith parser conn q result = finishQueryWith' q result $ do nrows <- PQ.ntuples result ncols <- PQ.nfields result forM' 0 (nrows-1) $ \row -> getRowWith parser row ncols conn result finishQueryWithV :: RowParser r -> Connection -> Query -> PQ.Result -> IO (V.Vector r) -finishQueryWithV parser conn q result = finishQueryWith' conn q result $ do +finishQueryWithV parser conn q result = finishQueryWith' q result $ do nrows <- PQ.ntuples result let PQ.Row nrows' = nrows ncols <- PQ.nfields result @@ -56,7 +56,7 @@ finishQueryWithV parser conn q result = finishQueryWith' conn q result $ do V.unsafeFreeze mv finishQueryWithVU :: VU.Unbox r => RowParser r -> Connection -> Query -> PQ.Result -> IO (VU.Vector r) -finishQueryWithVU parser conn q result = finishQueryWith' conn q result $ do +finishQueryWithVU parser conn q result = finishQueryWith' q result $ do nrows <- PQ.ntuples result let PQ.Row nrows' = nrows ncols <- PQ.nfields result @@ -67,8 +67,8 @@ finishQueryWithVU parser conn q result = finishQueryWith' conn q result $ do MVU.unsafeWrite mv (fromIntegral row') value VU.unsafeFreeze mv -finishQueryWith' :: Connection -> Query -> PQ.Result -> IO a -> IO a -finishQueryWith' conn q result k = do +finishQueryWith' :: Query -> PQ.Result -> IO a -> IO a +finishQueryWith' q result k = do status <- PQ.resultStatus result case status of PQ.TuplesOk -> k @@ -82,9 +82,9 @@ finishQueryWith' conn q result k = do #if MIN_VERSION_postgresql_libpq(0,9,2) PQ.SingleTuple -> queryErr "query: single-row mode is not supported" #endif - PQ.BadResponse -> throwResultError "query" conn result status - PQ.NonfatalError -> throwResultError "query" conn result status - PQ.FatalError -> throwResultError "query" conn result status + PQ.BadResponse -> throwResultError "query" result status + PQ.NonfatalError -> throwResultError "query" result status + PQ.FatalError -> throwResultError "query" result status where queryErr msg = throwIO $ QueryError msg q diff --git a/test/Main.hs b/test/Main.hs index 8c06ec1..529a201 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -91,7 +91,6 @@ tests env = testGroup "tests" , testCase "Orphaned running query state mgmt" . testOrphanedRunningQueryStateMgmt , testCase "Async exceptions" . testAsyncExceptionFailure , testCase "Query canceled" . testCanceledQueryExceptions - , testCase "Connection terminated" . testConnectionTerminated ] testBytea :: TestEnv -> TestTree @@ -664,24 +663,6 @@ testCanceledQueryExceptions TestEnv{..} = do [ Only (cPidAgain :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" cPid @?= cPidAgain --- | Ensures that a specific type of exception is thrown when --- the connection is terminated abruptly. -testConnectionTerminated :: TestEnv -> Assertion -testConnectionTerminated TestEnv{..} = do - withConn $ \c1 -> withConn $ \c2 -> do - [ Only (c1Pid :: Int) ] <- query_ c1 "SELECT pg_backend_pid()" - withAsync (execute_ c1 "SELECT pg_sleep(5)") $ \pgSleep -> do - -- We need to give it enough time to start executing the query - -- before terminating it. One second should be more than enough - threadDelay (1000 * 1000) - cancelResult <- query c2 "SELECT pg_terminate_backend(?)" (Only c1Pid) - cancelResult @?= [ Only True ] - killedQuery <- try $ wait pgSleep - assertBool "Connection was terminated" $ case killedQuery of - Right _ -> False - Left (ex :: SqlError) -> ("server closed the connection unexpectedly" `isInfixOf` show (sqlErrorMsg ex)) - && sqlExecStatus ex == FatalError - testGeneric1 :: TestEnv -> Assertion testGeneric1 TestEnv{..} = do roundTrip conn (Gen1 123)