diff --git a/auto-update/Control/AutoUpdate.hs b/auto-update/Control/AutoUpdate.hs index 0c67a3be4..66b348efb 100644 --- a/auto-update/Control/AutoUpdate.hs +++ b/auto-update/Control/AutoUpdate.hs @@ -1,7 +1,6 @@ {-# LANGUAGE CPP #-} --- | In a multithreaded environment, running actions on a regularly scheduled --- background thread can dramatically improve performance. +-- | In a multithreaded environment, sharing results of actions can dramatically improve performance. -- For example, web servers need to return the current time with each HTTP response. -- For a high-volume server, it's much faster for a dedicated thread to run every -- second, and write the current time to a shared 'IORef', than it is for each @@ -43,187 +42,12 @@ module Control.AutoUpdate ( -- * Creation mkAutoUpdate, mkAutoUpdateWithModify, -) where +) +where -#if __GLASGOW_HASKELL__ < 709 -import Control.Applicative ((<*>)) +#ifdef mingw32_HOST_OS +import Control.AutoUpdate.Thread +#else +import Control.AutoUpdate.Event #endif -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar ( - newEmptyMVar, - putMVar, - readMVar, - takeMVar, - tryPutMVar, - ) -import Control.Exception ( - SomeException, - catch, - mask_, - throw, - try, - ) -import Control.Monad (void) -import Data.IORef (newIORef, readIORef, writeIORef) -import Data.Maybe (fromMaybe) -import GHC.Conc.Sync (labelThread) - --- | Default value for creating an 'UpdateSettings'. --- --- @since 0.1.0 -defaultUpdateSettings :: UpdateSettings () -defaultUpdateSettings = - UpdateSettings - { updateFreq = 1000000 - , updateSpawnThreshold = 3 - , updateAction = return () - , updateThreadName = "AutoUpdate" - } - --- | Settings to control how values are updated. --- --- This should be constructed using 'defaultUpdateSettings' and record --- update syntax, e.g.: --- --- @ --- let settings = 'defaultUpdateSettings' { 'updateAction' = 'Data.Time.Clock.getCurrentTime' } --- @ --- --- @since 0.1.0 -data UpdateSettings a = UpdateSettings - { updateFreq :: Int - -- ^ Microseconds between update calls. Same considerations as - -- 'threadDelay' apply. - -- - -- Default: 1 second (1000000) - -- - -- @since 0.1.0 - , updateSpawnThreshold :: Int - -- ^ NOTE: This value no longer has any effect, since worker threads are - -- dedicated instead of spawned on demand. - -- - -- Previously, this determined how many times the data must be requested - -- before we decide to spawn a dedicated thread. - -- - -- Default: 3 - -- - -- @since 0.1.0 - , updateAction :: IO a - -- ^ Action to be performed to get the current value. - -- - -- Default: does nothing. - -- - -- @since 0.1.0 - , updateThreadName :: String - -- ^ Label of the thread being forked. - -- - -- Default: @"AutoUpdate"@ - -- - -- @since 0.2.2 - } - --- | Generate an action which will either read from an automatically --- updated value, or run the update action in the current thread. --- --- @since 0.1.0 -mkAutoUpdate :: UpdateSettings a -> IO (IO a) -mkAutoUpdate us = mkAutoUpdateHelper us Nothing - --- | Generate an action which will either read from an automatically --- updated value, or run the update action in the current thread if --- the first time or the provided modify action after that. --- --- @since 0.1.4 -mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a) -mkAutoUpdateWithModify us f = mkAutoUpdateHelper us (Just f) - -mkAutoUpdateHelper :: UpdateSettings a -> Maybe (a -> IO a) -> IO (IO a) -mkAutoUpdateHelper us updateActionModify = do - -- A baton to tell the worker thread to generate a new value. - needsRunning <- newEmptyMVar - - -- The initial response variable. Response variables allow the requesting - -- thread to block until a value is generated by the worker thread. - responseVar0 <- newEmptyMVar - - -- The current value, if available. We start off with a Left value - -- indicating no value is available, and the above-created responseVar0 to - -- give a variable to block on. - currRef <- newIORef $ Left responseVar0 - - -- This is used to set a value in the currRef variable when the worker - -- thread exits. In reality, that value should never be used, since the - -- worker thread exiting only occurs if an async exception is thrown, which - -- should only occur if there are no references to needsRunning left. - -- However, this handler will make error messages much clearer if there's a - -- bug in the implementation. - let fillRefOnExit f = do - eres <- try f - case eres of - Left e -> - writeIORef currRef $ - error $ - "Control.AutoUpdate.mkAutoUpdate: worker thread exited with exception: " - ++ show (e :: SomeException) - Right () -> - writeIORef currRef $ - error $ - "Control.AutoUpdate.mkAutoUpdate: worker thread exited normally, " - ++ "which should be impossible due to usage of infinite loop" - - -- fork the worker thread immediately. Note that we mask async exceptions, - -- but *not* in an uninterruptible manner. This will allow a - -- BlockedIndefinitelyOnMVar exception to still be thrown, which will take - -- down this thread when all references to the returned function are - -- garbage collected, and therefore there is no thread that can fill the - -- needsRunning MVar. - -- - -- Note that since we throw away the ThreadId of this new thread and never - -- calls myThreadId, normal async exceptions can never be thrown to it, - -- only RTS exceptions. - tid <- mask_ $ forkIO $ fillRefOnExit $ do - -- This infinite loop makes up out worker thread. It takes an a - -- responseVar value where the next value should be putMVar'ed to for - -- the benefit of any requesters currently blocked on it. - let loop responseVar maybea = do - -- block until a value is actually needed - takeMVar needsRunning - - -- new value requested, so run the updateAction - a <- catchSome $ fromMaybe (updateAction us) (updateActionModify <*> maybea) - - -- we got a new value, update currRef and lastValue - writeIORef currRef $ Right a - putMVar responseVar a - - -- delay until we're needed again - threadDelay $ updateFreq us - - -- delay's over. create a new response variable and set currRef - -- to use it, so that the next requester will block on that - -- variable. Then loop again with the updated response - -- variable. - responseVar' <- newEmptyMVar - writeIORef currRef $ Left responseVar' - loop responseVar' (Just a) - - -- Kick off the loop, with the initial responseVar0 variable. - loop responseVar0 Nothing - labelThread tid $ updateThreadName us - return $ do - mval <- readIORef currRef - case mval of - Left responseVar -> do - -- no current value, force the worker thread to run... - void $ tryPutMVar needsRunning () - - -- and block for the result from the worker - readMVar responseVar - -- we have a current value, use it - Right val -> return val - --- | Turn a runtime exception into an impure exception, so that all 'IO' --- actions will complete successfully. This simply defers the exception until --- the value is forced. -catchSome :: IO a -> IO a -catchSome act = Control.Exception.catch act $ \e -> return $ throw (e :: SomeException) +import Control.AutoUpdate.Types diff --git a/auto-update/Control/AutoUpdate/Event.hs b/auto-update/Control/AutoUpdate/Event.hs new file mode 100644 index 000000000..e2a5e29cd --- /dev/null +++ b/auto-update/Control/AutoUpdate/Event.hs @@ -0,0 +1,124 @@ +{-# LANGUAGE RecordWildCards #-} + +module Control.AutoUpdate.Event ( + -- * Creation + mkAutoUpdate, + mkAutoUpdateWithModify, + + -- * Internal + UpdateState (..), + mkClosableAutoUpdate, + mkClosableAutoUpdate', +) +where + +import Control.Concurrent.STM +import Control.Monad +import Data.IORef +import GHC.Event (getSystemTimerManager, registerTimeout, unregisterTimeout) + +import Control.AutoUpdate.Types + +-------------------------------------------------------------------------------- + +-- | Generate an action which will either read from an automatically +-- updated value, or run the update action in the current thread. +-- +-- @since 0.1.0 +mkAutoUpdate :: UpdateSettings a -> IO (IO a) +mkAutoUpdate = mkAutoUpdateThings $ \g _ _ -> g + +-- | Generate an action which will either read from an automatically +-- updated value, or run the update action in the current thread if +-- the first time or the provided modify action after that. +-- +-- @since 0.1.4 +mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a) +mkAutoUpdateWithModify us f = mkAutoUpdateThingsWithModify (\g _ _ -> g) us f + +-------------------------------------------------------------------------------- + +{- FOURMOLU_DISABLE -} +data UpdateState a = + UpdateState + { usUpdateAction_ :: a -> IO a + , usLastResult_ :: IORef a + , usIntervalMicro_ :: Int + , usTimeHasCome_ :: TVar Bool + , usDeleteTimeout_ :: IORef (IO ()) + } +{- FOURMOLU_ENABLE -} + +-------------------------------------------------------------------------------- + +mkAutoUpdateThings + :: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> IO b +mkAutoUpdateThings mk settings@UpdateSettings{..} = + mkAutoUpdateThingsWithModify mk settings (const updateAction) + +mkAutoUpdateThingsWithModify + :: (IO a -> IO () -> UpdateState a -> b) -> UpdateSettings a -> (a -> IO a) -> IO b +mkAutoUpdateThingsWithModify mk settings update1 = do + us <- openUpdateState settings update1 + pure $ mk (getUpdateResult us) (closeUpdateState us) us + +-------------------------------------------------------------------------------- + +-- $setup +-- >>> :set -XNumericUnderscores +-- >>> import Control.Concurrent + +-- | +-- >>> iref <- newIORef (0 :: Int) +-- >>> action = modifyIORef iref (+ 1) >> readIORef iref +-- >>> (getValue, closeState) <- mkClosableAutoUpdate $ defaultUpdateSettings { updateFreq = 200_000, updateAction = action } +-- >>> getValue +-- 1 +-- >>> threadDelay 100_000 >> getValue +-- 1 +-- >>> threadDelay 200_000 >> getValue +-- 2 +-- >>> closeState +mkClosableAutoUpdate :: UpdateSettings a -> IO (IO a, IO ()) +mkClosableAutoUpdate = mkAutoUpdateThings $ \g c _ -> (g, c) + +-- | provide `UpdateState` for debugging +mkClosableAutoUpdate' :: UpdateSettings a -> IO (IO a, IO (), UpdateState a) +mkClosableAutoUpdate' = mkAutoUpdateThings (,,) + +-------------------------------------------------------------------------------- + +mkDeleteTimeout :: TVar Bool -> Int -> IO (IO ()) +mkDeleteTimeout thc micro = do + mgr <- getSystemTimerManager + key <- registerTimeout mgr micro (atomically $ writeTVar thc True) + pure $ unregisterTimeout mgr key + +openUpdateState :: UpdateSettings a -> (a -> IO a) -> IO (UpdateState a) +openUpdateState UpdateSettings{..} update1 = do + thc <- newTVarIO False + UpdateState update1 + <$> (newIORef =<< updateAction) + <*> pure updateFreq + <*> pure thc + <*> (newIORef =<< mkDeleteTimeout thc updateFreq) + +closeUpdateState :: UpdateState a -> IO () +closeUpdateState UpdateState{..} = do + delete <- readIORef usDeleteTimeout_ + delete + +onceOnTimeHasCome :: UpdateState a -> IO () -> IO () +onceOnTimeHasCome UpdateState{..} action = do + action' <- atomically $ do + timeHasCome <- readTVar usTimeHasCome_ + when timeHasCome $ writeTVar usTimeHasCome_ False + pure $ when timeHasCome action + action' + +getUpdateResult :: UpdateState a -> IO a +getUpdateResult us@UpdateState{..} = do + onceOnTimeHasCome us $ do + writeIORef usLastResult_ =<< usUpdateAction_ =<< readIORef usLastResult_ + writeIORef usDeleteTimeout_ =<< mkDeleteTimeout usTimeHasCome_ usIntervalMicro_ + readIORef usLastResult_ diff --git a/auto-update/Control/AutoUpdate/Internal.hs b/auto-update/Control/AutoUpdate/Internal.hs new file mode 100644 index 000000000..bbda5c957 --- /dev/null +++ b/auto-update/Control/AutoUpdate/Internal.hs @@ -0,0 +1,11 @@ +{-# LANGUAGE RecordWildCards #-} + +module Control.AutoUpdate.Internal ( + -- * Debugging + UpdateState (..), + mkClosableAutoUpdate, + mkClosableAutoUpdate', +) +where + +import Control.AutoUpdate.Event diff --git a/auto-update/Control/AutoUpdate/Thread.hs b/auto-update/Control/AutoUpdate/Thread.hs new file mode 100644 index 000000000..7cc4e8ac3 --- /dev/null +++ b/auto-update/Control/AutoUpdate/Thread.hs @@ -0,0 +1,133 @@ +module Control.AutoUpdate.Thread ( + -- * Creation + mkAutoUpdate, + mkAutoUpdateWithModify, +) where + +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.MVar ( + newEmptyMVar, + putMVar, + readMVar, + takeMVar, + tryPutMVar, + ) +import Control.Exception ( + SomeException, + catch, + mask_, + throw, + try, + ) +import Control.Monad (void) +import Data.IORef (newIORef, readIORef, writeIORef) +import Data.Maybe (fromMaybe) +import GHC.Conc.Sync (labelThread) + +import Control.AutoUpdate.Types + +-- | Generate an action which will either read from an automatically +-- updated value, or run the update action in the current thread. +-- +-- @since 0.1.0 +mkAutoUpdate :: UpdateSettings a -> IO (IO a) +mkAutoUpdate us = mkAutoUpdateHelper us Nothing + +-- | Generate an action which will either read from an automatically +-- updated value, or run the update action in the current thread if +-- the first time or the provided modify action after that. +-- +-- @since 0.1.4 +mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a) +mkAutoUpdateWithModify us f = mkAutoUpdateHelper us (Just f) + +mkAutoUpdateHelper :: UpdateSettings a -> Maybe (a -> IO a) -> IO (IO a) +mkAutoUpdateHelper us updateActionModify = do + -- A baton to tell the worker thread to generate a new value. + needsRunning <- newEmptyMVar + + -- The initial response variable. Response variables allow the requesting + -- thread to block until a value is generated by the worker thread. + responseVar0 <- newEmptyMVar + + -- The current value, if available. We start off with a Left value + -- indicating no value is available, and the above-created responseVar0 to + -- give a variable to block on. + currRef <- newIORef $ Left responseVar0 + + -- This is used to set a value in the currRef variable when the worker + -- thread exits. In reality, that value should never be used, since the + -- worker thread exiting only occurs if an async exception is thrown, which + -- should only occur if there are no references to needsRunning left. + -- However, this handler will make error messages much clearer if there's a + -- bug in the implementation. + let fillRefOnExit f = do + eres <- try f + case eres of + Left e -> + writeIORef currRef $ + error $ + "Control.AutoUpdate.mkAutoUpdate: worker thread exited with exception: " + ++ show (e :: SomeException) + Right () -> + writeIORef currRef $ + error $ + "Control.AutoUpdate.mkAutoUpdate: worker thread exited normally, " + ++ "which should be impossible due to usage of infinite loop" + + -- fork the worker thread immediately. Note that we mask async exceptions, + -- but *not* in an uninterruptible manner. This will allow a + -- BlockedIndefinitelyOnMVar exception to still be thrown, which will take + -- down this thread when all references to the returned function are + -- garbage collected, and therefore there is no thread that can fill the + -- needsRunning MVar. + -- + -- Note that since we throw away the ThreadId of this new thread and never + -- calls myThreadId, normal async exceptions can never be thrown to it, + -- only RTS exceptions. + tid <- mask_ $ forkIO $ fillRefOnExit $ do + -- This infinite loop makes up out worker thread. It takes an a + -- responseVar value where the next value should be putMVar'ed to for + -- the benefit of any requesters currently blocked on it. + let loop responseVar maybea = do + -- block until a value is actually needed + takeMVar needsRunning + + -- new value requested, so run the updateAction + a <- catchSome $ fromMaybe (updateAction us) (updateActionModify <*> maybea) + + -- we got a new value, update currRef and lastValue + writeIORef currRef $ Right a + putMVar responseVar a + + -- delay until we're needed again + threadDelay $ updateFreq us + + -- delay's over. create a new response variable and set currRef + -- to use it, so that the next requester will block on that + -- variable. Then loop again with the updated response + -- variable. + responseVar' <- newEmptyMVar + writeIORef currRef $ Left responseVar' + loop responseVar' (Just a) + + -- Kick off the loop, with the initial responseVar0 variable. + loop responseVar0 Nothing + labelThread tid $ updateThreadName us + return $ do + mval <- readIORef currRef + case mval of + Left responseVar -> do + -- no current value, force the worker thread to run... + void $ tryPutMVar needsRunning () + + -- and block for the result from the worker + readMVar responseVar + -- we have a current value, use it + Right val -> return val + +-- | Turn a runtime exception into an impure exception, so that all 'IO' +-- actions will complete successfully. This simply defers the exception until +-- the value is forced. +catchSome :: IO a -> IO a +catchSome act = Control.Exception.catch act $ \e -> return $ throw (e :: SomeException) diff --git a/auto-update/Control/AutoUpdate/Types.hs b/auto-update/Control/AutoUpdate/Types.hs new file mode 100644 index 000000000..e3e59c2cf --- /dev/null +++ b/auto-update/Control/AutoUpdate/Types.hs @@ -0,0 +1,49 @@ +module Control.AutoUpdate.Types where + +-- | Settings to control how values are updated. +-- +-- This should be constructed using 'defaultUpdateSettings' and record +-- update syntax, e.g.: +-- +-- @ +-- let settings = 'defaultUpdateSettings' { 'updateAction' = 'Data.Time.Clock.getCurrentTime' } +-- @ +-- +-- @since 0.1.0 +data UpdateSettings a = UpdateSettings + { updateFreq :: Int + -- ^ Microseconds between update calls. Same considerations as + -- 'threadDelay' apply. + -- + -- Default: 1000000 microseconds (1 second) + -- + -- @since 0.1.0 + , updateSpawnThreshold :: Int + -- ^ Obsoleted field. + -- + -- @since 0.1.0 + , updateAction :: IO a + -- ^ Action to be performed to get the current value. + -- + -- Default: does nothing. + -- + -- @since 0.1.0 + , updateThreadName :: String + -- ^ Label of the thread being forked. + -- + -- Default: @"AutoUpdate"@ + -- + -- @since 0.2.2 + } + +-- | Default value for creating an 'UpdateSettings'. +-- +-- @since 0.1.0 +defaultUpdateSettings :: UpdateSettings () +defaultUpdateSettings = + UpdateSettings + { updateFreq = 1000000 + , updateSpawnThreshold = 3 + , updateAction = return () + , updateThreadName = "AutoUpdate" + } diff --git a/auto-update/Control/AutoUpdate/Util.hs b/auto-update/Control/AutoUpdate/Util.hs deleted file mode 100644 index 7e2348911..000000000 --- a/auto-update/Control/AutoUpdate/Util.hs +++ /dev/null @@ -1,24 +0,0 @@ -{-# LANGUAGE CPP #-} - -module Control.AutoUpdate.Util ( - atomicModifyIORef', -) where - -#ifndef MIN_VERSION_base -#define MIN_VERSION_base(x,y,z) 1 -#endif - -#if MIN_VERSION_base(4,6,0) -import Data.IORef (atomicModifyIORef') -#else -import Data.IORef (IORef, atomicModifyIORef) --- | Strict version of 'atomicModifyIORef'. This forces both the value stored --- in the 'IORef' as well as the value returned. -atomicModifyIORef' :: IORef a -> (a -> (a,b)) -> IO b -atomicModifyIORef' ref f = do - c <- atomicModifyIORef ref - (\x -> let (a, b) = f x -- Lazy application of "f" - in (a, a `seq` b)) -- Lazy application of "seq" - -- The following forces "a `seq` b", so it also forces "f x". - c `seq` return c -#endif diff --git a/auto-update/Control/Reaper.hs b/auto-update/Control/Reaper.hs index 34db68a75..ef08bae00 100644 --- a/auto-update/Control/Reaper.hs +++ b/auto-update/Control/Reaper.hs @@ -42,11 +42,10 @@ module Control.Reaper ( mkListAction, ) where -import Control.AutoUpdate.Util (atomicModifyIORef') import Control.Concurrent (ThreadId, forkIO, killThread, threadDelay) import Control.Exception (mask_) import Control.Reaper.Internal -import Data.IORef (IORef, newIORef, readIORef, writeIORef) +import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef, writeIORef) import GHC.Conc.Sync (labelThread) -- | Settings for creating a reaper. This type has two parameters: diff --git a/auto-update/auto-update.cabal b/auto-update/auto-update.cabal index 4316e1c0e..0b3cec474 100644 --- a/auto-update/auto-update.cabal +++ b/auto-update/auto-update.cabal @@ -20,8 +20,14 @@ library Control.Debounce.Internal Control.Reaper Control.Reaper.Internal - other-modules: Control.AutoUpdate.Util - build-depends: base >= 4.12 && < 5 + other-modules: Control.AutoUpdate.Types + if os(windows) + other-modules: Control.AutoUpdate.Thread + else + exposed-modules: Control.AutoUpdate.Internal + other-modules: Control.AutoUpdate.Event + build-depends: base >= 4.12 && < 5, + stm default-language: Haskell2010 if impl(ghc >= 8) default-extensions: Strict StrictData