Skip to content

Commit 6338341

Browse files
committed
improve schema sync logic to support latest pg-client-hs lib
1 parent 4472a21 commit 6338341

File tree

6 files changed

+140
-106
lines changed

6 files changed

+140
-106
lines changed

server/src-exec/Main.hs

+6-24
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import System.Exit (exitFailure)
1111

1212

1313
import qualified Control.Concurrent as C
14-
import qualified Control.Concurrent.STM as STM
1514
import qualified Data.Aeson as A
1615
import qualified Data.ByteString.Char8 as BC
1716
import qualified Data.ByteString.Lazy as BL
@@ -100,7 +99,7 @@ printYaml :: (A.ToJSON a) => a -> IO ()
10099
printYaml = BC.putStrLn . Y.encode
101100

102101
mkPGLogger :: Logger -> Q.PGLogger
103-
mkPGLogger (Logger logger) msg =
102+
mkPGLogger (Logger logger) (Q.PLERetryMsg msg) =
104103
logger $ PGLog LevelWarn msg
105104

106105
main :: IO ()
@@ -128,31 +127,21 @@ main = do
128127
-- log postgres connection info
129128
unLogger logger $ connInfoToLog ci
130129

131-
-- create empty cache update events queue
132-
eventsQueue <- STM.newTQueueIO
133-
134130
pool <- Q.initPGPool ci cp pgLogger
135131

136-
-- start postgres cache update events listener thread in background
137-
listenerTId <- C.forkIO $ schemaUpdateEventListener pool logger eventsQueue
138-
unLogger logger $ mkThreadLog listenerTId instanceId TTListener
139-
140132
-- safe init catalog
141133
initRes <- initialise logger ci httpManager
142134

143135
-- prepare event triggers data
144136
prepareEvents logger ci
145137

146-
(app, cacheRef, cacheBuiltTime) <-
138+
(app, cacheRef, cacheInitTime) <-
147139
mkWaiApp isoL loggerCtx strfyNum pool httpManager am
148140
corsCfg enableConsole enableTelemetry instanceId enabledAPIs
149141

150-
let scRef = _scrCache cacheRef
151-
152-
-- start cache update events processor thread in background
153-
procTId <- C.forkIO $ schemaUpdateEventProcessor strfyNum pool logger httpManager
154-
eventsQueue cacheRef instanceId cacheBuiltTime
155-
unLogger logger $ mkThreadLog procTId instanceId TTProcessor
142+
-- start a background thread for schema sync
143+
void $ C.forkIO $ startSchemaSync strfyNum pool logger httpManager
144+
cacheRef instanceId cacheInitTime
156145

157146
let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings
158147

@@ -162,6 +151,7 @@ main = do
162151

163152
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec
164153

154+
let scRef = _scrCache cacheRef
165155
unLogger logger $
166156
mkGenericStrLog "event_triggers" "starting workers"
167157
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx
@@ -235,14 +225,6 @@ main = do
235225
res <- runTx pgLogger ci unlockAllEvents
236226
either printErrJExit return res
237227

238-
mkThreadLog threadId instanceId threadType =
239-
let msg = T.pack (show threadType) <> " thread started"
240-
in StartupLog LevelInfo "threads" $
241-
A.object [ "instance_id" A..= getInstanceId instanceId
242-
, "thread_id" A..= show threadId
243-
, "message" A..= msg
244-
]
245-
246228
getUniqIds pgLogger ci = do
247229
eDbId <- runTx pgLogger ci getDbId
248230
dbId <- either printErrJExit return eDbId

server/src-lib/Hasura/Server/App.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,10 @@ mkWaiApp isoLevel loggerCtx strfyNum pool httpManager mode corsCfg
314314
pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo
315315
httpManager strfyNum pool Q.Serializable $ do
316316
buildSchemaCache
317-
fetchLastUpdateTime
317+
liftTx fetchLastUpdate
318318
(time, sc) <- either initErrExit return pgResp
319319
scRef <- newIORef sc
320-
return (scRef, time)
320+
return (scRef, snd <$> time)
321321

322322
cacheLock <- newMVar ()
323323

server/src-lib/Hasura/Server/Init.hs

+3-6
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@ import Hasura.Server.Utils
2323
import Network.Wai.Handler.Warp
2424

