Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #5 from rakeshkky/pg-notify
Browse files Browse the repository at this point in the history
allow subscribing to postgres notifications.
  • Loading branch information
0x777 authored Feb 6, 2019
2 parents f3d1e9e + 9d8a014 commit 2d0e9f4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pg-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ library
, Database.PG.Query.Connection
, Database.PG.Query.Transaction
, Database.PG.Query.Pool
, Database.PG.Query.Listen

build-depends: base >= 4.7 && < 5
, attoparsec >= 0.13
Expand All @@ -46,6 +47,7 @@ library
, scientific >= 0.3

, postgresql-libpq
, select

default-language: Haskell2010
ghc-options: -Wall
Expand Down
2 changes: 2 additions & 0 deletions src/Database/PG/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ module Database.PG.Query
, module Database.PG.Query.Pool
, module Database.PG.Query.Class
, module Database.PG.Query.Connection
, module Database.PG.Query.Listen
) where

import Database.PG.Query.Class
import Database.PG.Query.Connection (ConnInfo (..), PGConn (..),
PGConnErr (..), PrepArg,
ResultOk (..), Template)
import Database.PG.Query.Listen
import Database.PG.Query.Pool
import Database.PG.Query.Transaction
97 changes: 97 additions & 0 deletions src/Database/PG/Query/Listen.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}

-- Reference:- https://github.com/hasura/skor/blob/master/src/skor.c

module Database.PG.Query.Listen
( PGChannel(..)
, NotifyHandler
, listen
)
where

import Database.PG.Query.Connection
import Database.PG.Query.Pool
import Database.PG.Query.Transaction

import Control.Monad.Except
import Control.Monad.Trans.Control
import Data.Pool (withResource)
import Data.String

import qualified Data.Text as T
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified System.Posix.IO.Select as PS
import qualified System.Posix.IO.Select.FdSet as PS
import qualified System.Posix.IO.Select.Types as PS

newtype PGChannel
= PGChannel {getChannelTxt :: T.Text}
deriving(Show, Eq, IsString)

type NotifyHandler = PQ.Notify -> IO ()

-- | listen on given channel
listen
:: ( FromPGConnErr e
, FromPGTxErr e
, MonadError e m
, MonadIO m
, MonadBaseControl IO m
)
=> PGPool -> PGChannel -> NotifyHandler -> m ()
listen pool channel handler = catchConnErr $
withResource pool $ \pgConn -> do
-- Issue listen command
eRes <- liftIO $ runExceptT $
execMulti pgConn (mkTemplate listenCmd) $ const $ return ()
either throwTxErr return eRes
forever $ do
let conn = pgPQConn pgConn
-- Make postgres connection ready for reading
r <- liftIO $ runExceptT $ waitForReadReadiness conn
either (throwError . fromPGConnErr) return r
-- Check for input
success <- liftIO $ PQ.consumeInput conn
unless success throwConsumeFailed
liftIO $ processNotifs conn
where
listenCmd = "LISTEN " <> getChannelTxt channel <> ";"
throwTxErr =
throwError . fromPGTxErr . PGTxErr listenCmd [] False
throwConsumeFailed = throwError $ fromPGConnErr $
PGConnErr "consuming input failed from postgres connection"

processNotifs conn = do
-- Collect notification
mNotify <- PQ.notifies conn
onJust mNotify $ \n -> do
-- Apply handler on arrived notification
handler n
-- Process remaining notifications if any
processNotifs conn

waitForReadReadiness :: PQ.Connection -> ExceptT PGConnErr IO ()
waitForReadReadiness conn = do
-- Get file descriptor of underlying socket of a connection
mFd <- lift $ PQ.socket conn
onJust mFd withFd
where
-- Perform select(2) operation on file descriptor
-- to check whether it is ready for reading
withFd fd = do
selRes <- lift $ do
-- Make file descriptors set
r <- PS.fromList [fd] --read
w <- PS.empty -- write
e <- PS.empty -- exception
PS.select r w e PS.Never
case selRes of
PS.Error -> throwError $ PGConnErr "select() failed"
_ -> return ()

onJust :: Monad m => Maybe a -> (a -> m ()) -> m ()
onJust Nothing _ = return ()
onJust (Just v) act = act v
2 changes: 2 additions & 0 deletions stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ extra-deps:
- text-builder-0.6.3
- deferred-folds-0.9.7
- primitive-0.6.4.0
# for POSIX select(2)
- select-0.4.0.1
# Override default flag values for local packages and extra-deps
flags: {}

Expand Down

0 comments on commit 2d0e9f4

Please sign in to comment.