Skip to content

Commit

Permalink
Merge pull request #1018 from kazu-yamamoto/thread-less-autoupdate
Browse files Browse the repository at this point in the history
Thread less autoupdate
  • Loading branch information
kazu-yamamoto authored Dec 18, 2024
2 parents f268f20 + d16ec9a commit b9cf8ed
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 212 deletions.
192 changes: 8 additions & 184 deletions auto-update/Control/AutoUpdate.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{-# LANGUAGE CPP #-}

-- | In a multithreaded environment, running actions on a regularly scheduled
-- background thread can dramatically improve performance.
-- | In a multithreaded environment, sharing results of actions can dramatically improve performance.
-- For example, web servers need to return the current time with each HTTP response.
-- For a high-volume server, it's much faster for a dedicated thread to run every
-- second, and write the current time to a shared 'IORef', than it is for each
Expand Down Expand Up @@ -43,187 +42,12 @@ module Control.AutoUpdate (
-- * Creation
mkAutoUpdate,
mkAutoUpdateWithModify,
) where
)
where

#if __GLASGOW_HASKELL__ < 709
import Control.Applicative ((<*>))
#ifdef mingw32_HOST_OS
import Control.AutoUpdate.Thread
#else
import Control.AutoUpdate.Event
#endif
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (
newEmptyMVar,
putMVar,
readMVar,
takeMVar,
tryPutMVar,
)
import Control.Exception (
SomeException,
catch,
mask_,
throw,
try,
)
import Control.Monad (void)
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Maybe (fromMaybe)
import GHC.Conc.Sync (labelThread)

-- | Default value for creating an 'UpdateSettings'.
--
-- @since 0.1.0
defaultUpdateSettings :: UpdateSettings ()
defaultUpdateSettings =
UpdateSettings
{ updateFreq = 1000000
, updateSpawnThreshold = 3
, updateAction = return ()
, updateThreadName = "AutoUpdate"
}

-- | Settings to control how values are updated.
--
-- This should be constructed using 'defaultUpdateSettings' and record
-- update syntax, e.g.:
--
-- @
-- let settings = 'defaultUpdateSettings' { 'updateAction' = 'Data.Time.Clock.getCurrentTime' }
-- @
--
-- @since 0.1.0
data UpdateSettings a = UpdateSettings
{ updateFreq :: Int
-- ^ Microseconds between update calls. Same considerations as
-- 'threadDelay' apply.
--
-- Default: 1 second (1000000)
--
-- @since 0.1.0
, updateSpawnThreshold :: Int
-- ^ NOTE: This value no longer has any effect, since worker threads are
-- dedicated instead of spawned on demand.
--
-- Previously, this determined how many times the data must be requested
-- before we decide to spawn a dedicated thread.
--
-- Default: 3
--
-- @since 0.1.0
, updateAction :: IO a
-- ^ Action to be performed to get the current value.
--
-- Default: does nothing.
--
-- @since 0.1.0
, updateThreadName :: String
-- ^ Label of the thread being forked.
--
-- Default: @"AutoUpdate"@
--
-- @since 0.2.2
}

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread.
--
-- @since 0.1.0
mkAutoUpdate :: UpdateSettings a -> IO (IO a)
mkAutoUpdate us = mkAutoUpdateHelper us Nothing

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread if
-- the first time or the provided modify action after that.
--
-- @since 0.1.4
mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a)
mkAutoUpdateWithModify us f = mkAutoUpdateHelper us (Just f)

mkAutoUpdateHelper :: UpdateSettings a -> Maybe (a -> IO a) -> IO (IO a)
mkAutoUpdateHelper us updateActionModify = do
-- A baton to tell the worker thread to generate a new value.
needsRunning <- newEmptyMVar

-- The initial response variable. Response variables allow the requesting
-- thread to block until a value is generated by the worker thread.
responseVar0 <- newEmptyMVar

