From 6338341770af974c25354bb6ff45de7504d7094c Mon Sep 17 00:00:00 2001 From: rakeshkky Date: Fri, 8 Mar 2019 19:36:41 +0530 Subject: [PATCH] improve schema sync logic to support latest pg-client-hs lib --- server/src-exec/Main.hs | 30 +-- server/src-lib/Hasura/Server/App.hs | 4 +- server/src-lib/Hasura/Server/Init.hs | 9 +- server/src-lib/Hasura/Server/Query.hs | 19 +- server/src-lib/Hasura/Server/SchemaUpdate.hs | 182 ++++++++++++------- server/stack.yaml | 2 +- 6 files changed, 140 insertions(+), 106 deletions(-) diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index 7d2ab5a1448c6..ba8cb201da8f2 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -11,7 +11,6 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as C -import qualified Control.Concurrent.STM as STM import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL @@ -100,7 +99,7 @@ printYaml :: (A.ToJSON a) => a -> IO () printYaml = BC.putStrLn . Y.encode mkPGLogger :: Logger -> Q.PGLogger -mkPGLogger (Logger logger) msg = +mkPGLogger (Logger logger) (Q.PLERetryMsg msg) = logger $ PGLog LevelWarn msg main :: IO () @@ -128,31 +127,21 @@ main = do -- log postgres connection info unLogger logger $ connInfoToLog ci - -- create empty cache update events queue - eventsQueue <- STM.newTQueueIO - pool <- Q.initPGPool ci cp pgLogger - -- start postgres cache update events listener thread in background - listenerTId <- C.forkIO $ schemaUpdateEventListener pool logger eventsQueue - unLogger logger $ mkThreadLog listenerTId instanceId TTListener - -- safe init catalog initRes <- initialise logger ci httpManager -- prepare event triggers data prepareEvents logger ci - (app, cacheRef, cacheBuiltTime) <- + (app, cacheRef, cacheInitTime) <- mkWaiApp isoL loggerCtx strfyNum pool httpManager am corsCfg enableConsole enableTelemetry instanceId enabledAPIs - let scRef = _scrCache cacheRef - - -- start cache update events processor thread in background - procTId <- C.forkIO $ schemaUpdateEventProcessor strfyNum pool logger httpManager - eventsQueue cacheRef instanceId cacheBuiltTime - unLogger logger $ mkThreadLog procTId instanceId TTProcessor + -- start a background thread for schema sync + void $ C.forkIO $ startSchemaSync strfyNum pool logger httpManager + cacheRef instanceId cacheInitTime let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings @@ -162,6 +151,7 @@ main = do eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec + let scRef = _scrCache cacheRef unLogger logger $ mkGenericStrLog "event_triggers" "starting workers" void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx @@ -235,14 +225,6 @@ main = do res <- runTx pgLogger ci unlockAllEvents either printErrJExit return res - mkThreadLog threadId instanceId threadType = - let msg = T.pack (show threadType) <> " thread started" - in StartupLog LevelInfo "threads" $ - A.object [ "instance_id" A..= getInstanceId instanceId - , "thread_id" A..= show threadId - , "message" A..= msg - ] - getUniqIds pgLogger ci = do eDbId <- runTx pgLogger ci getDbId dbId <- either printErrJExit return eDbId diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index a34c2b6db608e..b93073a01ed9c 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -314,10 +314,10 @@ mkWaiApp isoLevel loggerCtx strfyNum pool httpManager mode corsCfg pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo httpManager strfyNum pool Q.Serializable $ do buildSchemaCache - fetchLastUpdateTime + liftTx fetchLastUpdate (time, sc) <- either initErrExit return pgResp scRef <- newIORef sc - return (scRef, time) + return (scRef, snd <$> time) cacheLock <- newMVar () diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index eb77a950118d0..183478914a03c 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -23,14 +23,11 @@ import Hasura.Server.Utils import Network.Wai.Handler.Warp newtype InstanceId - = InstanceId {getInstanceId :: UUID.UUID} - deriving (Show, Eq) - -instanceIdToTxt :: InstanceId -> T.Text -instanceIdToTxt = UUID.toText . getInstanceId + = InstanceId {getInstanceId :: T.Text} + deriving (Show, Eq, J.ToJSON, J.FromJSON) mkInstanceId :: IO InstanceId -mkInstanceId = InstanceId <$> UUID.nextRandom +mkInstanceId = (InstanceId . UUID.toText) <$> UUID.nextRandom initErrExit :: (Show e) => e -> IO a initErrExit e = print e >> exitFailure diff --git a/server/src-lib/Hasura/Server/Query.hs b/server/src-lib/Hasura/Server/Query.hs index 5bd1f2959dcdb..66b22b7f12fb9 100644 --- a/server/src-lib/Hasura/Server/Query.hs +++ b/server/src-lib/Hasura/Server/Query.hs @@ -30,8 +30,7 @@ import Hasura.RQL.DML.Returning (encodeJSONVector) import Hasura.RQL.DML.Select import Hasura.RQL.DML.Update import Hasura.RQL.Types -import Hasura.Server.Init (InstanceId (..), - instanceIdToTxt) +import Hasura.Server.Init (InstanceId (..)) import Hasura.Server.Utils import qualified Database.PG.Query as Q @@ -119,16 +118,18 @@ instance HasHttpManager Run where instance HasSQLGenCtx Run where askSQLGenCtx = asks _3 -fetchLastUpdateTime :: Run (Maybe UTCTime) -fetchLastUpdateTime = do - l <- liftTx $ Q.listQE defaultTxErrorHandler +fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime)) +fetchLastUpdate = do + l <- Q.listQE defaultTxErrorHandler [Q.sql| - SELECT occurred_at FROM hdb_catalog.hdb_schema_update_event - ORDER BY id DESC LIMIT 1 + SELECT instance_id::text, occurred_at + FROM hdb_catalog.hdb_schema_update_event + ORDER BY occurred_at DESC LIMIT 1 |] () True case l of [] -> return Nothing - [Identity t] -> return $ Just t + [(instId, occurredAt)] -> + return $ Just (InstanceId instId, occurredAt) -- never happens _ -> throw500 "more than one row returned by query" @@ -139,7 +140,7 @@ recordSchemaUpdate instanceId = hdb_catalog.hdb_schema_update_event (instance_id, occurred_at) VALUES ($1::uuid, DEFAULT) - |] (Identity $ instanceIdToTxt instanceId) True + |] (Identity $ getInstanceId instanceId) True peelRun :: SchemaCache diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index dddd9120ac7ea..ca9eb37f7e5fa 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -1,8 +1,5 @@ module Hasura.Server.SchemaUpdate - ( ThreadType(..) - , schemaUpdateEventListener - , schemaUpdateEventProcessor - ) + (startSchemaSync) where import Hasura.Prelude @@ -12,13 +9,15 @@ import Hasura.RQL.DDL.Schema.Table (buildSchemaCache) import Hasura.RQL.Types import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate) import Hasura.Server.Init (InstanceId (..)) +import Hasura.Server.Logging import Hasura.Server.Query import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH -import Data.UUID +import qualified Control.Concurrent as C +import qualified Control.Concurrent.Async as A import qualified Control.Concurrent.STM as STM import qualified Data.Text as T import qualified Data.Time as UTC @@ -58,20 +57,11 @@ instance ToEngineLog SchemaUpdateEventLog where data EventPayload = EventPayload - { _epInstanceId :: !UUID + { _epInstanceId :: !InstanceId , _epOccurredAt :: !UTC.UTCTime } deriving (Show, Eq) $(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) -data SchemaUpdateEvent - = SUESuccess !EventPayload - | SUEPGReConn - deriving (Show, Eq) - -instance ToJSON SchemaUpdateEvent where - toJSON (SUESuccess payload) = toJSON payload - toJSON SUEPGReConn = String "postgres reconnection" - data ThreadError = TEJsonParse !T.Text | TEQueryError !QErr @@ -81,83 +71,147 @@ $(deriveToJSON } ''ThreadError) +-- | An IO action that enables metadata syncing +startSchemaSync + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime -> IO () +startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do + -- Init events queue + eventsQueue <- STM.newTQueueIO + -- Start listener thread + lAsync <- A.async $ listener strfyNum pool + logger httpMgr eventsQueue cacheRef instanceId cacheInitTime + + -- Start processor thread + pAsync <- A.async $ processor strfyNum pool + logger httpMgr eventsQueue cacheRef instanceId + + void $ A.waitAny [lAsync, pAsync] + -- | An IO action that listens to postgres for events and pushes them to a Queue -schemaUpdateEventListener - :: PG.PGPool +listener + :: Bool + -> PG.PGPool -> Logger - -> STM.TQueue SchemaUpdateEvent - -> IO () -schemaUpdateEventListener pool logger eventsQueue = + -> HTTP.Manager + -> STM.TQueue EventPayload + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime -> IO () +listener strfyNum pool logger httpMgr eventsQueue + cacheRef instanceId cacheInitTime = do + logThreadStartup logger instanceId threadType -- Never exits forever $ do - listenResE <- liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler + listenResE <- + liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler either onError return listenResE + logWarn + C.threadDelay $ 1 * 1000 * 1000 -- 1 second where - notifyHandler = PG.NotifyHandler onReconn onMessage threadType = TTListener + shouldRefresh dbInstId accrdAt = + case cacheInitTime of + Nothing -> True + Just time -> (dbInstId /= instanceId) && accrdAt > time + + refreshCache Nothing = return () + refreshCache (Just (dbInstId, accrdAt)) = + when (shouldRefresh dbInstId accrdAt) $ + refreshSchemaCache strfyNum pool logger httpMgr cacheRef + threadType "reloading schema cache on listen start" + + notifyHandler = \case + PG.PNEOnStart -> do + eRes <- runExceptT $ PG.runTx pool + (PG.Serializable, Nothing) fetchLastUpdate + case eRes of + Left e -> onError e + Right mLastUpd -> refreshCache mLastUpd + + PG.PNEPQNotify notif -> + case eitherDecodeStrict $ PQ.notifyExtra notif of + Left e -> logError logger threadType $ TEJsonParse $ T.pack e + Right payload -> do + logInfo logger threadType $ object ["received_event" .= payload] + -- Push a notify event to Queue + STM.atomically $ STM.writeTQueue eventsQueue payload + onError = logError logger threadType . TEQueryError + logWarn = unLogger logger $ + SchemaUpdateEventLog LevelWarn TTListener $ String + "error occured retrying pg listen after 1 second" - onReconn = do - -- emit postgres reconnection event - let event = SUEPGReConn - logInfo logger threadType $ object ["received_event" .= event] - STM.atomically $ STM.writeTQueue eventsQueue event - - -- Postgres notification handler - onMessage notif = - case eitherDecodeStrict $ PQ.notifyExtra notif of - Left e -> logError logger threadType $ TEJsonParse $ T.pack e - Right payload -> do - let newEvent = SUESuccess payload - logInfo logger threadType $ object ["received_event" .= newEvent] - -- Push a success event to Queue along with event payload - STM.atomically $ STM.writeTQueue eventsQueue newEvent -- | An IO action that processes events from Queue -schemaUpdateEventProcessor +processor :: Bool -> PG.PGPool -> Logger -> HTTP.Manager - -> STM.TQueue SchemaUpdateEvent + -> STM.TQueue EventPayload -> SchemaCacheRef - -> InstanceId - -> Maybe UTC.UTCTime - -> IO () -schemaUpdateEventProcessor strfyNum pool logger httpManager - eventsQueue cacheRef instanceId cacheInit = + -> InstanceId -> IO () +processor strfyNum pool logger httpMgr eventsQueue + cacheRef instanceId = do + logThreadStartup logger instanceId threadType -- Never exits forever $ do event <- STM.atomically $ STM.readTQueue eventsQueue logInfo logger threadType $ object ["processed_event" .= event] - when (shouldReload event) $ do - -- Reload schema cache from catalog - resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $ - peelRun emptySchemaCache adminUserInfo - httpManager strfyNum pool PG.Serializable buildSchemaCache - case resE of - Left e -> logError logger threadType $ TEQueryError e - Right _ -> - logInfo logger threadType $ - object ["message" .= ("schema cache reloaded" :: T.Text)] + when (shouldReload event) $ + refreshSchemaCache strfyNum pool logger httpMgr cacheRef + threadType "schema cache reloaded" where threadType = TTProcessor - -- If postgres reconnect happens reload schema cache - shouldReload SUEPGReConn = True - -- If event is from another server and occurred after - -- init schema cache built then reload - shouldReload (SUESuccess payload) = - (_epInstanceId payload /= getInstanceId instanceId) - && maybe True (withCacheInit $ _epOccurredAt payload) cacheInit + -- If event is from another server + shouldReload payload = _epInstanceId payload /= instanceId - withCacheInit occurredAt initTime = occurredAt > initTime +logThreadStartup + :: Show a + => Logger + -> InstanceId + -> a -> IO () +logThreadStartup logger instanceId threadType = + unLogger logger threadLog + where + threadLog = + let msg = T.pack (show threadType) <> " thread started" + in StartupLog LevelInfo "threads" $ + object [ "instance_id" .= getInstanceId instanceId + , "message" .= msg + ] + +refreshSchemaCache + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> SchemaCacheRef + -> ThreadType + -> T.Text -> IO () +refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do + -- Reload schema cache from catalog + resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $ + peelRun emptySchemaCache adminUserInfo + httpManager strfyNum pool PG.Serializable buildSchemaCache + case resE of + Left e -> logError logger threadType $ TEQueryError e + Right _ -> + logInfo logger threadType $ object ["message" .= msg] logInfo :: Logger -> ThreadType -> Value -> IO () logInfo logger threadType val = unLogger logger $ SchemaUpdateEventLog LevelInfo threadType val logError :: ToJSON a => Logger -> ThreadType -> a -> IO () -logError logger threadType err = unLogger logger $ - SchemaUpdateEventLog LevelError threadType $ object ["error" .= toJSON err] +logError logger threadType err = + unLogger logger $ SchemaUpdateEventLog LevelError threadType $ + object ["error" .= toJSON err] diff --git a/server/stack.yaml b/server/stack.yaml index d74b8c16d3668..d25e2d1db5532 100644 --- a/server/stack.yaml +++ b/server/stack.yaml @@ -17,7 +17,7 @@ packages: extra-deps: # use https URLs so that build systems can clone these repos - git: https://github.com/rakeshkky/pg-client-hs.git - commit: bdeb884279100d0417bb697e0e05017cc7c320df + commit: ed3dcfb864a2a23ac6c22ed947a5095b0d03170d - git: https://github.com/hasura/graphql-parser-hs.git commit: ff95d9a96aa5ef9e5390f8712958e4118e3831f6 - ginger-0.8.1.0