Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public interface ISyncConfig : IConfig
[ConfigItem(Description = "_Technical._ Enable storage range split.", DefaultValue = "false", HiddenFromDocs = true)]
bool EnableSnapSyncStorageRangeSplit { get; set; }

[ConfigItem(Description = "_Technical._ Max tx in forward sync buffer.", DefaultValue = "200000", HiddenFromDocs = true)]
int MaxTxInForwardSyncBuffer { get; set; }
[ConfigItem(Description = "_Technical._ Estimated size of memory for storing blocks during download.", DefaultValue = "200000000", HiddenFromDocs = true)]
long ForwardSyncDownloadBufferMemoryBudget { get; set; }

[ConfigItem(Description = "_Technical._ Estimated max size of blocks in block processing queue before stop downloading.", DefaultValue = "200000000", HiddenFromDocs = true)]
long ForwardSyncBlockProcessingQueueMemoryBudget { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public string? PivotHash

public ulong FastHeadersMemoryBudget { get; set; } = (ulong)128.MB();
public bool EnableSnapSyncStorageRangeSplit { get; set; } = false;
public int MaxTxInForwardSyncBuffer { get; set; } = 20000;
public long ForwardSyncDownloadBufferMemoryBudget { get; set; } = 200.MiB();
public long ForwardSyncBlockProcessingQueueMemoryBudget { get; set; } = 200.MiB();

public override string ToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Nethermind.Blockchain;
using Nethermind.Blockchain.Receipts;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Specs;
Expand All @@ -18,40 +19,46 @@

namespace Nethermind.Merge.Plugin.Synchronization
{
public class MergeBlockDownloader : BlockDownloader
public class MergeBlockDownloader(
IBeaconPivot beaconPivot,
IBlockTree blockTree,
IBlockValidator blockValidator,
ISyncReport syncReport,
IReceiptStorage receiptStorage,
ISpecProvider specProvider,
IBetterPeerStrategy betterPeerStrategy,
IFullStateFinder fullStateFinder,
IForwardHeaderProvider forwardHeaderProvider,
ISyncPeerPool syncPeerPool,
IReceiptsRecovery receiptsRecovery,
IBlockProcessingQueue blockProcessingQueue,
ISyncConfig syncConfig,
ILogManager logManager)
: BlockDownloader(
blockTree,
blockValidator,
syncReport,
receiptStorage,
specProvider,
betterPeerStrategy,
fullStateFinder,
forwardHeaderProvider,
syncPeerPool,
receiptsRecovery,
blockProcessingQueue,
syncConfig,
logManager)
{
private readonly IBeaconPivot _beaconPivot;
private readonly IBlockTree _blockTree;
private readonly ILogger _logger;

public MergeBlockDownloader(
IBeaconPivot beaconPivot,
IBlockTree? blockTree,
IBlockValidator? blockValidator,
ISyncReport? syncReport,
IReceiptStorage? receiptStorage,
ISpecProvider specProvider,
IBetterPeerStrategy betterPeerStrategy,
IFullStateFinder fullStateFinder,
IForwardHeaderProvider forwardHeaderProvider,
ISyncPeerPool syncPeerPool,
ISyncConfig syncConfig,
ILogManager logManager)
: base(blockTree, blockValidator, syncReport, receiptStorage,
specProvider, betterPeerStrategy, fullStateFinder, forwardHeaderProvider, syncPeerPool, syncConfig, logManager)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_beaconPivot = beaconPivot;
_logger = logManager.GetClassLogger();
}
private readonly IBlockTree _blockTree = blockTree;
private readonly ILogger _logger = logManager.GetClassLogger();

protected override BlockTreeSuggestOptions GetSuggestOption(bool shouldProcess, Block currentBlock)
{
BlockTreeSuggestOptions suggestOptions =
shouldProcess ? BlockTreeSuggestOptions.ShouldProcess : BlockTreeSuggestOptions.None;

bool isKnownBeaconBlock = _blockTree.IsKnownBeaconBlock(currentBlock.Number, currentBlock.GetOrCalculateHash());
if (_logger.IsTrace) _logger.Trace($"Current block {currentBlock}, BeaconPivot: {_beaconPivot.PivotNumber}, IsKnownBeaconBlock: {isKnownBeaconBlock}");
if (_logger.IsTrace) _logger.Trace($"Current block {currentBlock}, BeaconPivot: {beaconPivot.PivotNumber}, IsKnownBeaconBlock: {isKnownBeaconBlock}");

if (isKnownBeaconBlock)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
using Autofac.Features.AttributeFilters;
using Humanizer;
using Nethermind.Config;
using Nethermind.Consensus.Processing;
using Nethermind.Core.Events;
using Nethermind.Core.Test;
using Nethermind.Core.Test.Modules;
Expand Down Expand Up @@ -208,6 +209,32 @@ public async Task Catch_exception_from_forwardHeaderProvider()
await act.Should().NotThrowAsync();
}

[Test]
public async Task Skit_spawning_request_when_block_processing_queue_is_high()
{
IForwardHeaderProvider mockForwardHeaderProvider = Substitute.For<IForwardHeaderProvider>();
IBlockProcessingQueue blockProcessingQueue = Substitute.For<IBlockProcessingQueue>();
blockProcessingQueue.Count.Returns(10000);

await using IContainer node = CreateNode(configProvider: new ConfigProvider(new SyncConfig()
{
FastSync = true
}),
configurer: (builder) => builder
.AddSingleton<IForwardHeaderProvider>(mockForwardHeaderProvider)
.AddSingleton<IBlockProcessingQueue>(blockProcessingQueue));

Context ctx = node.Resolve<Context>();
var request = await ctx.FastSyncFeedComponent.BlockDownloader.PrepareRequest(
DownloaderOptions.Insert,
0,
CancellationToken.None);

request.Should().BeNull();
await mockForwardHeaderProvider.DidNotReceive()
.GetBlockHeaders(Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>());
}

[Test]
public async Task Ancestor_lookup_simple()
{
Expand Down Expand Up @@ -492,7 +519,7 @@ public async Task Prune_download_requests_map()
await using IContainer node = CreateNode(builder => builder
.AddDecorator<ISyncConfig>((_, syncConfig) =>
{
syncConfig.MaxTxInForwardSyncBuffer = 3200;
syncConfig.ForwardSyncDownloadBufferMemoryBudget = 3200000;
return syncConfig;
})
.AddSingleton<IBlockValidator>(Always.Invalid));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Nethermind.Blockchain;
using Nethermind.Blockchain.Receipts;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Collections;
Expand Down Expand Up @@ -47,10 +48,12 @@ public class BlockDownloader : IForwardSyncController
private readonly IFullStateFinder _fullStateFinder;
private readonly IForwardHeaderProvider _forwardHeaderProvider;
private readonly ISyncPeerPool _syncPeerPool;
private readonly IBlockProcessingQueue _processingQueue;
private readonly ILogger _logger;

// Estimated maximum tx in buffer used to estimate memory limit. Each tx is on average about 1KB.
private readonly int _maxTxInBuffer;
private readonly int _maxTxInInProcessingQueue;
private const int MinEstimateTxPerBlock = 10;

// Header lookup need to be limited, because `IForwardHeaderProvider.GetBlockHeaders` can be slow.
Expand All @@ -72,31 +75,35 @@ public class BlockDownloader : IForwardSyncController
private SemaphoreSlim _requestLock = new(1);

public BlockDownloader(
IBlockTree? blockTree,
IBlockValidator? blockValidator,
ISyncReport? syncReport,
IReceiptStorage? receiptStorage,
ISpecProvider? specProvider,
IBlockTree blockTree,
IBlockValidator blockValidator,
ISyncReport syncReport,
IReceiptStorage receiptStorage,
ISpecProvider specProvider,
IBetterPeerStrategy betterPeerStrategy,
IFullStateFinder fullStateFinder,
IForwardHeaderProvider forwardHeaderProvider,
ISyncPeerPool syncPeerPool,
IReceiptsRecovery receiptsRecovery,
IBlockProcessingQueue processingQueue,
ISyncConfig syncConfig,
ILogManager? logManager)
ILogManager logManager)
{
_blockTree = blockTree ?? throw new ArgumentNullException(nameof(blockTree));
_blockValidator = blockValidator ?? throw new ArgumentNullException(nameof(blockValidator));
_syncReport = syncReport ?? throw new ArgumentNullException(nameof(syncReport));
_receiptStorage = receiptStorage ?? throw new ArgumentNullException(nameof(receiptStorage));
_specProvider = specProvider ?? throw new ArgumentNullException(nameof(specProvider));
_betterPeerStrategy = betterPeerStrategy ?? throw new ArgumentNullException(nameof(betterPeerStrategy));
_fullStateFinder = fullStateFinder ?? throw new ArgumentNullException(nameof(fullStateFinder));
_blockTree = blockTree;
_blockValidator = blockValidator;
_syncReport = syncReport;
_receiptStorage = receiptStorage;
_specProvider = specProvider;
_betterPeerStrategy = betterPeerStrategy;
_fullStateFinder = fullStateFinder;
_forwardHeaderProvider = forwardHeaderProvider;
_syncPeerPool = syncPeerPool;
_logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_maxTxInBuffer = syncConfig.MaxTxInForwardSyncBuffer;

_receiptsRecovery = new ReceiptsRecovery(new EthereumEcdsa(_specProvider.ChainId), _specProvider);
_logger = logManager.GetClassLogger();
// Assume that each tx cost about 1kb.
_maxTxInBuffer = (int)(syncConfig.ForwardSyncDownloadBufferMemoryBudget / 1000);
_maxTxInInProcessingQueue = (int)(syncConfig.ForwardSyncBlockProcessingQueueMemoryBudget / 1000);
_receiptsRecovery = receiptsRecovery;
_processingQueue = processingQueue;
_blockTree.NewHeadBlock += BlockTreeOnNewHeadBlock;
}

Expand Down Expand Up @@ -145,6 +152,12 @@ private void BlockTreeOnNewHeadBlock(object? sender, BlockEventArgs e)

while (true)
{
if (_processingQueue.Count > _maxTxInInProcessingQueue / _estimateTxPerBlock)
{
if (_logger.IsTrace) _logger.Trace("Processing queue full");
return null;
}

using IOwnedReadOnlyList<BlockHeader?>? headers = await _forwardHeaderProvider.GetBlockHeaders(fastSyncLag, HeaderLookupSize + 1, cancellation);
if (cancellation.IsCancellationRequested) return null; // check before every heavy operation
if (headers is null || headers.Count <= 1) return null;
Expand Down