diff --git a/auto-update/Control/AutoUpdate.hs b/auto-update/Control/AutoUpdate.hs index 0c67a3be4..a07692df5 100644 --- a/auto-update/Control/AutoUpdate.hs +++ b/auto-update/Control/AutoUpdate.hs @@ -1,7 +1,4 @@ -{-# LANGUAGE CPP #-} - --- | In a multithreaded environment, running actions on a regularly scheduled --- background thread can dramatically improve performance. +-- | In a multithreaded environment, sharing results of actions can dramatically improve performance. -- For example, web servers need to return the current time with each HTTP response. -- For a high-volume server, it's much faster for a dedicated thread to run every -- second, and write the current time to a shared 'IORef', than it is for each @@ -43,187 +40,9 @@ module Control.AutoUpdate ( -- * Creation mkAutoUpdate, mkAutoUpdateWithModify, -) where - -#if __GLASGOW_HASKELL__ < 709 -import Control.Applicative ((<*>)) -#endif -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar ( - newEmptyMVar, - putMVar, - readMVar, - takeMVar, - tryPutMVar, - ) -import Control.Exception ( - SomeException, - catch, - mask_, - throw, - try, - ) -import Control.Monad (void) -import Data.IORef (newIORef, readIORef, writeIORef) -import Data.Maybe (fromMaybe) -import GHC.Conc.Sync (labelThread) - --- | Default value for creating an 'UpdateSettings'. --- --- @since 0.1.0 -defaultUpdateSettings :: UpdateSettings () -defaultUpdateSettings = - UpdateSettings - { updateFreq = 1000000 - , updateSpawnThreshold = 3 - , updateAction = return () - , updateThreadName = "AutoUpdate" - } - --- | Settings to control how values are updated. --- --- This should be constructed using 'defaultUpdateSettings' and record --- update syntax, e.g.: --- --- @ --- let settings = 'defaultUpdateSettings' { 'updateAction' = 'Data.Time.Clock.getCurrentTime' } --- @ --- --- @since 0.1.0 -data UpdateSettings a = UpdateSettings - { updateFreq :: Int - -- ^ Microseconds between update calls. Same considerations as - -- 'threadDelay' apply. - -- - -- Default: 1 second (1000000) - -- - -- @since 0.1.0 - , updateSpawnThreshold :: Int - -- ^ NOTE: This value no longer has any effect, since worker threads are - -- dedicated instead of spawned on demand. - -- - -- Previously, this determined how many times the data must be requested - -- before we decide to spawn a dedicated thread. - -- - -- Default: 3 - -- - -- @since 0.1.0 - , updateAction :: IO a - -- ^ Action to be performed to get the current value. - -- - -- Default: does nothing. - -- - -- @since 0.1.0 - , updateThreadName :: String - -- ^ Label of the thread being forked. - -- - -- Default: @"AutoUpdate"@ - -- - -- @since 0.2.2 - } - --- | Generate an action which will either read from an automatically --- updated value, or run the update action in the current thread. --- --- @since 0.1.0 -mkAutoUpdate :: UpdateSettings a -> IO (IO a) -mkAutoUpdate us = mkAutoUpdateHelper us Nothing - --- | Generate an action which will either read from an automatically --- updated value, or run the update action in the current thread if --- the first time or the provided modify action after that. --- --- @since 0.1.4 -mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a) -mkAutoUpdateWithModify us f = mkAutoUpdateHelper us (Just f) - -mkAutoUpdateHelper :: UpdateSettings a -> Maybe (a -> IO a) -> IO (IO a) -mkAutoUpdateHelper us updateActionModify = do - -- A baton to tell the worker thread to generate a new value. - needsRunning <- newEmptyMVar - - -- The initial response variable. Response variables allow the requesting - -- thread to block until a value is generated by the worker thread. - responseVar0 <- newEmptyMVar - - -- The current value, if available. We start off with a Left value - -- indicating no value is available, and the above-created responseVar0 to - -- give a variable to block on. - currRef <- newIORef $ Left responseVar0 - - -- This is used to set a value in the currRef variable when the worker - -- thread exits. In reality, that value should never be used, since the - -- worker thread exiting only occurs if an async exception is thrown, which - -- should only occur if there are no references to needsRunning left. - -- However, this handler will make error messages much clearer if there's a - -- bug in the implementation. - let fillRefOnExit f = do - eres <- try f - case eres of - Left e -> - writeIORef currRef $ - error $ - "Control.AutoUpdate.mkAutoUpdate: worker thread exited with exception: " - ++ show (e :: SomeException) - Right () -> - writeIORef currRef $ - error $ - "Control.AutoUpdate.mkAutoUpdate: worker thread exited normally, " - ++ "which should be impossible due to usage of infinite loop" - - -- fork the worker thread immediately. Note that we mask async exceptions, - -- but *not* in an uninterruptible manner. This will allow a - -- BlockedIndefinitelyOnMVar exception to still be thrown, which will take - -- down this thread when all references to the returned function are - -- garbage collected, and therefore there is no thread that can fill the - -- needsRunning MVar. - -- - -- Note that since we throw away the ThreadId of this new thread and never - -- calls myThreadId, normal async exceptions can never be thrown to it, - -- only RTS exceptions. - tid <- mask_ $ forkIO $ fillRefOnExit $ do - -- This infinite loop makes up out worker thread. It takes an a - -- responseVar value where the next value should be putMVar'ed to for - -- the benefit of any requesters currently blocked on it. - let loop responseVar maybea = do - -- block until a value is actually needed - takeMVar needsRunning - - -- new value requested, so run the updateAction - a <- catchSome $ fromMaybe (updateAction us) (updateActionModify <*> maybea) - - -- we got a new value, update currRef and lastValue - writeIORef currRef $ Right a - putMVar responseVar a - - -- delay until we're needed again - threadDelay $ updateFreq us - - -- delay's over. create a new response variable and set currRef - -- to use it, so that the next requester will block on that - -- variable. Then loop again with the updated response - -- variable. - responseVar' <- newEmptyMVar - writeIORef currRef $ Left responseVar' - loop responseVar' (Just a) - - -- Kick off the loop, with the initial responseVar0 variable. - loop responseVar0 Nothing - labelThread tid $ updateThreadName us - return $ do - mval <- readIORef currRef - case mval of - Left responseVar -> do - -- no current value, force the worker thread to run... - void $ tryPutMVar needsRunning () +) +where - -- and block for the result from the worker - readMVar responseVar - -- we have a current value, use it - Right val -> return val +-- GHC packages --- | Turn a runtime exception into an impure exception, so that all 'IO' --- actions will complete successfully. This simply defers the exception until --- the value is forced. -catchSome :: IO a -> IO a -catchSome act = Control.Exception.catch act $ \e -> return $ throw (e :: SomeException) +import Control.AutoUpdate.Internal diff --git a/auto-update/Control/AutoUpdate/Internal.hs b/auto-update/Control/AutoUpdate/Internal.hs new file mode 100644 index 000000000..f76399def --- /dev/null +++ b/auto-update/Control/AutoUpdate/Internal.hs @@ -0,0 +1,163 @@ +{-# LANGUAGE RecordWildCards #-} + +module Control.AutoUpdate.Internal ( + -- * Type + UpdateSettings (..), + defaultUpdateSettings, + + -- * Creation + mkAutoUpdate, + mkAutoUpdateWithModify, + + -- * Debugging + mkClosableAutoUpdate, + mkClosableAutoUpdate', + UpdateState (..), +) +where + +-- GHC packages + +import Control.Concurrent.STM +import Control.Monad +import Data.IORef +import GHC.Event (getSystemTimerManager, registerTimeout, unregisterTimeout) + +-- | Default value for creating an 'UpdateSettings'. +-- +-- @since 0.1.0 +defaultUpdateSettings :: UpdateSettings () +defaultUpdateSettings = + UpdateSettings + { updateFreq = 1000000 + , updateSpawnThreshold = 3 + , updateAction = return () + , updateThreadName = "AutoUpdate" + } + +-- | Settings to control how values are updated. +-- +-- This should be constructed using 'defaultUpdateSettings' and record +-- update syntax, e.g.: +-- +-- @ +-- let settings = 'defaultUpdateSettings' { 'updateAction' = 'Data.Time.Clock.getCurrentTime' } +-- @ +-- +-- @since 0.1.0 +data UpdateSettings a = UpdateSettings + { updateFreq :: Int + -- ^ Microseconds between update calls. Same considerations as + -- 'threadDelay' apply. + -- + -- Default: 1000000 microseconds (1 second) + -- + -- @since 0.1.0 + , updateSpawnThreshold :: Int + -- ^ Obsoleted field. + -- + -- @since 0.1.0 + , updateAction :: IO a + -- ^ Action to be performed to get the current value. + -- + -- Default: does nothing. + -- + -- @since 0.1.0 + , updateThreadName :: String + -- ^ Label of the thread being forked. + -- + -- Default: @"AutoUpdate"@ + -- + -- @since 0.2.2 + } + +-- | Generate an action which will either read from an automatically +-- updated value, or run the update action in the current thread. +-- +-- @since 0.1.0 +mkAutoUpdate :: UpdateSettings a -> IO (IO a) +mkAutoUpdate settings = fst <$> mkClosableAutoUpdate settings + +-- $setup +-- >>> :set -XNumericUnderscores +-- >>> import Control.Concurrent + +-- | +-- >>> iref <- newIORef (0 :: Int) +-- >>> action = modifyIORef iref (+ 1) >> readIORef iref +-- >>> (getValue, closeState) <- mkClosableAutoUpdate $ defaultUpdateSettings { updateFreq = 200_000, updateAction = action } +-- >>> getValue +-- 1 +-- >>> threadDelay 100_000 >> getValue +-- 1 +-- >>> threadDelay 200_000 >> getValue +-- 2 +-- >>> closeState +mkClosableAutoUpdate :: UpdateSettings a -> IO (IO a, IO ()) +mkClosableAutoUpdate = mkAutoUpdateThings $ \g c _ -> (g, c) + +-- | provide `UpdateState` for debugging +mkClosableAutoUpdate' :: UpdateSettings a -> IO (IO a, IO (), UpdateState a) +mkClosableAutoUpdate' = mkAutoUpdateThings (,,) + +mkAutoUpdateThings + :: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> IO b +mkAutoUpdateThings mk settings = do + us <- openUpdateState settings + pure $ mk (getUpdateResult us) (closeUpdateState us) us + +-------------------------------------------------------------------------------- + +{- FOURMOLU_DISABLE -} +data UpdateState a = + UpdateState + { usUpdateAction_ :: IO a + , usLastResult_ :: IORef a + , usIntervalMicro_ :: Int + , usTimeHasCome_ :: TVar Bool + , usDeleteTimeout_ :: IORef (IO ()) + } +{- FOURMOLU_ENABLE -} + +mkDeleteTimeout :: TVar Bool -> Int -> IO (IO ()) +mkDeleteTimeout thc micro = do + mgr <- getSystemTimerManager + key <- registerTimeout mgr micro (atomically $ writeTVar thc True) + pure $ unregisterTimeout mgr key + +openUpdateState :: UpdateSettings a -> IO (UpdateState a) +openUpdateState UpdateSettings{..} = do + thc <- newTVarIO False + UpdateState updateAction + <$> (newIORef =<< updateAction) + <*> pure updateFreq + <*> pure thc + <*> (newIORef =<< mkDeleteTimeout thc updateFreq) + +closeUpdateState :: UpdateState a -> IO () +closeUpdateState UpdateState{..} = do + delete <- readIORef usDeleteTimeout_ + delete + +onceOnTimeHasCome :: UpdateState a -> IO () -> IO () +onceOnTimeHasCome UpdateState{..} action = do + action' <- atomically $ do + timeHasCome <- readTVar usTimeHasCome_ + when timeHasCome $ writeTVar usTimeHasCome_ False + pure $ when timeHasCome action + action' + +getUpdateResult :: UpdateState a -> IO a +getUpdateResult us@UpdateState{..} = do + onceOnTimeHasCome us $ do + writeIORef usLastResult_ =<< usUpdateAction_ + writeIORef usDeleteTimeout_ =<< mkDeleteTimeout usTimeHasCome_ usIntervalMicro_ + readIORef usLastResult_ + +-- | Generate an action which will either read from an automatically +-- updated value, or run the update action in the current thread if +-- the first time or the provided modify action after that. +-- +-- @since 0.1.4 +mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a) +mkAutoUpdateWithModify us f = undefined diff --git a/auto-update/auto-update.cabal b/auto-update/auto-update.cabal index 06a00da90..8242eeb08 100644 --- a/auto-update/auto-update.cabal +++ b/auto-update/auto-update.cabal @@ -16,11 +16,13 @@ cabal-version: >=1.10 library ghc-options: -Wall exposed-modules: Control.AutoUpdate + Control.AutoUpdate.Internal Control.Debounce Control.Debounce.Internal Control.Reaper Control.Reaper.Internal - build-depends: base >= 4.12 && < 5 + build-depends: base >= 4.12 && < 5, + stm default-language: Haskell2010 if impl(ghc >= 8) default-extensions: Strict StrictData