Skip to content

Commit

Permalink
manage schema cache when horizontally scaled, closes hasura#1182
Browse files Browse the repository at this point in the history
  • Loading branch information
rakeshkky committed Feb 7, 2019
1 parent 4bf9d19 commit 5fb6dea
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 43 deletions.
2 changes: 2 additions & 0 deletions server/graphql-engine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ library
, Hasura.Server.Version
, Hasura.Server.CheckUpdates
, Hasura.Server.Telemetry
, Hasura.Server.MetadataUpdate
, Hasura.RQL.Types
, Hasura.RQL.Instances
, Hasura.RQL.Types.SchemaCache
Expand Down Expand Up @@ -316,6 +317,7 @@ executable graphql-engine
, wreq
, connection
, string-conversions
, uuid

other-modules: Ops

Expand Down
86 changes: 57 additions & 29 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,45 @@ module Main where

import Ops

import Control.Monad.STM (atomically)
import Data.Time.Clock (getCurrentTime)
import Control.Monad.STM (atomically)
import Data.Time.Clock (getCurrentTime)
import Options.Applicative
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)

import qualified Control.Concurrent as C
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Text as T
import qualified Data.Yaml as Y
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)


import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Text as T
import qualified Data.UUID.V4 as UUID
import qualified Data.Yaml as Y
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp

import Hasura.Events.Lib
import Hasura.Logging (Logger (..), defaultLoggerSettings,
mkLogger, mkLoggerCtx)
import Hasura.Logging
import Hasura.Prelude
import Hasura.RQL.DDL.Metadata (fetchMetadata)
import Hasura.RQL.Types (QErr, adminUserInfo,
emptySchemaCache)
import Hasura.Server.App (mkWaiApp)
import Hasura.RQL.DDL.Metadata (fetchMetadata)
import Hasura.RQL.Types (QErr, adminUserInfo,
emptySchemaCache)
import Hasura.Server.App (mkWaiApp)
import Hasura.Server.Auth
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.Init
import Hasura.Server.Query (peelRun)
import Hasura.Server.Logging
import Hasura.Server.MetadataUpdate
import Hasura.Server.Query (peelRun)
import Hasura.Server.Telemetry
import Hasura.Server.Version (currentVersion)
import Hasura.Server.Version (currentVersion)

import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client.TLS as TLS
import qualified Network.Wreq.Session as WrqS
import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client.TLS as TLS
import qualified Network.Wreq.Session as WrqS

