Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Api/IInitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public interface IInitConfig : IConfig
[ConfigItem(Description = "[TECHNICAL] Exit when block number is reached. Useful for scripting and testing.", DefaultValue = "null", HiddenFromDocs = true)]
long? ExitOnBlockNumber { get; set; }

[ConfigItem(Description = "[TECHNICAL] Specify concurrency limit for background task.", DefaultValue = "1", HiddenFromDocs = true)]
[ConfigItem(Description = "[TECHNICAL] Specify concurrency limit for background task.", DefaultValue = "2", HiddenFromDocs = true)]
int BackgroundTaskConcurrency { get; set; }

[ConfigItem(Description = "[TECHNICAL] Specify max number of background task.", DefaultValue = "1024", HiddenFromDocs = true)]
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Api/InitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class InitConfig : IInitConfig
public bool DisableMallocOpts { get; set; } = false;
public INodeStorage.KeyScheme StateDbKeyScheme { get; set; } = INodeStorage.KeyScheme.Current;
public long? ExitOnBlockNumber { get; set; } = null;
public int BackgroundTaskConcurrency { get; set; } = 1;
public int BackgroundTaskConcurrency { get; set; } = 2;
public int BackgroundTaskMaxNumber { get; set; } = 1024;
public bool InRunnerTest { get; set; } = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Nethermind.Consensus.Scheduler;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.TxPool;
using NSubstitute;
using NUnit.Framework;
using TaskCompletionSource = DotNetty.Common.Concurrency.TaskCompletionSource;
Expand All @@ -18,18 +19,21 @@ namespace Nethermind.Consensus.Test.Scheduler;
public class BackgroundTaskSchedulerTests
{
private IBlockProcessor _blockProcessor;
private IChainHeadInfoProvider _chainHeadInfo;

[SetUp]
public void Setup()
{
_blockProcessor = Substitute.For<IBlockProcessor>();
_chainHeadInfo = Substitute.For<IChainHeadInfoProvider>();
_chainHeadInfo.IsSyncing.Returns(false);
}

[Test]
public async Task Test_task_will_execute()
{
TaskCompletionSource tcs = new TaskCompletionSource();
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 1, 65536, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, _chainHeadInfo, 1, 65536, LimboLogs.Instance);

scheduler.ScheduleTask(1, (_, token) =>
{
Expand All @@ -43,7 +47,7 @@ public async Task Test_task_will_execute()
[Test]
public async Task Test_task_will_execute_concurrently_when_configured_so()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, _chainHeadInfo, 2, 65536, LimboLogs.Instance);

int counter = 0;

Expand All @@ -68,7 +72,7 @@ public async Task Test_task_will_execute_concurrently_when_configured_so()
[Test]
public async Task Test_task_will_cancel_on_block_processing()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, _chainHeadInfo, 2, 65536, LimboLogs.Instance);

bool wasCancelled = false;

Expand Down Expand Up @@ -96,7 +100,7 @@ public async Task Test_task_will_cancel_on_block_processing()
[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_will_continue_after()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, _chainHeadInfo, 2, 65536, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

int executionCount = 0;
Expand All @@ -119,7 +123,7 @@ public async Task Test_task_that_is_scheduled_during_block_processing_will_conti
[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_but_deadlined_will_get_called_and_cancelled()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, 65536, LimboLogs.Instance);
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, _chainHeadInfo, 2, 65536, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

bool wasCancelled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Nethermind.Consensus.Processing;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using Nethermind.TxPool;

namespace Nethermind.Consensus.Scheduler;

Expand Down Expand Up @@ -38,11 +39,12 @@ public class BackgroundTaskScheduler : IBackgroundTaskScheduler, IAsyncDisposabl
private readonly Task[] _tasksExecutors;
private readonly ILogger _logger;
private readonly IBlockProcessor _blockProcessor;
private readonly IChainHeadInfoProvider _headInfo;

private CancellationTokenSource _blockProcessorCancellationTokenSource;
private bool _disposed = false;

public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency, int capacity, ILogManager logManager)
public BackgroundTaskScheduler(IBlockProcessor blockProcessor, IChainHeadInfoProvider headInfo, int concurrency, int capacity, ILogManager logManager)
{
ArgumentOutOfRangeException.ThrowIfLessThan(concurrency, 1);
ArgumentOutOfRangeException.ThrowIfLessThan(capacity, 1);
Expand All @@ -55,6 +57,7 @@ public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency,
_taskQueue = Channel.CreateUnboundedPrioritized<IActivity>();
_logger = logManager.GetClassLogger();
_blockProcessor = blockProcessor;
_headInfo = headInfo;
_restartQueueSignal = new ManualResetEventSlim(initialState: true);
// As channel is unbounded (to be prioritized) we gate capacity with a semaphore
_capacity = new SemaphoreSlim(initialCount: capacity);
Expand All @@ -75,10 +78,15 @@ public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency,

private void BlockProcessorOnBlocksProcessing(object? sender, BlocksProcessingEventArgs e)
{
// Reset background queue processing signal, causing it to wait
_restartQueueSignal.Reset();
// On block processing, we cancel the block process cts, causing current task to get cancelled.
_blockProcessorCancellationTokenSource.Cancel();
// If we are syncing we don't block background task processing
// as there are potentially no gaps between blocks
if (!_headInfo.IsSyncing)
{
// Reset background queue processing signal, causing it to wait
_restartQueueSignal.Reset();
// On block processing, we cancel the block process cts, causing current task to get cancelled.
_blockProcessorCancellationTokenSource.Cancel();
}
}

private void BlockProcessorOnBlockProcessed(object? sender, BlockProcessedEventArgs e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.IO.Abstractions;
using System.Reflection;
using Autofac;
using Nethermind.Api;
using Nethermind.Blockchain.Filters;
using Nethermind.Config;
using Nethermind.Consensus;
using Nethermind.Consensus.Scheduler;
using Nethermind.Core.Specs;
using Nethermind.Core.Timers;
using Nethermind.Crypto;
using Nethermind.Db;
using Nethermind.Evm;
using Nethermind.Evm.State;
using Nethermind.Init.Modules;
using Nethermind.Int256;
using Nethermind.JsonRpc;
using Nethermind.KeyStore;
using Nethermind.Logging;
Expand Down Expand Up @@ -53,6 +56,7 @@ protected override void Load(ContainerBuilder builder)
.AddSingleton<ITimerFactory, TimerFactory>()
.AddSingleton<IBackgroundTaskScheduler, MainBlockProcessingContext>((blockProcessingContext) => new BackgroundTaskScheduler(
blockProcessingContext.BlockProcessor,
new ChainHeadInfoMock(),
initConfig.BackgroundTaskConcurrency,
initConfig.BackgroundTaskMaxNumber,
logManager))
Expand Down Expand Up @@ -85,4 +89,19 @@ protected override void Load(ContainerBuilder builder)
}
});
}

private class ChainHeadInfoMock : IChainHeadInfoProvider
{
public IChainHeadSpecProvider SpecProvider { get; } = null!;
public IReadOnlyStateProvider ReadOnlyStateProvider { get; } = null!;
public ICodeInfoRepository CodeInfoRepository { get; } = null!;
public long HeadNumber { get; }
public long? BlockGasLimit { get; }
public UInt256 CurrentBaseFee { get; }
public UInt256 CurrentFeePerBlobGas { get; }
public ProofVersion CurrentProofVersion { get; }
public bool IsSyncing { get => false; }
public bool IsProcessingBlock { get; }
public event EventHandler<BlockReplacementEventArgs> HeadChanged { add { } remove { } }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ protected virtual Task InitBlockchain()

BackgroundTaskScheduler backgroundTaskScheduler = new BackgroundTaskScheduler(
mainBlockProcessor,
chainHeadInfoProvider,
initConfig.BackgroundTaskConcurrency,
initConfig.BackgroundTaskMaxNumber,
_api.LogManager);
Expand Down