Skip to content

Commit e32f5a1

Browse files
rakeshkkyshahidhk
authored andcommitted
sync metadata cache across multiple instances connected to same db (closes #1182) (#1574)
1. Haskel library `pg-client-hs` has been updated to expose a function that helps listen to `postgres` notifications over a `channel` in this [PR](hasura/pg-client-hs#5) 2. The server records an event in a table `hdb_catalog.hdb_cache_update_event` whenever any `/v1/query` (that changes metadata) is requested. A trigger notifies a `cache update` event via `hasura_cache_update` channel 3. The server runs two concurrent threads namely `listener` and `processor`. The `listener` thread listens to events on `hasura_cache_update` channel and pushed into a `Queue`. The `processor` thread fetches events from that `Queue` and processes it. Thus server rebuilds schema cache from database and updates.
1 parent a5b4c5f commit e32f5a1

20 files changed

+730
-93
lines changed

.circleci/config.yml

+5-2
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,17 @@ refs:
101101
keys:
102102
- server-deps-cache-{{ checksum "server/graphql-engine.cabal" }}-{{ checksum "server/stack.yaml" }}
103103
- *wait_for_postgres
104+
- run:
105+
name: Install deps
106+
command: |
107+
apt-get update
108+
apt install --yes pgbouncer jq curl postgresql-client
104109
- run:
105110
name: Run Python tests
106111
environment:
107112
HASURA_GRAPHQL_DATABASE_URL: 'postgres://gql_test:@localhost:5432/gql_test'
108113
GRAPHQL_ENGINE: '/build/_server_output/graphql-engine'
109114
command: |
110-
apt-get update
111-
apt install --yes jq curl
112115
OUTPUT_FOLDER=/build/_server_test_output/$PG_VERSION .circleci/test-server.sh
113116
- run:
114117
name: Generate coverage report

.circleci/pgbouncer/pgbouncer.ini

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[databases]
2+
hs_hge_test = host=localhost port=5432 dbname=hs_hge_test user=gql_test
3+
4+
[pgbouncer]
5+
listen_port = 6543
6+
listen_addr = 127.0.0.1
7+
logfile = pgbouncer/pgbouncer.log
8+
pidfile = pgbouncer/pgbouncer.pid
9+
auth_type = md5
10+
auth_file = pgbouncer/users.txt
11+
admin_users = postgres

.circleci/pgbouncer/users.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"postgres" "postgres"

.circleci/test-server.sh

+64
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ export HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES=true
128128

129129
PID=""
130130
WH_PID=""
131+
HS_PID=""
132+
131133
trap stop_services ERR
132134
trap stop_services INT
133135

@@ -356,4 +358,66 @@ if [ "$RUN_WEBHOOK_TESTS" == "true" ] ; then
356358

357359
fi
358360

361+
# horizontal scale test
362+
unset HASURA_GRAPHQL_AUTH_HOOK
363+
unset HASURA_GRAPHQL_AUTH_HOOK_MODE
364+
unset HASURA_GRAPHQL_ADMIN_SECRET
365+
366+
echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n"
367+
368+
HASURA_HS_TEST_DB='postgres://postgres:postgres@localhost:6543/hs_hge_test'
369+
psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;"
370+
371+
# create pgbouncer user
372+
useradd pgbouncer
373+
cd $CIRCLECI_FOLDER
374+
chown -R pgbouncer:pgbouncer pgbouncer
375+
376+
# start pgbouncer
377+
pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini
378+
379+
cd $PYTEST_ROOT
380+
381+
# start 1st server
382+
"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$!
383+
wait_for_port 8080
384+
385+
# start 2nd server
386+
"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve \
387+
--server-port 8081 \
388+
>> "$OUTPUT_FOLDER/hs-graphql-engine.log" 2>&1 & HS_PID=$!
389+
wait_for_port 8081
390+
391+
# run test
392+
pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py
393+
394+
# Shutdown pgbouncer
395+
psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true
396+
397+
cd $CIRCLECI_FOLDER
398+
399+
# start pgbouncer again
400+
pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini
401+
402+
cd $PYTEST_ROOT
403+
404+
# sleep for 30 seconds
405+
sleep 30
406+
407+
# run test
408+
pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py
409+
410+
# Shutdown pgbouncer
411+
psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true
412+
413+
kill $PID
414+
kill $HS_PID
415+
psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;"
416+
sleep 4
417+
combine_hpc_reports
418+
unset HASURA_HS_TEST_DB
419+
420+
421+
# end horizontal scale test
422+
359423
mv graphql-engine-combined.tix "$OUTPUT_FOLDER/graphql-engine.tix" || true

server/graphql-engine.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ library
150150
, Hasura.Server.Version
151151
, Hasura.Server.CheckUpdates
152152
, Hasura.Server.Telemetry
153+
, Hasura.Server.SchemaUpdate
153154
, Hasura.RQL.Types
154155
, Hasura.RQL.Instances
155156
, Hasura.RQL.Types.SchemaCache
@@ -322,6 +323,7 @@ executable graphql-engine
322323
, wreq
323324
, connection
324325
, string-conversions
326+
, uuid
325327

326328
other-modules: Ops
327329
, Migrate

server/src-exec/Main.hs

+41-27
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import Options.Applicative
99
import System.Environment (getEnvironment, lookupEnv)
1010
import System.Exit (exitFailure)
1111

12+
1213
import qualified Control.Concurrent as C
1314
import qualified Data.Aeson as A
1415
import qualified Data.ByteString.Char8 as BC
@@ -21,17 +22,17 @@ import qualified Network.HTTP.Client.TLS as HTTP
2122
import qualified Network.Wai.Handler.Warp as Warp
2223

2324
import Hasura.Events.Lib
24-
import Hasura.Logging (Logger (..), defaultLoggerSettings,
25-
mkLogger, mkLoggerCtx)
25+
import Hasura.Logging
2626
import Hasura.Prelude
2727
import Hasura.RQL.DDL.Metadata (fetchMetadata)
28-
import Hasura.RQL.Types (QErr, adminUserInfo,
29-
emptySchemaCache)
30-
import Hasura.Server.App (mkWaiApp)
28+
import Hasura.RQL.Types (adminUserInfo, emptySchemaCache)
29+
import Hasura.Server.App (SchemaCacheRef (..), mkWaiApp)
3130
import Hasura.Server.Auth
3231
import Hasura.Server.CheckUpdates (checkForUpdates)
3332
import Hasura.Server.Init
33+
import Hasura.Server.Logging
3434
import Hasura.Server.Query (peelRun)
35+
import Hasura.Server.SchemaUpdate
3536
import Hasura.Server.Telemetry
3637
import Hasura.Server.Version (currentVersion)
3738

@@ -97,13 +98,19 @@ printJSON = BLC.putStrLn . A.encode
9798
printYaml :: (A.ToJSON a) => a -> IO ()
9899
printYaml = BC.putStrLn . Y.encode
99100

101+
mkPGLogger :: Logger -> Q.PGLogger
102+
mkPGLogger (Logger logger) (Q.PLERetryMsg msg) =
103+
logger $ PGLog LevelWarn msg
104+
100105
main :: IO ()
101106
main = do
102107
(HGEOptionsG rci hgeCmd) <- parseArgs
103108
-- global http manager
104109
httpManager <- HTTP.newManager HTTP.tlsManagerSettings
105110
loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True
111+
instanceId <- mkInstanceId
106112
let logger = mkLogger loggerCtx
113+
pgLogger = mkPGLogger logger
107114
case hgeCmd of
108115
HCServe so@(ServeOptions port host cp isoL mAdminSecret mAuthHook mJwtSecret
109116
mUnAuthRole corsCfg enableConsole enableTelemetry strfyNum enabledAPIs) -> do
@@ -120,15 +127,21 @@ main = do
120127
-- log postgres connection info
121128
unLogger logger $ connInfoToLog ci
122129

130+
pool <- Q.initPGPool ci cp pgLogger
131+
123132
-- safe init catalog
124133
initRes <- initialise logger ci httpManager
125134

126135
-- prepare event triggers data
127136
prepareEvents logger ci
128137

129-
pool <- Q.initPGPool ci cp
130-
(app, cacheRef) <- mkWaiApp isoL loggerCtx pool httpManager
131-
strfyNum am corsCfg enableConsole enableTelemetry enabledAPIs
138+
(app, cacheRef, cacheInitTime) <-
139+
mkWaiApp isoL loggerCtx strfyNum pool httpManager am
140+
corsCfg enableConsole enableTelemetry instanceId enabledAPIs
141+
142+
-- start a background thread for schema sync
143+
startSchemaSync strfyNum pool logger httpManager
144+
cacheRef instanceId cacheInitTime
132145

133146
let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings
134147

@@ -138,48 +151,48 @@ main = do
138151

139152
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec
140153

154+
let scRef = _scrCache cacheRef
141155
unLogger logger $
142156
mkGenericStrLog "event_triggers" "starting workers"
143-
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool cacheRef eventEngineCtx
157+
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx
144158

145159
-- start a background thread to check for updates
146160
void $ C.forkIO $ checkForUpdates loggerCtx httpManager
147161

148162
-- start a background thread for telemetry
149163
when enableTelemetry $ do
150164
unLogger logger $ mkGenericStrLog "telemetry" telemetryNotice
151-
void $ C.forkIO $ runTelemetry logger httpManager cacheRef initRes
165+
void $ C.forkIO $ runTelemetry logger httpManager scRef initRes
152166

153167
unLogger logger $
154168
mkGenericStrLog "server" "starting API server"
155169
Warp.runSettings warpSettings app
156170

157171
HCExport -> do
158172
ci <- procConnInfo rci
159-
res <- runTx ci fetchMetadata
173+
res <- runTx pgLogger ci fetchMetadata
160174
either printErrJExit printJSON res
161175

162176
HCClean -> do
163177
ci <- procConnInfo rci
164-
res <- runTx ci cleanCatalog
178+
res <- runTx pgLogger ci cleanCatalog
165179
either printErrJExit (const cleanSuccess) res
166180

167181
HCExecute -> do
168182
queryBs <- BL.getContents
169183
ci <- procConnInfo rci
170-
res <- runAsAdmin ci httpManager $ execQuery queryBs
184+
res <- runAsAdmin pgLogger ci httpManager $ execQuery queryBs
171185
either printErrJExit BLC.putStrLn res
172186

173187
HCVersion -> putStrLn $ "Hasura GraphQL Engine: " ++ T.unpack currentVersion
174188
where
175189

176-
runTx :: Q.ConnInfo -> Q.TxE QErr a -> IO (Either QErr a)
177-
runTx ci tx = do
178-
pool <- getMinimalPool ci
190+
runTx pgLogger ci tx = do
191+
pool <- getMinimalPool pgLogger ci
179192
runExceptT $ Q.runTx pool (Q.Serializable, Nothing) tx
180193

181-
runAsAdmin ci httpManager m = do
182-
pool <- getMinimalPool ci
194+
runAsAdmin pgLogger ci httpManager m = do
195+
pool <- getMinimalPool pgLogger ci
183196
res <- runExceptT $ peelRun emptySchemaCache adminUserInfo
184197
httpManager False pool Q.Serializable m
185198
return $ fmap fst res
@@ -188,31 +201,32 @@ main = do
188201
either (printErrExit . connInfoErrModifier) return $
189202
mkConnInfo rci
190203

191-
getMinimalPool ci = do
204+
getMinimalPool pgLogger ci = do
192205
let connParams = Q.defaultConnParams { Q.cpConns = 1 }
193-
Q.initPGPool ci connParams
206+
Q.initPGPool ci connParams pgLogger
194207

195208
initialise (Logger logger) ci httpMgr = do
196209
currentTime <- getCurrentTime
197-
210+
let pgLogger = mkPGLogger $ Logger logger
198211
-- initialise the catalog
199-
initRes <- runAsAdmin ci httpMgr $ initCatalogSafe currentTime
212+
initRes <- runAsAdmin pgLogger ci httpMgr $ initCatalogSafe currentTime
200213
either printErrJExit (logger . mkGenericStrLog "db_init") initRes
201214

202215
-- migrate catalog if necessary
203-
migRes <- runAsAdmin ci httpMgr $ migrateCatalog currentTime
216+
migRes <- runAsAdmin pgLogger ci httpMgr $ migrateCatalog currentTime
204217
either printErrJExit (logger . mkGenericStrLog "db_migrate") migRes
205218

206219
-- generate and retrieve uuids
207-
getUniqIds ci
220+
getUniqIds pgLogger ci
208221

209222
prepareEvents (Logger logger) ci = do
223+
let pgLogger = mkPGLogger $ Logger logger
210224
logger $ mkGenericStrLog "event_triggers" "preparing data"
211-
res <- runTx ci unlockAllEvents
225+
res <- runTx pgLogger ci unlockAllEvents
212226
either printErrJExit return res
213227

214-
getUniqIds ci = do
215-
eDbId <- runTx ci getDbId
228+
getUniqIds pgLogger ci = do
229+
eDbId <- runTx pgLogger ci getDbId
216230
dbId <- either printErrJExit return eDbId
217231
fp <- liftIO generateFingerprint
218232
return (dbId, fp)

server/src-exec/Migrate.hs

+12-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import qualified Data.Yaml.TH as Y
1919
import qualified Database.PG.Query as Q
2020

2121
curCatalogVer :: T.Text
22-
curCatalogVer = "10"
22+
curCatalogVer = "11"
2323

2424
migrateMetadata
2525
:: ( MonadTx m
@@ -251,6 +251,13 @@ from9To10 = liftTx $ do
251251
$(Q.sqlFromFile "src-rsr/migrate_from_9_to_10.sql")
252252
return ()
253253

254+
from10To11 :: (MonadTx m) => m ()
255+
from10To11 = liftTx $ do
256+
-- Migrate database
257+
Q.Discard () <- Q.multiQE defaultTxErrorHandler
258+
$(Q.sqlFromFile "src-rsr/migrate_from_10_to_11.sql")
259+
return ()
260+
254261
migrateCatalog
255262
:: ( MonadTx m
256263
, CacheRWM m
@@ -274,10 +281,13 @@ migrateCatalog migrationTime = do
274281
| preVer == "7" -> from7ToCurrent
275282
| preVer == "8" -> from8ToCurrent
276283
| preVer == "9" -> from9ToCurrent
284+
| preVer == "10" -> from10ToCurrent
277285
| otherwise -> throw400 NotSupported $
278286
"unsupported version : " <> preVer
279287
where
280-
from9ToCurrent = from9To10 >> postMigrate
288+
from10ToCurrent = from10To11 >> postMigrate
289+
290+
from9ToCurrent = from9To10 >> from10ToCurrent
281291

282292
from8ToCurrent = from8To9 >> from9ToCurrent
283293

0 commit comments

Comments
 (0)