Skip to content

Commit

Permalink
add workerQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
soulomoon committed Jun 8, 2024
1 parent 82da337 commit 06e3b8f
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 22 deletions.
6 changes: 3 additions & 3 deletions ghcide/session-loader/Development/IDE/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ import Data.Void

import Control.Concurrent.STM.Stats (atomically, modifyTVar',
readTVar, writeTVar)
import Control.Concurrent.STM.TQueue
import Control.DeepSeq
import Control.Exception (evaluate)
import Control.Monad.IO.Unlift (MonadUnliftIO)
Expand All @@ -105,7 +104,8 @@ import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Database.SQLite.Simple
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Core.WorkerThread (awaitRunInThread,
import Development.IDE.Core.WorkerThread (WorkerQueue,
awaitRunInThread,
withWorkerQueue)
import Development.IDE.Session.Diagnostics (renderCradleError)
import Development.IDE.Types.Shake (WithHieDb,
Expand Down Expand Up @@ -438,7 +438,7 @@ getHieDbLoc dir = do
-- components mapping to the same hie.yaml file are mapped to the same
-- HscEnv which is updated as new components are discovered.

loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> WorkerQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do
let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory]
cradle_files <- newIORef []
Expand Down
3 changes: 2 additions & 1 deletion ghcide/src/Development/IDE/Core/Compile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ import GHC.Driver.Config.CoreToStg.Prep
#if MIN_VERSION_ghc(9,7,0)
import Data.Foldable (toList)
import GHC.Unit.Module.Warnings
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
#else
import Development.IDE.Core.FileStore (shareFilePath)
#endif
Expand Down Expand Up @@ -899,7 +900,7 @@ indexHieFile se mod_summary srcPath !hash hf = do
-- hiedb doesn't use the Haskell src, so we clear it to avoid unnecessarily keeping it around
let !hf' = hf{hie_hs_src = mempty}
modifyTVar' indexPending $ HashMap.insert srcPath hash
writeTQueue indexQueue $ \withHieDb -> do
writeWorkerQueue indexQueue $ \withHieDb -> do
-- We are now in the worker thread
-- Check if a newer index of this file has been scheduled, and if so skip this one
newerScheduled <- atomically $ do
Expand Down
4 changes: 2 additions & 2 deletions ghcide/src/Development/IDE/Core/FileStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module Development.IDE.Core.FileStore(
) where

import Control.Concurrent.STM.Stats (STM, atomically)
import Control.Concurrent.STM.TQueue (writeTQueue)
import Control.Exception
import Control.Monad.Extra
import Control.Monad.IO.Class
Expand All @@ -40,6 +39,7 @@ import Development.IDE.Core.IdeConfiguration (isWorkspaceFile)
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Shake hiding (Log)
import qualified Development.IDE.Core.Shake as Shake
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
import Development.IDE.GHC.Orphans ()
import Development.IDE.Graph
import Development.IDE.Import.DependencyInformation
Expand Down Expand Up @@ -247,7 +247,7 @@ typecheckParentsAction recorder nfp = do
setSomethingModified :: VFSModified -> IdeState -> String -> IO [Key] -> IO ()
setSomethingModified vfs state reason actionBetweenSession = do
-- Update database to remove any files that might have been renamed/deleted
atomically $ writeTQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
atomically $ writeWorkerQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
void $ restartShakeSession (shakeExtras state) vfs reason [] actionBetweenSession

registerFileWatches :: [String] -> LSP.LspT Config IO Bool
Expand Down
10 changes: 5 additions & 5 deletions ghcide/src/Development/IDE/Core/Shake.hs
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ data HieDbWriter
-- | Actions to queue up on the index worker thread
-- The inner `(HieDb -> IO ()) -> IO ()` wraps `HieDb -> IO ()`
-- with (currently) retry functionality
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())
type IndexQueue = WorkerQueue (((HieDb -> IO ()) -> IO ()) -> IO ())

data ThreadQueue = ThreadQueue {
tIndexQueue :: IndexQueue
, tRestartQueue :: TQueue (IO ())
, tLoaderQueue :: TQueue (IO ())
, tRestartQueue :: WorkerQueue (IO ())
, tLoaderQueue :: WorkerQueue (IO ())
}

-- Note [Semantic Tokens Cache Location]
Expand Down Expand Up @@ -342,9 +342,9 @@ data ShakeExtras = ShakeExtras
-- ^ Default HLS config, only relevant if the client does not provide any Config
, dirtyKeys :: TVar KeySet
-- ^ Set of dirty rule keys since the last Shake run
, restartQueue :: TQueue (IO ())
, restartQueue :: WorkerQueue (IO ())
-- ^ Queue of restart actions to be run.
, loaderQueue :: TQueue (IO ())
, loaderQueue :: WorkerQueue (IO ())
-- ^ Queue of loader actions to be run.
}