-- The current value, if available. We start off with a Left value
-- indicating no value is available, and the above-created responseVar0 to
-- give a variable to block on.
currRef <- newIORef $ Left responseVar0

-- This is used to set a value in the currRef variable when the worker
-- thread exits. In reality, that value should never be used, since the
-- worker thread exiting only occurs if an async exception is thrown, which
-- should only occur if there are no references to needsRunning left.
-- However, this handler will make error messages much clearer if there's a
-- bug in the implementation.
let fillRefOnExit f = do
eres <- try f
case eres of
Left e ->
writeIORef currRef $
error $
"Control.AutoUpdate.mkAutoUpdate: worker thread exited with exception: "
++ show (e :: SomeException)
Right () ->
writeIORef currRef $
error $
"Control.AutoUpdate.mkAutoUpdate: worker thread exited normally, "
++ "which should be impossible due to usage of infinite loop"

-- fork the worker thread immediately. Note that we mask async exceptions,
-- but *not* in an uninterruptible manner. This will allow a
-- BlockedIndefinitelyOnMVar exception to still be thrown, which will take
-- down this thread when all references to the returned function are
-- garbage collected, and therefore there is no thread that can fill the
-- needsRunning MVar.
--
-- Note that since we throw away the ThreadId of this new thread and never
-- calls myThreadId, normal async exceptions can never be thrown to it,
-- only RTS exceptions.
tid <- mask_ $ forkIO $ fillRefOnExit $ do
-- This infinite loop makes up out worker thread. It takes an a
-- responseVar value where the next value should be putMVar'ed to for
-- the benefit of any requesters currently blocked on it.
let loop responseVar maybea = do
-- block until a value is actually needed
takeMVar needsRunning

-- new value requested, so run the updateAction
a <- catchSome $ fromMaybe (updateAction us) (updateActionModify <*> maybea)

-- we got a new value, update currRef and lastValue
writeIORef currRef $ Right a
putMVar responseVar a

-- delay until we're needed again
threadDelay $ updateFreq us

-- delay's over. create a new response variable and set currRef
-- to use it, so that the next requester will block on that
-- variable. Then loop again with the updated response
-- variable.
responseVar' <- newEmptyMVar
writeIORef currRef $ Left responseVar'
loop responseVar' (Just a)

-- Kick off the loop, with the initial responseVar0 variable.
loop responseVar0 Nothing
labelThread tid $ updateThreadName us
return $ do
mval <- readIORef currRef
case mval of
Left responseVar -> do
-- no current value, force the worker thread to run...
void $ tryPutMVar needsRunning ()

-- and block for the result from the worker
readMVar responseVar
-- we have a current value, use it
Right val -> return val

-- | Turn a runtime exception into an impure exception, so that all 'IO'
-- actions will complete successfully. This simply defers the exception until
-- the value is forced.
catchSome :: IO a -> IO a
catchSome act = Control.Exception.catch act $ \e -> return $ throw (e :: SomeException)
import Control.AutoUpdate.Types
124 changes: 124 additions & 0 deletions auto-update/Control/AutoUpdate/Event.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{-# LANGUAGE RecordWildCards #-}

module Control.AutoUpdate.Event (
-- * Creation
mkAutoUpdate,
mkAutoUpdateWithModify,

-- * Internal
UpdateState (..),
mkClosableAutoUpdate,
mkClosableAutoUpdate',
)
where

import Control.Concurrent.STM
import Control.Monad
import Data.IORef
import GHC.Event (getSystemTimerManager, registerTimeout, unregisterTimeout)

import Control.AutoUpdate.Types

--------------------------------------------------------------------------------

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread.
--
-- @since 0.1.0
mkAutoUpdate :: UpdateSettings a -> IO (IO a)
mkAutoUpdate = mkAutoUpdateThings $ \g _ _ -> g

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread if
-- the first time or the provided modify action after that.
--
-- @since 0.1.4
mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a)
mkAutoUpdateWithModify us f = mkAutoUpdateThingsWithModify (\g _ _ -> g) us f

