Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread less autoupdate #1018

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 8 additions & 184 deletions auto-update/Control/AutoUpdate.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{-# LANGUAGE CPP #-}

-- | In a multithreaded environment, running actions on a regularly scheduled
-- background thread can dramatically improve performance.
-- | In a multithreaded environment, sharing results of actions can dramatically improve performance.
-- For example, web servers need to return the current time with each HTTP response.
-- For a high-volume server, it's much faster for a dedicated thread to run every
-- second, and write the current time to a shared 'IORef', than it is for each
Expand Down Expand Up @@ -43,187 +42,12 @@ module Control.AutoUpdate (
-- * Creation
mkAutoUpdate,
mkAutoUpdateWithModify,
) where
)
where

#if __GLASGOW_HASKELL__ < 709
import Control.Applicative ((<*>))
#ifdef mingw32_HOST_OS
import Control.AutoUpdate.Thread
#else
import Control.AutoUpdate.Event
#endif
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (
newEmptyMVar,
putMVar,
readMVar,
takeMVar,
tryPutMVar,
)
import Control.Exception (
SomeException,
catch,
mask_,
throw,
try,
)
import Control.Monad (void)
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Maybe (fromMaybe)
import GHC.Conc.Sync (labelThread)

-- | Default value for creating an 'UpdateSettings'.
--
-- @since 0.1.0
defaultUpdateSettings :: UpdateSettings ()
defaultUpdateSettings =
UpdateSettings
{ updateFreq = 1000000
, updateSpawnThreshold = 3
, updateAction = return ()
, updateThreadName = "AutoUpdate"
}

-- | Settings to control how values are updated.
--
-- This should be constructed using 'defaultUpdateSettings' and record
-- update syntax, e.g.:
--
-- @
-- let settings = 'defaultUpdateSettings' { 'updateAction' = 'Data.Time.Clock.getCurrentTime' }
-- @
--
-- @since 0.1.0
data UpdateSettings a = UpdateSettings
{ updateFreq :: Int
-- ^ Microseconds between update calls. Same considerations as
-- 'threadDelay' apply.
--
-- Default: 1 second (1000000)
--
-- @since 0.1.0
, updateSpawnThreshold :: Int
-- ^ NOTE: This value no longer has any effect, since worker threads are
-- dedicated instead of spawned on demand.
--
-- Previously, this determined how many times the data must be requested
-- before we decide to spawn a dedicated thread.
--
-- Default: 3
--
-- @since 0.1.0
, updateAction :: IO a
-- ^ Action to be performed to get the current value.
--
-- Default: does nothing.
--
-- @since 0.1.0
, updateThreadName :: String
-- ^ Label of the thread being forked.
--
-- Default: @"AutoUpdate"@
--
-- @since 0.2.2
}

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread.
--
-- @since 0.1.0
mkAutoUpdate :: UpdateSettings a -> IO (IO a)
mkAutoUpdate us = mkAutoUpdateHelper us Nothing

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread if
-- the first time or the provided modify action after that.
--
-- @since 0.1.4
mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a)
mkAutoUpdateWithModify us f = mkAutoUpdateHelper us (Just f)

mkAutoUpdateHelper :: UpdateSettings a -> Maybe (a -> IO a) -> IO (IO a)
mkAutoUpdateHelper us updateActionModify = do
-- A baton to tell the worker thread to generate a new value.
needsRunning <- newEmptyMVar

-- The initial response variable. Response variables allow the requesting
-- thread to block until a value is generated by the worker thread.
responseVar0 <- newEmptyMVar

-- The current value, if available. We start off with a Left value
-- indicating no value is available, and the above-created responseVar0 to
-- give a variable to block on.
currRef <- newIORef $ Left responseVar0

-- This is used to set a value in the currRef variable when the worker
-- thread exits. In reality, that value should never be used, since the
-- worker thread exiting only occurs if an async exception is thrown, which
-- should only occur if there are no references to needsRunning left.
-- However, this handler will make error messages much clearer if there's a
-- bug in the implementation.
let fillRefOnExit f = do
eres <- try f
case eres of
Left e ->
writeIORef currRef $
error $
"Control.AutoUpdate.mkAutoUpdate: worker thread exited with exception: "
++ show (e :: SomeException)
Right () ->
writeIORef currRef $
error $
"Control.AutoUpdate.mkAutoUpdate: worker thread exited normally, "
++ "which should be impossible due to usage of infinite loop"

-- fork the worker thread immediately. Note that we mask async exceptions,
-- but *not* in an uninterruptible manner. This will allow a
-- BlockedIndefinitelyOnMVar exception to still be thrown, which will take
-- down this thread when all references to the returned function are
-- garbage collected, and therefore there is no thread that can fill the
-- needsRunning MVar.
--
-- Note that since we throw away the ThreadId of this new thread and never
-- calls myThreadId, normal async exceptions can never be thrown to it,
-- only RTS exceptions.
tid <- mask_ $ forkIO $ fillRefOnExit $ do
-- This infinite loop makes up out worker thread. It takes an a
-- responseVar value where the next value should be putMVar'ed to for
-- the benefit of any requesters currently blocked on it.
let loop responseVar maybea = do
-- block until a value is actually needed
takeMVar needsRunning

-- new value requested, so run the updateAction
a <- catchSome $ fromMaybe (updateAction us) (updateActionModify <*> maybea)

-- we got a new value, update currRef and lastValue
writeIORef currRef $ Right a
putMVar responseVar a

-- delay until we're needed again
threadDelay $ updateFreq us

-- delay's over. create a new response variable and set currRef
-- to use it, so that the next requester will block on that
-- variable. Then loop again with the updated response
-- variable.
responseVar' <- newEmptyMVar
writeIORef currRef $ Left responseVar'
loop responseVar' (Just a)

