Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

add postgres notify listener #5

Merged
merged 4 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pg-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ library
, Database.PG.Query.Connection
, Database.PG.Query.Transaction
, Database.PG.Query.Pool
, Database.PG.Query.Listen

build-depends: base >= 4.7 && < 5
, attoparsec >= 0.13
Expand All @@ -46,6 +47,7 @@ library
, scientific >= 0.3

, postgresql-libpq
, select

default-language: Haskell2010
ghc-options: -Wall
Expand Down
2 changes: 2 additions & 0 deletions src/Database/PG/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ module Database.PG.Query
, module Database.PG.Query.Pool
, module Database.PG.Query.Class
, module Database.PG.Query.Connection
, module Database.PG.Query.Listen
) where

import Database.PG.Query.Class
import Database.PG.Query.Connection (ConnInfo (..), PGConn (..),
PGConnErr (..), PrepArg,
ResultOk (..), Template)
import Database.PG.Query.Listen
import Database.PG.Query.Pool
import Database.PG.Query.Transaction
97 changes: 97 additions & 0 deletions src/Database/PG/Query/Listen.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}

-- Reference:- https://github.com/hasura/skor/blob/master/src/skor.c

module Database.PG.Query.Listen
( PGChannel(..)
, NotifyHandler
, listen
)
where

import Database.PG.Query.Connection
import Database.PG.Query.Pool
import Database.PG.Query.Transaction

import Control.Monad.Except
import Control.Monad.Trans.Control
import Data.Pool (withResource)
import Data.String

import qualified Data.Text as T
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified System.Posix.IO.Select as PS
import qualified System.Posix.IO.Select.FdSet as PS
import qualified System.Posix.IO.Select.Types as PS

newtype PGChannel
= PGChannel {getChannelTxt :: T.Text}
deriving(Show, Eq, IsString)

type NotifyHandler = PQ.Notify -> IO ()

-- | listen on given channel
listen
:: ( FromPGConnErr e
, FromPGTxErr e
, MonadError e m
, MonadIO m
, MonadBaseControl IO m
)
=> PGPool -> PGChannel -> NotifyHandler -> m ()
listen pool channel handler = catchConnErr $
withResource pool $ \pgConn -> do
-- Issue listen command
eRes <- liftIO $ runExceptT $
execMulti pgConn (mkTemplate listenCmd) $ const $ return ()
either throwTxErr return eRes
forever $ do
let conn = pgPQConn pgConn
-- Make postgres connection ready for reading
r <- liftIO $ runExceptT $ waitForReadReadiness conn
either (throwError . fromPGConnErr) return r
-- Check for input
success <- liftIO $ PQ.consumeInput conn
unless success throwConsumeFailed
liftIO $ processNotifs conn
where
listenCmd = "LISTEN " <> getChannelTxt channel <> ";"
throwTxErr =
throwError . fromPGTxErr . PGTxErr listenCmd [] False
throwConsumeFailed = throwError $ fromPGConnErr $
PGConnErr "consuming input failed from postgres connection"

processNotifs conn = do
-- Collect notification
mNotify <- PQ.notifies conn
onJust mNotify $ \n -> do
-- Apply handler on arrived notification
handler n
-- Process remaining notifications if any
processNotifs conn

waitForReadReadiness :: PQ.Connection -> ExceptT PGConnErr IO ()
waitForReadReadiness conn = do
-- Get file descriptor of underlying socket of a connection
mFd <- lift $ PQ.socket conn
onJust mFd withFd
where
-- Perform select(2) operation on file descriptor
-- to check whether it is ready for reading
withFd fd = do
selRes <- lift $ do
-- Make file descriptors set
r <- PS.fromList [fd] --read
w <- PS.empty -- write
e <- PS.empty -- exception
PS.select r w e PS.Never
case selRes of
PS.Error -> throwError $ PGConnErr "select() failed"
_ -> return ()

onJust :: Monad m => Maybe a -> (a -> m ()) -> m ()
onJust Nothing _ = return ()
onJust (Just v) act = act v
2 changes: 2 additions & 0 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ extra-deps:
- text-builder-0.6.3
- deferred-folds-0.9.7
- primitive-0.6.4.0
# for POSIX select(2)
- select-0.4.0.1
# Override default flag values for local packages and extra-deps
flags: {}

Expand Down