Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import Ouroboros.Consensus.Util.Orphans ()
import Ouroboros.Network.Block (genesisPoint)
import System.FS.API
import System.IO
import System.Random
import System.Random (genWord64, newStdGen)
import Text.Printf (printf)

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -140,6 +140,7 @@ analyse dbaConfig args =
lsmSalt <- fst . genWord64 <$> newStdGen
ProtocolInfo{pInfoInitLedger = genesisLedger, pInfoConfig = cfg} <-
mkProtocolInfo args
snapshotDelayRng <- newStdGen
let shfs = Node.stdMkChainDbHasFS dbDir
chunkInfo = Node.nodeImmutableDbChunkInfo (configStorage cfg)
flavargs = case ldbBackend of
Expand Down Expand Up @@ -169,6 +170,7 @@ analyse dbaConfig args =
(const True)
shfs
shfs
snapshotDelayRng
flavargs
$ ChainDB.defaultArgs
-- Set @k=1@ to reduce the memory usage of the LedgerDB. We only ever
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import Ouroboros.Network.Block
import Ouroboros.Network.Point (WithOrigin (..))
import System.Directory
import System.FilePath (takeDirectory, (</>))
import System.Random (newStdGen)

initialize ::
NodeFilePaths ->
Expand Down Expand Up @@ -145,6 +146,7 @@ synthesize ::
IO ForgeResult
synthesize genTxs DBSynthesizerConfig{confOptions, confShelleyGenesis, confDbDir} runP =
withRegistry $ \registry -> do
snapshotDelayRng <- newStdGen
let
epochSize = sgEpochLength confShelleyGenesis
chunkInfo = Node.nodeImmutableDbChunkInfo (configStorage pInfoConfig)
Expand All @@ -158,6 +160,7 @@ synthesize genTxs DBSynthesizerConfig{confOptions, confShelleyGenesis, confDbDir
(const True)
(Node.stdMkChainDbHasFS confDbDir)
(Node.stdMkChainDbHasFS confDbDir)
snapshotDelayRng
flavargs
$ ChainDB.defaultArgs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
initLedger
llrnMkImmutableHasFS
llrnMkVolatileHasFS
snapshotDelayRng
llrnLdbFlavorArgs
llrnChainDbArgsDefaults
( setLoEinChainDbArgs
Expand Down Expand Up @@ -624,7 +625,8 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
where
(gsmAntiThunderingHerd, rng') = split llrnRng
(peerSelectionRng, rng'') = split rng'
(keepAliveRng, ntnAppsRng) = split rng''
(keepAliveRng, rng''') = split rng''
(ntnAppsRng, snapshotDelayRng) = split rng'''

ProtocolInfo
{ pInfoConfig = cfg
Expand Down Expand Up @@ -816,13 +818,14 @@ openChainDB ::
(ChainDB.RelativeMountPoint -> SomeHasFS m) ->
-- | Volatile FS, see 'NodeDatabasePaths'
(ChainDB.RelativeMountPoint -> SomeHasFS m) ->
StdGen ->
LedgerDbBackendArgs m blk ->
-- | A set of default arguments (possibly modified from 'defaultArgs')
Incomplete ChainDbArgs m blk ->
-- | Customise the 'ChainDbArgs'
(Complete ChainDbArgs m blk -> Complete ChainDbArgs m blk) ->
m (ChainDB m blk, Complete ChainDbArgs m blk)
openChainDB registry cfg initLedger fsImm fsVol flavorArgs defArgs customiseArgs =
openChainDB registry cfg initLedger fsImm fsVol delayRng flavorArgs defArgs customiseArgs =
let args =
customiseArgs $
ChainDB.completeChainDbArgs
Expand All @@ -833,6 +836,7 @@ openChainDB registry cfg initLedger fsImm fsVol flavorArgs defArgs customiseArgs
(nodeCheckIntegrity (configStorage cfg))
fsImm
fsVol
delayRng
flavorArgs
defArgs
in (,args) <$> ChainDB.openDB args
Expand Down
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ library
primitive,
psqueues ^>=0.2.3,
quiet ^>=0.2,
random,
rawlock ^>=0.1.1,
resource-registry ^>=0.1,
semialign >=1.1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
chainSelFuse <- newFuse "chain selection"
chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs)
varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing
varSnapshotDelayRNG <- newTVarIO (Args.cdbsSnapshotDelayRNG cdbSpecificArgs)

let env =
CDB
Expand All @@ -250,6 +251,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, cdbChainSelQueue = chainSelQueue
, cdbLoE = Args.cdbsLoE cdbSpecificArgs
, cdbChainSelStarvation = varChainSelStarvation
, cdbSnapshotDelayRNG = varSnapshotDelayRNG
}

setGetCurrentChainForLedgerDB $ Query.getCurrentChain env
Expand Down Expand Up @@ -289,7 +291,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, intGarbageCollect = \slot -> getEnv h $ \e -> do
Background.garbageCollectBlocks e slot
LedgerDB.garbageCollect (cdbLedgerDB e) slot
, intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB
, intTryTakeSnapshot = getEnv2 h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB
, intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse)
, intKillBgThreads = varKillBgThreads
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import System.FS.API
import System.Random (StdGen)

{-------------------------------------------------------------------------------
Arguments
Expand Down Expand Up @@ -91,6 +92,7 @@ data ChainDbSpecificArgs f m blk = ChainDbSpecificArgs
cdbsLoE :: GetLoEFragment m blk
-- ^ If this is 'LoEEnabled', it contains an action that returns the
-- current LoE fragment.
, cdbsSnapshotDelayRNG :: HKD f StdGen
}

-- | Default arguments
Expand Down Expand Up @@ -125,6 +127,7 @@ defaultSpecificArgs =
, cdbsHasFSGsmDB = noDefault
, cdbsTopLevelConfig = noDefault
, cdbsLoE = pure LoEDisabled
, cdbsSnapshotDelayRNG = noDefault
}

-- | Default arguments
Expand Down Expand Up @@ -176,6 +179,7 @@ completeChainDbArgs ::
(RelativeMountPoint -> SomeHasFS m) ->
-- | Volatile FS, see 'NodeDatabasePaths'
(RelativeMountPoint -> SomeHasFS m) ->
StdGen ->
LedgerDbBackendArgs m blk ->
-- | A set of incomplete arguments, possibly modified wrt @defaultArgs@
Incomplete ChainDbArgs m blk ->
Expand All @@ -188,6 +192,7 @@ completeChainDbArgs
checkIntegrity
mkImmFS
mkVolFS
snapshotDelayRNG
flavorArgs
defArgs =
defArgs
Expand Down Expand Up @@ -221,6 +226,7 @@ completeChainDbArgs
{ cdbsRegistry = registry
, cdbsTopLevelConfig
, cdbsHasFSGsmDB = mkVolFS $ RelativeMountPoint "gsm"
, cdbsSnapshotDelayRNG = snapshotDelayRNG
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import Control.Monad (forM_, forever, void)
import Control.Monad.Trans.Class (lift)
import Control.ResourceRegistry
import Control.Tracer
import Data.Bifunctor
import Data.Foldable (toList)
import qualified Data.Map.Strict as Map
import Data.Sequence.Strict (StrictSeq (..))
Expand Down Expand Up @@ -75,6 +76,7 @@ import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher)
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import System.Random

{-------------------------------------------------------------------------------
Launch background tasks
Expand All @@ -96,7 +98,7 @@ launchBgTasks cdb@CDB{..} = do
addBlockRunner cdbChainSelFuse cdb

ledgerDbTasksTrigger <- newLedgerDbTasksTrigger
!ledgerDbMaintenaceThread <-
!ledgerDbMaintenanceThread <-
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $
ledgerDbTaskWatcher cdb ledgerDbTasksTrigger

Expand All @@ -114,7 +116,7 @@ launchBgTasks cdb@CDB{..} = do
writeTVar cdbKillBgThreads $
sequence_
[ addBlockThread
, cancelThread ledgerDbMaintenaceThread
, cancelThread ledgerDbMaintenanceThread
, gcThread
, copyToImmutableDBThread
]
Expand Down Expand Up @@ -320,9 +322,23 @@ ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) =
, wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt
, wNotify = \slotNo -> do
LedgerDB.tryFlush cdbLedgerDB
LedgerDB.tryTakeSnapshot cdbLedgerDB
randomizedDelay <-
atomically $
stateTVar cdbSnapshotDelayRNG randomSnapshotDelay
now <- getMonotonicTime
LedgerDB.tryTakeSnapshot cdbLedgerDB now randomizedDelay
LedgerDB.garbageCollect cdbLedgerDB slotNo
}
where
randomSnapshotDelay :: StdGen -> (DiffTime, StdGen)
randomSnapshotDelay rng =
first fromInteger $ uniformR (fiveMinutes, tenMinutes) rng