-- Kick off the loop, with the initial responseVar0 variable.
loop responseVar0 Nothing
labelThread tid $ updateThreadName us
return $ do
mval <- readIORef currRef
case mval of
Left responseVar -> do
-- no current value, force the worker thread to run...
void $ tryPutMVar needsRunning ()

-- and block for the result from the worker
readMVar responseVar
-- we have a current value, use it
Right val -> return val

-- | Turn a runtime exception into an impure exception, so that all 'IO'
-- actions will complete successfully. This simply defers the exception until
-- the value is forced.
catchSome :: IO a -> IO a
catchSome act = Control.Exception.catch act $ \e -> return $ throw (e :: SomeException)
import Control.AutoUpdate.Types
124 changes: 124 additions & 0 deletions auto-update/Control/AutoUpdate/Event.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
{-# LANGUAGE RecordWildCards #-}

module Control.AutoUpdate.Event (
-- * Creation
mkAutoUpdate,
mkAutoUpdateWithModify,

-- * Internal
UpdateState (..),
mkClosableAutoUpdate,
mkClosableAutoUpdate',
)
where

import Control.Concurrent.STM
import Control.Monad
import Data.IORef
import GHC.Event (getSystemTimerManager, registerTimeout, unregisterTimeout)

import Control.AutoUpdate.Types

--------------------------------------------------------------------------------

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread.
--
-- @since 0.1.0
mkAutoUpdate :: UpdateSettings a -> IO (IO a)
mkAutoUpdate = mkAutoUpdateThings $ \g _ _ -> g

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread if
-- the first time or the provided modify action after that.
--
-- @since 0.1.4
mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a)
mkAutoUpdateWithModify us f = mkAutoUpdateThingsWithModify (\g _ _ -> g) us f

--------------------------------------------------------------------------------

{- FOURMOLU_DISABLE -}
data UpdateState a =
UpdateState
{ usUpdateAction_ :: a -> IO a
, usLastResult_ :: IORef a
, usIntervalMicro_ :: Int
, usTimeHasCome_ :: TVar Bool
, usDeleteTimeout_ :: IORef (IO ())
}
{- FOURMOLU_ENABLE -}

--------------------------------------------------------------------------------

mkAutoUpdateThings
:: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> IO b
mkAutoUpdateThings mk settings@UpdateSettings{..} =
mkAutoUpdateThingsWithModify mk settings (const updateAction)

mkAutoUpdateThingsWithModify
:: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> (a -> IO a) -> IO b
mkAutoUpdateThingsWithModify mk settings update1 = do
us <- openUpdateState settings update1
pure $ mk (getUpdateResult us) (closeUpdateState us) us

--------------------------------------------------------------------------------

-- $setup
-- >>> :set -XNumericUnderscores
-- >>> import Control.Concurrent

-- |
-- >>> iref <- newIORef (0 :: Int)
-- >>> action = modifyIORef iref (+ 1) >> readIORef iref
-- >>> (getValue, closeState) <- mkClosableAutoUpdate $ defaultUpdateSettings { updateFreq = 200_000, updateAction = action }
-- >>> getValue
-- 1
-- >>> threadDelay 100_000 >> getValue
-- 1
-- >>> threadDelay 200_000 >> getValue
-- 2
-- >>> closeState
mkClosableAutoUpdate :: UpdateSettings a -> IO (IO a, IO ())
mkClosableAutoUpdate = mkAutoUpdateThings $ \g c _ -> (g, c)

-- | provide `UpdateState` for debugging
mkClosableAutoUpdate' :: UpdateSettings a -> IO (IO a, IO (), UpdateState a)
mkClosableAutoUpdate' = mkAutoUpdateThings (,,)

--------------------------------------------------------------------------------

mkDeleteTimeout :: TVar Bool -> Int -> IO (IO ())
mkDeleteTimeout thc micro = do
mgr <- getSystemTimerManager
key <- registerTimeout mgr micro (atomically $ writeTVar thc True)
pure $ unregisterTimeout mgr key

openUpdateState :: UpdateSettings a -> (a -> IO a) -> IO (UpdateState a)
openUpdateState UpdateSettings{..} update1 = do
thc <- newTVarIO False
UpdateState update1
<$> (newIORef =<< updateAction)
<*> pure updateFreq
<*> pure thc
<*> (newIORef =<< mkDeleteTimeout thc updateFreq)

closeUpdateState :: UpdateState a -> IO ()
closeUpdateState UpdateState{..} = do
delete <- readIORef usDeleteTimeout_
delete

onceOnTimeHasCome :: UpdateState a -> IO () -> IO ()
onceOnTimeHasCome UpdateState{..} action = do
action' <- atomically $ do
timeHasCome <- readTVar usTimeHasCome_
when timeHasCome $ writeTVar usTimeHasCome_ False
pure $ when timeHasCome action
action'

getUpdateResult :: UpdateState a -> IO a
getUpdateResult us@UpdateState{..} = do
onceOnTimeHasCome us $ do
writeIORef usLastResult_ =<< usUpdateAction_ =<< readIORef usLastResult_
writeIORef usDeleteTimeout_ =<< mkDeleteTimeout usTimeHasCome_ usIntervalMicro_
readIORef usLastResult_
11 changes: 11 additions & 0 deletions auto-update/Control/AutoUpdate/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{-# LANGUAGE RecordWildCards #-}

module Control.AutoUpdate.Internal (
-- * Debugging
UpdateState (..),
mkClosableAutoUpdate,
mkClosableAutoUpdate',
)
where

import Control.AutoUpdate.Event
Loading
Loading