diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs index 0b661b1337..cb86707906 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Run.hs @@ -53,7 +53,7 @@ import Ouroboros.Consensus.Util.Orphans () import Ouroboros.Network.Block (genesisPoint) import System.FS.API import System.IO -import System.Random +import System.Random (genWord64, newStdGen) import Text.Printf (printf) {------------------------------------------------------------------------------- @@ -140,6 +140,7 @@ analyse dbaConfig args = lsmSalt <- fst . genWord64 <$> newStdGen ProtocolInfo{pInfoInitLedger = genesisLedger, pInfoConfig = cfg} <- mkProtocolInfo args + snapshotDelayRng <- newStdGen let shfs = Node.stdMkChainDbHasFS dbDir chunkInfo = Node.nodeImmutableDbChunkInfo (configStorage cfg) flavargs = case ldbBackend of @@ -169,6 +170,7 @@ analyse dbaConfig args = (const True) shfs shfs + snapshotDelayRng flavargs $ ChainDB.defaultArgs -- Set @k=1@ to reduce the memory usage of the LedgerDB. We only ever diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBSynthesizer/Run.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBSynthesizer/Run.hs index 885af6c734..5c2d294875 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBSynthesizer/Run.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBSynthesizer/Run.hs @@ -59,6 +59,7 @@ import Ouroboros.Network.Block import Ouroboros.Network.Point (WithOrigin (..)) import System.Directory import System.FilePath (takeDirectory, ()) +import System.Random (newStdGen) initialize :: NodeFilePaths -> @@ -145,6 +146,7 @@ synthesize :: IO ForgeResult synthesize genTxs DBSynthesizerConfig{confOptions, confShelleyGenesis, confDbDir} runP = withRegistry $ \registry -> do + snapshotDelayRng <- newStdGen let epochSize = sgEpochLength confShelleyGenesis chunkInfo = Node.nodeImmutableDbChunkInfo (configStorage pInfoConfig) @@ -158,6 +160,7 @@ synthesize genTxs DBSynthesizerConfig{confOptions, confShelleyGenesis, confDbDir (const True) (Node.stdMkChainDbHasFS confDbDir) (Node.stdMkChainDbHasFS confDbDir) + snapshotDelayRng flavargs $ ChainDB.defaultArgs diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index efd7018046..811fa64ea8 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -523,6 +523,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = initLedger llrnMkImmutableHasFS llrnMkVolatileHasFS + snapshotDelayRng llrnLdbFlavorArgs llrnChainDbArgsDefaults ( setLoEinChainDbArgs @@ -624,7 +625,8 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = where (gsmAntiThunderingHerd, rng') = split llrnRng (peerSelectionRng, rng'') = split rng' - (keepAliveRng, ntnAppsRng) = split rng'' + (keepAliveRng, rng''') = split rng'' + (ntnAppsRng, snapshotDelayRng) = split rng''' ProtocolInfo { pInfoConfig = cfg @@ -816,13 +818,14 @@ openChainDB :: (ChainDB.RelativeMountPoint -> SomeHasFS m) -> -- | Volatile FS, see 'NodeDatabasePaths' (ChainDB.RelativeMountPoint -> SomeHasFS m) -> + StdGen -> LedgerDbBackendArgs m blk -> -- | A set of default arguments (possibly modified from 'defaultArgs') Incomplete ChainDbArgs m blk -> -- | Customise the 'ChainDbArgs' (Complete ChainDbArgs m blk -> Complete ChainDbArgs m blk) -> m (ChainDB m blk, Complete ChainDbArgs m blk) -openChainDB registry cfg initLedger fsImm fsVol flavorArgs defArgs customiseArgs = +openChainDB registry cfg initLedger fsImm fsVol delayRng flavorArgs defArgs customiseArgs = let args = customiseArgs $ ChainDB.completeChainDbArgs @@ -833,6 +836,7 @@ openChainDB registry cfg initLedger fsImm fsVol flavorArgs defArgs customiseArgs (nodeCheckIntegrity (configStorage cfg)) fsImm fsVol + delayRng flavorArgs defArgs in (,args) <$> ChainDB.openDB args diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index e16d0c2f71..471d0cbf6e 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -341,6 +341,7 @@ library primitive, psqueues ^>=0.2.3, quiet ^>=0.2, + random, rawlock ^>=0.1.1, resource-registry ^>=0.1, semialign >=1.1, diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index e23888d4b7..ffbe1d2969 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -225,6 +225,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do chainSelFuse <- newFuse "chain selection" chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs) varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing + varSnapshotDelayRNG <- newTVarIO (Args.cdbsSnapshotDelayRNG cdbSpecificArgs) let env = CDB @@ -250,6 +251,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , cdbChainSelQueue = chainSelQueue , cdbLoE = Args.cdbsLoE cdbSpecificArgs , cdbChainSelStarvation = varChainSelStarvation + , cdbSnapshotDelayRNG = varSnapshotDelayRNG } setGetCurrentChainForLedgerDB $ Query.getCurrentChain env @@ -289,7 +291,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , intGarbageCollect = \slot -> getEnv h $ \e -> do Background.garbageCollectBlocks e slot LedgerDB.garbageCollect (cdbLedgerDB e) slot - , intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB + , intTryTakeSnapshot = getEnv2 h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB , intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse) , intKillBgThreads = varKillBgThreads } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs index a5b95d537d..108e81b7d2 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs @@ -48,6 +48,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.IOLike import System.FS.API +import System.Random (StdGen) {------------------------------------------------------------------------------- Arguments @@ -91,6 +92,7 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs cdbsLoE :: GetLoEFragment m blk -- ^ If this is 'LoEEnabled', it contains an action that returns the -- current LoE fragment. + , cdbsSnapshotDelayRNG :: HKD f StdGen } -- | Default arguments @@ -125,6 +127,7 @@ defaultSpecificArgs = , cdbsHasFSGsmDB = noDefault , cdbsTopLevelConfig = noDefault , cdbsLoE = pure LoEDisabled + , cdbsSnapshotDelayRNG = noDefault } -- | Default arguments @@ -176,6 +179,7 @@ completeChainDbArgs :: (RelativeMountPoint -> SomeHasFS m) -> -- | Volatile FS, see 'NodeDatabasePaths' (RelativeMountPoint -> SomeHasFS m) -> + StdGen -> LedgerDbBackendArgs m blk -> -- | A set of incomplete arguments, possibly modified wrt @defaultArgs@ Incomplete ChainDbArgs m blk -> @@ -188,6 +192,7 @@ completeChainDbArgs checkIntegrity mkImmFS mkVolFS + snapshotDelayRNG flavorArgs defArgs = defArgs @@ -221,6 +226,7 @@ completeChainDbArgs { cdbsRegistry = registry , cdbsTopLevelConfig , cdbsHasFSGsmDB = mkVolFS $ RelativeMountPoint "gsm" + , cdbsSnapshotDelayRNG = snapshotDelayRNG } } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 4a4e4f32e9..790dc6945d 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -44,6 +44,7 @@ import Control.Monad (forM_, forever, void) import Control.Monad.Trans.Class (lift) import Control.ResourceRegistry import Control.Tracer +import Data.Bifunctor import Data.Foldable (toList) import qualified Data.Map.Strict as Map import Data.Sequence.Strict (StrictSeq (..)) @@ -75,6 +76,7 @@ import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher) import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..)) import qualified Ouroboros.Network.AnchoredFragment as AF +import System.Random {------------------------------------------------------------------------------- Launch background tasks @@ -96,7 +98,7 @@ launchBgTasks cdb@CDB{..} = do addBlockRunner cdbChainSelFuse cdb ledgerDbTasksTrigger <- newLedgerDbTasksTrigger - !ledgerDbMaintenaceThread <- + !ledgerDbMaintenanceThread <- forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $ ledgerDbTaskWatcher cdb ledgerDbTasksTrigger @@ -114,7 +116,7 @@ launchBgTasks cdb@CDB{..} = do writeTVar cdbKillBgThreads $ sequence_ [ addBlockThread - , cancelThread ledgerDbMaintenaceThread + , cancelThread ledgerDbMaintenanceThread , gcThread , copyToImmutableDBThread ] @@ -320,9 +322,23 @@ ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) = , wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt , wNotify = \slotNo -> do LedgerDB.tryFlush cdbLedgerDB - LedgerDB.tryTakeSnapshot cdbLedgerDB + randomizedDelay <- + atomically $ + stateTVar cdbSnapshotDelayRNG randomSnapshotDelay + now <- getMonotonicTime + LedgerDB.tryTakeSnapshot cdbLedgerDB now randomizedDelay LedgerDB.garbageCollect cdbLedgerDB slotNo } + where + randomSnapshotDelay :: StdGen -> (DiffTime, StdGen) + randomSnapshotDelay rng = + first fromInteger $ uniformR (fiveMinutes, tenMinutes) rng + + fiveMinutes :: Integer + fiveMinutes = 5 * 60 + + tenMinutes :: Integer + tenMinutes = 10 * 60 {------------------------------------------------------------------------------- Executing garbage collection diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index 4a88b8d40b..eff0dc1381 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -142,6 +142,7 @@ import Ouroboros.Network.Block (MaxSlotNo (..)) import Ouroboros.Network.BlockFetch.ConsensusInterface ( ChainSelStarvation (..) ) +import System.Random (StdGen) -- | All the serialisation related constraints needed by the ChainDB. class @@ -349,6 +350,9 @@ data ChainDbEnv m blk = CDB , cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation) -- ^ Information on the last starvation of ChainSel, whether ongoing or -- ended recently. + , cdbSnapshotDelayRNG :: !(StrictTVar m StdGen) + -- ^ PRNG for determining the random delay we'll wait before actually + -- performing the snapshot when one has been requested. } deriving Generic @@ -374,7 +378,7 @@ data Internal m blk = Internal -- returned. This can be used for a garbage collection on the VolatileDB. , intGarbageCollect :: SlotNo -> m () -- ^ Perform garbage collection for blocks <= the given 'SlotNo'. - , intTryTakeSnapshot :: m () + , intTryTakeSnapshot :: Time -> DiffTime -> m () -- ^ Write a new LedgerDB snapshot to disk and remove the oldest one(s). , intAddBlockRunner :: m Void -- ^ Start the loop that adds blocks to the ChainDB retrieved from the diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs index e05e210af8..33408da299 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs @@ -267,6 +267,8 @@ data LedgerDB m l blk = LedgerDB -- * The set of previously applied points. , tryTakeSnapshot :: l ~ ExtLedgerState blk => + Time -> + DiffTime -> m () -- ^ If the provided arguments indicate so (based on the SnapshotPolicy with -- which this LedgerDB was opened), take a snapshot and delete stale ones. diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs index b73a2072f7..aa4271c567 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Snapshots.hs @@ -677,6 +677,10 @@ defaultSnapshotPolicy (SecurityParam k) args = data TraceSnapshotEvent blk = -- | An on disk snapshot was skipped because it was invalid. InvalidSnapshot DiskSnapshot (SnapshotFailure blk) + | -- | A snapshot request was requested and delayed + SnapshotRequestDelayed Time DiffTime Int + | -- | A snapshot request was completed + SnapshotRequestCompleted | -- | A snapshot was written to disk. TookSnapshot DiskSnapshot (RealPoint blk) EnclosingTimed | -- | An old or invalid on-disk snapshot was deleted diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs index 9dac9ed4c3..bf15ad8510 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V1.hs @@ -128,7 +128,7 @@ mkInitDb args bss getBlock snapManager getVolatileSuffix = flushLock <- mkLedgerDBLock forkers <- newTVarIO Map.empty nextForkerKey <- newTVarIO (ForkerKey 0) - lastSnapshotWrite <- newTVarIO Nothing + lastSnapshotRequestedAt <- newTVarIO Nothing let env = LedgerDBEnv { ldbChangelog = varDB @@ -146,7 +146,7 @@ mkInitDb args bss getBlock snapManager getVolatileSuffix = , ldbQueryBatchSize = lgrQueryBatchSize , ldbResolveBlock = getBlock , ldbGetVolatileSuffix = getVolatileSuffix - , ldbLastSnapshotWrite = lastSnapshotWrite + , ldbLastSnapshotRequestedAt = lastSnapshotRequestedAt } h <- LDBHandle <$> newTVarIO (LedgerDBOpen env) pure $ implMkLedgerDb h snapManager @@ -192,7 +192,7 @@ implMkLedgerDb h snapManager = , validateFork = getEnv5 h (implValidate h) , getPrevApplied = getEnvSTM h implGetPrevApplied , garbageCollect = getEnv1 h implGarbageCollect - , tryTakeSnapshot = getEnv h (implTryTakeSnapshot snapManager) + , tryTakeSnapshot = getEnv2 h (implTryTakeSnapshot snapManager) , tryFlush = getEnv h implTryFlush , closeDB = implCloseDB h } @@ -323,13 +323,14 @@ implTryTakeSnapshot :: ) => SnapshotManagerV1 m blk -> LedgerDBEnv m l blk -> + Time -> + DiffTime -> m () -implTryTakeSnapshot snapManager env = do - timeSinceLastWrite <- do - mLastWrite <- readTVarIO $ ldbLastSnapshotWrite env - forM mLastWrite $ \lastWrite -> do - now <- getMonotonicTime - pure $ now `diffTime` lastWrite +implTryTakeSnapshot snapManager env snapshotRequestTime delayBeforeSnapshotting = do + timeSinceLastSnapshot <- do + mLastSnapshotRequested <- readTVarIO $ ldbLastSnapshotRequestedAt env + forM mLastSnapshotRequested $ \lastSnapshotRequested -> do + pure $ snapshotRequestTime `diffTime` lastSnapshotRequested -- Get all states before the volatile suffix. immutableStates <- atomically $ do states <- changelogStates <$> readTVar (ldbChangelog env) @@ -343,29 +344,38 @@ implTryTakeSnapshot snapManager env = do onDiskSnapshotSelector (ldbSnapshotPolicy env) SnapshotSelectorContext - { sscTimeSinceLast = timeSinceLastWrite + { sscTimeSinceLast = timeSinceLastSnapshot , sscSnapshotSlots = immutableSlots } - forM_ snapshotSlots $ \slot -> do - -- Prune the 'DbChangelog' such that the resulting anchor state has slot - -- number @slot@. - let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1) - atomically $ modifyTVar (ldbChangelog env) (prune pruneStrat) - -- Flush the LedgerDB such that we can take a snapshot for the new anchor - -- state due to the previous prune. - withWriteLock - (ldbLock env) - (flushLedgerDB (ldbChangelog env) (ldbBackingStore env)) - -- Now, taking a snapshot (for the last flushed state) will do what we want. - void $ - withReadLock (ldbLock env) $ - takeSnapshot - snapManager - Nothing - (ldbChangelog env, ldbBackingStore env) - finished <- getMonotonicTime - atomically $ writeTVar (ldbLastSnapshotWrite env) (Just $! finished) - void $ trimSnapshots snapManager (ldbSnapshotPolicy env) + case snapshotSlots of + [] -> pure () + _ -> do + traceWith (LedgerDBSnapshotEvent >$< ldbTracer env) $ + SnapshotRequestDelayed snapshotRequestTime delayBeforeSnapshotting (length snapshotSlots) + threadDelay delayBeforeSnapshotting + forM_ snapshotSlots $ \slot -> do + -- Prune the 'DbChangelog' such that the resulting anchor state has slot + -- number @slot@. + let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1) + atomically $ modifyTVar (ldbChangelog env) (prune pruneStrat) + -- Flush the LedgerDB such that we can take a snapshot for the new anchor + -- state due to the previous prune. + withWriteLock + (ldbLock env) + (flushLedgerDB (ldbChangelog env) (ldbBackingStore env)) + -- Now, taking a snapshot (for the last flushed state) will do what we want. + void $ + withReadLock (ldbLock env) $ + takeSnapshot + snapManager + Nothing + (ldbChangelog env, ldbBackingStore env) + finished <- getMonotonicTime + atomically $ writeTVar (ldbLastSnapshotRequestedAt env) (Just $! finished) + void $ + trimSnapshots snapManager (ldbSnapshotPolicy env) + traceWith (LedgerDBSnapshotEvent >$< ldbTracer env) $ + SnapshotRequestCompleted -- If the DbChangelog in the LedgerDB can flush (based on the SnapshotPolicy -- with which this LedgerDB was opened), flush differences to the backing @@ -603,8 +613,10 @@ data LedgerDBEnv m l blk = LedgerDBEnv , ldbQueryBatchSize :: !QueryBatchSize , ldbResolveBlock :: !(ResolveBlock m blk) , ldbGetVolatileSuffix :: !(GetVolatileSuffix m blk) - , ldbLastSnapshotWrite :: !(StrictTVar m (Maybe Time)) - -- ^ When did we finish writing the last snapshot. + , ldbLastSnapshotRequestedAt :: !(StrictTVar m (Maybe Time)) + -- ^ The time at which the latest snapshot was requested. Note that this is + -- not the the last time a snapshot was requested -- this is only updated + -- with the request time when a snapshot is successfully made. } deriving Generic diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs index 5398ffa711..b6655d1cbb 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs @@ -14,7 +14,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2 (mkInitDb) where import Control.Arrow ((>>>)) -import qualified Control.Monad as Monad (join, void) +import qualified Control.Monad as Monad (forM, join, void) import Control.Monad.Except import Control.RAWLock import qualified Control.RAWLock as RAWLock @@ -103,7 +103,7 @@ mkInitDb args getBlock snapManager getVolatileSuffix res = do lock <- RAWLock.new () forkers <- newTVarIO Map.empty nextForkerKey <- newTVarIO (ForkerKey 0) - lastSnapshotWrite <- newTVarIO Nothing + lastSnapshotRequestedAt <- newTVarIO Nothing let env = LedgerDBEnv { ldbSeq = varDB @@ -119,7 +119,7 @@ mkInitDb args getBlock snapManager getVolatileSuffix res = do , ldbOpenHandlesLock = lock , ldbGetVolatileSuffix = getVolatileSuffix , ldbResourceKeys = SomeResources res - , ldbLastSnapshotWrite = lastSnapshotWrite + , ldbLastSnapshotRequestedAt = lastSnapshotRequestedAt } h <- LDBHandle <$> newTVarIO (LedgerDBOpen env) pure $ implMkLedgerDb h snapManager @@ -168,7 +168,7 @@ implMkLedgerDb h snapManager = , validateFork = getEnv5 h (implValidate h) , getPrevApplied = getEnvSTM h implGetPrevApplied , garbageCollect = \s -> getEnv h (flip implGarbageCollect s) - , tryTakeSnapshot = getEnv h (implTryTakeSnapshot snapManager) + , tryTakeSnapshot = getEnv2 h (implTryTakeSnapshot snapManager) , tryFlush = getEnv h implTryFlush , closeDB = implCloseDB h } @@ -349,14 +349,15 @@ implTryTakeSnapshot :: ) => SnapshotManager m m blk (StateRef m (ExtLedgerState blk)) -> LedgerDBEnv m l blk -> + Time -> + DiffTime -> m () -implTryTakeSnapshot snapManager env = do - timeSinceLastWrite <- do - mLastWrite <- readTVarIO $ ldbLastSnapshotWrite env - for mLastWrite $ \lastWrite -> do - now <- getMonotonicTime - pure $ now `diffTime` lastWrite - RAWLock.withReadAccess (ldbOpenHandlesLock env) $ \() -> do +implTryTakeSnapshot snapManager env snapshotRequestTime delayBeforeSnapshotting = do + timeSinceLastSnapshot <- do + mLastSnapshotRequested <- readTVarIO $ ldbLastSnapshotRequestedAt env + for mLastSnapshotRequested $ \lastSnapshotRequested -> do + pure $ snapshotRequestTime `diffTime` lastSnapshotRequested + handles <- RAWLock.withReadAccess (ldbOpenHandlesLock env) $ \() -> do lseq@(LedgerSeq immutableStates) <- atomically $ do LedgerSeq states <- readTVar $ ldbSeq env volSuffix <- getVolatileSuffix (ldbGetVolatileSuffix env) @@ -369,18 +370,31 @@ implTryTakeSnapshot snapManager env = do onDiskSnapshotSelector (ldbSnapshotPolicy env) SnapshotSelectorContext - { sscTimeSinceLast = timeSinceLastWrite + { sscTimeSinceLast = timeSinceLastSnapshot , sscSnapshotSlots = immutableSlots } - for_ snapshotSlots $ \slot -> do + Monad.forM snapshotSlots $ \slot -> do -- Prune the 'DbChangelog' such that the resulting anchor state has slot -- number @slot@. let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1) - st = anchorHandle $ snd $ prune pruneStrat lseq - Monad.void $ takeSnapshot snapManager Nothing st - finished <- getMonotonicTime - atomically $ writeTVar (ldbLastSnapshotWrite env) (Just $! finished) + duplicateStateRef $ anchorHandle $ snd $ prune pruneStrat lseq + + case handles of + [] -> pure () + _ -> do + traceWith (LedgerDBSnapshotEvent >$< ldbTracer env) $ + SnapshotRequestDelayed snapshotRequestTime delayBeforeSnapshotting (length handles) + threadDelay delayBeforeSnapshotting + for_ handles $ \h -> do + Monad.void $ takeSnapshot snapManager Nothing h + + atomically $ writeTVar (ldbLastSnapshotRequestedAt env) (Just $! snapshotRequestTime) Monad.void $ trimSnapshots snapManager (ldbSnapshotPolicy env) + traceWith (LedgerDBSnapshotEvent >$< ldbTracer env) $ + SnapshotRequestCompleted + where + duplicateStateRef :: StateRef m (ExtLedgerState blk) -> m (StateRef m (ExtLedgerState blk)) + duplicateStateRef StateRef{state, tables} = StateRef state <$> duplicate tables -- In the first version of the LedgerDB for UTxO-HD, there is a need to -- periodically flush the accumulated differences to the disk. However, in the @@ -465,8 +479,10 @@ data LedgerDBEnv m l blk = LedgerDBEnv -- in tests can release such resources. These are the resource keys for the -- LSM session and the resource key for the BlockIO interface. , ldbGetVolatileSuffix :: !(GetVolatileSuffix m blk) - , ldbLastSnapshotWrite :: !(StrictTVar m (Maybe Time)) - -- ^ When did we finish writing the last snapshot. + , ldbLastSnapshotRequestedAt :: !(StrictTVar m (Maybe Time)) + -- ^ The time at which the latest snapshot was requested. Note that this is + -- not the the last time a snapshot was requested -- this is only updated + -- with the request time when a snapshot is successfully made. } deriving Generic @@ -519,6 +535,16 @@ getEnv (LDBHandle varState) f = LedgerDBOpen env -> f env LedgerDBClosed -> throwIO $ ClosedDBError @blk prettyCallStack +-- | Variant 'of 'getEnv' for functions taking two arguments. +getEnv2 :: + (IOLike m, HasCallStack, HasHeader blk) => + LedgerDBHandle m l blk -> + (LedgerDBEnv m l blk -> a -> b -> m r) -> + a -> + b -> + m r +getEnv2 h f a b = getEnv h (\env -> f env a b) + -- | Variant 'of 'getEnv' for functions taking five arguments. getEnv5 :: (IOLike m, HasCallStack, HasHeader blk) => diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs index a623d0b9a9..df2374a3ba 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs @@ -28,6 +28,7 @@ import NoThunks.Class ( InspectHeap (..) , InspectHeapNamed (..) , NoThunks (..) + , OnlyCheckWhnf (..) , OnlyCheckWhnfNamed (..) , allNoThunks ) @@ -35,6 +36,8 @@ import Ouroboros.Network.Util.ShowProxy import System.FS.API (SomeHasFS) import System.FS.API.Types (FsPath, Handle) import System.FS.CRC (CRC (CRC)) +import System.Random (StdGen) +import qualified System.Random.Internal as Random {------------------------------------------------------------------------------- Serialise @@ -85,6 +88,10 @@ instance NoThunks a => NoThunks (MultiSet a) where showTypeOf _ = "MultiSet" wNoThunks ctxt = wNoThunks ctxt . MultiSet.toMap +instance NoThunks StdGen where + showTypeOf _ = "StdGen" + wNoThunks ctx = wNoThunks ctx . OnlyCheckWhnf . Random.unStdGen + {------------------------------------------------------------------------------- fs-api -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs index cf9c7775a9..9f4ba7b326 100644 --- a/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs +++ b/ouroboros-consensus/src/unstable-consensus-testlib/Test/Util/ChainDB.hs @@ -40,6 +40,7 @@ import System.FS.API (SomeHasFS (..)) import System.FS.Sim.MockFS import qualified System.FS.Sim.MockFS as Mock import System.FS.Sim.STM (simHasFS) +import System.Random (mkStdGen) import Test.Util.Orphans.NoThunks () import Test.Util.TestBlock (TestBlock, TestBlockLedgerConfig (..)) @@ -141,5 +142,6 @@ fromMinimalChainDbArgs MinimalChainDbArgs{..} = , cdbsTracer = nullTracer , cdbsTopLevelConfig = mcdbTopLevelConfig , cdbsLoE = pure LoEDisabled + , cdbsSnapshotDelayRNG = mkStdGen 0 } } diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs index 3f7011abb9..fff26862c7 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/LedgerSnapshots.hs @@ -18,7 +18,7 @@ module Test.Ouroboros.Storage.ChainDB.LedgerSnapshots (tests) where import Cardano.Ledger.BaseTypes.NonZero -import Control.Monad (replicateM) +import Control.Monad (guard, replicateM) import Control.Monad.IOSim (runSim) import Control.ResourceRegistry import Control.Tracer @@ -249,11 +249,14 @@ runTest :: runTest lgrDbBackendArgs testSetup = withRegistry \registry -> do (withTime -> tracer, getTrace) <- recordingTracerTVar - (chainDB, lgrHasFS) <- openChainDB registry tracer + isSnapshottingTMVar :: StrictTMVar m () <- newEmptyTMVarIO + + (chainDB, lgrHasFS) <- openChainDB registry (tracer <> isSnapshottingTracer isSnapshottingTMVar) for_ (tsBlocksToAdd testSetup) \blk -> do ChainDB.addBlock_ chainDB Punishment.noPunishment blk threadDelay 1 + atomically $ isEmptyTMVar isSnapshottingTMVar >>= guard toutImmutableTip <- AF.castAnchor . AF.anchor <$> atomically (ChainDB.getCurrentChain chainDB) @@ -301,6 +304,14 @@ runTest lgrDbBackendArgs testSetup = withRegistry \registry -> do withTime = contramapM \ev -> (,ev) <$> getMonotonicTime + isSnapshottingTracer :: StrictTMVar m () -> Tracer m (ChainDB.TraceEvent TestBlock) + isSnapshottingTracer tmvar = Tracer \case + ChainDB.TraceLedgerDBEvent (LedgerDB.LedgerDBSnapshotEvent (SnapshotRequestDelayed _ _ _)) -> + atomically $ putTMVar tmvar () + ChainDB.TraceLedgerDBEvent (LedgerDB.LedgerDBSnapshotEvent SnapshotRequestCompleted) -> + atomically $ takeTMVar tmvar + _ -> pure () + {------------------------------------------------------------------------------- Assess a test outcome -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/StateMachine.hs index 4e97810d90..126c309494 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/ChainDB/StateMachine.hs @@ -425,7 +425,9 @@ run cfg env@ChainDBEnv{varDB, ..} cmd = Reopen -> Unit <$> reopen env PersistBlks -> ignore <$> persistBlks DoNotGarbageCollect internal PersistBlksThenGC -> ignore <$> persistBlks GarbageCollect internal - UpdateLedgerSnapshots -> ignore <$> intTryTakeSnapshot internal + UpdateLedgerSnapshots -> do + now <- getMonotonicTime + ignore <$> intTryTakeSnapshot internal now 0 WipeVolatileDB -> Point <$> wipeVolatileDB st where mbGCedAllComponents = MbGCedAllComponents . MaybeGCedBlock True