--------------------------------------------------------------------------------

{- FOURMOLU_DISABLE -}
data UpdateState a =
UpdateState
{ usUpdateAction_ :: a -> IO a
, usLastResult_ :: IORef a
, usIntervalMicro_ :: Int
, usTimeHasCome_ :: TVar Bool
, usDeleteTimeout_ :: IORef (IO ())
}
{- FOURMOLU_ENABLE -}

--------------------------------------------------------------------------------

mkAutoUpdateThings
:: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> IO b
mkAutoUpdateThings mk settings@UpdateSettings{..} =
mkAutoUpdateThingsWithModify mk settings (const updateAction)

mkAutoUpdateThingsWithModify
:: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> (a -> IO a) -> IO b
mkAutoUpdateThingsWithModify mk settings update1 = do
us <- openUpdateState settings update1
pure $ mk (getUpdateResult us) (closeUpdateState us) us

--------------------------------------------------------------------------------

-- $setup
-- >>> :set -XNumericUnderscores
-- >>> import Control.Concurrent

-- |
-- >>> iref <- newIORef (0 :: Int)
-- >>> action = modifyIORef iref (+ 1) >> readIORef iref
-- >>> (getValue, closeState) <- mkClosableAutoUpdate $ defaultUpdateSettings { updateFreq = 200_000, updateAction = action }
-- >>> getValue
-- 1
-- >>> threadDelay 100_000 >> getValue
-- 1
-- >>> threadDelay 200_000 >> getValue
-- 2
-- >>> closeState
mkClosableAutoUpdate :: UpdateSettings a -> IO (IO a, IO ())
mkClosableAutoUpdate = mkAutoUpdateThings $ \g c _ -> (g, c)

-- | provide `UpdateState` for debugging
mkClosableAutoUpdate' :: UpdateSettings a -> IO (IO a, IO (), UpdateState a)
mkClosableAutoUpdate' = mkAutoUpdateThings (,,)

--------------------------------------------------------------------------------

mkDeleteTimeout :: TVar Bool -> Int -> IO (IO ())
mkDeleteTimeout thc micro = do
mgr <- getSystemTimerManager
key <- registerTimeout mgr micro (atomically $ writeTVar thc True)
pure $ unregisterTimeout mgr key

openUpdateState :: UpdateSettings a -> (a -> IO a) -> IO (UpdateState a)
openUpdateState UpdateSettings{..} update1 = do
thc <- newTVarIO False
UpdateState update1
<$> (newIORef =<< updateAction)
<*> pure updateFreq
<*> pure thc
<*> (newIORef =<< mkDeleteTimeout thc updateFreq)

closeUpdateState :: UpdateState a -> IO ()
closeUpdateState UpdateState{..} = do
delete <- readIORef usDeleteTimeout_
delete

onceOnTimeHasCome :: UpdateState a -> IO () -> IO ()
onceOnTimeHasCome UpdateState{..} action = do
action' <- atomically $ do
timeHasCome <- readTVar usTimeHasCome_
when timeHasCome $ writeTVar usTimeHasCome_ False
pure $ when timeHasCome action
action'

getUpdateResult :: UpdateState a -> IO a
getUpdateResult us@UpdateState{..} = do
onceOnTimeHasCome us $ do
writeIORef usLastResult_ =<< usUpdateAction_ =<< readIORef usLastResult_
writeIORef usDeleteTimeout_ =<< mkDeleteTimeout usTimeHasCome_ usIntervalMicro_
readIORef usLastResult_
11 changes: 11 additions & 0 deletions auto-update/Control/AutoUpdate/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{-# LANGUAGE RecordWildCards #-}

module Control.AutoUpdate.Internal (
-- * Debugging
UpdateState (..),
mkClosableAutoUpdate,
mkClosableAutoUpdate',
)
where

import Control.AutoUpdate.Event
Loading

0 comments on commit b9cf8ed

Please sign in to comment.