2525
newtype InstanceId
26-
= InstanceId {getInstanceId :: UUID.UUID}
27-
deriving (Show, Eq)
28-
29-
instanceIdToTxt :: InstanceId -> T.Text
30-
instanceIdToTxt = UUID.toText . getInstanceId
26+
= InstanceId {getInstanceId :: T.Text}
27+
deriving (Show, Eq, J.ToJSON, J.FromJSON)
3128

3229
mkInstanceId :: IO InstanceId
33-
mkInstanceId = InstanceId <$> UUID.nextRandom
30+
mkInstanceId = (InstanceId . UUID.toText) <$> UUID.nextRandom
3431

3532
initErrExit :: (Show e) => e -> IO a
3633
initErrExit e = print e >> exitFailure

server/src-lib/Hasura/Server/Query.hs

+10-9
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ import Hasura.RQL.DML.Returning (encodeJSONVector)
3030
import Hasura.RQL.DML.Select
3131
import Hasura.RQL.DML.Update
3232
import Hasura.RQL.Types
33-
import Hasura.Server.Init (InstanceId (..),
34-
instanceIdToTxt)
33+
import Hasura.Server.Init (InstanceId (..))
3534
import Hasura.Server.Utils
3635

3736
import qualified Database.PG.Query as Q
@@ -119,16 +118,18 @@ instance HasHttpManager Run where
119118
instance HasSQLGenCtx Run where
120119
askSQLGenCtx = asks _3
121120

122-
fetchLastUpdateTime :: Run (Maybe UTCTime)
123-
fetchLastUpdateTime = do
124-
l <- liftTx $ Q.listQE defaultTxErrorHandler
121+
fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime))
122+
fetchLastUpdate = do
123+
l <- Q.listQE defaultTxErrorHandler
125124
[Q.sql|
126-
SELECT occurred_at FROM hdb_catalog.hdb_schema_update_event
127-
ORDER BY id DESC LIMIT 1
125+
SELECT instance_id::text, occurred_at
126+
FROM hdb_catalog.hdb_schema_update_event
127+
ORDER BY occurred_at DESC LIMIT 1
128128
|] () True
129129
case l of
130130
[] -> return Nothing
131-
[Identity t] -> return $ Just t
131+
[(instId, occurredAt)] ->
132+
return $ Just (InstanceId instId, occurredAt)
132133
-- never happens
133134
_ -> throw500 "more than one row returned by query"
134135

@@ -139,7 +140,7 @@ recordSchemaUpdate instanceId =
139140
hdb_catalog.hdb_schema_update_event
140141
(instance_id, occurred_at)
141142
VALUES ($1::uuid, DEFAULT)
142-
|] (Identity $ instanceIdToTxt instanceId) True
143+
|] (Identity $ getInstanceId instanceId) True
143144

144145
peelRun
145146
:: SchemaCache
+118-64
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
module Hasura.Server.SchemaUpdate
2-
( ThreadType(..)
3-
, schemaUpdateEventListener
4-
, schemaUpdateEventProcessor
5-
)
2+
(startSchemaSync)
63
where
74

85
import Hasura.Prelude
@@ -12,13 +9,15 @@ import Hasura.RQL.DDL.Schema.Table (buildSchemaCache)
129
import Hasura.RQL.Types
1310
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
1411
import Hasura.Server.Init (InstanceId (..))
12+
import Hasura.Server.Logging
1513
import Hasura.Server.Query
1614

1715
import Data.Aeson
1816
import Data.Aeson.Casing
1917
import Data.Aeson.TH
20-
import Data.UUID
2118

19+
import qualified Control.Concurrent as C
20+
import qualified Control.Concurrent.Async as A
2221
import qualified Control.Concurrent.STM as STM
2322
import qualified Data.Text as T
2423
import qualified Data.Time as UTC
@@ -58,20 +57,11 @@ instance ToEngineLog SchemaUpdateEventLog where
5857

