From a79a6cc5ecf6c46b18857ca05dc0a26b5dbe5a47 Mon Sep 17 00:00:00 2001 From: rakeshkky Date: Thu, 7 Feb 2019 21:50:32 +0530 Subject: [PATCH 01/20] manage schema cache when horizontally scaled, closes #1182 --- server/graphql-engine.cabal | 2 + server/src-exec/Main.hs | 36 +++- server/src-exec/Ops.hs | 15 +- server/src-lib/Hasura/Server/App.hs | 29 +++- server/src-lib/Hasura/Server/CacheUpdate.hs | 178 ++++++++++++++++++++ server/src-lib/Hasura/Server/Query.hs | 23 ++- server/src-rsr/initialise.sql | 29 ++++ server/src-rsr/migrate_from_9_to_10.sql | 26 +++ server/stack.yaml | 5 +- 9 files changed, 325 insertions(+), 18 deletions(-) create mode 100644 server/src-lib/Hasura/Server/CacheUpdate.hs create mode 100644 server/src-rsr/migrate_from_9_to_10.sql diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 77deaab336a73..da06876e38829 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -149,6 +149,7 @@ library , Hasura.Server.Version , Hasura.Server.CheckUpdates , Hasura.Server.Telemetry + , Hasura.Server.CacheUpdate , Hasura.RQL.Types , Hasura.RQL.Instances , Hasura.RQL.Types.SchemaCache @@ -316,6 +317,7 @@ executable graphql-engine , wreq , connection , string-conversions + , uuid other-modules: Ops diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index b834b9bf83933..d4dd91beab566 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -8,28 +8,32 @@ import Options.Applicative import System.Environment (getEnvironment, lookupEnv) import System.Exit (exitFailure) + import qualified Control.Concurrent as C +import qualified Control.Concurrent.STM as STM import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy.Char8 as BLC import qualified Data.Text as T +import qualified Data.UUID.V4 as UUID import qualified Data.Yaml as Y import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client.TLS as HTTP import qualified Network.Wai.Handler.Warp as Warp import Hasura.Events.Lib -import Hasura.Logging (Logger (..), defaultLoggerSettings, - mkLogger, mkLoggerCtx) +import Hasura.Logging import Hasura.Prelude import Hasura.RQL.DDL.Metadata (fetchMetadata) import Hasura.RQL.Types (QErr, adminUserInfo, emptySchemaCache) import Hasura.Server.App (mkWaiApp) import Hasura.Server.Auth +import Hasura.Server.CacheUpdate import Hasura.Server.CheckUpdates (checkForUpdates) import Hasura.Server.Init +import Hasura.Server.Logging import Hasura.Server.Query (peelRun) import Hasura.Server.Telemetry import Hasura.Server.Version (currentVersion) @@ -100,6 +104,7 @@ main = do -- global http manager httpManager <- HTTP.newManager HTTP.tlsManagerSettings loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True + serverId <- UUID.nextRandom let logger = mkLogger loggerCtx case hgeCmd of HCServe so@(ServeOptions port host cp isoL mAccessKey mAuthHook mJwtSecret @@ -117,6 +122,14 @@ main = do -- log postgres connection info unLogger logger $ connInfoToLog ci + -- create empty cache update events queue + eventsQueue <- STM.newTQueueIO + + -- start postgres cache update events listener thread in background + listenerPool <- getMinimalPool ci + listenerTId <- C.forkIO $ cacheUpdateEventListener listenerPool logger eventsQueue + unLogger logger $ mkThreadLog listenerTId serverId TTListener + -- safe init catalog initRes <- initialise logger ci httpManager @@ -124,8 +137,15 @@ main = do prepareEvents logger ci pool <- Q.initPGPool ci cp - (app, cacheRef) <- mkWaiApp isoL loggerCtx pool httpManager - am corsCfg enableConsole enableTelemetry + + (app, cacheRef, cacheLk, cacheBuiltTime) <- + mkWaiApp isoL loggerCtx pool httpManager am corsCfg enableConsole + enableTelemetry serverId + + -- start cache update events processor thread in background + procTId <- C.forkIO $ cacheUpdateEventProcessor pool logger httpManager + eventsQueue cacheRef cacheLk serverId cacheBuiltTime + unLogger logger $ mkThreadLog procTId serverId TTProcessor let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings @@ -209,6 +229,14 @@ main = do res <- runTx ci unlockAllEvents either printErrJExit return res + mkThreadLog threadId serverId threadType = + let msg = T.pack (show threadType) <> " thread started" + in StartupLog LevelInfo "threads" $ + A.object [ "server_id" A..= serverId + , "thread_id" A..= show threadId + , "message" A..= msg + ] + getUniqIds ci = do eDbId <- runTx ci getDbId dbId <- either printErrJExit return eDbId diff --git a/server/src-exec/Ops.hs b/server/src-exec/Ops.hs index 273d5901a38ed..ae1fd46cd0ab7 100644 --- a/server/src-exec/Ops.hs +++ b/server/src-exec/Ops.hs @@ -23,7 +23,7 @@ import qualified Database.PG.Query as Q import qualified Database.PG.Query.Connection as Q curCatalogVer :: T.Text -curCatalogVer = "9" +curCatalogVer = "10" initCatalogSafe :: (QErrM m, UserInfoM m, CacheRWM m, MonadTx m, MonadIO m, HasHttpManager m) @@ -318,6 +318,12 @@ from8To9 = do migrateMetadataFrom8 = $(unTypeQ (Y.decodeFile "src-rsr/migrate_metadata_from_8_to_9.yaml" :: Q (TExp RQLQuery))) +from9To10 :: MonadTx m => m () +from9To10 = do + -- migrate database + Q.Discard () <- liftTx $ Q.multiQE defaultTxErrorHandler + $(Q.sqlFromFile "src-rsr/migrate_from_9_to_10.sql") + return () migrateCatalog :: (MonadTx m, CacheRWM m, MonadIO m, UserInfoM m, HasHttpManager m) @@ -335,12 +341,17 @@ migrateCatalog migrationTime = do | preVer == "6" -> from6ToCurrent | preVer == "7" -> from7ToCurrent | preVer == "8" -> from8ToCurrent + | preVer == "9" -> from9ToCurrent | otherwise -> throw400 NotSupported $ "unsupported version : " <> preVer where + from9ToCurrent = do + from9To10 + postMigrate + from8ToCurrent = do from8To9 - postMigrate + from9ToCurrent from7ToCurrent = do from7To8 diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index bdc17ee636c25..9d12c74ab5485 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -10,6 +10,7 @@ import Data.Aeson hiding (json) import Data.IORef import Data.Time.Clock (UTCTime, getCurrentTime) +import Data.UUID (UUID) import Network.Wai (requestHeaders, strictRequestBody) import Web.Spock.Core @@ -96,6 +97,7 @@ data ServerCtx , scCacheLock :: MVar () , scAuthMode :: AuthMode , scManager :: HTTP.Manager + , scServerId :: UUID } data HandlerCtx @@ -214,7 +216,8 @@ v1QueryHandler query = do httpMgr <- scManager . hcServerCtx <$> ask pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask - runQuery pool isoL userInfo schemaCache httpMgr query + serverId <- scServerId . hcServerCtx <$> ask + runQuery pool isoL serverId userInfo schemaCache httpMgr query -- Also update the schema cache dbActionReload = do @@ -289,18 +292,24 @@ mkWaiApp -> CorsConfig -> Bool -> Bool - -> IO (Wai.Application, IORef SchemaCache) -mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg enableConsole enableTelemetry = do - cacheRef <- do + -> UUID + -> IO (Wai.Application, IORef SchemaCache, MVar (), UTCTime) +mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg + enableConsole enableTelemetry serverId = do + (cacheRef, cacheBuiltTime) <- do pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo - httpManager pool Q.Serializable buildSchemaCache - either initErrExit return pgResp >>= newIORef . snd + httpManager pool Q.Serializable $ do + buildSchemaCache + liftIO getCurrentTime + (time, sc) <- either initErrExit return pgResp + scRef <- newIORef sc + return (scRef, time) cacheLock <- newMVar () let serverCtx = ServerCtx isoLevel pool (L.mkLogger loggerCtx) cacheRef - cacheLock mode httpManager + cacheLock mode httpManager serverId spockApp <- spockAsApp $ spockT id $ httpApp corsCfg serverCtx enableConsole enableTelemetry @@ -309,7 +318,11 @@ mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg enableConsole enableTe wsServerEnv <- WS.createWSServerEnv (scLogger serverCtx) httpManager cacheRef runTx let wsServerApp = WS.createWSServerApp mode wsServerEnv - return (WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp, cacheRef) + return ( WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp + , cacheRef + , cacheLock + , cacheBuiltTime + ) httpApp :: CorsConfig -> ServerCtx -> Bool -> Bool -> SpockT IO () httpApp corsCfg serverCtx enableConsole enableTelemetry = do diff --git a/server/src-lib/Hasura/Server/CacheUpdate.hs b/server/src-lib/Hasura/Server/CacheUpdate.hs new file mode 100644 index 0000000000000..70253a79e0df9 --- /dev/null +++ b/server/src-lib/Hasura/Server/CacheUpdate.hs @@ -0,0 +1,178 @@ +module Hasura.Server.CacheUpdate + ( ThreadType(..) + , cacheUpdateEventListener + , cacheUpdateEventProcessor + ) +where + +import Hasura.Prelude + +import Hasura.Logging +import Hasura.RQL.DDL.Schema.Table (buildSchemaCache) +import Hasura.RQL.Types +import Hasura.Server.App (withLock) +import Hasura.Server.Query +import Hasura.Server.Utils (bsToTxt) + +import Control.Concurrent.MVar +import Data.Aeson +import Data.Aeson.Casing +import Data.Aeson.TH +import Data.UUID +import Database.PG.Query + +import qualified Control.Concurrent as C +import qualified Control.Concurrent.STM as STM +import qualified Data.IORef as IORef +import qualified Data.Text as T +import qualified Data.Time as UTC +import qualified Database.PostgreSQL.LibPQ as PQ +import qualified Network.HTTP.Client as HTTP + +pgChannel :: PGChannel +pgChannel = "hasura_cache_update" + +data ThreadType + = TTListener + | TTProcessor + deriving (Eq) + +instance Show ThreadType where + show TTListener = "listener" + show TTProcessor = "processor" + + +data CacheUpdateEventLog + = CacheUpdateEventLog + { cuelLogLevel :: !LogLevel + , cuelThreadType :: !ThreadType + , cuelInfo :: !Value + } deriving (Show, Eq) + +instance ToJSON CacheUpdateEventLog where + toJSON (CacheUpdateEventLog _ t info) = + object [ "thread_type" .= show t + , "info" .= info + ] + +instance ToEngineLog CacheUpdateEventLog where + toEngineLog threadLog = + (cuelLogLevel threadLog, "cache_update_event", toJSON threadLog) + +data EventPayload + = EventPayload + { _epServerId :: !UUID + , _epOccurredAt :: !UTC.UTCTime + } deriving (Show, Eq) +$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) + +data CacheUpdateEvent + = CUEListenSuccess !EventPayload + | CUEListenFail + deriving (Show, Eq) + +instance ToJSON CacheUpdateEvent where + toJSON (CUEListenSuccess payload) = toJSON payload + toJSON CUEListenFail = String "event listening failed" + +data ThreadError + = TEJsonParse !T.Text + | TEQueryError !QErr +$(deriveToJSON + defaultOptions { constructorTagModifier = snakeCase . drop 2 + , sumEncoding = TaggedObject "type" "info" + } + ''ThreadError) + +-- | An IO action that listens to postgres for events and pushes them to a Queue +cacheUpdateEventListener + :: PGPool + -> Logger + -> STM.TQueue CacheUpdateEvent + -> IO () +cacheUpdateEventListener pool logger eventsQueue = + -- Never exits + forever $ do + listenResE <- liftIO $ runExceptT $ listen pool pgChannel notifyHandler + either onError return listenResE + where + threadType = TTListener + + onError e = do + logError logger threadType $ TEQueryError e + -- Push a listen failed event to queue + STM.atomically $ STM.writeTQueue eventsQueue CUEListenFail + logInfo logger threadType $ + object ["message" .= ("retrying in 10 seconds" :: T.Text)] + C.threadDelay $ 10 * 1000 * 1000 + + -- Postgres notification handler + notifyHandler notif = do + let eventChannel = PGChannel $ bsToTxt $ PQ.notifyRelname notif + when (eventChannel == pgChannel) $ + case eitherDecodeStrict $ PQ.notifyExtra notif of + Left e -> logError logger threadType $ TEJsonParse $ T.pack e + Right payload -> do + let newEvent = CUEListenSuccess payload + logInfo logger threadType $ object [ "received_event" .= newEvent] + -- Push a success event to Queue along with event payload + STM.atomically $ STM.writeTQueue eventsQueue newEvent + +-- | An IO action that processes events from Queue +cacheUpdateEventProcessor + :: PGPool + -> Logger + -> HTTP.Manager + -> STM.TQueue CacheUpdateEvent + -> IORef.IORef SchemaCache + -> MVar () + -> UUID + -> UTC.UTCTime + -> IO () +cacheUpdateEventProcessor pool logger httpManager eventsQueue + cacheRef lk serverId cacheInit = do + -- Initiate previous event IO reference + prevEventRef <- IORef.newIORef Nothing + + -- Never exits + forever $ do + event <- STM.atomically $ STM.readTQueue eventsQueue + prevEvent <- IORef.readIORef prevEventRef + logInfo logger threadType $ + object [ "previous_event" .= prevEvent + , "processed_event" .= event + ] + when (shouldReload prevEvent event) $ do + -- Build schema cache from catalog + scE <- liftIO $ runExceptT $ withLock lk $ + snd <$> peelRun emptySchemaCache adminUserInfo + httpManager pool Serializable buildSchemaCache + case scE of + Left e -> logError logger threadType $ TEQueryError e + Right sc -> do + -- Updata cache reference with newly built cache + IORef.writeIORef cacheRef sc + logInfo logger threadType $ + object ["message" .= ("schema cache reloaded" :: T.Text)] + + IORef.writeIORef prevEventRef $ Just event + where + threadType = TTProcessor + + -- If previous event is failed and current event is success, + -- reload irrespective current event payload + shouldReload (Just CUEListenFail) (CUEListenSuccess _) = True + -- If current event is failed, do not reload + shouldReload _ CUEListenFail = False + shouldReload _ (CUEListenSuccess payload) = + -- If event is from another server and occurred after + -- init schema cache built then reload + (_epServerId payload /= serverId) && (_epOccurredAt payload > cacheInit) + +logInfo :: Logger -> ThreadType -> Value -> IO () +logInfo logger threadType val = unLogger logger $ + CacheUpdateEventLog LevelInfo threadType val + +logError :: ToJSON a => Logger -> ThreadType -> a -> IO () +logError logger threadType err = unLogger logger $ + CacheUpdateEventLog LevelError threadType $ object ["error" .= toJSON err] diff --git a/server/src-lib/Hasura/Server/Query.hs b/server/src-lib/Hasura/Server/Query.hs index 45e2cb0691e60..f0a1f470aecef 100644 --- a/server/src-lib/Hasura/Server/Query.hs +++ b/server/src-lib/Hasura/Server/Query.hs @@ -3,6 +3,8 @@ module Hasura.Server.Query where import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH +import Data.Time (getCurrentTime) +import Data.UUID import Language.Haskell.TH.Syntax (Lift) import qualified Data.ByteString.Builder as BB @@ -10,6 +12,7 @@ import qualified Data.ByteString.Lazy as BL import qualified Data.Vector as V import qualified Network.HTTP.Client as HTTP + import Hasura.Prelude import Hasura.RQL.DDL.Metadata import Hasura.RQL.DDL.Permission @@ -109,6 +112,16 @@ instance UserInfoM Run where instance HasHttpManager Run where askHttpManager = asks snd +recordCacheUpdate :: UUID -> Run () +recordCacheUpdate serverId = do + currTime <- liftIO getCurrentTime + liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql| + INSERT INTO + hdb_catalog.hdb_cache_update_event + (server_id, occurred_at) + VALUES ($1::uuid, $2) + |] (toText serverId, currTime) True + peelRun :: SchemaCache -> UserInfo @@ -122,12 +135,16 @@ peelRun sc userInfo httMgr pgPool txIso (Run m) = runQuery :: (MonadIO m, MonadError QErr m) - => Q.PGPool -> Q.TxIsolation + => Q.PGPool -> Q.TxIsolation -> UUID -> UserInfo -> SchemaCache -> HTTP.Manager -> RQLQuery -> m (BL.ByteString, SchemaCache) -runQuery pool isoL userInfo sc hMgr query = do +runQuery pool isoL serverId userInfo sc hMgr query = do res <- liftIO $ runExceptT $ - peelRun sc userInfo hMgr pool isoL $ runQueryM query + peelRun sc userInfo hMgr pool isoL $ do + r <- runQueryM query + when (queryNeedsReload query) $ + recordCacheUpdate serverId + return r liftEither res queryNeedsReload :: RQLQuery -> Bool diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 9bcfe13412467..744de0f219fab 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -403,3 +403,32 @@ CREATE TABLE hdb_catalog.remote_schemas ( definition JSON, comment TEXT ); + +CREATE TABLE hdb_catalog.hdb_cache_update_event ( + id BIGSERIAL PRIMARY KEY, + server_id uuid NOT NULL, + occurred_at timestamptz NOT NULL +); + +CREATE FUNCTION hdb_catalog.hdb_cache_update_event_notifier() RETURNS trigger AS +$function$ + DECLARE + server_id uuid; + occurred_at timestamptz; + curr_rec record; + BEGIN + server_id = NEW.server_id; + occurred_at = NEW.occurred_at; + PERFORM pg_notify('hasura_cache_update', json_build_object( + 'server_id', server_id, + 'occurred_at', occurred_at + )::text); + RETURN curr_rec; + END; +$function$ +LANGUAGE plpgsql; + +CREATE TRIGGER hdb_cache_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_cache_update_event + FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_cache_update_event_notifier(); + + diff --git a/server/src-rsr/migrate_from_9_to_10.sql b/server/src-rsr/migrate_from_9_to_10.sql new file mode 100644 index 0000000000000..ebc9ac8140e1e --- /dev/null +++ b/server/src-rsr/migrate_from_9_to_10.sql @@ -0,0 +1,26 @@ +CREATE TABLE hdb_catalog.hdb_cache_update_event ( + id BIGSERIAL PRIMARY KEY, + server_id uuid NOT NULL, + occurred_at timestamptz NOT NULL +); + +CREATE FUNCTION hdb_catalog.hdb_cache_update_event_notifier() RETURNS trigger AS +$function$ + DECLARE + server_id uuid; + occurred_at timestamptz; + curr_rec record; + BEGIN + server_id = NEW.server_id; + occurred_at = NEW.occurred_at; + PERFORM pg_notify('hasura_cache_update', json_build_object( + 'server_id', server_id, + 'occurred_at', occurred_at + )::text); + RETURN curr_rec; + END; +$function$ +LANGUAGE plpgsql; + +CREATE TRIGGER hdb_cache_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_cache_update_event + FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_cache_update_event_notifier(); diff --git a/server/stack.yaml b/server/stack.yaml index 7e275c8a067e7..b10f7a73224dd 100644 --- a/server/stack.yaml +++ b/server/stack.yaml @@ -17,7 +17,7 @@ packages: extra-deps: # use https URLs so that build systems can clone these repos - git: https://github.com/hasura/pg-client-hs.git - commit: f3d1e9e67bdfbfa3de85b7cbdb4c557dce7fd84d + commit: 2d0e9f4c4ff0ba26bf4925db241d92b5f58621c3 - git: https://github.com/hasura/graphql-parser-hs.git commit: ff95d9a96aa5ef9e5390f8712958e4118e3831f6 - ginger-0.8.1.0 @@ -27,6 +27,9 @@ extra-deps: - deferred-folds-0.9.9 - primitive-0.6.4.0 +# extra dep for pg-client-hs +- select-0.4.0.1 + # jose for the x5t bugfix (0.8.0.0) - git: https://github.com/frasertweedale/hs-jose.git commit: d47572fb0650ac6cc2c9e00711c7f99132d897cb From 4539bdcce905d2df56ff65beaa97baa8111e8d85 Mon Sep 17 00:00:00 2001 From: rakeshkky Date: Thu, 21 Feb 2019 13:20:00 +0530 Subject: [PATCH 02/20] support pg-client-hs library with retries & improve naming --- server/graphql-engine.cabal | 2 +- server/src-exec/Main.hs | 78 ++++---- server/src-lib/Hasura/Server/App.hs | 97 +++++----- server/src-lib/Hasura/Server/CacheUpdate.hs | 178 ------------------- server/src-lib/Hasura/Server/Init.hs | 52 ++++-- server/src-lib/Hasura/Server/Logging.hs | 15 ++ server/src-lib/Hasura/Server/Query.hs | 52 ++++-- server/src-lib/Hasura/Server/SchemaUpdate.hs | 163 +++++++++++++++++ server/src-rsr/initialise.sql | 20 +-- server/src-rsr/migrate_from_9_to_10.sql | 20 +-- server/stack.yaml | 4 +- 11 files changed, 366 insertions(+), 315 deletions(-) delete mode 100644 server/src-lib/Hasura/Server/CacheUpdate.hs create mode 100644 server/src-lib/Hasura/Server/SchemaUpdate.hs diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index da06876e38829..95289415b528c 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -149,7 +149,7 @@ library , Hasura.Server.Version , Hasura.Server.CheckUpdates , Hasura.Server.Telemetry - , Hasura.Server.CacheUpdate + , Hasura.Server.SchemaUpdate , Hasura.RQL.Types , Hasura.RQL.Instances , Hasura.RQL.Types.SchemaCache diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index d4dd91beab566..eca4c17e6fb8a 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -16,7 +16,6 @@ import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy.Char8 as BLC import qualified Data.Text as T -import qualified Data.UUID.V4 as UUID import qualified Data.Yaml as Y import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client.TLS as HTTP @@ -26,15 +25,14 @@ import Hasura.Events.Lib import Hasura.Logging import Hasura.Prelude import Hasura.RQL.DDL.Metadata (fetchMetadata) -import Hasura.RQL.Types (QErr, adminUserInfo, - emptySchemaCache) -import Hasura.Server.App (mkWaiApp) +import Hasura.RQL.Types (adminUserInfo, emptySchemaCache) +import Hasura.Server.App (SchemaCacheRef (..), mkWaiApp) import Hasura.Server.Auth -import Hasura.Server.CacheUpdate import Hasura.Server.CheckUpdates (checkForUpdates) import Hasura.Server.Init import Hasura.Server.Logging import Hasura.Server.Query (peelRun) +import Hasura.Server.SchemaUpdate import Hasura.Server.Telemetry import Hasura.Server.Version (currentVersion) @@ -98,14 +96,19 @@ printJSON = BLC.putStrLn . A.encode printYaml :: (A.ToJSON a) => a -> IO () printYaml = BC.putStrLn . Y.encode +mkPGLogger :: Logger -> Q.PGLogger +mkPGLogger (Logger logger) msg = + logger $ PGLog LevelWarn msg + main :: IO () main = do (HGEOptionsG rci hgeCmd) <- parseArgs -- global http manager httpManager <- HTTP.newManager HTTP.tlsManagerSettings loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True - serverId <- UUID.nextRandom + instanceId <- mkInstanceId let logger = mkLogger loggerCtx + pgLogger = mkPGLogger logger case hgeCmd of HCServe so@(ServeOptions port host cp isoL mAccessKey mAuthHook mJwtSecret mUnAuthRole corsCfg enableConsole enableTelemetry) -> do @@ -125,10 +128,11 @@ main = do -- create empty cache update events queue eventsQueue <- STM.newTQueueIO + pool <- Q.initPGPool ci cp pgLogger + -- start postgres cache update events listener thread in background - listenerPool <- getMinimalPool ci - listenerTId <- C.forkIO $ cacheUpdateEventListener listenerPool logger eventsQueue - unLogger logger $ mkThreadLog listenerTId serverId TTListener + listenerTId <- C.forkIO $ schemaUpdateEventListener pool logger eventsQueue + unLogger logger $ mkThreadLog listenerTId instanceId TTListener -- safe init catalog initRes <- initialise logger ci httpManager @@ -136,16 +140,16 @@ main = do -- prepare event triggers data prepareEvents logger ci - pool <- Q.initPGPool ci cp - - (app, cacheRef, cacheLk, cacheBuiltTime) <- + (app, cacheRef, cacheBuiltTime) <- mkWaiApp isoL loggerCtx pool httpManager am corsCfg enableConsole - enableTelemetry serverId + enableTelemetry instanceId + + let scRef = _scrCache cacheRef -- start cache update events processor thread in background - procTId <- C.forkIO $ cacheUpdateEventProcessor pool logger httpManager - eventsQueue cacheRef cacheLk serverId cacheBuiltTime - unLogger logger $ mkThreadLog procTId serverId TTProcessor + procTId <- C.forkIO $ schemaUpdateEventProcessor pool logger httpManager + eventsQueue cacheRef instanceId cacheBuiltTime + unLogger logger $ mkThreadLog procTId instanceId TTProcessor let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings @@ -158,7 +162,7 @@ main = do unLogger logger $ mkGenericStrLog "event_triggers" "starting workers" - void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool cacheRef eventEngineCtx + void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpSession pool scRef eventEngineCtx -- start a background thread to check for updates void $ C.forkIO $ checkForUpdates loggerCtx httpManager @@ -166,7 +170,7 @@ main = do -- start a background thread for telemetry when enableTelemetry $ do unLogger logger $ mkGenericStrLog "telemetry" telemetryNotice - void $ C.forkIO $ runTelemetry logger httpManager cacheRef initRes + void $ C.forkIO $ runTelemetry logger httpManager scRef initRes unLogger logger $ mkGenericStrLog "server" "starting API server" @@ -174,30 +178,29 @@ main = do HCExport -> do ci <- procConnInfo rci - res <- runTx ci fetchMetadata + res <- runTx pgLogger ci fetchMetadata either printErrJExit printJSON res HCClean -> do ci <- procConnInfo rci - res <- runTx ci cleanCatalog + res <- runTx pgLogger ci cleanCatalog either printErrJExit (const cleanSuccess) res HCExecute -> do queryBs <- BL.getContents ci <- procConnInfo rci - res <- runAsAdmin ci httpManager $ execQuery queryBs + res <- runAsAdmin pgLogger ci httpManager $ execQuery queryBs either printErrJExit BLC.putStrLn res HCVersion -> putStrLn $ "Hasura GraphQL Engine: " ++ T.unpack currentVersion where - runTx :: Q.ConnInfo -> Q.TxE QErr a -> IO (Either QErr a) - runTx ci tx = do - pool <- getMinimalPool ci + runTx pgLogger ci tx = do + pool <- getMinimalPool pgLogger ci runExceptT $ Q.runTx pool (Q.Serializable, Nothing) tx - runAsAdmin ci httpManager m = do - pool <- getMinimalPool ci + runAsAdmin pgLogger ci httpManager m = do + pool <- getMinimalPool pgLogger ci res <- runExceptT $ peelRun emptySchemaCache adminUserInfo httpManager pool Q.Serializable m return $ fmap fst res @@ -206,39 +209,40 @@ main = do either (printErrExit . connInfoErrModifier) return $ mkConnInfo rci - getMinimalPool ci = do + getMinimalPool pgLogger ci = do let connParams = Q.defaultConnParams { Q.cpConns = 1 } - Q.initPGPool ci connParams + Q.initPGPool ci connParams pgLogger initialise (Logger logger) ci httpMgr = do currentTime <- getCurrentTime - + let pgLogger = mkPGLogger $ Logger logger -- initialise the catalog - initRes <- runAsAdmin ci httpMgr $ initCatalogSafe currentTime + initRes <- runAsAdmin pgLogger ci httpMgr $ initCatalogSafe currentTime either printErrJExit (logger . mkGenericStrLog "db_init") initRes -- migrate catalog if necessary - migRes <- runAsAdmin ci httpMgr $ migrateCatalog currentTime + migRes <- runAsAdmin pgLogger ci httpMgr $ migrateCatalog currentTime either printErrJExit (logger . mkGenericStrLog "db_migrate") migRes -- generate and retrieve uuids - getUniqIds ci + getUniqIds pgLogger ci prepareEvents (Logger logger) ci = do + let pgLogger = mkPGLogger $ Logger logger logger $ mkGenericStrLog "event_triggers" "preparing data" - res <- runTx ci unlockAllEvents + res <- runTx pgLogger ci unlockAllEvents either printErrJExit return res - mkThreadLog threadId serverId threadType = + mkThreadLog threadId instanceId threadType = let msg = T.pack (show threadType) <> " thread started" in StartupLog LevelInfo "threads" $ - A.object [ "server_id" A..= serverId + A.object [ "instance_id" A..= getInstanceId instanceId , "thread_id" A..= show threadId , "message" A..= msg ] - getUniqIds ci = do - eDbId <- runTx ci getDbId + getUniqIds pgLogger ci = do + eDbId <- runTx pgLogger ci getDbId dbId <- either printErrJExit return eDbId fp <- liftIO generateFingerprint return (dbId, fp) diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 9d12c74ab5485..d712012161d40 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -10,7 +10,6 @@ import Data.Aeson hiding (json) import Data.IORef import Data.Time.Clock (UTCTime, getCurrentTime) -import Data.UUID (UUID) import Network.Wai (requestHeaders, strictRequestBody) import Web.Spock.Core @@ -88,16 +87,37 @@ mkConsoleHTML path authMode enableTelemetry = errMsg = "console template rendering failed: " ++ show errs +data SchemaCacheRef + = SchemaCacheRef + { _scrLock :: MVar () + , _scrCache :: IORef SchemaCache + } + +withSCUpdate + :: (MonadIO m, MonadError e m) + => SchemaCacheRef -> m (a, SchemaCache) -> m a +withSCUpdate scr action = do + acquireLock + (res, newSC) <- action `catchError` onError + -- update schemacache in IO reference + liftIO $ writeIORef cacheRef newSC + releaseLock + return res + where + SchemaCacheRef lk cacheRef = scr + onError e = releaseLock >> throwError e + acquireLock = liftIO $ takeMVar lk + releaseLock = liftIO $ putMVar lk () + data ServerCtx = ServerCtx - { scIsolation :: Q.TxIsolation - , scPGPool :: Q.PGPool - , scLogger :: L.Logger - , scCacheRef :: IORef SchemaCache - , scCacheLock :: MVar () - , scAuthMode :: AuthMode - , scManager :: HTTP.Manager - , scServerId :: UUID + { scIsolation :: Q.TxIsolation + , scPGPool :: Q.PGPool + , scLogger :: L.Logger + , scCacheRef :: SchemaCacheRef + , scAuthMode :: AuthMode + , scManager :: HTTP.Manager + , scInstanceId :: InstanceId } data HandlerCtx @@ -128,7 +148,7 @@ buildQCtx :: Handler QCtx buildQCtx = do scRef <- scCacheRef . hcServerCtx <$> ask userInfo <- asks hcUser - cache <- liftIO $ readIORef scRef + cache <- liftIO $ readIORef $ _scrCache scRef return $ QCtx userInfo cache logResult @@ -190,39 +210,26 @@ mkSpockAction qErrEncoder serverCtx handler = do uncurry setHeader jsonHeader lazyBytes resp -withLock :: (MonadIO m, MonadError e m) - => MVar () -> m a -> m a -withLock lk action = do - acquireLock - res <- action `catchError` onError - releaseLock - return res - where - onError e = releaseLock >> throwError e - acquireLock = liftIO $ takeMVar lk - releaseLock = liftIO $ putMVar lk () - v1QueryHandler :: RQLQuery -> Handler BL.ByteString v1QueryHandler query = do - lk <- scCacheLock . hcServerCtx <$> ask - bool (fst <$> dbAction) (withLock lk dbActionReload) $ + scRef <- scCacheRef . hcServerCtx <$> ask + bool (fst <$> dbAction) (withSCUpdate scRef dbActionReload) $ queryNeedsReload query where -- Hit postgres dbAction = do userInfo <- asks hcUser scRef <- scCacheRef . hcServerCtx <$> ask - schemaCache <- liftIO $ readIORef scRef + schemaCache <- liftIO $ readIORef $ _scrCache scRef httpMgr <- scManager . hcServerCtx <$> ask pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask - serverId <- scServerId . hcServerCtx <$> ask - runQuery pool isoL serverId userInfo schemaCache httpMgr query + instanceId <- scInstanceId . hcServerCtx <$> ask + runQuery pool isoL instanceId userInfo schemaCache httpMgr query -- Also update the schema cache dbActionReload = do (resp, newSc) <- dbAction - scRef <- scCacheRef . hcServerCtx <$> ask httpMgr <- scManager . hcServerCtx <$> ask --FIXME: should we be fetching the remote schema again? if not how do we get the remote schema? newGCtxMap <- GS.mkGCtxMap (scTables newSc) (scFunctions newSc) @@ -230,8 +237,7 @@ v1QueryHandler query = do mergeSchemas (scRemoteResolvers newSc) newGCtxMap httpMgr let newSc' = newSc { scGCtxMap = mergedGCtxMap, scDefaultRemoteGCtx = defGCtx } - liftIO $ writeIORef scRef newSc' - return resp + return (resp, newSc') v1Alpha1GQHandler :: GH.GraphQLRequest -> Handler BL.ByteString v1Alpha1GQHandler query = do @@ -240,7 +246,7 @@ v1Alpha1GQHandler query = do reqHeaders <- asks hcReqHeaders manager <- scManager . hcServerCtx <$> ask scRef <- scCacheRef . hcServerCtx <$> ask - sc <- liftIO $ readIORef scRef + sc <- liftIO $ readIORef $ _scrCache scRef pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask GH.runGQ pool isoL userInfo sc manager reqHeaders query reqBody @@ -249,7 +255,7 @@ gqlExplainHandler :: GE.GQLExplain -> Handler BL.ByteString gqlExplainHandler query = do onlyAdmin scRef <- scCacheRef . hcServerCtx <$> ask - sc <- liftIO $ readIORef scRef + sc <- liftIO $ readIORef $ _scrCache scRef pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask GE.explainGQLQuery pool isoL sc query @@ -284,32 +290,26 @@ legacyQueryHandler tn queryType = mkWaiApp - :: Q.TxIsolation - -> L.LoggerCtx - -> Q.PGPool - -> HTTP.Manager - -> AuthMode - -> CorsConfig - -> Bool - -> Bool - -> UUID - -> IO (Wai.Application, IORef SchemaCache, MVar (), UTCTime) + :: Q.TxIsolation -> L.LoggerCtx -> Q.PGPool + -> HTTP.Manager -> AuthMode -> CorsConfig + -> Bool -> Bool -> InstanceId + -> IO (Wai.Application, SchemaCacheRef, Maybe UTCTime) mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg - enableConsole enableTelemetry serverId = do + enableConsole enableTelemetry instanceId = do (cacheRef, cacheBuiltTime) <- do pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo httpManager pool Q.Serializable $ do buildSchemaCache - liftIO getCurrentTime + fetchLastUpdateTime (time, sc) <- either initErrExit return pgResp scRef <- newIORef sc return (scRef, time) cacheLock <- newMVar () - let serverCtx = - ServerCtx isoLevel pool (L.mkLogger loggerCtx) cacheRef - cacheLock mode httpManager serverId + let schemaCacheRef = SchemaCacheRef cacheLock cacheRef + serverCtx = ServerCtx isoLevel pool (L.mkLogger loggerCtx) + schemaCacheRef mode httpManager instanceId spockApp <- spockAsApp $ spockT id $ httpApp corsCfg serverCtx enableConsole enableTelemetry @@ -319,8 +319,7 @@ mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg wsServerEnv <- WS.createWSServerEnv (scLogger serverCtx) httpManager cacheRef runTx let wsServerApp = WS.createWSServerApp mode wsServerEnv return ( WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp - , cacheRef - , cacheLock + , schemaCacheRef , cacheBuiltTime ) diff --git a/server/src-lib/Hasura/Server/CacheUpdate.hs b/server/src-lib/Hasura/Server/CacheUpdate.hs deleted file mode 100644 index 70253a79e0df9..0000000000000 --- a/server/src-lib/Hasura/Server/CacheUpdate.hs +++ /dev/null @@ -1,178 +0,0 @@ -module Hasura.Server.CacheUpdate - ( ThreadType(..) - , cacheUpdateEventListener - , cacheUpdateEventProcessor - ) -where - -import Hasura.Prelude - -import Hasura.Logging -import Hasura.RQL.DDL.Schema.Table (buildSchemaCache) -import Hasura.RQL.Types -import Hasura.Server.App (withLock) -import Hasura.Server.Query -import Hasura.Server.Utils (bsToTxt) - -import Control.Concurrent.MVar -import Data.Aeson -import Data.Aeson.Casing -import Data.Aeson.TH -import Data.UUID -import Database.PG.Query - -import qualified Control.Concurrent as C -import qualified Control.Concurrent.STM as STM -import qualified Data.IORef as IORef -import qualified Data.Text as T -import qualified Data.Time as UTC -import qualified Database.PostgreSQL.LibPQ as PQ -import qualified Network.HTTP.Client as HTTP - -pgChannel :: PGChannel -pgChannel = "hasura_cache_update" - -data ThreadType - = TTListener - | TTProcessor - deriving (Eq) - -instance Show ThreadType where - show TTListener = "listener" - show TTProcessor = "processor" - - -data CacheUpdateEventLog - = CacheUpdateEventLog - { cuelLogLevel :: !LogLevel - , cuelThreadType :: !ThreadType - , cuelInfo :: !Value - } deriving (Show, Eq) - -instance ToJSON CacheUpdateEventLog where - toJSON (CacheUpdateEventLog _ t info) = - object [ "thread_type" .= show t - , "info" .= info - ] - -instance ToEngineLog CacheUpdateEventLog where - toEngineLog threadLog = - (cuelLogLevel threadLog, "cache_update_event", toJSON threadLog) - -data EventPayload - = EventPayload - { _epServerId :: !UUID - , _epOccurredAt :: !UTC.UTCTime - } deriving (Show, Eq) -$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) - -data CacheUpdateEvent - = CUEListenSuccess !EventPayload - | CUEListenFail - deriving (Show, Eq) - -instance ToJSON CacheUpdateEvent where - toJSON (CUEListenSuccess payload) = toJSON payload - toJSON CUEListenFail = String "event listening failed" - -data ThreadError - = TEJsonParse !T.Text - | TEQueryError !QErr -$(deriveToJSON - defaultOptions { constructorTagModifier = snakeCase . drop 2 - , sumEncoding = TaggedObject "type" "info" - } - ''ThreadError) - --- | An IO action that listens to postgres for events and pushes them to a Queue -cacheUpdateEventListener - :: PGPool - -> Logger - -> STM.TQueue CacheUpdateEvent - -> IO () -cacheUpdateEventListener pool logger eventsQueue = - -- Never exits - forever $ do - listenResE <- liftIO $ runExceptT $ listen pool pgChannel notifyHandler - either onError return listenResE - where - threadType = TTListener - - onError e = do - logError logger threadType $ TEQueryError e - -- Push a listen failed event to queue - STM.atomically $ STM.writeTQueue eventsQueue CUEListenFail - logInfo logger threadType $ - object ["message" .= ("retrying in 10 seconds" :: T.Text)] - C.threadDelay $ 10 * 1000 * 1000 - - -- Postgres notification handler - notifyHandler notif = do - let eventChannel = PGChannel $ bsToTxt $ PQ.notifyRelname notif - when (eventChannel == pgChannel) $ - case eitherDecodeStrict $ PQ.notifyExtra notif of - Left e -> logError logger threadType $ TEJsonParse $ T.pack e - Right payload -> do - let newEvent = CUEListenSuccess payload - logInfo logger threadType $ object [ "received_event" .= newEvent] - -- Push a success event to Queue along with event payload - STM.atomically $ STM.writeTQueue eventsQueue newEvent - --- | An IO action that processes events from Queue -cacheUpdateEventProcessor - :: PGPool - -> Logger - -> HTTP.Manager - -> STM.TQueue CacheUpdateEvent - -> IORef.IORef SchemaCache - -> MVar () - -> UUID - -> UTC.UTCTime - -> IO () -cacheUpdateEventProcessor pool logger httpManager eventsQueue - cacheRef lk serverId cacheInit = do - -- Initiate previous event IO reference - prevEventRef <- IORef.newIORef Nothing - - -- Never exits - forever $ do - event <- STM.atomically $ STM.readTQueue eventsQueue - prevEvent <- IORef.readIORef prevEventRef - logInfo logger threadType $ - object [ "previous_event" .= prevEvent - , "processed_event" .= event - ] - when (shouldReload prevEvent event) $ do - -- Build schema cache from catalog - scE <- liftIO $ runExceptT $ withLock lk $ - snd <$> peelRun emptySchemaCache adminUserInfo - httpManager pool Serializable buildSchemaCache - case scE of - Left e -> logError logger threadType $ TEQueryError e - Right sc -> do - -- Updata cache reference with newly built cache - IORef.writeIORef cacheRef sc - logInfo logger threadType $ - object ["message" .= ("schema cache reloaded" :: T.Text)] - - IORef.writeIORef prevEventRef $ Just event - where - threadType = TTProcessor - - -- If previous event is failed and current event is success, - -- reload irrespective current event payload - shouldReload (Just CUEListenFail) (CUEListenSuccess _) = True - -- If current event is failed, do not reload - shouldReload _ CUEListenFail = False - shouldReload _ (CUEListenSuccess payload) = - -- If event is from another server and occurred after - -- init schema cache built then reload - (_epServerId payload /= serverId) && (_epOccurredAt payload > cacheInit) - -logInfo :: Logger -> ThreadType -> Value -> IO () -logInfo logger threadType val = unLogger logger $ - CacheUpdateEventLog LevelInfo threadType val - -logError :: ToJSON a => Logger -> ThreadType -> a -> IO () -logError logger threadType err = unLogger logger $ - CacheUpdateEventLog LevelError threadType $ object ["error" .= toJSON err] diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 19a3b6a596ff2..a591dbfc189f5 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -8,7 +8,10 @@ import System.Exit (exitFailure) import qualified Data.Aeson as J import qualified Data.String as DataString import qualified Data.Text as T +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID import qualified Hasura.Logging as L + import Hasura.Prelude import Hasura.RQL.Types (RoleName (..)) import Hasura.Server.Auth @@ -17,6 +20,15 @@ import Hasura.Server.Utils import Network.Wai.Handler.Warp import qualified Text.PrettyPrint.ANSI.Leijen as PP +newtype InstanceId + = InstanceId {getInstanceId :: UUID.UUID} + deriving (Show, Eq) + +instanceIdToTxt :: InstanceId -> T.Text +instanceIdToTxt = UUID.toText . getInstanceId + +mkInstanceId :: IO InstanceId +mkInstanceId = InstanceId <$> UUID.nextRandom initErrExit :: (Show e) => e -> IO a initErrExit e = print e >> exitFailure @@ -79,6 +91,7 @@ data RawConnInfo = , connUrl :: !(Maybe String) , connDatabase :: !(Maybe String) , connOptions :: !(Maybe String) + , connRetries :: !(Maybe Int) } deriving (Eq, Read, Show) data HGECommandG a @@ -199,9 +212,13 @@ mkHGEOptions (HGEOptionsG rawConnInfo rawCmd) = mkRawConnInfo :: RawConnInfo -> WithEnv RawConnInfo mkRawConnInfo rawConnInfo = do withEnvUrl <- withEnv rawDBUrl $ fst databaseUrlEnv - return $ rawConnInfo {connUrl = withEnvUrl} + withEnvRetries <- withEnv retries $ fst retriesNumEnv + return $ rawConnInfo { connUrl = withEnvUrl + , connRetries = withEnvRetries + } where rawDBUrl = connUrl rawConnInfo + retries = connRetries rawConnInfo mkServeOptions :: RawServeOptions -> WithEnv ServeOptions mkServeOptions rso = do @@ -278,7 +295,7 @@ mainCmdFooter = ] ] - envVarDoc = mkEnvVarDoc [databaseUrlEnv] + envVarDoc = mkEnvVarDoc [databaseUrlEnv, retriesNumEnv] databaseUrlEnv :: (String, String) databaseUrlEnv = @@ -323,10 +340,10 @@ serveCmdFooter = envVarDoc = mkEnvVarDoc $ envVars <> eventEnvs envVars = - [ servePortEnv, serveHostEnv, pgStripesEnv, pgConnsEnv, pgTimeoutEnv - , txIsoEnv, accessKeyEnv, authHookEnv , authHookModeEnv - , jwtSecretEnv , unAuthRoleEnv, corsDomainEnv , enableConsoleEnv - , enableTelemetryEnv + [ databaseUrlEnv, retriesNumEnv, servePortEnv, serveHostEnv + , pgStripesEnv, pgConnsEnv, pgTimeoutEnv , txIsoEnv + , accessKeyEnv, authHookEnv , authHookModeEnv , jwtSecretEnv + , unAuthRoleEnv, corsDomainEnv , enableConsoleEnv , enableTelemetryEnv ] eventEnvs = @@ -338,6 +355,12 @@ serveCmdFooter = ) ] +retriesNumEnv :: (String, String) +retriesNumEnv = + ( "HASURA_GRAPHQL_NO_OF_RETRIES" + , "No.of retries if Postgres connection error occurs (default: 1)" + ) + servePortEnv :: (String, String) servePortEnv = ( "HASURA_GRAPHQL_SERVER_PORT" @@ -433,6 +456,7 @@ parseRawConnInfo :: Parser RawConnInfo parseRawConnInfo = RawConnInfo <$> host <*> port <*> user <*> password <*> dbUrl <*> dbName <*> pure Nothing + <*> retries where host = optional $ strOption ( long "host" <> @@ -471,24 +495,31 @@ parseRawConnInfo = metavar "NAME" <> help "Database name to connect to" ) + retries = optional $ + option auto ( long "retries" <> + metavar "NO OF RETRIES" <> + help (snd retriesNumEnv) + ) connInfoErrModifier :: String -> String connInfoErrModifier s = "Fatal Error : " ++ s mkConnInfo ::RawConnInfo -> Either String Q.ConnInfo -mkConnInfo (RawConnInfo mHost mPort mUser pass mURL mDB opts) = +mkConnInfo (RawConnInfo mHost mPort mUser pass mURL mDB opts mRetries) = case (mHost, mPort, mUser, mDB, mURL) of (Just host, Just port, Just user, Just db, Nothing) -> - return $ Q.ConnInfo host port user pass db opts + return $ Q.ConnInfo host port user pass db opts retries (_, _, _, _, Just dbURL) -> maybe (throwError invalidUrlMsg) - return $ parseDatabaseUrl dbURL opts + withRetries $ parseDatabaseUrl dbURL opts _ -> throwError $ "Invalid options. " ++ "Expecting all database connection params " ++ "(host, port, user, dbname, password) or " ++ "database-url (HASURA_GRAPHQL_DATABASE_URL)" where + retries = fromMaybe 1 mRetries + withRetries ci = return $ ci{Q.connRetries = retries} invalidUrlMsg = "Invalid database-url (HASURA_GRAPHQL_DATABASE_URL). " ++ "Example postgres://foo:bar@example.com:2345/database" @@ -631,13 +662,14 @@ parseEnableTelemetry = optional $ -- Init logging related connInfoToLog :: Q.ConnInfo -> StartupLog -connInfoToLog (Q.ConnInfo host port user _ db _) = +connInfoToLog (Q.ConnInfo host port user _ db _ retries) = StartupLog L.LevelInfo "postgres_connection" infoVal where infoVal = J.object [ "host" J..= host , "port" J..= port , "user" J..= user , "database" J..= db + , "retries" J..= retries ] serveOptsToLog :: ServeOptions -> StartupLog diff --git a/server/src-lib/Hasura/Server/Logging.hs b/server/src-lib/Hasura/Server/Logging.hs index 9e23b3b6abfdf..89947907d3827 100644 --- a/server/src-lib/Hasura/Server/Logging.hs +++ b/server/src-lib/Hasura/Server/Logging.hs @@ -2,6 +2,7 @@ module Hasura.Server.Logging ( StartupLog(..) + , PGLog(..) , mkAccessLog , getRequestHeader , WebHookLog(..) @@ -54,6 +55,20 @@ instance L.ToEngineLog StartupLog where toEngineLog startupLog = (slLogLevel startupLog, "startup", toJSON startupLog) +data PGLog + = PGLog + { plLogLevel :: !L.LogLevel + , plMessage :: !T.Text + } deriving (Show, Eq) + +instance ToJSON PGLog where + toJSON (PGLog _ msg) = + object ["message" .= msg] + +instance L.ToEngineLog PGLog where + toEngineLog pgLog = + (plLogLevel pgLog, "pg-client", toJSON pgLog) + data WebHookLog = WebHookLog { whlLogLevel :: !L.LogLevel diff --git a/server/src-lib/Hasura/Server/Query.hs b/server/src-lib/Hasura/Server/Query.hs index f0a1f470aecef..7ebd323fa5cba 100644 --- a/server/src-lib/Hasura/Server/Query.hs +++ b/server/src-lib/Hasura/Server/Query.hs @@ -3,8 +3,7 @@ module Hasura.Server.Query where import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH -import Data.Time (getCurrentTime) -import Data.UUID +import Data.Time (UTCTime) import Language.Haskell.TH.Syntax (Lift) import qualified Data.ByteString.Builder as BB @@ -30,6 +29,8 @@ import Hasura.RQL.DML.Returning (encodeJSONVector) import Hasura.RQL.DML.Select import Hasura.RQL.DML.Update import Hasura.RQL.Types +import Hasura.Server.Init (InstanceId (..), + instanceIdToTxt) import qualified Database.PG.Query as Q @@ -112,15 +113,27 @@ instance UserInfoM Run where instance HasHttpManager Run where askHttpManager = asks snd -recordCacheUpdate :: UUID -> Run () -recordCacheUpdate serverId = do - currTime <- liftIO getCurrentTime +fetchLastUpdateTime :: Run (Maybe UTCTime) +fetchLastUpdateTime = do + l <- liftTx $ Q.listQE defaultTxErrorHandler + [Q.sql| + SELECT occurred_at FROM hdb_catalog.hdb_schema_update_event + ORDER BY id DESC LIMIT 1 + |] () True + case l of + [] -> return Nothing + [Identity t] -> return $ Just t + -- never happens + _ -> throw500 "more than one row returned by query" + +recordSchemaUpdate :: InstanceId -> Q.TxE QErr () +recordSchemaUpdate instanceId = liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql| INSERT INTO - hdb_catalog.hdb_cache_update_event - (server_id, occurred_at) - VALUES ($1::uuid, $2) - |] (toText serverId, currTime) True + hdb_catalog.hdb_schema_update_event + (instance_id, occurred_at) + VALUES ($1::uuid, DEFAULT) + |] (Identity $ instanceIdToTxt instanceId) True peelRun :: SchemaCache @@ -135,17 +148,20 @@ peelRun sc userInfo httMgr pgPool txIso (Run m) = runQuery :: (MonadIO m, MonadError QErr m) - => Q.PGPool -> Q.TxIsolation -> UUID + => Q.PGPool -> Q.TxIsolation -> InstanceId -> UserInfo -> SchemaCache -> HTTP.Manager -> RQLQuery -> m (BL.ByteString, SchemaCache) -runQuery pool isoL serverId userInfo sc hMgr query = do - res <- liftIO $ runExceptT $ - peelRun sc userInfo hMgr pool isoL $ do - r <- runQueryM query - when (queryNeedsReload query) $ - recordCacheUpdate serverId - return r - liftEither res +runQuery pool isoL instanceId userInfo sc hMgr query = do + resE <- liftIO $ runExceptT $ + peelRun sc userInfo hMgr pool isoL $ runQueryM query + either throwError withReload resE + where + withReload r = do + when (queryNeedsReload query) $ do + e <- liftIO $ runExceptT $ Q.runTx pool (isoL, Nothing) + $ recordSchemaUpdate instanceId + liftEither e + return r queryNeedsReload :: RQLQuery -> Bool queryNeedsReload qi = case qi of diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs new file mode 100644 index 0000000000000..6522e1467cb32 --- /dev/null +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -0,0 +1,163 @@ +module Hasura.Server.SchemaUpdate + ( ThreadType(..) + , schemaUpdateEventListener + , schemaUpdateEventProcessor + ) +where + +import Hasura.Prelude + +import Hasura.Logging +import Hasura.RQL.DDL.Schema.Table (buildSchemaCache) +import Hasura.RQL.Types +import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate) +import Hasura.Server.Init (InstanceId (..)) +import Hasura.Server.Query + +import Data.Aeson +import Data.Aeson.Casing +import Data.Aeson.TH +import Data.UUID + +import qualified Control.Concurrent.STM as STM +import qualified Data.Text as T +import qualified Data.Time as UTC +import qualified Database.PG.Query as PG +import qualified Database.PostgreSQL.LibPQ as PQ +import qualified Network.HTTP.Client as HTTP + +pgChannel :: PG.PGChannel +pgChannel = "hasura_schema_update" + +data ThreadType + = TTListener + | TTProcessor + deriving (Eq) + +instance Show ThreadType where + show TTListener = "listener" + show TTProcessor = "processor" + + +data SchemaUpdateEventLog + = SchemaUpdateEventLog + { suelLogLevel :: !LogLevel + , suelThreadType :: !ThreadType + , suelInfo :: !Value + } deriving (Show, Eq) + +instance ToJSON SchemaUpdateEventLog where + toJSON (SchemaUpdateEventLog _ t info) = + object [ "thread_type" .= show t + , "info" .= info + ] + +instance ToEngineLog SchemaUpdateEventLog where + toEngineLog threadLog = + (suelLogLevel threadLog, "schema_update_event", toJSON threadLog) + +data EventPayload + = EventPayload + { _epInstanceId :: !UUID + , _epOccurredAt :: !UTC.UTCTime + } deriving (Show, Eq) +$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) + +data SchemaUpdateEvent + = SUESuccess !EventPayload + | SUEPGReConn + deriving (Show, Eq) + +instance ToJSON SchemaUpdateEvent where + toJSON (SUESuccess payload) = toJSON payload + toJSON SUEPGReConn = String "postgres reconnection" + +data ThreadError + = TEJsonParse !T.Text + | TEQueryError !QErr +$(deriveToJSON + defaultOptions { constructorTagModifier = snakeCase . drop 2 + , sumEncoding = TaggedObject "type" "info" + } + ''ThreadError) + +-- | An IO action that listens to postgres for events and pushes them to a Queue +schemaUpdateEventListener + :: PG.PGPool + -> Logger + -> STM.TQueue SchemaUpdateEvent + -> IO () +schemaUpdateEventListener pool logger eventsQueue = + -- Never exits + forever $ do + listenResE <- liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler + either onError return listenResE + where + notifyHandler = PG.NotifyHandler onReconn onMessage + threadType = TTListener + + onError = logError logger threadType . TEQueryError + + onReconn = do + -- emit postgres reconnection event + let event = SUEPGReConn + logInfo logger threadType $ object ["received_event" .= event] + STM.atomically $ STM.writeTQueue eventsQueue event + + -- Postgres notification handler + onMessage notif = + case eitherDecodeStrict $ PQ.notifyExtra notif of + Left e -> logError logger threadType $ TEJsonParse $ T.pack e + Right payload -> do + let newEvent = SUESuccess payload + logInfo logger threadType $ object ["received_event" .= newEvent] + -- Push a success event to Queue along with event payload + STM.atomically $ STM.writeTQueue eventsQueue newEvent + +-- | An IO action that processes events from Queue +schemaUpdateEventProcessor + :: PG.PGPool + -> Logger + -> HTTP.Manager + -> STM.TQueue SchemaUpdateEvent + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime + -> IO () +schemaUpdateEventProcessor pool logger httpManager eventsQueue + cacheRef instanceId cacheInit = + -- Never exits + forever $ do + event <- STM.atomically $ STM.readTQueue eventsQueue + logInfo logger threadType $ object ["processed_event" .= event] + when (shouldReload event) $ do + -- Reload schema cache from catalog + resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $ + peelRun emptySchemaCache adminUserInfo + httpManager pool PG.Serializable buildSchemaCache + case resE of + Left e -> logError logger threadType $ TEQueryError e + Right _ -> + logInfo logger threadType $ + object ["message" .= ("schema cache reloaded" :: T.Text)] + where + threadType = TTProcessor + + -- If postgres reconnect happens reload schema cache + shouldReload SUEPGReConn = True + -- If event is from another server and occurred after + -- init schema cache built then reload + shouldReload (SUESuccess payload) = + (_epInstanceId payload /= getInstanceId instanceId) + && withCacheInit (_epOccurredAt payload) cacheInit + + withCacheInit _ Nothing = False + withCacheInit occurredAt (Just initTime) = occurredAt > initTime + +logInfo :: Logger -> ThreadType -> Value -> IO () +logInfo logger threadType val = unLogger logger $ + SchemaUpdateEventLog LevelInfo threadType val + +logError :: ToJSON a => Logger -> ThreadType -> a -> IO () +logError logger threadType err = unLogger logger $ + SchemaUpdateEventLog LevelError threadType $ object ["error" .= toJSON err] diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 744de0f219fab..3e891005767f6 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -404,23 +404,23 @@ CREATE TABLE hdb_catalog.remote_schemas ( comment TEXT ); -CREATE TABLE hdb_catalog.hdb_cache_update_event ( +CREATE TABLE hdb_catalog.hdb_schema_update_event ( id BIGSERIAL PRIMARY KEY, - server_id uuid NOT NULL, - occurred_at timestamptz NOT NULL + instance_id uuid NOT NULL, + occurred_at timestamptz NOT NULL DEFAULT NOW() ); -CREATE FUNCTION hdb_catalog.hdb_cache_update_event_notifier() RETURNS trigger AS +CREATE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS $function$ DECLARE - server_id uuid; + instance_id uuid; occurred_at timestamptz; curr_rec record; BEGIN - server_id = NEW.server_id; + instance_id = NEW.instance_id; occurred_at = NEW.occurred_at; - PERFORM pg_notify('hasura_cache_update', json_build_object( - 'server_id', server_id, + PERFORM pg_notify('hasura_schema_update', json_build_object( + 'instance_id', instance_id, 'occurred_at', occurred_at )::text); RETURN curr_rec; @@ -428,7 +428,7 @@ $function$ $function$ LANGUAGE plpgsql; -CREATE TRIGGER hdb_cache_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_cache_update_event - FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_cache_update_event_notifier(); +CREATE TRIGGER hdb_schema_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_schema_update_event + FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_schema_update_event_notifier(); diff --git a/server/src-rsr/migrate_from_9_to_10.sql b/server/src-rsr/migrate_from_9_to_10.sql index ebc9ac8140e1e..714711b642b1b 100644 --- a/server/src-rsr/migrate_from_9_to_10.sql +++ b/server/src-rsr/migrate_from_9_to_10.sql @@ -1,20 +1,20 @@ -CREATE TABLE hdb_catalog.hdb_cache_update_event ( +CREATE TABLE hdb_catalog.hdb_schema_update_event ( id BIGSERIAL PRIMARY KEY, - server_id uuid NOT NULL, - occurred_at timestamptz NOT NULL + instance_id uuid NOT NULL, + occurred_at timestamptz NOT NULL DEFAULT NOW() ); -CREATE FUNCTION hdb_catalog.hdb_cache_update_event_notifier() RETURNS trigger AS +CREATE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS $function$ DECLARE - server_id uuid; + instance_id uuid; occurred_at timestamptz; curr_rec record; BEGIN - server_id = NEW.server_id; + instance_id = NEW.instance_id; occurred_at = NEW.occurred_at; - PERFORM pg_notify('hasura_cache_update', json_build_object( - 'server_id', server_id, + PERFORM pg_notify('hasura_schema_update', json_build_object( + 'instance_id', instance_id, 'occurred_at', occurred_at )::text); RETURN curr_rec; @@ -22,5 +22,5 @@ $function$ $function$ LANGUAGE plpgsql; -CREATE TRIGGER hdb_cache_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_cache_update_event - FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_cache_update_event_notifier(); +CREATE TRIGGER hdb_schema_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_schema_update_event + FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_schema_update_event_notifier(); diff --git a/server/stack.yaml b/server/stack.yaml index b10f7a73224dd..d74b8c16d3668 100644 --- a/server/stack.yaml +++ b/server/stack.yaml @@ -16,8 +16,8 @@ packages: # Packages to be pulled from upstream that are not in the resolver (e.g., acme-missiles-0.3) extra-deps: # use https URLs so that build systems can clone these repos -- git: https://github.com/hasura/pg-client-hs.git - commit: 2d0e9f4c4ff0ba26bf4925db241d92b5f58621c3 +- git: https://github.com/rakeshkky/pg-client-hs.git + commit: bdeb884279100d0417bb697e0e05017cc7c320df - git: https://github.com/hasura/graphql-parser-hs.git commit: ff95d9a96aa5ef9e5390f8712958e4118e3831f6 - ginger-0.8.1.0 From e9c7c2ba7699cfb046a2af5fd7bbd187a65238b5 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 11:50:45 +0530 Subject: [PATCH 03/20] added basic tests --- .circleci/test-server.sh | 32 +++++++++++ server/tests-py/conftest.py | 11 +++- server/tests-py/context.py | 4 +- .../queries/horizontal_scale/basic/steps.yaml | 34 ++++++++++++ server/tests-py/test_horizontal_scale.py | 53 +++++++++++++++++++ 5 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 server/tests-py/queries/horizontal_scale/basic/steps.yaml create mode 100644 server/tests-py/test_horizontal_scale.py diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index f711e4f571e0c..7e57980c718bc 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -356,4 +356,36 @@ if [ "$RUN_WEBHOOK_TESTS" == "true" ] ; then fi +# horizontal scale test + +echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n" + +HASURA_HS_TEST_DB='postgres://gql_test:@localhost:5432/hs_hge_test' +echo "Installing psql" +apt-get update && apt-get install -y postgresql-client +psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" + +# start 1st server +"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! +wait_for_port 8080 + +# start 2nd server +"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve \ + --server-port 8081 --server-host 0.0.0.0 \ + >> "$OUTPUT_FOLDER/hs-graphql-engine.log" 2>&1 & HS_PID=$! +wait_for_port 8081 + +# run test +pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py + +kill -INT $PID +kill -INT $HS_PID +psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;" +sleep 4 +combine_hpc_reports +unset HASURA_HS_TEST_DB + + +# end horizontal scale test + mv graphql-engine-combined.tix "$OUTPUT_FOLDER/graphql-engine.tix" || true diff --git a/server/tests-py/conftest.py b/server/tests-py/conftest.py index 47a8ba97a6e84..8352475d5eb88 100644 --- a/server/tests-py/conftest.py +++ b/server/tests-py/conftest.py @@ -50,6 +50,13 @@ def pytest_addoption(parser): help="Run Test cases with GraphQL queries being disabled" ) + parser.addoption( + "--test-hge-scale-url", + metavar="", + required=False, + help="Run testcases for horizontal scaling" + ) + @pytest.fixture(scope='session') def hge_ctx(request): @@ -63,6 +70,7 @@ def hge_ctx(request): hge_jwt_conf = request.config.getoption('--hge-jwt-conf') ws_read_cookie = request.config.getoption('--test-ws-init-cookie') metadata_disabled = request.config.getoption('--test-metadata-disabled') + hge_scale_url = request.config.getoption('--test-hge-scale-url') try: hge_ctx = HGECtx( hge_url=hge_url, @@ -73,7 +81,8 @@ def hge_ctx(request): hge_jwt_key_file=hge_jwt_key_file, hge_jwt_conf=hge_jwt_conf, ws_read_cookie=ws_read_cookie, - metadata_disabled=metadata_disabled + metadata_disabled=metadata_disabled, + hge_scale_url=hge_scale_url ) except HGECtxError as e: pytest.exit(str(e)) diff --git a/server/tests-py/context.py b/server/tests-py/context.py index deb0671ef6e6a..b8676fc27d33e 100644 --- a/server/tests-py/context.py +++ b/server/tests-py/context.py @@ -75,7 +75,7 @@ def server_bind(self): class HGECtx: def __init__(self, hge_url, pg_url, hge_key, hge_webhook, webhook_insecure, - hge_jwt_key_file, hge_jwt_conf, metadata_disabled, ws_read_cookie): + hge_jwt_key_file, hge_jwt_conf, metadata_disabled, ws_read_cookie, hge_scale_url): server_address = ('0.0.0.0', 5592) self.resp_queue = queue.Queue(maxsize=1) @@ -118,6 +118,8 @@ def __init__(self, hge_url, pg_url, hge_key, hge_webhook, webhook_insecure, self.ws_read_cookie = ws_read_cookie + self.hge_scale_url = hge_scale_url + result = subprocess.run(['../../scripts/get-version.sh'], shell=False, stdout=subprocess.PIPE, check=True) self.version = result.stdout.decode('utf-8').strip() if not self.metadata_disabled: diff --git a/server/tests-py/queries/horizontal_scale/basic/steps.yaml b/server/tests-py/queries/horizontal_scale/basic/steps.yaml new file mode 100644 index 0000000000000..35da6eff72c57 --- /dev/null +++ b/server/tests-py/queries/horizontal_scale/basic/steps.yaml @@ -0,0 +1,34 @@ +- + operation: + type: bulk + args: + - type: run_sql + args: + sql: | + create table test_t1( + c1 int, + c2 text, + PRIMARY KEY (c1) + ); + - type: track_table + args: + schema: public + name: test_t1 + - type: run_sql + args: + sql: | + insert into test_t1(c1, c2) VALUES(1, 'test'); + validate: + response: + data: + test_t1: + - c1: 1 + c2: test + query: + query: | + query { + test_t1 { + c1 + c2 + } + } \ No newline at end of file diff --git a/server/tests-py/test_horizontal_scale.py b/server/tests-py/test_horizontal_scale.py new file mode 100644 index 0000000000000..3ae7f23331774 --- /dev/null +++ b/server/tests-py/test_horizontal_scale.py @@ -0,0 +1,53 @@ +import pytest +import yaml +import time +import jsondiff + +from validate import json_ordered + + +if not pytest.config.getoption("--test-hge-scale-url"): + pytest.skip("--test-hge-scale-url flag is missing, skipping tests", allow_module_level=True) + + +class TestHorizantalScaleBasic(): + + @pytest.fixture(autouse=True, scope='class') + def transact(self, hge_ctx): + self.teardown = {"type": "clear_metadata", "args": {}} + yield + # teardown + st_code, resp = hge_ctx.v1q(self.teardown) + assert st_code == 200, resp + + def test_horizontal_scale_basic(self, hge_ctx): + with open(self.dir() + "/steps.yaml") as c: + conf = yaml.load(c) + + assert isinstance(conf, list) == True, 'Not an list' + for _, step in enumerate(conf): + # execute operation on 1st server + st_code, resp = hge_ctx.v1q(step['operation']) + assert st_code == 200, resp + + # wait for x sec + time.sleep(0.3) + # validate data on 2nd server + response = hge_ctx.http.post( + hge_ctx.hge_scale_url + "/v1alpha1/graphql", + json=step['validate']['query'] + ) + st_code = response.status_code + resp = response.json() + assert st_code == 200, resp + + if 'response' in step['validate']: + assert json_ordered(resp) == json_ordered(step['validate']['response']), yaml.dump({ + 'response': resp, + 'expected': step['validate']['response'], + 'diff': jsondiff.diff(step['validate']['response'], resp) + }) + + @classmethod + def dir(cls): + return 'queries/horizontal_scale/basic' From 9fb165f18b89652f7e1da0565ba5923132621056 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 12:48:47 +0530 Subject: [PATCH 04/20] unset HASURA_GRAPHQL_AUTH_HOOK --- .circleci/test-server.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 7e57980c718bc..33c1036638cc0 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -357,6 +357,7 @@ if [ "$RUN_WEBHOOK_TESTS" == "true" ] ; then fi # horizontal scale test +unset HASURA_GRAPHQL_AUTH_HOOK echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n" From 9504395714b0ca5500cc336510ffd3a162c4c792 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 13:11:50 +0530 Subject: [PATCH 05/20] add server option in scaling tests --- .../queries/horizontal_scale/basic/steps.yaml | 39 ++++++++++--------- server/tests-py/test_horizontal_scale.py | 16 ++++++-- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/server/tests-py/queries/horizontal_scale/basic/steps.yaml b/server/tests-py/queries/horizontal_scale/basic/steps.yaml index 35da6eff72c57..8134389e20b19 100644 --- a/server/tests-py/queries/horizontal_scale/basic/steps.yaml +++ b/server/tests-py/queries/horizontal_scale/basic/steps.yaml @@ -1,24 +1,27 @@ - operation: - type: bulk - args: - - type: run_sql - args: - sql: | - create table test_t1( - c1 int, - c2 text, - PRIMARY KEY (c1) - ); - - type: track_table - args: - schema: public - name: test_t1 - - type: run_sql - args: - sql: | - insert into test_t1(c1, c2) VALUES(1, 'test'); + server: '1' + query: + type: bulk + args: + - type: run_sql + args: + sql: | + create table test_t1( + c1 int, + c2 text, + PRIMARY KEY (c1) + ); + - type: track_table + args: + schema: public + name: test_t1 + - type: run_sql + args: + sql: | + insert into test_t1(c1, c2) VALUES(1, 'test'); validate: + server: '2' response: data: test_t1: diff --git a/server/tests-py/test_horizontal_scale.py b/server/tests-py/test_horizontal_scale.py index 3ae7f23331774..d37ee29e35272 100644 --- a/server/tests-py/test_horizontal_scale.py +++ b/server/tests-py/test_horizontal_scale.py @@ -11,10 +11,13 @@ class TestHorizantalScaleBasic(): + servers = {} @pytest.fixture(autouse=True, scope='class') def transact(self, hge_ctx): self.teardown = {"type": "clear_metadata", "args": {}} + self.servers['1'] = hge_ctx.hge_url + self.servers['2'] = hge_ctx.hge_scale_url yield # teardown st_code, resp = hge_ctx.v1q(self.teardown) @@ -26,15 +29,20 @@ def test_horizontal_scale_basic(self, hge_ctx): assert isinstance(conf, list) == True, 'Not an list' for _, step in enumerate(conf): - # execute operation on 1st server - st_code, resp = hge_ctx.v1q(step['operation']) + # execute operation + response = hge_ctx.http.post( + self.servers[step['operation']['server']] + "/v1/query", + json=step['operation']['query'] + ) + st_code = response.status_code + resp = response.json() assert st_code == 200, resp # wait for x sec time.sleep(0.3) - # validate data on 2nd server + # validate data response = hge_ctx.http.post( - hge_ctx.hge_scale_url + "/v1alpha1/graphql", + self.servers[step['validate']['server']] + "/v1alpha1/graphql", json=step['validate']['query'] ) st_code = response.status_code From 1632dd13a76f1259325a166fc432e64d0d702575 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 13:40:41 +0530 Subject: [PATCH 06/20] fix test script --- .circleci/test-server.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 33c1036638cc0..5c5f282303d93 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -358,6 +358,8 @@ fi # horizontal scale test unset HASURA_GRAPHQL_AUTH_HOOK +unset HASURA_GRAPHQL_AUTH_HOOK_MODE +unset HASURA_GRAPHQL_ADMIN_SECRET echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n" From bf30119b34f39971a4daaf68548dcb6a36a71bfe Mon Sep 17 00:00:00 2001 From: rakeshkky Date: Thu, 7 Mar 2019 14:44:10 +0530 Subject: [PATCH 07/20] fix 'shouldReload' function --- server/src-lib/Hasura/Server/SchemaUpdate.hs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index 3b404aa2c4643..dddd9120ac7ea 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -150,10 +150,9 @@ schemaUpdateEventProcessor strfyNum pool logger httpManager -- init schema cache built then reload shouldReload (SUESuccess payload) = (_epInstanceId payload /= getInstanceId instanceId) - && withCacheInit (_epOccurredAt payload) cacheInit + && maybe True (withCacheInit $ _epOccurredAt payload) cacheInit - withCacheInit _ Nothing = False - withCacheInit occurredAt (Just initTime) = occurredAt > initTime + withCacheInit occurredAt initTime = occurredAt > initTime logInfo :: Logger -> ThreadType -> Value -> IO () logInfo logger threadType val = unLogger logger $ From 1d8c647a9eb937156b60a8a87db76a4ff24d162c Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 16:05:52 +0530 Subject: [PATCH 08/20] added relationship test --- .../queries/horizontal_scale/basic/steps.yaml | 74 ++++++++++++++++--- 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/server/tests-py/queries/horizontal_scale/basic/steps.yaml b/server/tests-py/queries/horizontal_scale/basic/steps.yaml index 8134389e20b19..706ffecf2dc7e 100644 --- a/server/tests-py/queries/horizontal_scale/basic/steps.yaml +++ b/server/tests-py/queries/horizontal_scale/basic/steps.yaml @@ -8,9 +8,9 @@ args: sql: | create table test_t1( - c1 int, - c2 text, - PRIMARY KEY (c1) + t1_c1 int, + t1_c2 text, + PRIMARY KEY (t1_c1) ); - type: track_table args: @@ -19,19 +19,75 @@ - type: run_sql args: sql: | - insert into test_t1(c1, c2) VALUES(1, 'test'); + insert into test_t1(t1_c1, t1_c2) VALUES(1, 'table1'); validate: server: '2' response: data: test_t1: - - c1: 1 - c2: test + - t1_c1: 1 + t1_c2: table1 query: query: | query { test_t1 { - c1 - c2 + t1_c1 + t1_c2 } - } \ No newline at end of file + } +- + operation: + server: '2' + query: + type: bulk + args: + - type: run_sql + args: + sql: | + create table test_t2( + t2_c1 int, + t2_c2 text, + PRIMARY KEY (t2_c1) + ); + - type: run_sql + args: + sql: | + ALTER TABLE test_t2 ADD FOREIGN KEY (t2_c1) REFERENCES test_t1 (t1_c1); + - type: track_table + args: + schema: public + name: test_t2 + - type: create_object_relationship + args: + name: testT1Byc1 + table: + name: test_t2 + schema: public + using: + foreign_key_constraint_on: t2_c1 + - type: run_sql + args: + sql: | + insert into test_t2(t2_c1, t2_c2) VALUES(1, 'table2'); + validate: + server: '1' + query: + query: | + query { + test_t2 { + t2_c1 + t2_c2 + testT1Byc1 { + t1_c1 + t1_c2 + } + } + } + response: + data: + test_t2: + - t2_c1: 1 + t2_c2: table2 + testT1Byc1: + t1_c1: 1 + t1_c2: table1 From cf28c955ddb6e1799312bdff747bb4501f52ec6c Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 19:22:48 +0530 Subject: [PATCH 09/20] added pgbouncer to restart postgres connection --- .circleci/config.yml | 7 +++++-- .circleci/pgbouncer/pgbouncer.ini | 11 +++++++++++ .circleci/pgbouncer/users.txt | 1 + .circleci/test-server.sh | 16 +++++++++++++--- .../queries/horizontal_scale/basic/teardown.yaml | 10 ++++++++++ server/tests-py/test_horizontal_scale.py | 3 +-- 6 files changed, 41 insertions(+), 7 deletions(-) create mode 100644 .circleci/pgbouncer/pgbouncer.ini create mode 100644 .circleci/pgbouncer/users.txt create mode 100644 server/tests-py/queries/horizontal_scale/basic/teardown.yaml diff --git a/.circleci/config.yml b/.circleci/config.yml index 36576786d70b2..bc9ee2e517c15 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -89,14 +89,17 @@ refs: - attach_workspace: at: /build - *wait_for_postgres + - run: + name: Install deps + command: | + apt-get update + apt install --yes pgbouncer jq curl postgresql-client - run: name: Run Python tests environment: HASURA_GRAPHQL_DATABASE_URL: 'postgres://gql_test:@localhost:5432/gql_test' GRAPHQL_ENGINE: '/build/_server_output/graphql-engine' command: | - apt-get update - apt install --yes jq curl OUTPUT_FOLDER=/build/_server_test_output/$PG_VERSION .circleci/test-server.sh - run: name: Generate coverage report diff --git a/.circleci/pgbouncer/pgbouncer.ini b/.circleci/pgbouncer/pgbouncer.ini new file mode 100644 index 0000000000000..1314f91c8090c --- /dev/null +++ b/.circleci/pgbouncer/pgbouncer.ini @@ -0,0 +1,11 @@ +[databases] +hs_hge_test = host=localhost port=5432 dbname=hs_hge_test user=gql_test + +[pgbouncer] +listen_port = 6543 +listen_addr = 127.0.0.1 +logfile = pgbouncer.log +pidfile = pgbouncer.pid +auth_type = md5 +auth_file = pgbouncer/users.txt +admin_users = postgres \ No newline at end of file diff --git a/.circleci/pgbouncer/users.txt b/.circleci/pgbouncer/users.txt new file mode 100644 index 0000000000000..d67de99b740d3 --- /dev/null +++ b/.circleci/pgbouncer/users.txt @@ -0,0 +1 @@ +"postgres" "postgres" \ No newline at end of file diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 5c5f282303d93..0d0c26bc4f87f 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -363,11 +363,12 @@ unset HASURA_GRAPHQL_ADMIN_SECRET echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n" -HASURA_HS_TEST_DB='postgres://gql_test:@localhost:5432/hs_hge_test' -echo "Installing psql" -apt-get update && apt-get install -y postgresql-client +HASURA_HS_TEST_DB='postgres://postgres:postgres@localhost:6543/hs_hge_test' psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" +# start pgbouncer +pgbouncer -d pgbouncer/pgbouncer.ini + # start 1st server "$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! wait_for_port 8080 @@ -381,6 +382,15 @@ wait_for_port 8081 # run test pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py +# Shutdown pgbouncer +psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" + +# start pgbouncer again +pgbouncer -d pgbouncer/pgbouncer.ini + +# run test +pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py + kill -INT $PID kill -INT $HS_PID psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;" diff --git a/server/tests-py/queries/horizontal_scale/basic/teardown.yaml b/server/tests-py/queries/horizontal_scale/basic/teardown.yaml new file mode 100644 index 0000000000000..067a106f0e84a --- /dev/null +++ b/server/tests-py/queries/horizontal_scale/basic/teardown.yaml @@ -0,0 +1,10 @@ +type: bulk +args: +- type: run_sql + args: + sql: | + drop table test_t1; +- type: run_sql + args: + sql: | + drop table test_t2; \ No newline at end of file diff --git a/server/tests-py/test_horizontal_scale.py b/server/tests-py/test_horizontal_scale.py index d37ee29e35272..3b60a4c5e8fc0 100644 --- a/server/tests-py/test_horizontal_scale.py +++ b/server/tests-py/test_horizontal_scale.py @@ -15,12 +15,11 @@ class TestHorizantalScaleBasic(): @pytest.fixture(autouse=True, scope='class') def transact(self, hge_ctx): - self.teardown = {"type": "clear_metadata", "args": {}} self.servers['1'] = hge_ctx.hge_url self.servers['2'] = hge_ctx.hge_scale_url yield # teardown - st_code, resp = hge_ctx.v1q(self.teardown) + st_code, resp = hge_ctx.v1q_f(self.dir() + '/teardown.yaml') assert st_code == 200, resp def test_horizontal_scale_basic(self, hge_ctx): From 4472a21406a702ca8268bbea052cc419591b5e35 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Thu, 7 Mar 2019 19:53:44 +0530 Subject: [PATCH 10/20] fix pgbouncer config path --- .circleci/test-server.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 0d0c26bc4f87f..a8e854f54593e 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -367,7 +367,7 @@ HASURA_HS_TEST_DB='postgres://postgres:postgres@localhost:6543/hs_hge_test' psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" # start pgbouncer -pgbouncer -d pgbouncer/pgbouncer.ini +pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini # start 1st server "$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! @@ -386,11 +386,13 @@ pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-h psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" # start pgbouncer again -pgbouncer -d pgbouncer/pgbouncer.ini +pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini # run test pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py +# Shutdown pgbouncer +psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" kill -INT $PID kill -INT $HS_PID psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;" From 6338341770af974c25354bb6ff45de7504d7094c Mon Sep 17 00:00:00 2001 From: rakeshkky Date: Fri, 8 Mar 2019 19:36:41 +0530 Subject: [PATCH 11/20] improve schema sync logic to support latest pg-client-hs lib --- server/src-exec/Main.hs | 30 +-- server/src-lib/Hasura/Server/App.hs | 4 +- server/src-lib/Hasura/Server/Init.hs | 9 +- server/src-lib/Hasura/Server/Query.hs | 19 +- server/src-lib/Hasura/Server/SchemaUpdate.hs | 182 ++++++++++++------- server/stack.yaml | 2 +- 6 files changed, 140 insertions(+), 106 deletions(-) diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index 7d2ab5a1448c6..ba8cb201da8f2 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -11,7 +11,6 @@ import System.Exit (exitFailure) import qualified Control.Concurrent as C -import qualified Control.Concurrent.STM as STM import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL @@ -100,7 +99,7 @@ printYaml :: (A.ToJSON a) => a -> IO () printYaml = BC.putStrLn . Y.encode mkPGLogger :: Logger -> Q.PGLogger -mkPGLogger (Logger logger) msg = +mkPGLogger (Logger logger) (Q.PLERetryMsg msg) = logger $ PGLog LevelWarn msg main :: IO () @@ -128,31 +127,21 @@ main = do -- log postgres connection info unLogger logger $ connInfoToLog ci - -- create empty cache update events queue - eventsQueue <- STM.newTQueueIO - pool <- Q.initPGPool ci cp pgLogger - -- start postgres cache update events listener thread in background - listenerTId <- C.forkIO $ schemaUpdateEventListener pool logger eventsQueue - unLogger logger $ mkThreadLog listenerTId instanceId TTListener - -- safe init catalog initRes <- initialise logger ci httpManager -- prepare event triggers data prepareEvents logger ci - (app, cacheRef, cacheBuiltTime) <- + (app, cacheRef, cacheInitTime) <- mkWaiApp isoL loggerCtx strfyNum pool httpManager am corsCfg enableConsole enableTelemetry instanceId enabledAPIs - let scRef = _scrCache cacheRef - - -- start cache update events processor thread in background - procTId <- C.forkIO $ schemaUpdateEventProcessor strfyNum pool logger httpManager - eventsQueue cacheRef instanceId cacheBuiltTime - unLogger logger $ mkThreadLog procTId instanceId TTProcessor + -- start a background thread for schema sync + void $ C.forkIO $ startSchemaSync strfyNum pool logger httpManager + cacheRef instanceId cacheInitTime let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings @@ -162,6 +151,7 @@ main = do eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec + let scRef = _scrCache cacheRef unLogger logger $ mkGenericStrLog "event_triggers" "starting workers" void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx @@ -235,14 +225,6 @@ main = do res <- runTx pgLogger ci unlockAllEvents either printErrJExit return res - mkThreadLog threadId instanceId threadType = - let msg = T.pack (show threadType) <> " thread started" - in StartupLog LevelInfo "threads" $ - A.object [ "instance_id" A..= getInstanceId instanceId - , "thread_id" A..= show threadId - , "message" A..= msg - ] - getUniqIds pgLogger ci = do eDbId <- runTx pgLogger ci getDbId dbId <- either printErrJExit return eDbId diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index a34c2b6db608e..b93073a01ed9c 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -314,10 +314,10 @@ mkWaiApp isoLevel loggerCtx strfyNum pool httpManager mode corsCfg pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo httpManager strfyNum pool Q.Serializable $ do buildSchemaCache - fetchLastUpdateTime + liftTx fetchLastUpdate (time, sc) <- either initErrExit return pgResp scRef <- newIORef sc - return (scRef, time) + return (scRef, snd <$> time) cacheLock <- newMVar () diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index eb77a950118d0..183478914a03c 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -23,14 +23,11 @@ import Hasura.Server.Utils import Network.Wai.Handler.Warp newtype InstanceId - = InstanceId {getInstanceId :: UUID.UUID} - deriving (Show, Eq) - -instanceIdToTxt :: InstanceId -> T.Text -instanceIdToTxt = UUID.toText . getInstanceId + = InstanceId {getInstanceId :: T.Text} + deriving (Show, Eq, J.ToJSON, J.FromJSON) mkInstanceId :: IO InstanceId -mkInstanceId = InstanceId <$> UUID.nextRandom +mkInstanceId = (InstanceId . UUID.toText) <$> UUID.nextRandom initErrExit :: (Show e) => e -> IO a initErrExit e = print e >> exitFailure diff --git a/server/src-lib/Hasura/Server/Query.hs b/server/src-lib/Hasura/Server/Query.hs index 5bd1f2959dcdb..66b22b7f12fb9 100644 --- a/server/src-lib/Hasura/Server/Query.hs +++ b/server/src-lib/Hasura/Server/Query.hs @@ -30,8 +30,7 @@ import Hasura.RQL.DML.Returning (encodeJSONVector) import Hasura.RQL.DML.Select import Hasura.RQL.DML.Update import Hasura.RQL.Types -import Hasura.Server.Init (InstanceId (..), - instanceIdToTxt) +import Hasura.Server.Init (InstanceId (..)) import Hasura.Server.Utils import qualified Database.PG.Query as Q @@ -119,16 +118,18 @@ instance HasHttpManager Run where instance HasSQLGenCtx Run where askSQLGenCtx = asks _3 -fetchLastUpdateTime :: Run (Maybe UTCTime) -fetchLastUpdateTime = do - l <- liftTx $ Q.listQE defaultTxErrorHandler +fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime)) +fetchLastUpdate = do + l <- Q.listQE defaultTxErrorHandler [Q.sql| - SELECT occurred_at FROM hdb_catalog.hdb_schema_update_event - ORDER BY id DESC LIMIT 1 + SELECT instance_id::text, occurred_at + FROM hdb_catalog.hdb_schema_update_event + ORDER BY occurred_at DESC LIMIT 1 |] () True case l of [] -> return Nothing - [Identity t] -> return $ Just t + [(instId, occurredAt)] -> + return $ Just (InstanceId instId, occurredAt) -- never happens _ -> throw500 "more than one row returned by query" @@ -139,7 +140,7 @@ recordSchemaUpdate instanceId = hdb_catalog.hdb_schema_update_event (instance_id, occurred_at) VALUES ($1::uuid, DEFAULT) - |] (Identity $ instanceIdToTxt instanceId) True + |] (Identity $ getInstanceId instanceId) True peelRun :: SchemaCache diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index dddd9120ac7ea..ca9eb37f7e5fa 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -1,8 +1,5 @@ module Hasura.Server.SchemaUpdate - ( ThreadType(..) - , schemaUpdateEventListener - , schemaUpdateEventProcessor - ) + (startSchemaSync) where import Hasura.Prelude @@ -12,13 +9,15 @@ import Hasura.RQL.DDL.Schema.Table (buildSchemaCache) import Hasura.RQL.Types import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate) import Hasura.Server.Init (InstanceId (..)) +import Hasura.Server.Logging import Hasura.Server.Query import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH -import Data.UUID +import qualified Control.Concurrent as C +import qualified Control.Concurrent.Async as A import qualified Control.Concurrent.STM as STM import qualified Data.Text as T import qualified Data.Time as UTC @@ -58,20 +57,11 @@ instance ToEngineLog SchemaUpdateEventLog where data EventPayload = EventPayload - { _epInstanceId :: !UUID + { _epInstanceId :: !InstanceId , _epOccurredAt :: !UTC.UTCTime } deriving (Show, Eq) $(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) -data SchemaUpdateEvent - = SUESuccess !EventPayload - | SUEPGReConn - deriving (Show, Eq) - -instance ToJSON SchemaUpdateEvent where - toJSON (SUESuccess payload) = toJSON payload - toJSON SUEPGReConn = String "postgres reconnection" - data ThreadError = TEJsonParse !T.Text | TEQueryError !QErr @@ -81,83 +71,147 @@ $(deriveToJSON } ''ThreadError) +-- | An IO action that enables metadata syncing +startSchemaSync + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime -> IO () +startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do + -- Init events queue + eventsQueue <- STM.newTQueueIO + -- Start listener thread + lAsync <- A.async $ listener strfyNum pool + logger httpMgr eventsQueue cacheRef instanceId cacheInitTime + + -- Start processor thread + pAsync <- A.async $ processor strfyNum pool + logger httpMgr eventsQueue cacheRef instanceId + + void $ A.waitAny [lAsync, pAsync] + -- | An IO action that listens to postgres for events and pushes them to a Queue -schemaUpdateEventListener - :: PG.PGPool +listener + :: Bool + -> PG.PGPool -> Logger - -> STM.TQueue SchemaUpdateEvent - -> IO () -schemaUpdateEventListener pool logger eventsQueue = + -> HTTP.Manager + -> STM.TQueue EventPayload + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime -> IO () +listener strfyNum pool logger httpMgr eventsQueue + cacheRef instanceId cacheInitTime = do + logThreadStartup logger instanceId threadType -- Never exits forever $ do - listenResE <- liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler + listenResE <- + liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler either onError return listenResE + logWarn + C.threadDelay $ 1 * 1000 * 1000 -- 1 second where - notifyHandler = PG.NotifyHandler onReconn onMessage threadType = TTListener + shouldRefresh dbInstId accrdAt = + case cacheInitTime of + Nothing -> True + Just time -> (dbInstId /= instanceId) && accrdAt > time + + refreshCache Nothing = return () + refreshCache (Just (dbInstId, accrdAt)) = + when (shouldRefresh dbInstId accrdAt) $ + refreshSchemaCache strfyNum pool logger httpMgr cacheRef + threadType "reloading schema cache on listen start" + + notifyHandler = \case + PG.PNEOnStart -> do + eRes <- runExceptT $ PG.runTx pool + (PG.Serializable, Nothing) fetchLastUpdate + case eRes of + Left e -> onError e + Right mLastUpd -> refreshCache mLastUpd + + PG.PNEPQNotify notif -> + case eitherDecodeStrict $ PQ.notifyExtra notif of + Left e -> logError logger threadType $ TEJsonParse $ T.pack e + Right payload -> do + logInfo logger threadType $ object ["received_event" .= payload] + -- Push a notify event to Queue + STM.atomically $ STM.writeTQueue eventsQueue payload + onError = logError logger threadType . TEQueryError + logWarn = unLogger logger $ + SchemaUpdateEventLog LevelWarn TTListener $ String + "error occured retrying pg listen after 1 second" - onReconn = do - -- emit postgres reconnection event - let event = SUEPGReConn - logInfo logger threadType $ object ["received_event" .= event] - STM.atomically $ STM.writeTQueue eventsQueue event - - -- Postgres notification handler - onMessage notif = - case eitherDecodeStrict $ PQ.notifyExtra notif of - Left e -> logError logger threadType $ TEJsonParse $ T.pack e - Right payload -> do - let newEvent = SUESuccess payload - logInfo logger threadType $ object ["received_event" .= newEvent] - -- Push a success event to Queue along with event payload - STM.atomically $ STM.writeTQueue eventsQueue newEvent -- | An IO action that processes events from Queue -schemaUpdateEventProcessor +processor :: Bool -> PG.PGPool -> Logger -> HTTP.Manager - -> STM.TQueue SchemaUpdateEvent + -> STM.TQueue EventPayload -> SchemaCacheRef - -> InstanceId - -> Maybe UTC.UTCTime - -> IO () -schemaUpdateEventProcessor strfyNum pool logger httpManager - eventsQueue cacheRef instanceId cacheInit = + -> InstanceId -> IO () +processor strfyNum pool logger httpMgr eventsQueue + cacheRef instanceId = do + logThreadStartup logger instanceId threadType -- Never exits forever $ do event <- STM.atomically $ STM.readTQueue eventsQueue logInfo logger threadType $ object ["processed_event" .= event] - when (shouldReload event) $ do - -- Reload schema cache from catalog - resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $ - peelRun emptySchemaCache adminUserInfo - httpManager strfyNum pool PG.Serializable buildSchemaCache - case resE of - Left e -> logError logger threadType $ TEQueryError e - Right _ -> - logInfo logger threadType $ - object ["message" .= ("schema cache reloaded" :: T.Text)] + when (shouldReload event) $ + refreshSchemaCache strfyNum pool logger httpMgr cacheRef + threadType "schema cache reloaded" where threadType = TTProcessor - -- If postgres reconnect happens reload schema cache - shouldReload SUEPGReConn = True - -- If event is from another server and occurred after - -- init schema cache built then reload - shouldReload (SUESuccess payload) = - (_epInstanceId payload /= getInstanceId instanceId) - && maybe True (withCacheInit $ _epOccurredAt payload) cacheInit + -- If event is from another server + shouldReload payload = _epInstanceId payload /= instanceId - withCacheInit occurredAt initTime = occurredAt > initTime +logThreadStartup + :: Show a + => Logger + -> InstanceId + -> a -> IO () +logThreadStartup logger instanceId threadType = + unLogger logger threadLog + where + threadLog = + let msg = T.pack (show threadType) <> " thread started" + in StartupLog LevelInfo "threads" $ + object [ "instance_id" .= getInstanceId instanceId + , "message" .= msg + ] + +refreshSchemaCache + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> SchemaCacheRef + -> ThreadType + -> T.Text -> IO () +refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do + -- Reload schema cache from catalog + resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $ + peelRun emptySchemaCache adminUserInfo + httpManager strfyNum pool PG.Serializable buildSchemaCache + case resE of + Left e -> logError logger threadType $ TEQueryError e + Right _ -> + logInfo logger threadType $ object ["message" .= msg] logInfo :: Logger -> ThreadType -> Value -> IO () logInfo logger threadType val = unLogger logger $ SchemaUpdateEventLog LevelInfo threadType val logError :: ToJSON a => Logger -> ThreadType -> a -> IO () -logError logger threadType err = unLogger logger $ - SchemaUpdateEventLog LevelError threadType $ object ["error" .= toJSON err] +logError logger threadType err = + unLogger logger $ SchemaUpdateEventLog LevelError threadType $ + object ["error" .= toJSON err] diff --git a/server/stack.yaml b/server/stack.yaml index d74b8c16d3668..d25e2d1db5532 100644 --- a/server/stack.yaml +++ b/server/stack.yaml @@ -17,7 +17,7 @@ packages: extra-deps: # use https URLs so that build systems can clone these repos - git: https://github.com/rakeshkky/pg-client-hs.git - commit: bdeb884279100d0417bb697e0e05017cc7c320df + commit: ed3dcfb864a2a23ac6c22ed947a5095b0d03170d - git: https://github.com/hasura/graphql-parser-hs.git commit: ff95d9a96aa5ef9e5390f8712958e4118e3831f6 - ginger-0.8.1.0 From 7f98d5fabd6e8bd3cd3f1352743a4f210245c166 Mon Sep 17 00:00:00 2001 From: rakeshkky Date: Mon, 11 Mar 2019 11:34:45 +0530 Subject: [PATCH 12/20] improve invoking schema sync threads & improve thread logging --- server/src-exec/Main.hs | 4 +- server/src-lib/Hasura/Server/SchemaUpdate.hs | 61 +++++++++----------- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index ba8cb201da8f2..188d5d3f848ff 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -140,8 +140,8 @@ main = do corsCfg enableConsole enableTelemetry instanceId enabledAPIs -- start a background thread for schema sync - void $ C.forkIO $ startSchemaSync strfyNum pool logger httpManager - cacheRef instanceId cacheInitTime + startSchemaSync strfyNum pool logger httpManager + cacheRef instanceId cacheInitTime let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index ca9eb37f7e5fa..0c6993ae857a1 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -17,7 +17,6 @@ import Data.Aeson.Casing import Data.Aeson.TH import qualified Control.Concurrent as C -import qualified Control.Concurrent.Async as A import qualified Control.Concurrent.STM as STM import qualified Data.Text as T import qualified Data.Time as UTC @@ -38,22 +37,22 @@ instance Show ThreadType where show TTProcessor = "processor" -data SchemaUpdateEventLog - = SchemaUpdateEventLog +data SchemaSyncThreadLog + = SchemaSyncThreadLog { suelLogLevel :: !LogLevel , suelThreadType :: !ThreadType , suelInfo :: !Value } deriving (Show, Eq) -instance ToJSON SchemaUpdateEventLog where - toJSON (SchemaUpdateEventLog _ t info) = +instance ToJSON SchemaSyncThreadLog where + toJSON (SchemaSyncThreadLog _ t info) = object [ "thread_type" .= show t , "info" .= info ] -instance ToEngineLog SchemaUpdateEventLog where +instance ToEngineLog SchemaSyncThreadLog where toEngineLog threadLog = - (suelLogLevel threadLog, "schema_update_event", toJSON threadLog) + (suelLogLevel threadLog, "schema_sync_thread", toJSON threadLog) data EventPayload = EventPayload @@ -83,15 +82,26 @@ startSchemaSync startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do -- Init events queue eventsQueue <- STM.newTQueueIO + -- Start listener thread - lAsync <- A.async $ listener strfyNum pool + lTId <- C.forkIO $ listener strfyNum pool logger httpMgr eventsQueue cacheRef instanceId cacheInitTime + logThreadStarted TTListener lTId -- Start processor thread - pAsync <- A.async $ processor strfyNum pool + pTId <- C.forkIO $ processor strfyNum pool logger httpMgr eventsQueue cacheRef instanceId + logThreadStarted TTProcessor pTId - void $ A.waitAny [lAsync, pAsync] + where + logThreadStarted threadType threadId = + let msg = T.pack (show threadType) <> " thread started" + in unLogger logger $ + StartupLog LevelInfo "threads" $ + object [ "instance_id" .= getInstanceId instanceId + , "thread_id" .= show threadId + , "message" .= msg + ] -- | An IO action that listens to postgres for events and pushes them to a Queue listener @@ -104,8 +114,7 @@ listener -> InstanceId -> Maybe UTC.UTCTime -> IO () listener strfyNum pool logger httpMgr eventsQueue - cacheRef instanceId cacheInitTime = do - logThreadStartup logger instanceId threadType + cacheRef instanceId cacheInitTime = -- Never exits forever $ do listenResE <- @@ -125,7 +134,7 @@ listener strfyNum pool logger httpMgr eventsQueue refreshCache (Just (dbInstId, accrdAt)) = when (shouldRefresh dbInstId accrdAt) $ refreshSchemaCache strfyNum pool logger httpMgr cacheRef - threadType "reloading schema cache on listen start" + threadType "schema cache reloaded after postgres listen init" notifyHandler = \case PG.PNEOnStart -> do @@ -145,8 +154,8 @@ listener strfyNum pool logger httpMgr eventsQueue onError = logError logger threadType . TEQueryError logWarn = unLogger logger $ - SchemaUpdateEventLog LevelWarn TTListener $ String - "error occured retrying pg listen after 1 second" + SchemaSyncThreadLog LevelWarn TTListener $ String + "error occurred, retrying postgres listen after 1 second" -- | An IO action that processes events from Queue @@ -159,8 +168,7 @@ processor -> SchemaCacheRef -> InstanceId -> IO () processor strfyNum pool logger httpMgr eventsQueue - cacheRef instanceId = do - logThreadStartup logger instanceId threadType + cacheRef instanceId = -- Never exits forever $ do event <- STM.atomically $ STM.readTQueue eventsQueue @@ -174,21 +182,6 @@ processor strfyNum pool logger httpMgr eventsQueue -- If event is from another server shouldReload payload = _epInstanceId payload /= instanceId -logThreadStartup - :: Show a - => Logger - -> InstanceId - -> a -> IO () -logThreadStartup logger instanceId threadType = - unLogger logger threadLog - where - threadLog = - let msg = T.pack (show threadType) <> " thread started" - in StartupLog LevelInfo "threads" $ - object [ "instance_id" .= getInstanceId instanceId - , "message" .= msg - ] - refreshSchemaCache :: Bool -> PG.PGPool @@ -209,9 +202,9 @@ refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do logInfo :: Logger -> ThreadType -> Value -> IO () logInfo logger threadType val = unLogger logger $ - SchemaUpdateEventLog LevelInfo threadType val + SchemaSyncThreadLog LevelInfo threadType val logError :: ToJSON a => Logger -> ThreadType -> a -> IO () logError logger threadType err = - unLogger logger $ SchemaUpdateEventLog LevelError threadType $ + unLogger logger $ SchemaSyncThreadLog LevelError threadType $ object ["error" .= toJSON err] From 4a74839aa5b4e18540effeefd731cc9c58687f35 Mon Sep 17 00:00:00 2001 From: Shahidh K Muhammed Date: Mon, 11 Mar 2019 12:45:48 +0530 Subject: [PATCH 13/20] add pgbouncer user --- .circleci/test-server.sh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index a8e854f54593e..403e0f483178d 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -366,8 +366,11 @@ echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n" HASURA_HS_TEST_DB='postgres://postgres:postgres@localhost:6543/hs_hge_test' psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" +# create pgbouncer user +useradd pgbouncer + # start pgbouncer -pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini +su -c "pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini" pgbouncer # start 1st server "$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! @@ -386,7 +389,7 @@ pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-h psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" # start pgbouncer again -pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini +su -c "pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini" pgbouncer # run test pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py From e5b9b8540e96b9d121ec98f65f9a50a8260a7f6d Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 15:15:05 +0530 Subject: [PATCH 14/20] add user flag to pgbouncer command --- .circleci/pgbouncer/pgbouncer.ini | 2 +- .circleci/test-server.sh | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.circleci/pgbouncer/pgbouncer.ini b/.circleci/pgbouncer/pgbouncer.ini index 1314f91c8090c..cfdfc9e341d41 100644 --- a/.circleci/pgbouncer/pgbouncer.ini +++ b/.circleci/pgbouncer/pgbouncer.ini @@ -7,5 +7,5 @@ listen_addr = 127.0.0.1 logfile = pgbouncer.log pidfile = pgbouncer.pid auth_type = md5 -auth_file = pgbouncer/users.txt +auth_file = .circleci/pgbouncer/users.txt admin_users = postgres \ No newline at end of file diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 403e0f483178d..e42d748fa2f1c 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -370,7 +370,7 @@ psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" useradd pgbouncer # start pgbouncer -su -c "pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini" pgbouncer +pgbouncer -u pgbouncer -d .circleci/pgbouncer/pgbouncer.ini # start 1st server "$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! @@ -389,7 +389,10 @@ pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-h psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" # start pgbouncer again -su -c "pgbouncer -d /root/graphql-engine/.circleci/pgbouncer/pgbouncer.ini" pgbouncer +pgbouncer -u pgbouncer .circleci/pgbouncer/pgbouncer.ini + +# sleep for 2 seconds +sleep 2 # run test pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py From d0df8ac763ef63d2fda6e0a982d36020d6c943c0 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 16:09:42 +0530 Subject: [PATCH 15/20] fix tests --- .circleci/pgbouncer/pgbouncer.ini | 6 +++--- .circleci/test-server.sh | 13 +++++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/.circleci/pgbouncer/pgbouncer.ini b/.circleci/pgbouncer/pgbouncer.ini index cfdfc9e341d41..7aeb5e9c495b8 100644 --- a/.circleci/pgbouncer/pgbouncer.ini +++ b/.circleci/pgbouncer/pgbouncer.ini @@ -4,8 +4,8 @@ hs_hge_test = host=localhost port=5432 dbname=hs_hge_test user=gql_test [pgbouncer] listen_port = 6543 listen_addr = 127.0.0.1 -logfile = pgbouncer.log -pidfile = pgbouncer.pid +logfile = pgbouncer/pgbouncer.log +pidfile = pgbouncer/pgbouncer.pid auth_type = md5 -auth_file = .circleci/pgbouncer/users.txt +auth_file = pgbouncer/users.txt admin_users = postgres \ No newline at end of file diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index e42d748fa2f1c..ee054f636c324 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -368,9 +368,13 @@ psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" # create pgbouncer user useradd pgbouncer +cd $CIRCLECI_FOLDER +chown -R pgbouncer:pgbouncer pgbouncer # start pgbouncer -pgbouncer -u pgbouncer -d .circleci/pgbouncer/pgbouncer.ini +pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini + +cd $PYTEST_ROOT # start 1st server "$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! @@ -388,8 +392,12 @@ pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-h # Shutdown pgbouncer psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" +cd $CIRCLECI_FOLDER + # start pgbouncer again -pgbouncer -u pgbouncer .circleci/pgbouncer/pgbouncer.ini +pgbouncer -u pgbouncer pgbouncer/pgbouncer.ini + +cd $PYTEST_ROOT # sleep for 2 seconds sleep 2 @@ -399,6 +407,7 @@ pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-h # Shutdown pgbouncer psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" + kill -INT $PID kill -INT $HS_PID psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;" From a5cf3a3e383bee7fa1fc919618f9783acca17187 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 17:39:47 +0530 Subject: [PATCH 16/20] add sleep for 30s --- .circleci/test-server.sh | 6 +++--- .../tests-py/queries/horizontal_scale/basic/teardown.yaml | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index ee054f636c324..dd493a306988f 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -382,7 +382,7 @@ wait_for_port 8080 # start 2nd server "$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve \ - --server-port 8081 --server-host 0.0.0.0 \ + --server-port 8081 \ >> "$OUTPUT_FOLDER/hs-graphql-engine.log" 2>&1 & HS_PID=$! wait_for_port 8081 @@ -399,8 +399,8 @@ pgbouncer -u pgbouncer pgbouncer/pgbouncer.ini cd $PYTEST_ROOT -# sleep for 2 seconds -sleep 2 +# sleep for 30 seconds +sleep 30 # run test pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py diff --git a/server/tests-py/queries/horizontal_scale/basic/teardown.yaml b/server/tests-py/queries/horizontal_scale/basic/teardown.yaml index 067a106f0e84a..e019e15231abb 100644 --- a/server/tests-py/queries/horizontal_scale/basic/teardown.yaml +++ b/server/tests-py/queries/horizontal_scale/basic/teardown.yaml @@ -1,10 +1,12 @@ type: bulk args: +- type: clear_metadata + args: {} - type: run_sql args: sql: | - drop table test_t1; + drop table test_t2; - type: run_sql args: sql: | - drop table test_t2; \ No newline at end of file + drop table test_t1; \ No newline at end of file From db55aa70545e096c2bc86f79e8112b3fd709c1bb Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 17:41:44 +0530 Subject: [PATCH 17/20] add 30s delay for test validation --- server/tests-py/test_horizontal_scale.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/tests-py/test_horizontal_scale.py b/server/tests-py/test_horizontal_scale.py index 3b60a4c5e8fc0..bbfff0fe2c64c 100644 --- a/server/tests-py/test_horizontal_scale.py +++ b/server/tests-py/test_horizontal_scale.py @@ -37,8 +37,8 @@ def test_horizontal_scale_basic(self, hge_ctx): resp = response.json() assert st_code == 200, resp - # wait for x sec - time.sleep(0.3) + # wait for 30 sec + time.sleep(30) # validate data response = hge_ctx.http.post( self.servers[step['validate']['server']] + "/v1alpha1/graphql", From 6a8c4a133f1da00f3123121aaf3fe11760309167 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 18:12:25 +0530 Subject: [PATCH 18/20] fix tests --- .circleci/test-server.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index dd493a306988f..1a7fcdb4dc35e 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -390,7 +390,7 @@ wait_for_port 8081 pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py # Shutdown pgbouncer -psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" +psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true cd $CIRCLECI_FOLDER @@ -406,7 +406,7 @@ sleep 30 pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py # Shutdown pgbouncer -psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" +psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true kill -INT $PID kill -INT $HS_PID From 81edef26262824efa9332e4572f164eb091a81e1 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 18:16:05 +0530 Subject: [PATCH 19/20] fix tests --- .circleci/test-server.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 1a7fcdb4dc35e..60295b637f01c 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -128,6 +128,8 @@ export HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES=true PID="" WH_PID="" +HS_PID="" + trap stop_services ERR trap stop_services INT @@ -408,8 +410,8 @@ pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-h # Shutdown pgbouncer psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true -kill -INT $PID -kill -INT $HS_PID +kill $PID +kill $HS_PID psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;" sleep 4 combine_hpc_reports From 524bf9148cf0f01a0dfff8a0e3569c90020ee896 Mon Sep 17 00:00:00 2001 From: Aravind Shankar Date: Mon, 11 Mar 2019 18:44:19 +0530 Subject: [PATCH 20/20] run pgbouncer as daemon --- .circleci/test-server.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index 60295b637f01c..ab80fcbdc97ac 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -397,7 +397,7 @@ psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || t cd $CIRCLECI_FOLDER # start pgbouncer again -pgbouncer -u pgbouncer pgbouncer/pgbouncer.ini +pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini cd $PYTEST_ROOT