diff --git a/.circleci/config.yml b/.circleci/config.yml index 605f311fba3e7..a693f888b4a95 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -101,14 +101,17 @@ refs: keys: - server-deps-cache-{{ checksum "server/graphql-engine.cabal" }}-{{ checksum "server/stack.yaml" }} - *wait_for_postgres + - run: + name: Install deps + command: | + apt-get update + apt install --yes pgbouncer jq curl postgresql-client - run: name: Run Python tests environment: HASURA_GRAPHQL_DATABASE_URL: 'postgres://gql_test:@localhost:5432/gql_test' GRAPHQL_ENGINE: '/build/_server_output/graphql-engine' command: | - apt-get update - apt install --yes jq curl OUTPUT_FOLDER=/build/_server_test_output/$PG_VERSION .circleci/test-server.sh - run: name: Generate coverage report diff --git a/.circleci/pgbouncer/pgbouncer.ini b/.circleci/pgbouncer/pgbouncer.ini new file mode 100644 index 0000000000000..7aeb5e9c495b8 --- /dev/null +++ b/.circleci/pgbouncer/pgbouncer.ini @@ -0,0 +1,11 @@ +[databases] +hs_hge_test = host=localhost port=5432 dbname=hs_hge_test user=gql_test + +[pgbouncer] +listen_port = 6543 +listen_addr = 127.0.0.1 +logfile = pgbouncer/pgbouncer.log +pidfile = pgbouncer/pgbouncer.pid +auth_type = md5 +auth_file = pgbouncer/users.txt +admin_users = postgres \ No newline at end of file diff --git a/.circleci/pgbouncer/users.txt b/.circleci/pgbouncer/users.txt new file mode 100644 index 0000000000000..d67de99b740d3 --- /dev/null +++ b/.circleci/pgbouncer/users.txt @@ -0,0 +1 @@ +"postgres" "postgres" \ No newline at end of file diff --git a/.circleci/test-server.sh b/.circleci/test-server.sh index f711e4f571e0c..ab80fcbdc97ac 100755 --- a/.circleci/test-server.sh +++ b/.circleci/test-server.sh @@ -128,6 +128,8 @@ export HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES=true PID="" WH_PID="" +HS_PID="" + trap stop_services ERR trap stop_services INT @@ -356,4 +358,66 @@ if [ "$RUN_WEBHOOK_TESTS" == "true" ] ; then fi +# horizontal scale test +unset HASURA_GRAPHQL_AUTH_HOOK +unset HASURA_GRAPHQL_AUTH_HOOK_MODE +unset HASURA_GRAPHQL_ADMIN_SECRET + +echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n" + +HASURA_HS_TEST_DB='postgres://postgres:postgres@localhost:6543/hs_hge_test' +psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;" + +# create pgbouncer user +useradd pgbouncer +cd $CIRCLECI_FOLDER +chown -R pgbouncer:pgbouncer pgbouncer + +# start pgbouncer +pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini + +cd $PYTEST_ROOT + +# start 1st server +"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$! +wait_for_port 8080 + +# start 2nd server +"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve \ + --server-port 8081 \ + >> "$OUTPUT_FOLDER/hs-graphql-engine.log" 2>&1 & HS_PID=$! +wait_for_port 8081 + +# run test +pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py + +# Shutdown pgbouncer +psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true + +cd $CIRCLECI_FOLDER + +# start pgbouncer again +pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini + +cd $PYTEST_ROOT + +# sleep for 30 seconds +sleep 30 + +# run test +pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py + +# Shutdown pgbouncer +psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true + +kill $PID +kill $HS_PID +psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;" +sleep 4 +combine_hpc_reports +unset HASURA_HS_TEST_DB + + +# end horizontal scale test + mv graphql-engine-combined.tix "$OUTPUT_FOLDER/graphql-engine.tix" || true diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index bec9393f47b7c..76e0f1830a2d3 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -150,6 +150,7 @@ library , Hasura.Server.Version , Hasura.Server.CheckUpdates , Hasura.Server.Telemetry + , Hasura.Server.SchemaUpdate , Hasura.RQL.Types , Hasura.RQL.Instances , Hasura.RQL.Types.SchemaCache @@ -322,6 +323,7 @@ executable graphql-engine , wreq , connection , string-conversions + , uuid other-modules: Ops , Migrate diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index e0242702b6d1d..188d5d3f848ff 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -9,6 +9,7 @@ import Options.Applicative import System.Environment (getEnvironment, lookupEnv) import System.Exit (exitFailure) + import qualified Control.Concurrent as C import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as BC @@ -21,17 +22,17 @@ import qualified Network.HTTP.Client.TLS as HTTP import qualified Network.Wai.Handler.Warp as Warp import Hasura.Events.Lib -import Hasura.Logging (Logger (..), defaultLoggerSettings, - mkLogger, mkLoggerCtx) +import Hasura.Logging import Hasura.Prelude import Hasura.RQL.DDL.Metadata (fetchMetadata) -import Hasura.RQL.Types (QErr, adminUserInfo, - emptySchemaCache) -import Hasura.Server.App (mkWaiApp) +import Hasura.RQL.Types (adminUserInfo, emptySchemaCache) +import Hasura.Server.App (SchemaCacheRef (..), mkWaiApp) import Hasura.Server.Auth import Hasura.Server.CheckUpdates (checkForUpdates) import Hasura.Server.Init +import Hasura.Server.Logging import Hasura.Server.Query (peelRun) +import Hasura.Server.SchemaUpdate import Hasura.Server.Telemetry import Hasura.Server.Version (currentVersion) @@ -97,13 +98,19 @@ printJSON = BLC.putStrLn . A.encode printYaml :: (A.ToJSON a) => a -> IO () printYaml = BC.putStrLn . Y.encode +mkPGLogger :: Logger -> Q.PGLogger +mkPGLogger (Logger logger) (Q.PLERetryMsg msg) = + logger $ PGLog LevelWarn msg + main :: IO () main = do (HGEOptionsG rci hgeCmd) <- parseArgs -- global http manager httpManager <- HTTP.newManager HTTP.tlsManagerSettings loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True + instanceId <- mkInstanceId let logger = mkLogger loggerCtx + pgLogger = mkPGLogger logger case hgeCmd of HCServe so@(ServeOptions port host cp isoL mAdminSecret mAuthHook mJwtSecret mUnAuthRole corsCfg enableConsole enableTelemetry strfyNum enabledAPIs) -> do @@ -120,15 +127,21 @@ main = do -- log postgres connection info unLogger logger $ connInfoToLog ci + pool <- Q.initPGPool ci cp pgLogger + -- safe init catalog initRes <- initialise logger ci httpManager -- prepare event triggers data prepareEvents logger ci - pool <- Q.initPGPool ci cp - (app, cacheRef) <- mkWaiApp isoL loggerCtx pool httpManager - strfyNum am corsCfg enableConsole enableTelemetry enabledAPIs + (app, cacheRef, cacheInitTime) <- + mkWaiApp isoL loggerCtx strfyNum pool httpManager am + corsCfg enableConsole enableTelemetry instanceId enabledAPIs + + -- start a background thread for schema sync + startSchemaSync strfyNum pool logger httpManager + cacheRef instanceId cacheInitTime let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings @@ -138,9 +151,10 @@ main = do eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec + let scRef = _scrCache cacheRef unLogger logger $ mkGenericStrLog "event_triggers" "starting workers" - void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool cacheRef eventEngineCtx + void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx -- start a background thread to check for updates void $ C.forkIO $ checkForUpdates loggerCtx httpManager @@ -148,7 +162,7 @@ main = do -- start a background thread for telemetry when enableTelemetry $ do unLogger logger $ mkGenericStrLog "telemetry" telemetryNotice - void $ C.forkIO $ runTelemetry logger httpManager cacheRef initRes + void $ C.forkIO $ runTelemetry logger httpManager scRef initRes unLogger logger $ mkGenericStrLog "server" "starting API server" @@ -156,30 +170,29 @@ main = do HCExport -> do ci <- procConnInfo rci - res <- runTx ci fetchMetadata + res <- runTx pgLogger ci fetchMetadata either printErrJExit printJSON res HCClean -> do ci <- procConnInfo rci - res <- runTx ci cleanCatalog + res <- runTx pgLogger ci cleanCatalog either printErrJExit (const cleanSuccess) res HCExecute -> do queryBs <- BL.getContents ci <- procConnInfo rci - res <- runAsAdmin ci httpManager $ execQuery queryBs + res <- runAsAdmin pgLogger ci httpManager $ execQuery queryBs either printErrJExit BLC.putStrLn res HCVersion -> putStrLn $ "Hasura GraphQL Engine: " ++ T.unpack currentVersion where - runTx :: Q.ConnInfo -> Q.TxE QErr a -> IO (Either QErr a) - runTx ci tx = do - pool <- getMinimalPool ci + runTx pgLogger ci tx = do + pool <- getMinimalPool pgLogger ci runExceptT $ Q.runTx pool (Q.Serializable, Nothing) tx - runAsAdmin ci httpManager m = do - pool <- getMinimalPool ci + runAsAdmin pgLogger ci httpManager m = do + pool <- getMinimalPool pgLogger ci res <- runExceptT $ peelRun emptySchemaCache adminUserInfo httpManager False pool Q.Serializable m return $ fmap fst res @@ -188,31 +201,32 @@ main = do either (printErrExit . connInfoErrModifier) return $ mkConnInfo rci - getMinimalPool ci = do + getMinimalPool pgLogger ci = do let connParams = Q.defaultConnParams { Q.cpConns = 1 } - Q.initPGPool ci connParams + Q.initPGPool ci connParams pgLogger initialise (Logger logger) ci httpMgr = do currentTime <- getCurrentTime - + let pgLogger = mkPGLogger $ Logger logger -- initialise the catalog - initRes <- runAsAdmin ci httpMgr $ initCatalogSafe currentTime + initRes <- runAsAdmin pgLogger ci httpMgr $ initCatalogSafe currentTime either printErrJExit (logger . mkGenericStrLog "db_init") initRes -- migrate catalog if necessary - migRes <- runAsAdmin ci httpMgr $ migrateCatalog currentTime + migRes <- runAsAdmin pgLogger ci httpMgr $ migrateCatalog currentTime either printErrJExit (logger . mkGenericStrLog "db_migrate") migRes -- generate and retrieve uuids - getUniqIds ci + getUniqIds pgLogger ci prepareEvents (Logger logger) ci = do + let pgLogger = mkPGLogger $ Logger logger logger $ mkGenericStrLog "event_triggers" "preparing data" - res <- runTx ci unlockAllEvents + res <- runTx pgLogger ci unlockAllEvents either printErrJExit return res - getUniqIds ci = do - eDbId <- runTx ci getDbId + getUniqIds pgLogger ci = do + eDbId <- runTx pgLogger ci getDbId dbId <- either printErrJExit return eDbId fp <- liftIO generateFingerprint return (dbId, fp) diff --git a/server/src-exec/Migrate.hs b/server/src-exec/Migrate.hs index 955ed446af3b5..051fe1e3c5258 100644 --- a/server/src-exec/Migrate.hs +++ b/server/src-exec/Migrate.hs @@ -19,7 +19,7 @@ import qualified Data.Yaml.TH as Y import qualified Database.PG.Query as Q curCatalogVer :: T.Text -curCatalogVer = "10" +curCatalogVer = "11" migrateMetadata :: ( MonadTx m @@ -251,6 +251,13 @@ from9To10 = liftTx $ do $(Q.sqlFromFile "src-rsr/migrate_from_9_to_10.sql") return () +from10To11 :: (MonadTx m) => m () +from10To11 = liftTx $ do + -- Migrate database + Q.Discard () <- Q.multiQE defaultTxErrorHandler + $(Q.sqlFromFile "src-rsr/migrate_from_10_to_11.sql") + return () + migrateCatalog :: ( MonadTx m , CacheRWM m @@ -274,10 +281,13 @@ migrateCatalog migrationTime = do | preVer == "7" -> from7ToCurrent | preVer == "8" -> from8ToCurrent | preVer == "9" -> from9ToCurrent + | preVer == "10" -> from10ToCurrent | otherwise -> throw400 NotSupported $ "unsupported version : " <> preVer where - from9ToCurrent = from9To10 >> postMigrate + from10ToCurrent = from10To11 >> postMigrate + + from9ToCurrent = from9To10 >> from10ToCurrent from8ToCurrent = from8To9 >> from9ToCurrent diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 31dffaa61fa59..b93073a01ed9c 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -88,17 +88,39 @@ mkConsoleHTML path authMode enableTelemetry = errMsg = "console template rendering failed: " ++ show errs +data SchemaCacheRef + = SchemaCacheRef + { _scrLock :: MVar () + , _scrCache :: IORef SchemaCache + } + +withSCUpdate + :: (MonadIO m, MonadError e m) + => SchemaCacheRef -> m (a, SchemaCache) -> m a +withSCUpdate scr action = do + acquireLock + (res, newSC) <- action `catchError` onError + -- update schemacache in IO reference + liftIO $ writeIORef cacheRef newSC + releaseLock + return res + where + SchemaCacheRef lk cacheRef = scr + onError e = releaseLock >> throwError e + acquireLock = liftIO $ takeMVar lk + releaseLock = liftIO $ putMVar lk () + data ServerCtx = ServerCtx { scIsolation :: Q.TxIsolation , scPGPool :: Q.PGPool , scLogger :: L.Logger - , scCacheRef :: IORef SchemaCache - , scCacheLock :: MVar () + , scCacheRef :: SchemaCacheRef , scAuthMode :: AuthMode , scManager :: HTTP.Manager , scStringifyNum :: Bool , scEnabledAPIs :: S.HashSet API + , scInstanceId :: InstanceId } data HandlerCtx @@ -135,7 +157,7 @@ buildQCtx :: Handler QCtx buildQCtx = do scRef <- scCacheRef . hcServerCtx <$> ask userInfo <- asks hcUser - cache <- liftIO $ readIORef scRef + cache <- liftIO $ readIORef $ _scrCache scRef strfyNum <- scStringifyNum . hcServerCtx <$> ask return $ QCtx userInfo cache $ SQLGenCtx strfyNum @@ -198,39 +220,27 @@ mkSpockAction qErrEncoder serverCtx handler = do uncurry setHeader jsonHeader lazyBytes resp -withLock :: (MonadIO m, MonadError e m) - => MVar () -> m a -> m a -withLock lk action = do - acquireLock - res <- action `catchError` onError - releaseLock - return res - where - onError e = releaseLock >> throwError e - acquireLock = liftIO $ takeMVar lk - releaseLock = liftIO $ putMVar lk () - v1QueryHandler :: RQLQuery -> Handler BL.ByteString v1QueryHandler query = do - lk <- scCacheLock . hcServerCtx <$> ask - bool (fst <$> dbAction) (withLock lk dbActionReload) $ + scRef <- scCacheRef . hcServerCtx <$> ask + bool (fst <$> dbAction) (withSCUpdate scRef dbActionReload) $ queryNeedsReload query where -- Hit postgres dbAction = do userInfo <- asks hcUser scRef <- scCacheRef . hcServerCtx <$> ask - schemaCache <- liftIO $ readIORef scRef + schemaCache <- liftIO $ readIORef $ _scrCache scRef httpMgr <- scManager . hcServerCtx <$> ask strfyNum <- scStringifyNum . hcServerCtx <$> ask pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask - runQuery pool isoL userInfo schemaCache httpMgr strfyNum query + instanceId <- scInstanceId . hcServerCtx <$> ask + runQuery pool isoL instanceId userInfo schemaCache httpMgr strfyNum query -- Also update the schema cache dbActionReload = do (resp, newSc) <- dbAction - scRef <- scCacheRef . hcServerCtx <$> ask httpMgr <- scManager . hcServerCtx <$> ask --FIXME: should we be fetching the remote schema again? if not how do we get the remote schema? newGCtxMap <- GS.mkGCtxMap (scTables newSc) (scFunctions newSc) @@ -238,8 +248,7 @@ v1QueryHandler query = do mergeSchemas (scRemoteResolvers newSc) newGCtxMap httpMgr let newSc' = newSc { scGCtxMap = mergedGCtxMap, scDefaultRemoteGCtx = defGCtx } - liftIO $ writeIORef scRef newSc' - return resp + return (resp, newSc') v1Alpha1GQHandler :: GH.GraphQLRequest -> Handler BL.ByteString v1Alpha1GQHandler query = do @@ -248,7 +257,7 @@ v1Alpha1GQHandler query = do reqHeaders <- asks hcReqHeaders manager <- scManager . hcServerCtx <$> ask scRef <- scCacheRef . hcServerCtx <$> ask - sc <- liftIO $ readIORef scRef + sc <- liftIO $ readIORef $ _scrCache scRef pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask strfyNum <- scStringifyNum . hcServerCtx <$> ask @@ -258,7 +267,7 @@ gqlExplainHandler :: GE.GQLExplain -> Handler BL.ByteString gqlExplainHandler query = do onlyAdmin scRef <- scCacheRef . hcServerCtx <$> ask - sc <- liftIO $ readIORef scRef + sc <- liftIO $ readIORef $ _scrCache scRef pool <- scPGPool . hcServerCtx <$> ask isoL <- scIsolation . hcServerCtx <$> ask strfyNum <- scStringifyNum . hcServerCtx <$> ask @@ -294,28 +303,27 @@ legacyQueryHandler tn queryType = mkWaiApp - :: Q.TxIsolation - -> L.LoggerCtx - -> Q.PGPool - -> HTTP.Manager - -> Bool - -> AuthMode - -> CorsConfig - -> Bool - -> Bool - -> S.HashSet API - -> IO (Wai.Application, IORef SchemaCache) -mkWaiApp isoLevel loggerCtx pool httpManager strfyNum mode corsCfg enableConsole enableTelemetry apis = do - cacheRef <- do + :: Q.TxIsolation -> L.LoggerCtx -> Bool + -> Q.PGPool -> HTTP.Manager -> AuthMode + -> CorsConfig -> Bool -> Bool + -> InstanceId -> S.HashSet API + -> IO (Wai.Application, SchemaCacheRef, Maybe UTCTime) +mkWaiApp isoLevel loggerCtx strfyNum pool httpManager mode corsCfg + enableConsole enableTelemetry instanceId apis = do + (cacheRef, cacheBuiltTime) <- do pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo - httpManager strfyNum pool Q.Serializable buildSchemaCache - either initErrExit return pgResp >>= newIORef . snd + httpManager strfyNum pool Q.Serializable $ do + buildSchemaCache + liftTx fetchLastUpdate + (time, sc) <- either initErrExit return pgResp + scRef <- newIORef sc + return (scRef, snd <$> time) cacheLock <- newMVar () - let serverCtx = - ServerCtx isoLevel pool (L.mkLogger loggerCtx) cacheRef - cacheLock mode httpManager strfyNum apis + let schemaCacheRef = SchemaCacheRef cacheLock cacheRef + serverCtx = ServerCtx isoLevel pool (L.mkLogger loggerCtx) + schemaCacheRef mode httpManager strfyNum apis instanceId spockApp <- spockAsApp $ spockT id $ httpApp corsCfg serverCtx enableConsole enableTelemetry @@ -328,7 +336,10 @@ mkWaiApp isoLevel loggerCtx pool httpManager strfyNum mode corsCfg enableConsole cacheRef runTx corsPolicy let wsServerApp = WS.createWSServerApp mode wsServerEnv - return (WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp, cacheRef) + return ( WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp + , schemaCacheRef + , cacheBuiltTime + ) httpApp :: CorsConfig -> ServerCtx -> Bool -> Bool -> SpockT IO () httpApp corsCfg serverCtx enableConsole enableTelemetry = do diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 800ef1a82f669..183478914a03c 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -9,6 +9,9 @@ import qualified Data.Aeson as J import qualified Data.HashSet as Set import qualified Data.String as DataString import qualified Data.Text as T +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Hasura.Logging as L import qualified Text.PrettyPrint.ANSI.Leijen as PP import Hasura.Prelude @@ -19,8 +22,12 @@ import Hasura.Server.Logging import Hasura.Server.Utils import Network.Wai.Handler.Warp -import qualified Hasura.Logging as L +newtype InstanceId + = InstanceId {getInstanceId :: T.Text} + deriving (Show, Eq, J.ToJSON, J.FromJSON) +mkInstanceId :: IO InstanceId +mkInstanceId = (InstanceId . UUID.toText) <$> UUID.nextRandom initErrExit :: (Show e) => e -> IO a initErrExit e = print e >> exitFailure @@ -79,6 +86,7 @@ data RawConnInfo = , connUrl :: !(Maybe String) , connDatabase :: !(Maybe String) , connOptions :: !(Maybe String) + , connRetries :: !(Maybe Int) } deriving (Eq, Read, Show) data HGECommandG a @@ -219,9 +227,13 @@ mkHGEOptions (HGEOptionsG rawConnInfo rawCmd) = mkRawConnInfo :: RawConnInfo -> WithEnv RawConnInfo mkRawConnInfo rawConnInfo = do withEnvUrl <- withEnv rawDBUrl $ fst databaseUrlEnv - return $ rawConnInfo {connUrl = withEnvUrl} + withEnvRetries <- withEnv retries $ fst retriesNumEnv + return $ rawConnInfo { connUrl = withEnvUrl + , connRetries = withEnvRetries + } where rawDBUrl = connUrl rawConnInfo + retries = connRetries rawConnInfo mkServeOptions :: RawServeOptions -> WithEnv ServeOptions mkServeOptions rso = do @@ -309,7 +321,7 @@ mainCmdFooter = ] ] - envVarDoc = mkEnvVarDoc [databaseUrlEnv] + envVarDoc = mkEnvVarDoc [databaseUrlEnv, retriesNumEnv] databaseUrlEnv :: (String, String) databaseUrlEnv = @@ -357,7 +369,8 @@ serveCmdFooter = envVarDoc = mkEnvVarDoc $ envVars <> eventEnvs envVars = - [ servePortEnv, serveHostEnv, pgStripesEnv, pgConnsEnv, pgTimeoutEnv + [ databaseUrlEnv, retriesNumEnv, servePortEnv, serveHostEnv, + pgStripesEnv, pgConnsEnv, pgTimeoutEnv , pgUsePrepareEnv, txIsoEnv, adminSecretEnv , accessKeyEnv, authHookEnv, authHookModeEnv , jwtSecretEnv, unAuthRoleEnv, corsDomainEnv, enableConsoleEnv @@ -373,6 +386,12 @@ serveCmdFooter = ) ] +retriesNumEnv :: (String, String) +retriesNumEnv = + ( "HASURA_GRAPHQL_NO_OF_RETRIES" + , "No.of retries if Postgres connection error occurs (default: 1)" + ) + servePortEnv :: (String, String) servePortEnv = ( "HASURA_GRAPHQL_SERVER_PORT" @@ -496,6 +515,7 @@ parseRawConnInfo :: Parser RawConnInfo parseRawConnInfo = RawConnInfo <$> host <*> port <*> user <*> password <*> dbUrl <*> dbName <*> pure Nothing + <*> retries where host = optional $ strOption ( long "host" <> @@ -534,24 +554,31 @@ parseRawConnInfo = metavar "" <> help "Database name to connect to" ) + retries = optional $ + option auto ( long "retries" <> + metavar "NO OF RETRIES" <> + help (snd retriesNumEnv) + ) connInfoErrModifier :: String -> String connInfoErrModifier s = "Fatal Error : " ++ s mkConnInfo ::RawConnInfo -> Either String Q.ConnInfo -mkConnInfo (RawConnInfo mHost mPort mUser pass mURL mDB opts) = +mkConnInfo (RawConnInfo mHost mPort mUser pass mURL mDB opts mRetries) = case (mHost, mPort, mUser, mDB, mURL) of (Just host, Just port, Just user, Just db, Nothing) -> - return $ Q.ConnInfo host port user pass db opts + return $ Q.ConnInfo host port user pass db opts retries (_, _, _, _, Just dbURL) -> maybe (throwError invalidUrlMsg) - return $ parseDatabaseUrl dbURL opts + withRetries $ parseDatabaseUrl dbURL opts _ -> throwError $ "Invalid options. " ++ "Expecting all database connection params " ++ "(host, port, user, dbname, password) or " ++ "database-url (HASURA_GRAPHQL_DATABASE_URL)" where + retries = fromMaybe 1 mRetries + withRetries ci = return $ ci{Q.connRetries = retries} invalidUrlMsg = "Invalid database-url (HASURA_GRAPHQL_DATABASE_URL). " ++ "Example postgres://foo:bar@example.com:2345/database" @@ -730,13 +757,14 @@ parseEnabledAPIs = optional $ -- Init logging related connInfoToLog :: Q.ConnInfo -> StartupLog -connInfoToLog (Q.ConnInfo host port user _ db _) = +connInfoToLog (Q.ConnInfo host port user _ db _ retries) = StartupLog L.LevelInfo "postgres_connection" infoVal where infoVal = J.object [ "host" J..= host , "port" J..= port , "user" J..= user , "database" J..= db + , "retries" J..= retries ] serveOptsToLog :: ServeOptions -> StartupLog diff --git a/server/src-lib/Hasura/Server/Logging.hs b/server/src-lib/Hasura/Server/Logging.hs index 9e23b3b6abfdf..89947907d3827 100644 --- a/server/src-lib/Hasura/Server/Logging.hs +++ b/server/src-lib/Hasura/Server/Logging.hs @@ -2,6 +2,7 @@ module Hasura.Server.Logging ( StartupLog(..) + , PGLog(..) , mkAccessLog , getRequestHeader , WebHookLog(..) @@ -54,6 +55,20 @@ instance L.ToEngineLog StartupLog where toEngineLog startupLog = (slLogLevel startupLog, "startup", toJSON startupLog) +data PGLog + = PGLog + { plLogLevel :: !L.LogLevel + , plMessage :: !T.Text + } deriving (Show, Eq) + +instance ToJSON PGLog where + toJSON (PGLog _ msg) = + object ["message" .= msg] + +instance L.ToEngineLog PGLog where + toEngineLog pgLog = + (plLogLevel pgLog, "pg-client", toJSON pgLog) + data WebHookLog = WebHookLog { whlLogLevel :: !L.LogLevel diff --git a/server/src-lib/Hasura/Server/Query.hs b/server/src-lib/Hasura/Server/Query.hs index 8a5c5bb8d412e..66b22b7f12fb9 100644 --- a/server/src-lib/Hasura/Server/Query.hs +++ b/server/src-lib/Hasura/Server/Query.hs @@ -3,6 +3,7 @@ module Hasura.Server.Query where import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH +import Data.Time (UTCTime) import Language.Haskell.TH.Syntax (Lift) import qualified Data.ByteString.Builder as BB @@ -10,6 +11,7 @@ import qualified Data.ByteString.Lazy as BL import qualified Data.Vector as V import qualified Network.HTTP.Client as HTTP + import Hasura.Prelude import Hasura.RQL.DDL.Metadata import Hasura.RQL.DDL.Permission @@ -28,6 +30,7 @@ import Hasura.RQL.DML.Returning (encodeJSONVector) import Hasura.RQL.DML.Select import Hasura.RQL.DML.Update import Hasura.RQL.Types +import Hasura.Server.Init (InstanceId (..)) import Hasura.Server.Utils import qualified Database.PG.Query as Q @@ -115,6 +118,30 @@ instance HasHttpManager Run where instance HasSQLGenCtx Run where askSQLGenCtx = asks _3 +fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime)) +fetchLastUpdate = do + l <- Q.listQE defaultTxErrorHandler + [Q.sql| + SELECT instance_id::text, occurred_at + FROM hdb_catalog.hdb_schema_update_event + ORDER BY occurred_at DESC LIMIT 1 + |] () True + case l of + [] -> return Nothing + [(instId, occurredAt)] -> + return $ Just (InstanceId instId, occurredAt) + -- never happens + _ -> throw500 "more than one row returned by query" + +recordSchemaUpdate :: InstanceId -> Q.TxE QErr () +recordSchemaUpdate instanceId = + liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql| + INSERT INTO + hdb_catalog.hdb_schema_update_event + (instance_id, occurred_at) + VALUES ($1::uuid, DEFAULT) + |] (Identity $ getInstanceId instanceId) True + peelRun :: SchemaCache -> UserInfo @@ -130,13 +157,20 @@ peelRun sc userInfo httMgr strfyNum pgPool txIso (Run m) = runQuery :: (MonadIO m, MonadError QErr m) - => Q.PGPool -> Q.TxIsolation -> UserInfo - -> SchemaCache -> HTTP.Manager -> Bool - -> RQLQuery -> m (BL.ByteString, SchemaCache) -runQuery pool isoL userInfo sc hMgr strfyNum query = do - res <- liftIO $ runExceptT $ - peelRun sc userInfo hMgr strfyNum pool isoL $ runQueryM query - liftEither res + => Q.PGPool -> Q.TxIsolation -> InstanceId + -> UserInfo -> SchemaCache -> HTTP.Manager + -> Bool -> RQLQuery -> m (BL.ByteString, SchemaCache) +runQuery pool isoL instanceId userInfo sc hMgr strfyNum query = do + resE <- liftIO $ runExceptT $ + peelRun sc userInfo hMgr strfyNum pool isoL $ runQueryM query + either throwError withReload resE + where + withReload r = do + when (queryNeedsReload query) $ do + e <- liftIO $ runExceptT $ Q.runTx pool (isoL, Nothing) + $ recordSchemaUpdate instanceId + liftEither e + return r queryNeedsReload :: RQLQuery -> Bool queryNeedsReload qi = case qi of diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs new file mode 100644 index 0000000000000..0c6993ae857a1 --- /dev/null +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -0,0 +1,210 @@ +module Hasura.Server.SchemaUpdate + (startSchemaSync) +where + +import Hasura.Prelude + +import Hasura.Logging +import Hasura.RQL.DDL.Schema.Table (buildSchemaCache) +import Hasura.RQL.Types +import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate) +import Hasura.Server.Init (InstanceId (..)) +import Hasura.Server.Logging +import Hasura.Server.Query + +import Data.Aeson +import Data.Aeson.Casing +import Data.Aeson.TH + +import qualified Control.Concurrent as C +import qualified Control.Concurrent.STM as STM +import qualified Data.Text as T +import qualified Data.Time as UTC +import qualified Database.PG.Query as PG +import qualified Database.PostgreSQL.LibPQ as PQ +import qualified Network.HTTP.Client as HTTP + +pgChannel :: PG.PGChannel +pgChannel = "hasura_schema_update" + +data ThreadType + = TTListener + | TTProcessor + deriving (Eq) + +instance Show ThreadType where + show TTListener = "listener" + show TTProcessor = "processor" + + +data SchemaSyncThreadLog + = SchemaSyncThreadLog + { suelLogLevel :: !LogLevel + , suelThreadType :: !ThreadType + , suelInfo :: !Value + } deriving (Show, Eq) + +instance ToJSON SchemaSyncThreadLog where + toJSON (SchemaSyncThreadLog _ t info) = + object [ "thread_type" .= show t + , "info" .= info + ] + +instance ToEngineLog SchemaSyncThreadLog where + toEngineLog threadLog = + (suelLogLevel threadLog, "schema_sync_thread", toJSON threadLog) + +data EventPayload + = EventPayload + { _epInstanceId :: !InstanceId + , _epOccurredAt :: !UTC.UTCTime + } deriving (Show, Eq) +$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) + +data ThreadError + = TEJsonParse !T.Text + | TEQueryError !QErr +$(deriveToJSON + defaultOptions { constructorTagModifier = snakeCase . drop 2 + , sumEncoding = TaggedObject "type" "info" + } + ''ThreadError) + +-- | An IO action that enables metadata syncing +startSchemaSync + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime -> IO () +startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do + -- Init events queue + eventsQueue <- STM.newTQueueIO + + -- Start listener thread + lTId <- C.forkIO $ listener strfyNum pool + logger httpMgr eventsQueue cacheRef instanceId cacheInitTime + logThreadStarted TTListener lTId + + -- Start processor thread + pTId <- C.forkIO $ processor strfyNum pool + logger httpMgr eventsQueue cacheRef instanceId + logThreadStarted TTProcessor pTId + + where + logThreadStarted threadType threadId = + let msg = T.pack (show threadType) <> " thread started" + in unLogger logger $ + StartupLog LevelInfo "threads" $ + object [ "instance_id" .= getInstanceId instanceId + , "thread_id" .= show threadId + , "message" .= msg + ] + +-- | An IO action that listens to postgres for events and pushes them to a Queue +listener + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> STM.TQueue EventPayload + -> SchemaCacheRef + -> InstanceId + -> Maybe UTC.UTCTime -> IO () +listener strfyNum pool logger httpMgr eventsQueue + cacheRef instanceId cacheInitTime = + -- Never exits + forever $ do + listenResE <- + liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler + either onError return listenResE + logWarn + C.threadDelay $ 1 * 1000 * 1000 -- 1 second + where + threadType = TTListener + + shouldRefresh dbInstId accrdAt = + case cacheInitTime of + Nothing -> True + Just time -> (dbInstId /= instanceId) && accrdAt > time + + refreshCache Nothing = return () + refreshCache (Just (dbInstId, accrdAt)) = + when (shouldRefresh dbInstId accrdAt) $ + refreshSchemaCache strfyNum pool logger httpMgr cacheRef + threadType "schema cache reloaded after postgres listen init" + + notifyHandler = \case + PG.PNEOnStart -> do + eRes <- runExceptT $ PG.runTx pool + (PG.Serializable, Nothing) fetchLastUpdate + case eRes of + Left e -> onError e + Right mLastUpd -> refreshCache mLastUpd + + PG.PNEPQNotify notif -> + case eitherDecodeStrict $ PQ.notifyExtra notif of + Left e -> logError logger threadType $ TEJsonParse $ T.pack e + Right payload -> do + logInfo logger threadType $ object ["received_event" .= payload] + -- Push a notify event to Queue + STM.atomically $ STM.writeTQueue eventsQueue payload + + onError = logError logger threadType . TEQueryError + logWarn = unLogger logger $ + SchemaSyncThreadLog LevelWarn TTListener $ String + "error occurred, retrying postgres listen after 1 second" + + +-- | An IO action that processes events from Queue +processor + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> STM.TQueue EventPayload + -> SchemaCacheRef + -> InstanceId -> IO () +processor strfyNum pool logger httpMgr eventsQueue + cacheRef instanceId = + -- Never exits + forever $ do + event <- STM.atomically $ STM.readTQueue eventsQueue + logInfo logger threadType $ object ["processed_event" .= event] + when (shouldReload event) $ + refreshSchemaCache strfyNum pool logger httpMgr cacheRef + threadType "schema cache reloaded" + where + threadType = TTProcessor + + -- If event is from another server + shouldReload payload = _epInstanceId payload /= instanceId + +refreshSchemaCache + :: Bool + -> PG.PGPool + -> Logger + -> HTTP.Manager + -> SchemaCacheRef + -> ThreadType + -> T.Text -> IO () +refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do + -- Reload schema cache from catalog + resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $ + peelRun emptySchemaCache adminUserInfo + httpManager strfyNum pool PG.Serializable buildSchemaCache + case resE of + Left e -> logError logger threadType $ TEQueryError e + Right _ -> + logInfo logger threadType $ object ["message" .= msg] + +logInfo :: Logger -> ThreadType -> Value -> IO () +logInfo logger threadType val = unLogger logger $ + SchemaSyncThreadLog LevelInfo threadType val + +logError :: ToJSON a => Logger -> ThreadType -> a -> IO () +logError logger threadType err = + unLogger logger $ SchemaSyncThreadLog LevelError threadType $ + object ["error" .= toJSON err] diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 25d7d548499c7..c7efa93b426d8 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -405,3 +405,32 @@ CREATE TABLE hdb_catalog.remote_schemas ( definition JSON, comment TEXT ); + +CREATE TABLE hdb_catalog.hdb_schema_update_event ( + id BIGSERIAL PRIMARY KEY, + instance_id uuid NOT NULL, + occurred_at timestamptz NOT NULL DEFAULT NOW() +); + +CREATE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS +$function$ + DECLARE + instance_id uuid; + occurred_at timestamptz; + curr_rec record; + BEGIN + instance_id = NEW.instance_id; + occurred_at = NEW.occurred_at; + PERFORM pg_notify('hasura_schema_update', json_build_object( + 'instance_id', instance_id, + 'occurred_at', occurred_at + )::text); + RETURN curr_rec; + END; +$function$ +LANGUAGE plpgsql; + +CREATE TRIGGER hdb_schema_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_schema_update_event + FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_schema_update_event_notifier(); + + diff --git a/server/src-rsr/migrate_from_10_to_11.sql b/server/src-rsr/migrate_from_10_to_11.sql new file mode 100644 index 0000000000000..517debe72a9f2 --- /dev/null +++ b/server/src-rsr/migrate_from_10_to_11.sql @@ -0,0 +1,26 @@ +CREATE TABLE hdb_catalog.hdb_schema_update_event ( + id BIGSERIAL PRIMARY KEY, + instance_id uuid NOT NULL, + occurred_at timestamptz NOT NULL DEFAULT NOW() +); + +CREATE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS + $function$ + DECLARE + instance_id uuid; + occurred_at timestamptz; + curr_rec record; +BEGIN + instance_id = NEW.instance_id; + occurred_at = NEW.occurred_at; + PERFORM pg_notify('hasura_schema_update', json_build_object( + 'instance_id', instance_id, + 'occurred_at', occurred_at + )::text); + RETURN curr_rec; +END; +$function$ + LANGUAGE plpgsql; + +CREATE TRIGGER hdb_schema_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_schema_update_event + FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_schema_update_event_notifier(); diff --git a/server/stack.yaml b/server/stack.yaml index 7e275c8a067e7..d25e2d1db5532 100644 --- a/server/stack.yaml +++ b/server/stack.yaml @@ -16,8 +16,8 @@ packages: # Packages to be pulled from upstream that are not in the resolver (e.g., acme-missiles-0.3) extra-deps: # use https URLs so that build systems can clone these repos -- git: https://github.com/hasura/pg-client-hs.git - commit: f3d1e9e67bdfbfa3de85b7cbdb4c557dce7fd84d +- git: https://github.com/rakeshkky/pg-client-hs.git + commit: ed3dcfb864a2a23ac6c22ed947a5095b0d03170d - git: https://github.com/hasura/graphql-parser-hs.git commit: ff95d9a96aa5ef9e5390f8712958e4118e3831f6 - ginger-0.8.1.0 @@ -27,6 +27,9 @@ extra-deps: - deferred-folds-0.9.9 - primitive-0.6.4.0 +# extra dep for pg-client-hs +- select-0.4.0.1 + # jose for the x5t bugfix (0.8.0.0) - git: https://github.com/frasertweedale/hs-jose.git commit: d47572fb0650ac6cc2c9e00711c7f99132d897cb diff --git a/server/tests-py/conftest.py b/server/tests-py/conftest.py index 47a8ba97a6e84..8352475d5eb88 100644 --- a/server/tests-py/conftest.py +++ b/server/tests-py/conftest.py @@ -50,6 +50,13 @@ def pytest_addoption(parser): help="Run Test cases with GraphQL queries being disabled" ) + parser.addoption( + "--test-hge-scale-url", + metavar="", + required=False, + help="Run testcases for horizontal scaling" + ) + @pytest.fixture(scope='session') def hge_ctx(request): @@ -63,6 +70,7 @@ def hge_ctx(request): hge_jwt_conf = request.config.getoption('--hge-jwt-conf') ws_read_cookie = request.config.getoption('--test-ws-init-cookie') metadata_disabled = request.config.getoption('--test-metadata-disabled') + hge_scale_url = request.config.getoption('--test-hge-scale-url') try: hge_ctx = HGECtx( hge_url=hge_url, @@ -73,7 +81,8 @@ def hge_ctx(request): hge_jwt_key_file=hge_jwt_key_file, hge_jwt_conf=hge_jwt_conf, ws_read_cookie=ws_read_cookie, - metadata_disabled=metadata_disabled + metadata_disabled=metadata_disabled, + hge_scale_url=hge_scale_url ) except HGECtxError as e: pytest.exit(str(e)) diff --git a/server/tests-py/context.py b/server/tests-py/context.py index deb0671ef6e6a..b8676fc27d33e 100644 --- a/server/tests-py/context.py +++ b/server/tests-py/context.py @@ -75,7 +75,7 @@ def server_bind(self): class HGECtx: def __init__(self, hge_url, pg_url, hge_key, hge_webhook, webhook_insecure, - hge_jwt_key_file, hge_jwt_conf, metadata_disabled, ws_read_cookie): + hge_jwt_key_file, hge_jwt_conf, metadata_disabled, ws_read_cookie, hge_scale_url): server_address = ('0.0.0.0', 5592) self.resp_queue = queue.Queue(maxsize=1) @@ -118,6 +118,8 @@ def __init__(self, hge_url, pg_url, hge_key, hge_webhook, webhook_insecure, self.ws_read_cookie = ws_read_cookie + self.hge_scale_url = hge_scale_url + result = subprocess.run(['../../scripts/get-version.sh'], shell=False, stdout=subprocess.PIPE, check=True) self.version = result.stdout.decode('utf-8').strip() if not self.metadata_disabled: diff --git a/server/tests-py/queries/horizontal_scale/basic/steps.yaml b/server/tests-py/queries/horizontal_scale/basic/steps.yaml new file mode 100644 index 0000000000000..706ffecf2dc7e --- /dev/null +++ b/server/tests-py/queries/horizontal_scale/basic/steps.yaml @@ -0,0 +1,93 @@ +- + operation: + server: '1' + query: + type: bulk + args: + - type: run_sql + args: + sql: | + create table test_t1( + t1_c1 int, + t1_c2 text, + PRIMARY KEY (t1_c1) + ); + - type: track_table + args: + schema: public + name: test_t1 + - type: run_sql + args: + sql: | + insert into test_t1(t1_c1, t1_c2) VALUES(1, 'table1'); + validate: + server: '2' + response: + data: + test_t1: + - t1_c1: 1 + t1_c2: table1 + query: + query: | + query { + test_t1 { + t1_c1 + t1_c2 + } + } +- + operation: + server: '2' + query: + type: bulk + args: + - type: run_sql + args: + sql: | + create table test_t2( + t2_c1 int, + t2_c2 text, + PRIMARY KEY (t2_c1) + ); + - type: run_sql + args: + sql: | + ALTER TABLE test_t2 ADD FOREIGN KEY (t2_c1) REFERENCES test_t1 (t1_c1); + - type: track_table + args: + schema: public + name: test_t2 + - type: create_object_relationship + args: + name: testT1Byc1 + table: + name: test_t2 + schema: public + using: + foreign_key_constraint_on: t2_c1 + - type: run_sql + args: + sql: | + insert into test_t2(t2_c1, t2_c2) VALUES(1, 'table2'); + validate: + server: '1' + query: + query: | + query { + test_t2 { + t2_c1 + t2_c2 + testT1Byc1 { + t1_c1 + t1_c2 + } + } + } + response: + data: + test_t2: + - t2_c1: 1 + t2_c2: table2 + testT1Byc1: + t1_c1: 1 + t1_c2: table1 diff --git a/server/tests-py/queries/horizontal_scale/basic/teardown.yaml b/server/tests-py/queries/horizontal_scale/basic/teardown.yaml new file mode 100644 index 0000000000000..e019e15231abb --- /dev/null +++ b/server/tests-py/queries/horizontal_scale/basic/teardown.yaml @@ -0,0 +1,12 @@ +type: bulk +args: +- type: clear_metadata + args: {} +- type: run_sql + args: + sql: | + drop table test_t2; +- type: run_sql + args: + sql: | + drop table test_t1; \ No newline at end of file diff --git a/server/tests-py/test_horizontal_scale.py b/server/tests-py/test_horizontal_scale.py new file mode 100644 index 0000000000000..bbfff0fe2c64c --- /dev/null +++ b/server/tests-py/test_horizontal_scale.py @@ -0,0 +1,60 @@ +import pytest +import yaml +import time +import jsondiff + +from validate import json_ordered + + +if not pytest.config.getoption("--test-hge-scale-url"): + pytest.skip("--test-hge-scale-url flag is missing, skipping tests", allow_module_level=True) + + +class TestHorizantalScaleBasic(): + servers = {} + + @pytest.fixture(autouse=True, scope='class') + def transact(self, hge_ctx): + self.servers['1'] = hge_ctx.hge_url + self.servers['2'] = hge_ctx.hge_scale_url + yield + # teardown + st_code, resp = hge_ctx.v1q_f(self.dir() + '/teardown.yaml') + assert st_code == 200, resp + + def test_horizontal_scale_basic(self, hge_ctx): + with open(self.dir() + "/steps.yaml") as c: + conf = yaml.load(c) + + assert isinstance(conf, list) == True, 'Not an list' + for _, step in enumerate(conf): + # execute operation + response = hge_ctx.http.post( + self.servers[step['operation']['server']] + "/v1/query", + json=step['operation']['query'] + ) + st_code = response.status_code + resp = response.json() + assert st_code == 200, resp + + # wait for 30 sec + time.sleep(30) + # validate data + response = hge_ctx.http.post( + self.servers[step['validate']['server']] + "/v1alpha1/graphql", + json=step['validate']['query'] + ) + st_code = response.status_code + resp = response.json() + assert st_code == 200, resp + + if 'response' in step['validate']: + assert json_ordered(resp) == json_ordered(step['validate']['response']), yaml.dump({ + 'response': resp, + 'expected': step['validate']['response'], + 'diff': jsondiff.diff(step['validate']['response'], resp) + }) + + @classmethod + def dir(cls): + return 'queries/horizontal_scale/basic'