5958
data EventPayload
6059
= EventPayload
61-
{ _epInstanceId :: !UUID
60+
{ _epInstanceId :: !InstanceId
6261
, _epOccurredAt :: !UTC.UTCTime
6362
} deriving (Show, Eq)
6463
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
6564

66-
data SchemaUpdateEvent
67-
= SUESuccess !EventPayload
68-
| SUEPGReConn
69-
deriving (Show, Eq)
70-
71-
instance ToJSON SchemaUpdateEvent where
72-
toJSON (SUESuccess payload) = toJSON payload
73-
toJSON SUEPGReConn = String "postgres reconnection"
74-
7565
data ThreadError
7666
= TEJsonParse !T.Text
7767
| TEQueryError !QErr
@@ -81,83 +71,147 @@ $(deriveToJSON
8171
}
8272
''ThreadError)
8373

74+
-- | An IO action that enables metadata syncing
75+
startSchemaSync
76+
:: Bool
77+
-> PG.PGPool
78+
-> Logger
79+
-> HTTP.Manager
80+
-> SchemaCacheRef
81+
-> InstanceId
82+
-> Maybe UTC.UTCTime -> IO ()
83+
startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do
84+
-- Init events queue
85+
eventsQueue <- STM.newTQueueIO
86+
-- Start listener thread
87+
lAsync <- A.async $ listener strfyNum pool
88+
logger httpMgr eventsQueue cacheRef instanceId cacheInitTime
89+
90+
-- Start processor thread
91+
pAsync <- A.async $ processor strfyNum pool
92+
logger httpMgr eventsQueue cacheRef instanceId
93+
94+
void $ A.waitAny [lAsync, pAsync]
95+
8496
-- | An IO action that listens to postgres for events and pushes them to a Queue
85-
schemaUpdateEventListener
86-
:: PG.PGPool
97+
listener
98+
:: Bool
99+
-> PG.PGPool
87100
-> Logger
88-
-> STM.TQueue SchemaUpdateEvent
89-
-> IO ()
90-
schemaUpdateEventListener pool logger eventsQueue =
101+
-> HTTP.Manager
102+
-> STM.TQueue EventPayload
103+
-> SchemaCacheRef
104+
-> InstanceId
105+
-> Maybe UTC.UTCTime -> IO ()
106+
listener strfyNum pool logger httpMgr eventsQueue
107+
cacheRef instanceId cacheInitTime = do
108+
logThreadStartup logger instanceId threadType
91109
-- Never exits
92110
forever $ do
93-
listenResE <- liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
111+
listenResE <-
112+
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
94113
either onError return listenResE
114+
logWarn
115+
C.threadDelay $ 1 * 1000 * 1000 -- 1 second
95116
where
96-
notifyHandler = PG.NotifyHandler onReconn onMessage
97117
threadType = TTListener
98118

119+
shouldRefresh dbInstId accrdAt =
120+
case cacheInitTime of
121+
Nothing -> True
122+
Just time -> (dbInstId /= instanceId) && accrdAt > time
123+
124+
refreshCache Nothing = return ()
125+
refreshCache (Just (dbInstId, accrdAt)) =
126+
when (shouldRefresh dbInstId accrdAt) $
127+
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
128+
threadType "reloading schema cache on listen start"
129+
130+
notifyHandler = \case
131+
PG.PNEOnStart -> do
132+
eRes <- runExceptT $ PG.runTx pool
133+
(PG.Serializable, Nothing) fetchLastUpdate
134+
case eRes of
135+
Left e -> onError e
136+
Right mLastUpd -> refreshCache mLastUpd
137+
138+
PG.PNEPQNotify notif ->
139+
case eitherDecodeStrict $ PQ.notifyExtra notif of
140+
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
141+
Right payload -> do
142+
logInfo logger threadType $ object ["received_event" .= payload]
143+
-- Push a notify event to Queue
144+
STM.atomically $ STM.writeTQueue eventsQueue payload
145+
99146
onError = logError logger threadType . TEQueryError
147+
logWarn = unLogger logger $
148+
SchemaUpdateEventLog LevelWarn TTListener $ String
149+
"error occured retrying pg listen after 1 second"
100150

101-
onReconn = do
102-
-- emit postgres reconnection event
103-
let event = SUEPGReConn
104-
logInfo logger threadType $ object ["received_event" .= event]
105-
STM.atomically $ STM.writeTQueue eventsQueue event
106-
107-
-- Postgres notification handler
108-
onMessage notif =
109-
case eitherDecodeStrict $ PQ.notifyExtra notif of
110-
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
111-
Right payload -> do
112-
let newEvent = SUESuccess payload
113-
logInfo logger threadType $ object ["received_event" .= newEvent]
114-
-- Push a success event to Queue along with event payload
115-
STM.atomically $ STM.writeTQueue eventsQueue newEvent
116151