printErrExit :: forall a . String -> IO a
printErrExit = (>> exitFailure) . putStrLn
Expand Down Expand Up @@ -100,6 +104,7 @@ main = do
-- global http manager
httpManager <- HTTP.newManager HTTP.tlsManagerSettings
loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True
serverId <- UUID.nextRandom
let logger = mkLogger loggerCtx
case hgeCmd of
HCServe so@(ServeOptions port host cp isoL mAccessKey mAuthHook mJwtSecret
Expand All @@ -117,15 +122,30 @@ main = do
-- log postgres connection info
unLogger logger $ connInfoToLog ci

-- create empty metadata update events queue
eventsQueue <- STM.newTQueueIO

-- start postgres metadata update events listener thread in backgroud
minPool <- getMinimalPool ci
listenerTId <- C.forkIO $ metadataEventListener minPool logger eventsQueue
unLogger logger $ mkThreadLog listenerTId serverId TTListener

-- safe init catalog
initRes <- initialise logger ci httpManager

-- prepare event triggers data
prepareEvents logger ci

pool <- Q.initPGPool ci cp
(app, cacheRef) <- mkWaiApp isoL loggerCtx pool httpManager
am corsCfg enableConsole enableTelemetry

(app, cacheRef, lk, cacheBuiltTime) <-
mkWaiApp isoL loggerCtx pool httpManager am corsCfg enableConsole
enableTelemetry serverId

-- start metadata update events processor thread in backgroud
procTId <- C.forkIO $ metadataEventProcessor pool logger httpManager
eventsQueue cacheRef lk serverId cacheBuiltTime
unLogger logger $ mkThreadLog procTId serverId TTProcessor

let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings

Expand Down Expand Up @@ -209,6 +229,14 @@ main = do
res <- runTx ci unlockAllEvents
either printErrJExit return res

mkThreadLog threadId serverId threadType =
let msg = T.pack (show threadType) <> " thread started"
in StartupLog LevelInfo "threads" $
A.object [ "server_id" A..= serverId
, "thread_id" A..= show threadId
, "message" A..= msg
]

getUniqIds ci = do
eDbId <- runTx ci getDbId
dbId <- either printErrJExit return eDbId
Expand Down
15 changes: 13 additions & 2 deletions server/src-exec/Ops.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import qualified Database.PG.Query as Q
import qualified Database.PG.Query.Connection as Q

curCatalogVer :: T.Text
curCatalogVer = "9"
curCatalogVer = "10"

initCatalogSafe
:: (QErrM m, UserInfoM m, CacheRWM m, MonadTx m, MonadIO m, HasHttpManager m)
Expand Down Expand Up @@ -318,6 +318,12 @@ from8To9 = do
migrateMetadataFrom8 =
$(unTypeQ (Y.decodeFile "src-rsr/migrate_metadata_from_8_to_9.yaml" :: Q (TExp RQLQuery)))

from9To10 :: MonadTx m => m ()
from9To10 = do
-- migrate database
Q.Discard () <- liftTx $ Q.multiQE defaultTxErrorHandler
$(Q.sqlFromFile "src-rsr/migrate_from_9_to_10.sql")
return ()

migrateCatalog
:: (MonadTx m, CacheRWM m, MonadIO m, UserInfoM m, HasHttpManager m)
Expand All @@ -335,12 +341,17 @@ migrateCatalog migrationTime = do
| preVer == "6" -> from6ToCurrent
| preVer == "7" -> from7ToCurrent
| preVer == "8" -> from8ToCurrent
| preVer == "9" -> from9ToCurrent
| otherwise -> throw400 NotSupported $
"unsupported version : " <> preVer
where
from9ToCurrent = do
from9To10
postMigrate

from8ToCurrent = do
from8To9
postMigrate
from9ToCurrent

from7ToCurrent = do
from7To8
Expand Down
29 changes: 21 additions & 8 deletions server/src-lib/Hasura/Server/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Data.Aeson hiding (json)
import Data.IORef
import Data.Time.Clock (UTCTime,
getCurrentTime)
import Data.UUID (UUID)
import Network.Wai (requestHeaders,
strictRequestBody)
import Web.Spock.Core
Expand Down Expand Up @@ -96,6 +97,7 @@ data ServerCtx
, scCacheLock :: MVar ()
, scAuthMode :: AuthMode
, scManager :: HTTP.Manager
, scServerId :: UUID
}

data HandlerCtx
Expand Down Expand Up @@ -214,7 +216,8 @@ v1QueryHandler query = do
httpMgr <- scManager . hcServerCtx <$> ask
pool <- scPGPool . hcServerCtx <$> ask
isoL <- scIsolation . hcServerCtx <$> ask
runQuery pool isoL userInfo schemaCache httpMgr query
serverId <- scServerId . hcServerCtx <$> ask
runQuery pool isoL serverId userInfo schemaCache httpMgr query

-- Also update the schema cache
dbActionReload = do
Expand Down Expand Up @@ -289,18 +292,24 @@ mkWaiApp
-> CorsConfig
-> Bool
-> Bool
-> IO (Wai.Application, IORef SchemaCache)
mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg enableConsole enableTelemetry = do
cacheRef <- do
-> UUID
-> IO (Wai.Application, IORef SchemaCache, MVar (), UTCTime)
mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg
enableConsole enableTelemetry serverId = do
(cacheRef, cacheBuiltTime) <- do
pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo
httpManager pool Q.Serializable buildSchemaCache
either initErrExit return pgResp >>= newIORef . snd
httpManager pool Q.Serializable $ do
buildSchemaCache
liftIO getCurrentTime
(time, sc) <- either initErrExit return pgResp
scRef <- newIORef sc
return (scRef, time)

cacheLock <- newMVar ()

let serverCtx =
ServerCtx isoLevel pool (L.mkLogger loggerCtx) cacheRef
cacheLock mode httpManager
cacheLock mode httpManager serverId

spockApp <- spockAsApp $ spockT id $
httpApp corsCfg serverCtx enableConsole enableTelemetry
Expand All @@ -309,7 +318,11 @@ mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg enableConsole enableTe

wsServerEnv <- WS.createWSServerEnv (scLogger serverCtx) httpManager cacheRef runTx
let wsServerApp = WS.createWSServerApp mode wsServerEnv
return (WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp, cacheRef)
return ( WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp
, cacheRef
, cacheLock
, cacheBuiltTime
)

httpApp :: CorsConfig -> ServerCtx -> Bool -> Bool -> SpockT IO ()
httpApp corsCfg serverCtx enableConsole enableTelemetry = do
Expand Down
Loading

0 comments on commit 5fb6dea

Please sign in to comment.