Expand Down
52 changes: 43 additions & 9 deletions ghcide/src/Development/IDE/Core/WorkerThread.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ Description : This module provides an API for managing worker threads in the IDE
see Note [Serializing runs in separate thread]
-}
module Development.IDE.Core.WorkerThread
(withWorkerQueue, awaitRunInThread)
(withWorkerQueue, awaitRunInThread, withWorkerQueueOfOne, WorkerQueue, writeWorkerQueue)
where

import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM
import Control.Concurrent.Strict (newBarrier, signalBarrier,
waitBarrier)
import Control.Exception (finally)
import Control.Monad (forever)
import Control.Monad.Cont (ContT (ContT))
import Control.Monad.IO.Class (liftIO)

{-
Note [Serializing runs in separate thread]
Expand All @@ -28,27 +30,59 @@ Originally we used various ways to implement this, but it was hard to maintain a
Moreover, we can not stop these threads uniformly when we are shutting down the server.
-}

-- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
data WorkerQueue a = WorkerQueueOfOne (TMVar a) | WorkerQueueOfMany (TQueue a)

writeWorkerQueue :: WorkerQueue a -> a -> STM ()
writeWorkerQueue (WorkerQueueOfOne tvar) action = putTMVar tvar action
writeWorkerQueue (WorkerQueueOfMany tqueue) action = writeTQueue tqueue action

newWorkerQueue :: STM (WorkerQueue a)
newWorkerQueue = WorkerQueueOfMany <$> newTQueue

newWorkerQueueOfOne :: STM (WorkerQueue a)
newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar


-- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker
-- thread which polls the queue for requests and runs the given worker
-- function on them.
withWorkerQueue :: (t -> IO a) -> ContT () IO (TQueue t)
withWorkerQueue workerAction = ContT $ \mainAction -> do
q <- newTQueueIO
withWorkerQueue :: (t -> IO a) -> ContT () IO (WorkerQueue t)
withWorkerQueue workerAction = do
q <- liftIO $ atomically newWorkerQueue
runWorkerQueue q workerAction

-- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time.
-- and one action can only be queued after the previous action has been done.
-- this is useful when we want to cancel the action waiting in the queue, if it's thread is cancelled.
-- e.g. session loading in session loader. When a shake session is restarted, we want to cancel the previous pending session loading.
withWorkerQueueOfOne :: (t -> IO a) -> ContT () IO (WorkerQueue t)
withWorkerQueueOfOne workerAction = do
q <- liftIO $ atomically newWorkerQueueOfOne
runWorkerQueue q workerAction

runWorkerQueue :: WorkerQueue t -> (t -> IO a) -> ContT () IO (WorkerQueue t)
runWorkerQueue q workerAction = ContT $ \mainAction -> do
withAsync (writerThread q) $ \_ -> mainAction q
where
writerThread q =
forever $ do
l <- atomically $ readTQueue q
workerAction l
case q of
-- only remove the action from the queue after it has been run if it is a one-shot queue
WorkerQueueOfOne tvar -> do
l <- atomically $ readTMVar tvar
workerAction l `finally` atomically (takeTMVar tvar)
WorkerQueueOfMany q -> do
l <- atomically $ readTQueue q
workerAction l

-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
-- and then blocks until the result is computed.
awaitRunInThread :: TQueue (IO ()) -> IO result -> IO result
awaitRunInThread :: WorkerQueue (IO ()) -> IO result -> IO result
awaitRunInThread q act = do
-- Take an action from TQueue, run it and
-- use barrier to wait for the result
barrier <- newBarrier
atomically $ writeTQueue q $ do
atomically $ writeWorkerQueue q $ do
res <- act
signalBarrier barrier res
waitBarrier barrier
5 changes: 3 additions & 2 deletions ghcide/src/Development/IDE/LSP/LanguageServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import Control.Monad.Trans.Cont (evalContT)
import Development.IDE.Core.IdeConfiguration
import Development.IDE.Core.Shake hiding (Log)
import Development.IDE.Core.Tracing
import Development.IDE.Core.WorkerThread (withWorkerQueue)
import Development.IDE.Core.WorkerThread (withWorkerQueue,
withWorkerQueueOfOne)
import qualified Development.IDE.Session as Session
import Development.IDE.Types.Shake (WithHieDb,
WithHieDbShield (..))
Expand Down Expand Up @@ -261,7 +262,7 @@ handleInit recorder defaultRoot getHieDbLoc getIdeState lifetime exitClientMsg c
runWithWorkerThreads :: Recorder (WithPriority Session.Log) -> FilePath -> (WithHieDb -> ThreadQueue -> IO ()) -> IO ()
runWithWorkerThreads recorder dbLoc f = evalContT $ do
sessionRestartTQueue <- withWorkerQueue id
sessionLoaderTQueue <- withWorkerQueue id
sessionLoaderTQueue <- withWorkerQueueOfOne id
(WithHieDbShield hiedb, threadQueue) <- runWithDb recorder dbLoc
liftIO $ f hiedb (ThreadQueue threadQueue sessionRestartTQueue sessionLoaderTQueue)

Expand Down

0 comments on commit 06e3b8f

Please sign in to comment.