117152
-- | An IO action that processes events from Queue
118-
schemaUpdateEventProcessor
153+
processor
119154
:: Bool
120155
-> PG.PGPool
121156
-> Logger
122157
-> HTTP.Manager
123-
-> STM.TQueue SchemaUpdateEvent
158+
-> STM.TQueue EventPayload
124159
-> SchemaCacheRef
125-
-> InstanceId
126-
-> Maybe UTC.UTCTime
127-
-> IO ()
128-
schemaUpdateEventProcessor strfyNum pool logger httpManager
129-
eventsQueue cacheRef instanceId cacheInit =
160+
-> InstanceId -> IO ()
161+
processor strfyNum pool logger httpMgr eventsQueue
162+
cacheRef instanceId = do
163+
logThreadStartup logger instanceId threadType
130164
-- Never exits
131165
forever $ do
132166
event <- STM.atomically $ STM.readTQueue eventsQueue
133167
logInfo logger threadType $ object ["processed_event" .= event]
134-
when (shouldReload event) $ do
135-
-- Reload schema cache from catalog
136-
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $
137-
peelRun emptySchemaCache adminUserInfo
138-
httpManager strfyNum pool PG.Serializable buildSchemaCache
139-
case resE of
140-
Left e -> logError logger threadType $ TEQueryError e
141-
Right _ ->
142-
logInfo logger threadType $
143-
object ["message" .= ("schema cache reloaded" :: T.Text)]
168+
when (shouldReload event) $
169+
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
170+
threadType "schema cache reloaded"
144171
where
145172
threadType = TTProcessor
146173

147-
-- If postgres reconnect happens reload schema cache
148-
shouldReload SUEPGReConn = True
149-
-- If event is from another server and occurred after
150-
-- init schema cache built then reload
151-
shouldReload (SUESuccess payload) =
152-
(_epInstanceId payload /= getInstanceId instanceId)
153-
&& maybe True (withCacheInit $ _epOccurredAt payload) cacheInit
174+
-- If event is from another server
175+
shouldReload payload = _epInstanceId payload /= instanceId
154176

155-
withCacheInit occurredAt initTime = occurredAt > initTime
177+
logThreadStartup
178+
:: Show a
179+
=> Logger
180+
-> InstanceId
181+
-> a -> IO ()
182+
logThreadStartup logger instanceId threadType =
183+
unLogger logger threadLog
184+
where
185+
threadLog =
186+
let msg = T.pack (show threadType) <> " thread started"
187+
in StartupLog LevelInfo "threads" $
188+
object [ "instance_id" .= getInstanceId instanceId
189+
, "message" .= msg
190+
]
191+
192+
refreshSchemaCache
193+
:: Bool
194+
-> PG.PGPool
195+
-> Logger
196+
-> HTTP.Manager
197+
-> SchemaCacheRef
198+
-> ThreadType
199+
-> T.Text -> IO ()
200+
refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do
201+
-- Reload schema cache from catalog
202+
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $
203+
peelRun emptySchemaCache adminUserInfo
204+
httpManager strfyNum pool PG.Serializable buildSchemaCache
205+
case resE of
206+
Left e -> logError logger threadType $ TEQueryError e
207+
Right _ ->
208+
logInfo logger threadType $ object ["message" .= msg]
156209

157210
logInfo :: Logger -> ThreadType -> Value -> IO ()
158211
logInfo logger threadType val = unLogger logger $
159212
SchemaUpdateEventLog LevelInfo threadType val
160213

161214
logError :: ToJSON a => Logger -> ThreadType -> a -> IO ()
162-
logError logger threadType err = unLogger logger $
163-
SchemaUpdateEventLog LevelError threadType $ object ["error" .= toJSON err]
215+
logError logger threadType err =
216+
unLogger logger $ SchemaUpdateEventLog LevelError threadType $
217+
object ["error" .= toJSON err]

0 commit comments

Comments
 (0)