Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #69 #71

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
93 changes: 62 additions & 31 deletions src/Database/PostgreSQL/Simple/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE CPP, BangPatterns, DoAndIfThenElse, RecordWildCards #-}
{-# LANGUAGE DeriveDataTypeable, DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}

------------------------------------------------------------------------------
-- |
Expand All @@ -24,7 +25,7 @@ module Database.PostgreSQL.Simple.Internal where
import Control.Applicative
import Control.Exception
import Control.Concurrent.MVar
import Control.Monad(MonadPlus(..))
import Control.Monad(MonadPlus(..), when)
import Data.ByteString(ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
Expand Down Expand Up @@ -329,40 +330,70 @@ exec conn sql =
withConnection conn $ \h -> do
success <- PQ.sendQuery h sql
if success
then awaitResult h Nothing
then do
mfd <- PQ.socket h
case mfd of
Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec"
Just socket ->
-- Here we assume any exceptions are asynchronous, or that
-- they are not from libpq, or that if they come from libpq,
-- they don't do any of the internal libpq state "cleaning"
-- that is necessary.
(consumeUntilNotBusy h socket >> getResult h Nothing)
`catch` \(e :: SomeException) -> do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onException

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks! Should be fixed now.

cancelAndClear h socket
throw e
else throwLibPQError h "PQsendQuery failed"
where
awaitResult h mres = do
mfd <- PQ.socket h
case mfd of
Nothing -> throwIO $! fdError "Database.PostgreSQL.Simple.Internal.exec"
Just fd -> do
threadWaitRead fd
_ <- PQ.consumeInput h -- FIXME?
getResult h mres
cancelAndClear h socket = do
mcncl <- PQ.getCancel h
case mcncl of
Nothing -> pure ()
Just cncl -> do
cancelStatus <- PQ.cancel cncl
case cancelStatus of
Left _ -> PQ.errorMessage h >>= \mmsg -> throwLibPQError h ("Database.PostgreSQL.Simple.Internal.cancelAndClear: " <> fromMaybe "Unknown error" mmsg)
Right () -> do
consumeUntilNotBusy h socket
waitForNullResult h

waitForNullResult h = do
mres <- PQ.getResult h
case mres of
Nothing -> pure ()
Just _ -> waitForNullResult h

-- | Waits until results are ready to be fetched.
consumeUntilNotBusy h socket = do
-- According to https://www.postgresql.org/docs/9.6/libpq-async.html:
-- 1. The isBusy status only changes by calling PQConsumeInput
-- 2. In case of errors, "PQgetResult should be called until it returns a null pointer, to allow libpq to process the error information completely"
-- 3. Also, "A typical application using these functions will have a main loop that uses select() or poll() ... When the main loop detects input ready, it should call PQconsumeInput to read the input. It can then call PQisBusy, followed by PQgetResult if PQisBusy returns false (0)"
busy <- PQ.isBusy h
when busy $ do
threadWaitRead socket
someError <- not <$> PQ.consumeInput h
when someError $ PQ.errorMessage h >>= \mmsg -> throwLibPQError h ("Database.PostgreSQL.Simple.Internal.consumeUntilNotBusy: " <> fromMaybe "Unknown error" mmsg)
consumeUntilNotBusy h socket

getResult h mres = do
isBusy <- PQ.isBusy h
if isBusy
then awaitResult h mres
else do
mres' <- PQ.getResult h
case mres' of
Nothing -> case mres of
Nothing -> throwLibPQError h "PQgetResult returned no results"
Just res -> return res
Just res -> do
status <- PQ.resultStatus res
case status of
-- FIXME: handle PQ.CopyBoth and PQ.SingleTuple
PQ.EmptyQuery -> getResult h mres'
PQ.CommandOk -> getResult h mres'
PQ.TuplesOk -> getResult h mres'
PQ.CopyOut -> return res
PQ.CopyIn -> return res
PQ.BadResponse -> getResult h mres'
PQ.NonfatalError -> getResult h mres'
PQ.FatalError -> getResult h mres'
mres' <- PQ.getResult h
case mres' of
Nothing -> case mres of
Nothing -> throwLibPQError h "PQgetResult returned no results"
Just res -> return res
Just res -> do
status <- PQ.resultStatus res
case status of
-- FIXME: handle PQ.CopyBoth and PQ.SingleTuple
PQ.EmptyQuery -> getResult h mres'
PQ.CommandOk -> getResult h mres'
PQ.TuplesOk -> getResult h mres'
PQ.CopyOut -> return res
PQ.CopyIn -> return res
PQ.BadResponse -> getResult h mres'
PQ.NonfatalError -> getResult h mres'
PQ.FatalError -> getResult h mres'
#endif

-- | A version of 'execute' that does not perform query substitution.
Expand Down
13 changes: 13 additions & 0 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ tests env = testGroup "tests"
, testCase "2-ary generic" . testGeneric2
, testCase "3-ary generic" . testGeneric3
, testCase "Timeout" . testTimeout
, testCase "Async exceptions" . testAsyncExceptionFailure
]

testBytea :: TestEnv -> TestTree
Expand Down Expand Up @@ -533,6 +534,18 @@ testDouble TestEnv{..} = do
[Only (x :: Double)] <- query_ conn "SELECT '-Infinity'::float8"
x @?= (-1 / 0)

-- | Ensures that asynchronous excepions thrown while queries are executing
-- are handled properly.
testAsyncExceptionFailure :: TestEnv -> Assertion
testAsyncExceptionFailure TestEnv{..} = withConn $ \c -> do
-- We need to give it enough time to start executing the query
-- before timing out. One second should be more than enough
execute_ c "SET my.setting TO '42'"
tmt <- timeout (1000 * 1000) (execute_ c "SELECT pg_sleep(60)")
tmt @?= Nothing
-- Any other query should work now without errors.
number42 <- query_ c "SELECT current_setting('my.setting')"
number42 @?= [ Only ("42" :: String) ]

testGeneric1 :: TestEnv -> Assertion
testGeneric1 TestEnv{..} = do
Expand Down