diff --git a/src/Nethermind/Nethermind.Blockchain/Synchronization/ISyncConfig.cs b/src/Nethermind/Nethermind.Blockchain/Synchronization/ISyncConfig.cs index d8a08e3bc9f..dca9b597943 100644 --- a/src/Nethermind/Nethermind.Blockchain/Synchronization/ISyncConfig.cs +++ b/src/Nethermind/Nethermind.Blockchain/Synchronization/ISyncConfig.cs @@ -171,6 +171,9 @@ public interface ISyncConfig : IConfig [ConfigItem(Description = "_Technical._ Enable storage range split.", DefaultValue = "false", HiddenFromDocs = true)] bool EnableSnapSyncStorageRangeSplit { get; set; } - [ConfigItem(Description = "_Technical._ Max tx in forward sync buffer.", DefaultValue = "200000", HiddenFromDocs = true)] - int MaxTxInForwardSyncBuffer { get; set; } + [ConfigItem(Description = "_Technical._ Estimated size of memory for storing blocks during download.", DefaultValue = "200000000", HiddenFromDocs = true)] + long ForwardSyncDownloadBufferMemoryBudget { get; set; } + + [ConfigItem(Description = "_Technical._ Estimated max size of blocks in block processing queue before stop downloading.", DefaultValue = "200000000", HiddenFromDocs = true)] + long ForwardSyncBlockProcessingQueueMemoryBudget { get; set; } } diff --git a/src/Nethermind/Nethermind.Blockchain/Synchronization/SyncConfig.cs b/src/Nethermind/Nethermind.Blockchain/Synchronization/SyncConfig.cs index 30c68bb10fd..096d8f57444 100644 --- a/src/Nethermind/Nethermind.Blockchain/Synchronization/SyncConfig.cs +++ b/src/Nethermind/Nethermind.Blockchain/Synchronization/SyncConfig.cs @@ -83,7 +83,8 @@ public string? PivotHash public ulong FastHeadersMemoryBudget { get; set; } = (ulong)128.MB(); public bool EnableSnapSyncStorageRangeSplit { get; set; } = false; - public int MaxTxInForwardSyncBuffer { get; set; } = 20000; + public long ForwardSyncDownloadBufferMemoryBudget { get; set; } = 200.MiB(); + public long ForwardSyncBlockProcessingQueueMemoryBudget { get; set; } = 200.MiB(); public override string ToString() { diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs index c0795285b97..5b3338f5506 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Synchronization/MergeBlockDownloader.cs @@ -5,6 +5,7 @@ using Nethermind.Blockchain; using Nethermind.Blockchain.Receipts; using Nethermind.Blockchain.Synchronization; +using Nethermind.Consensus.Processing; using Nethermind.Consensus.Validators; using Nethermind.Core; using Nethermind.Core.Specs; @@ -18,32 +19,38 @@ namespace Nethermind.Merge.Plugin.Synchronization { - public class MergeBlockDownloader : BlockDownloader + public class MergeBlockDownloader( + IBeaconPivot beaconPivot, + IBlockTree blockTree, + IBlockValidator blockValidator, + ISyncReport syncReport, + IReceiptStorage receiptStorage, + ISpecProvider specProvider, + IBetterPeerStrategy betterPeerStrategy, + IFullStateFinder fullStateFinder, + IForwardHeaderProvider forwardHeaderProvider, + ISyncPeerPool syncPeerPool, + IReceiptsRecovery receiptsRecovery, + IBlockProcessingQueue blockProcessingQueue, + ISyncConfig syncConfig, + ILogManager logManager) + : BlockDownloader( + blockTree, + blockValidator, + syncReport, + receiptStorage, + specProvider, + betterPeerStrategy, + fullStateFinder, + forwardHeaderProvider, + syncPeerPool, + receiptsRecovery, + blockProcessingQueue, + syncConfig, + logManager) { - private readonly IBeaconPivot _beaconPivot; - private readonly IBlockTree _blockTree; - private readonly ILogger _logger; - - public MergeBlockDownloader( - IBeaconPivot beaconPivot, - IBlockTree? blockTree, - IBlockValidator? blockValidator, - ISyncReport? syncReport, - IReceiptStorage? receiptStorage, - ISpecProvider specProvider, - IBetterPeerStrategy betterPeerStrategy, - IFullStateFinder fullStateFinder, - IForwardHeaderProvider forwardHeaderProvider, - ISyncPeerPool syncPeerPool, - ISyncConfig syncConfig, - ILogManager logManager) - : base(blockTree, blockValidator, syncReport, receiptStorage, - specProvider, betterPeerStrategy, fullStateFinder, forwardHeaderProvider, syncPeerPool, syncConfig, logManager) - { - _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); - _beaconPivot = beaconPivot; - _logger = logManager.GetClassLogger(); - } + private readonly IBlockTree _blockTree = blockTree; + private readonly ILogger _logger = logManager.GetClassLogger(); protected override BlockTreeSuggestOptions GetSuggestOption(bool shouldProcess, Block currentBlock) { @@ -51,7 +58,7 @@ protected override BlockTreeSuggestOptions GetSuggestOption(bool shouldProcess, shouldProcess ? BlockTreeSuggestOptions.ShouldProcess : BlockTreeSuggestOptions.None; bool isKnownBeaconBlock = _blockTree.IsKnownBeaconBlock(currentBlock.Number, currentBlock.GetOrCalculateHash()); - if (_logger.IsTrace) _logger.Trace($"Current block {currentBlock}, BeaconPivot: {_beaconPivot.PivotNumber}, IsKnownBeaconBlock: {isKnownBeaconBlock}"); + if (_logger.IsTrace) _logger.Trace($"Current block {currentBlock}, BeaconPivot: {beaconPivot.PivotNumber}, IsKnownBeaconBlock: {isKnownBeaconBlock}"); if (isKnownBeaconBlock) { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs index eabe15cffc8..03102bb2bc4 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs @@ -39,6 +39,7 @@ using Autofac.Features.AttributeFilters; using Humanizer; using Nethermind.Config; +using Nethermind.Consensus.Processing; using Nethermind.Core.Events; using Nethermind.Core.Test; using Nethermind.Core.Test.Modules; @@ -208,6 +209,32 @@ public async Task Catch_exception_from_forwardHeaderProvider() await act.Should().NotThrowAsync(); } + [Test] + public async Task Skit_spawning_request_when_block_processing_queue_is_high() + { + IForwardHeaderProvider mockForwardHeaderProvider = Substitute.For(); + IBlockProcessingQueue blockProcessingQueue = Substitute.For(); + blockProcessingQueue.Count.Returns(10000); + + await using IContainer node = CreateNode(configProvider: new ConfigProvider(new SyncConfig() + { + FastSync = true + }), + configurer: (builder) => builder + .AddSingleton(mockForwardHeaderProvider) + .AddSingleton(blockProcessingQueue)); + + Context ctx = node.Resolve(); + var request = await ctx.FastSyncFeedComponent.BlockDownloader.PrepareRequest( + DownloaderOptions.Insert, + 0, + CancellationToken.None); + + request.Should().BeNull(); + await mockForwardHeaderProvider.DidNotReceive() + .GetBlockHeaders(Arg.Any(), Arg.Any(), Arg.Any()); + } + [Test] public async Task Ancestor_lookup_simple() { @@ -492,7 +519,7 @@ public async Task Prune_download_requests_map() await using IContainer node = CreateNode(builder => builder .AddDecorator((_, syncConfig) => { - syncConfig.MaxTxInForwardSyncBuffer = 3200; + syncConfig.ForwardSyncDownloadBufferMemoryBudget = 3200000; return syncConfig; }) .AddSingleton(Always.Invalid)); diff --git a/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs b/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs index 36ce050588e..2050a8d7a52 100644 --- a/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs +++ b/src/Nethermind/Nethermind.Synchronization/Blocks/BlockDownloader.cs @@ -9,6 +9,7 @@ using Nethermind.Blockchain; using Nethermind.Blockchain.Receipts; using Nethermind.Blockchain.Synchronization; +using Nethermind.Consensus.Processing; using Nethermind.Consensus.Validators; using Nethermind.Core; using Nethermind.Core.Collections; @@ -47,10 +48,12 @@ public class BlockDownloader : IForwardSyncController private readonly IFullStateFinder _fullStateFinder; private readonly IForwardHeaderProvider _forwardHeaderProvider; private readonly ISyncPeerPool _syncPeerPool; + private readonly IBlockProcessingQueue _processingQueue; private readonly ILogger _logger; // Estimated maximum tx in buffer used to estimate memory limit. Each tx is on average about 1KB. private readonly int _maxTxInBuffer; + private readonly int _maxTxInInProcessingQueue; private const int MinEstimateTxPerBlock = 10; // Header lookup need to be limited, because `IForwardHeaderProvider.GetBlockHeaders` can be slow. @@ -72,31 +75,35 @@ public class BlockDownloader : IForwardSyncController private SemaphoreSlim _requestLock = new(1); public BlockDownloader( - IBlockTree? blockTree, - IBlockValidator? blockValidator, - ISyncReport? syncReport, - IReceiptStorage? receiptStorage, - ISpecProvider? specProvider, + IBlockTree blockTree, + IBlockValidator blockValidator, + ISyncReport syncReport, + IReceiptStorage receiptStorage, + ISpecProvider specProvider, IBetterPeerStrategy betterPeerStrategy, IFullStateFinder fullStateFinder, IForwardHeaderProvider forwardHeaderProvider, ISyncPeerPool syncPeerPool, + IReceiptsRecovery receiptsRecovery, + IBlockProcessingQueue processingQueue, ISyncConfig syncConfig, - ILogManager? logManager) + ILogManager logManager) { - _blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree)); - _blockValidator = blockValidator ?? throw new ArgumentNullException(nameof(blockValidator)); - _syncReport = syncReport ?? throw new ArgumentNullException(nameof(syncReport)); - _receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage)); - _specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider)); - _betterPeerStrategy = betterPeerStrategy ?? throw new ArgumentNullException(nameof(betterPeerStrategy)); - _fullStateFinder = fullStateFinder ?? throw new ArgumentNullException(nameof(fullStateFinder)); + _blockTree = blockTree; + _blockValidator = blockValidator; + _syncReport = syncReport; + _receiptStorage = receiptStorage; + _specProvider = specProvider; + _betterPeerStrategy = betterPeerStrategy; + _fullStateFinder = fullStateFinder; _forwardHeaderProvider = forwardHeaderProvider; _syncPeerPool = syncPeerPool; - _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); - _maxTxInBuffer = syncConfig.MaxTxInForwardSyncBuffer; - - _receiptsRecovery = new ReceiptsRecovery(new EthereumEcdsa(_specProvider.ChainId), _specProvider); + _logger = logManager.GetClassLogger(); + // Assume that each tx cost about 1kb. + _maxTxInBuffer = (int)(syncConfig.ForwardSyncDownloadBufferMemoryBudget / 1000); + _maxTxInInProcessingQueue = (int)(syncConfig.ForwardSyncBlockProcessingQueueMemoryBudget / 1000); + _receiptsRecovery = receiptsRecovery; + _processingQueue = processingQueue; _blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock; } @@ -145,6 +152,12 @@ private void BlockTreeOnNewHeadBlock(object? sender, BlockEventArgs e) while (true) { + if (_processingQueue.Count > _maxTxInInProcessingQueue / _estimateTxPerBlock) + { + if (_logger.IsTrace) _logger.Trace("Processing queue full"); + return null; + } + using IOwnedReadOnlyList? headers = await _forwardHeaderProvider.GetBlockHeaders(fastSyncLag, HeaderLookupSize + 1, cancellation); if (cancellation.IsCancellationRequested) return null; // check before every heavy operation if (headers is null || headers.Count <= 1) return null;