fiveMinutes :: Integer
fiveMinutes = 5 * 60

tenMinutes :: Integer
tenMinutes = 10 * 60

{-------------------------------------------------------------------------------
Executing garbage collection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ import Ouroboros.Network.Block (MaxSlotNo (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface
( ChainSelStarvation (..)
)
import System.Random (StdGen)

-- | All the serialisation related constraints needed by the ChainDB.
class
Expand Down Expand Up @@ -349,6 +350,9 @@ data ChainDbEnv m blk = CDB
, cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation)
-- ^ Information on the last starvation of ChainSel, whether ongoing or
-- ended recently.
, cdbSnapshotDelayRNG :: !(StrictTVar m StdGen)
-- ^ PRNG for determining the random delay we'll wait before actually
-- performing the snapshot when one has been requested.
}
deriving Generic

Expand All @@ -374,7 +378,7 @@ data Internal m blk = Internal
-- returned. This can be used for a garbage collection on the VolatileDB.
, intGarbageCollect :: SlotNo -> m ()
-- ^ Perform garbage collection for blocks <= the given 'SlotNo'.
, intTryTakeSnapshot :: m ()
, intTryTakeSnapshot :: Time -> DiffTime -> m ()
-- ^ Write a new LedgerDB snapshot to disk and remove the oldest one(s).
, intAddBlockRunner :: m Void
-- ^ Start the loop that adds blocks to the ChainDB retrieved from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ data LedgerDB m l blk = LedgerDB
-- * The set of previously applied points.
, tryTakeSnapshot ::
l ~ ExtLedgerState blk =>
Time ->
DiffTime ->
m ()
-- ^ If the provided arguments indicate so (based on the SnapshotPolicy with
-- which this LedgerDB was opened), take a snapshot and delete stale ones.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ defaultSnapshotPolicy (SecurityParam k) args =
data TraceSnapshotEvent blk
= -- | An on disk snapshot was skipped because it was invalid.
InvalidSnapshot DiskSnapshot (SnapshotFailure blk)
| -- | A snapshot request was requested and delayed
SnapshotRequestDelayed Time DiffTime Int
| -- | A snapshot request was completed
SnapshotRequestCompleted
| -- | A snapshot was written to disk.
TookSnapshot DiskSnapshot (RealPoint blk) EnclosingTimed
| -- | An old or invalid on-disk snapshot was deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ mkInitDb args bss getBlock snapManager getVolatileSuffix =
flushLock <- mkLedgerDBLock
forkers <- newTVarIO Map.empty
nextForkerKey <- newTVarIO (ForkerKey 0)
lastSnapshotWrite <- newTVarIO Nothing
lastSnapshotRequestedAt <- newTVarIO Nothing
let env =
LedgerDBEnv
{ ldbChangelog = varDB
Expand All @@ -146,7 +146,7 @@ mkInitDb args bss getBlock snapManager getVolatileSuffix =
, ldbQueryBatchSize = lgrQueryBatchSize
, ldbResolveBlock = getBlock
, ldbGetVolatileSuffix = getVolatileSuffix
, ldbLastSnapshotWrite = lastSnapshotWrite
, ldbLastSnapshotRequestedAt = lastSnapshotRequestedAt
}
h <- LDBHandle <$> newTVarIO (LedgerDBOpen env)
pure $ implMkLedgerDb h snapManager
Expand Down Expand Up @@ -192,7 +192,7 @@ implMkLedgerDb h snapManager =
, validateFork = getEnv5 h (implValidate h)
, getPrevApplied = getEnvSTM h implGetPrevApplied
, garbageCollect = getEnv1 h implGarbageCollect
, tryTakeSnapshot = getEnv h (implTryTakeSnapshot snapManager)
, tryTakeSnapshot = getEnv2 h (implTryTakeSnapshot snapManager)
, tryFlush = getEnv h implTryFlush
, closeDB = implCloseDB h
}
Expand Down Expand Up @@ -323,13 +323,14 @@ implTryTakeSnapshot ::
) =>
SnapshotManagerV1 m blk ->
LedgerDBEnv m l blk ->
Time ->
DiffTime ->
m ()
implTryTakeSnapshot snapManager env = do
timeSinceLastWrite <- do
mLastWrite <- readTVarIO $ ldbLastSnapshotWrite env
forM mLastWrite $ \lastWrite -> do
now <- getMonotonicTime
pure $ now `diffTime` lastWrite
implTryTakeSnapshot snapManager env snapshotRequestTime delayBeforeSnapshotting = do
timeSinceLastSnapshot <- do
mLastSnapshotRequested <- readTVarIO $ ldbLastSnapshotRequestedAt env
forM mLastSnapshotRequested $ \lastSnapshotRequested -> do
pure $ snapshotRequestTime `diffTime` lastSnapshotRequested
-- Get all states before the volatile suffix.
immutableStates <- atomically $ do
states <- changelogStates <$> readTVar (ldbChangelog env)
Expand All @@ -343,29 +344,38 @@ implTryTakeSnapshot snapManager env = do
onDiskSnapshotSelector
(ldbSnapshotPolicy env)
SnapshotSelectorContext
{ sscTimeSinceLast = timeSinceLastWrite
{ sscTimeSinceLast = timeSinceLastSnapshot
, sscSnapshotSlots = immutableSlots
}
forM_ snapshotSlots $ \slot -> do
-- Prune the 'DbChangelog' such that the resulting anchor state has slot
-- number @slot@.
let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1)
atomically $ modifyTVar (ldbChangelog env) (prune pruneStrat)
-- Flush the LedgerDB such that we can take a snapshot for the new anchor
-- state due to the previous prune.
withWriteLock
(ldbLock env)
(flushLedgerDB (ldbChangelog env) (ldbBackingStore env))
-- Now, taking a snapshot (for the last flushed state) will do what we want.
void $
withReadLock (ldbLock env) $
takeSnapshot
snapManager
Nothing
(ldbChangelog env, ldbBackingStore env)
finished <- getMonotonicTime
atomically $ writeTVar (ldbLastSnapshotWrite env) (Just $! finished)
void $ trimSnapshots snapManager (ldbSnapshotPolicy env)
case snapshotSlots of
[] -> pure ()
_ -> do
traceWith (LedgerDBSnapshotEvent >$< ldbTracer env) $
SnapshotRequestDelayed snapshotRequestTime delayBeforeSnapshotting (length snapshotSlots)
threadDelay delayBeforeSnapshotting
forM_ snapshotSlots $ \slot -> do
-- Prune the 'DbChangelog' such that the resulting anchor state has slot
-- number @slot@.
let pruneStrat = LedgerDbPruneBeforeSlot (slot + 1)
atomically $ modifyTVar (ldbChangelog env) (prune pruneStrat)
-- Flush the LedgerDB such that we can take a snapshot for the new anchor
-- state due to the previous prune.
withWriteLock
(ldbLock env)
(flushLedgerDB (ldbChangelog env) (ldbBackingStore env))
-- Now, taking a snapshot (for the last flushed state) will do what we want.
void $
withReadLock (ldbLock env) $
takeSnapshot
snapManager
Nothing
(ldbChangelog env, ldbBackingStore env)
finished <- getMonotonicTime
atomically $ writeTVar (ldbLastSnapshotRequestedAt env) (Just $! finished)
void $
trimSnapshots snapManager (ldbSnapshotPolicy env)
traceWith (LedgerDBSnapshotEvent >$< ldbTracer env) $
SnapshotRequestCompleted

-- If the DbChangelog in the LedgerDB can flush (based on the SnapshotPolicy
-- with which this LedgerDB was opened), flush differences to the backing
Expand Down Expand Up @@ -603,8 +613,10 @@ data LedgerDBEnv m l blk = LedgerDBEnv
, ldbQueryBatchSize :: !QueryBatchSize
, ldbResolveBlock :: !(ResolveBlock m blk)
, ldbGetVolatileSuffix :: !(GetVolatileSuffix m blk)
, ldbLastSnapshotWrite :: !(StrictTVar m (Maybe Time))
-- ^ When did we finish writing the last snapshot.
, ldbLastSnapshotRequestedAt :: !(StrictTVar m (Maybe Time))
-- ^ The time at which the latest snapshot was requested. Note that this is
-- not the the last time a snapshot was requested -- this is only updated
-- with the request time when a snapshot is successfully made.
}
deriving Generic

Expand Down
Loading