diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockhashProviderTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockhashProviderTests.cs index 1f218bdddc2a..ec9997f9df50 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/BlockhashProviderTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockhashProviderTests.cs @@ -438,6 +438,7 @@ public void BlockhashStore_uses_custom_ring_buffer_size() } [Test, MaxTime(Timeout.MaxTestTime)] + [NonParallelizable] public async Task Prefetches_come_in_wrong_order() { const int chainLength = 261; diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Builders/FilterBuilder.cs b/src/Nethermind/Nethermind.Blockchain.Test/Builders/FilterBuilder.cs index 86c7706fe3f5..4dbb1cc3e31a 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/Builders/FilterBuilder.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/Builders/FilterBuilder.cs @@ -11,14 +11,15 @@ namespace Nethermind.Blockchain.Test.Builders { public class FilterBuilder { - private static int _id; + private int _id; private BlockParameter _fromBlock = new(BlockParameterType.Latest); private BlockParameter _toBlock = new(BlockParameterType.Latest); private AddressFilter _address = AddressFilter.AnyAddress; private SequenceTopicsFilter _topicsFilter = new(); - private FilterBuilder() + private FilterBuilder(int id) { + _id = id; } public static FilterBuilder New() @@ -29,9 +30,9 @@ public static FilterBuilder New() public static FilterBuilder New(ref int currentFilterIndex) { - _id = currentFilterIndex; + int id = currentFilterIndex; currentFilterIndex++; - return new FilterBuilder(); + return new FilterBuilder(id); } public FilterBuilder WithId(int id) diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Filters/FilterManagerTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/Filters/FilterManagerTests.cs index 0b863d6112bc..a8dd1a2a6c9c 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/Filters/FilterManagerTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/Filters/FilterManagerTests.cs @@ -21,11 +21,11 @@ namespace Nethermind.Blockchain.Test.Filters; +[Parallelizable(ParallelScope.None)] public class FilterManagerTests { private FilterStore _filterStore = null!; - private IBranchProcessor _branchProcessor = null!; - private IMainProcessingContext _mainProcessingContext = null!; + private TestMainProcessingContext _mainProcessingContext = null!; private ITxPool _txPool = null!; private ILogManager _logManager = null!; private FilterManager _filterManager = null!; @@ -36,10 +36,8 @@ public class FilterManagerTests public void Setup() { _currentFilterId = 0; - _filterStore = new FilterStore(new TimerFactory(), 20, 10); - _branchProcessor = Substitute.For(); - _mainProcessingContext = Substitute.For(); - _mainProcessingContext.BranchProcessor.Returns(_branchProcessor); + _filterStore = new FilterStore(new TimerFactory(), 400, 100); + _mainProcessingContext = new TestMainProcessingContext(); _txPool = Substitute.For(); _logManager = LimboLogs.Instance; } @@ -55,7 +53,7 @@ public async Task removing_filter_removes_data() { LogsShouldNotBeEmpty(static _ => { }, static _ => { }); _filterManager.GetLogs(0).Should().NotBeEmpty(); - await Task.Delay(60); + await Task.Delay(600); _filterManager.GetLogs(0).Should().BeEmpty(); } @@ -257,6 +255,7 @@ public void logs_should_be_empty_for_existing_block_and_addresses_and_non_existi [Test, MaxTime(Timeout.MaxTestTime)] [TestCase(1, 1)] [TestCase(5, 3)] + [NonParallelizable] public void logs_should_have_correct_log_indexes(int filtersCount, int logsPerTx) { const int txCount = 10; @@ -330,12 +329,12 @@ private void Assert(IEnumerable> filterBuilders, _filterStore.SaveFilters(filters.OfType()); _filterManager = new FilterManager(_filterStore, _mainProcessingContext, _txPool, _logManager); - _branchProcessor.BlockProcessed += Raise.EventWith(_branchProcessor, new BlockProcessedEventArgs(block, [])); + _mainProcessingContext.TestBranchProcessor.RaiseBlockProcessed(new BlockProcessedEventArgs(block, [])); int index = 1; foreach (TxReceipt receipt in receipts) { - _mainProcessingContext.TransactionProcessed += Raise.EventWith(_branchProcessor, + _mainProcessingContext.RaiseTransactionProcessed( new TxProcessedEventArgs(index, Build.A.Transaction.TestObject, block.Header, receipt)); index++; } diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs b/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs new file mode 100644 index 000000000000..5b089c3f11c3 --- /dev/null +++ b/src/Nethermind/Nethermind.Blockchain.Test/Filters/TestProcessingContext.cs @@ -0,0 +1,55 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Threading; +using Nethermind.Consensus.Processing; +using Nethermind.Core; +using Nethermind.Evm.State; +using Nethermind.Evm.Tracing; +using Nethermind.Evm.TransactionProcessing; + +namespace Nethermind.Blockchain.Test.Filters; + +/// +/// Test implementation of IBranchProcessor that allows manual event raising. +/// +internal class TestBranchProcessor : IBranchProcessor +{ + public event EventHandler? BlockProcessed; + public event EventHandler? BlocksProcessing { add { } remove { } } + public event EventHandler? BlockProcessing { add { } remove { } } + + public Block[] Process(BlockHeader? baseBlock, IReadOnlyList suggestedBlocks, + ProcessingOptions processingOptions, IBlockTracer blockTracer, CancellationToken token = default) + => []; + + public void RaiseBlockProcessed(BlockProcessedEventArgs args) + => BlockProcessed?.Invoke(this, args); +} + +/// +/// Test implementation of IMainProcessingContext that allows manual event raising. +/// +internal class TestMainProcessingContext : IMainProcessingContext +{ + private readonly TestBranchProcessor _branchProcessor = new(); + + public ITransactionProcessor TransactionProcessor => null!; + public IBranchProcessor BranchProcessor => _branchProcessor; + public IBlockProcessor BlockProcessor => null!; + public IBlockchainProcessor BlockchainProcessor => null!; + public IBlockProcessingQueue BlockProcessingQueue => null!; + public IWorldState WorldState => null!; + public IGenesisLoader GenesisLoader => null!; + + public event EventHandler? TransactionProcessed; + + public TestBranchProcessor TestBranchProcessor => _branchProcessor; + + public void RaiseTransactionProcessed(TxProcessedEventArgs args) + => TransactionProcessed?.Invoke(this, args); +} diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Find/LogFinderTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/Find/LogFinderTests.cs index 625ded52d824..ecf78c3fc6e7 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/Find/LogFinderTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/Find/LogFinderTests.cs @@ -318,6 +318,7 @@ public void complex_filter(LogFilter filter, int expectedCount, bool withBloomDb } [Test, MaxTime(Timeout.MaxTestTime)] + [NonParallelizable] public async Task Throw_log_finder_operation_canceled_after_given_timeout([Values(2, 0.01)] double waitTime) { var timeout = TimeSpan.FromMilliseconds(Timeout.MaxWaitTime); @@ -335,7 +336,7 @@ public async Task Throw_log_finder_operation_canceled_after_given_timeout([Value if (waitTime > 1) { - action.Should().Throw().WithInnerException(); + action.Should().Throw(); } else { diff --git a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs index 598cf8a0dc53..a9f15b75d8dc 100644 --- a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs +++ b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs @@ -20,7 +20,6 @@ using Nethermind.Network.P2P.Subprotocols.Eth.V62; using Nethermind.Network.P2P.Subprotocols.Eth.V62.Messages; using Nethermind.Network.P2P.Subprotocols.Eth.V63.Messages; -using Nethermind.Serialization.Rlp; using Nethermind.Stats; using Nethermind.Stats.Model; using Nethermind.Synchronization; @@ -50,7 +49,6 @@ public abstract class SyncPeerProtocolHandlerBase : ZeroProtocolHandlerBase, ISy protected Hash256 _remoteHeadBlockHash; protected readonly ITimestamper _timestamper; - protected readonly TxDecoder _txDecoder; protected readonly MessageQueue> _headersRequests; protected readonly MessageQueue _bodiesRequests; @@ -67,7 +65,6 @@ protected SyncPeerProtocolHandlerBase(ISession session, { SyncServer = syncServer ?? throw new ArgumentNullException(nameof(syncServer)); _timestamper = Timestamper.Default; - _txDecoder = TxDecoder.Instance; _headersRequests = new MessageQueue>(Send); _bodiesRequests = new MessageQueue(Send); } diff --git a/src/Nethermind/Nethermind.Shutter.Test/ShutterIntegrationTests.cs b/src/Nethermind/Nethermind.Shutter.Test/ShutterIntegrationTests.cs index 5bd3e4fc9a97..c50e687cbce7 100644 --- a/src/Nethermind/Nethermind.Shutter.Test/ShutterIntegrationTests.cs +++ b/src/Nethermind/Nethermind.Shutter.Test/ShutterIntegrationTests.cs @@ -115,7 +115,9 @@ public async Task Can_increment_metric_on_missed_keys() time += (long)ShutterTestsCommon.SlotLength.TotalSeconds; } - Assert.That(Metrics.ShutterKeysMissed, Is.EqualTo(5)); + // ImproveBlock tasks run in the background and may not have completed yet + // when GetPayload returns (it only waits 50ms), so poll until all increments land. + Assert.That(() => Metrics.ShutterKeysMissed, Is.EqualTo((ulong)5).After(5000, 50)); } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedHealingTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedHealingTests.cs index d6d7f65685d3..92ffa310957e 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedHealingTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedHealingTests.cs @@ -19,6 +19,7 @@ namespace Nethermind.Synchronization.Test.FastSync; [Parallelizable(ParallelScope.All)] +[FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class StateSyncFeedHealingTests : StateSyncFeedTestsBase { [Test] diff --git a/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs b/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs index 319ec65f7f33..b75849af8999 100644 --- a/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs +++ b/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs @@ -897,6 +897,7 @@ public void After_commit_should_have_has_root() [Test] [Retry(3)] + [NonParallelizable] public async Task Will_RemovePastKeys_OnSnapshot() { MemDb memDb = new(); @@ -936,6 +937,8 @@ public async Task Will_RemovePastKeys_OnSnapshot() } [Test] + [Retry(3)] + [NonParallelizable] public async Task Will_Trigger_ReorgBoundaryEvent_On_Prune() { // TODO: Check why slow diff --git a/src/Nethermind/Nethermind.Trie.Test/TrieTests.cs b/src/Nethermind/Nethermind.Trie.Test/TrieTests.cs index 15cea826d4b4..39acb58ec63e 100644 --- a/src/Nethermind/Nethermind.Trie.Test/TrieTests.cs +++ b/src/Nethermind/Nethermind.Trie.Test/TrieTests.cs @@ -28,6 +28,7 @@ namespace Nethermind.Trie.Test { [TestFixture] [Parallelizable(ParallelScope.All)] + [FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class TrieTests { private ILogger _logger; @@ -1082,6 +1083,8 @@ public void Fuzz_accounts_with_reorganizations( } [TestCaseSource(nameof(FuzzAccountsWithStorageScenarios))] + [Retry(3)] + [NonParallelizable] public void Fuzz_accounts_with_storage( (TrieStoreConfigurations trieStoreConfigurations, int accountsCount, diff --git a/src/Nethermind/Nethermind.TxPool.Test/BlobTxStorageTests.cs b/src/Nethermind/Nethermind.TxPool.Test/BlobTxStorageTests.cs index b8333b8af243..f4cfb5149371 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/BlobTxStorageTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/BlobTxStorageTests.cs @@ -10,6 +10,7 @@ namespace Nethermind.TxPool.Test; [TestFixture] +[Parallelizable(ParallelScope.All)] public class BlobTxStorageTests { [Test] diff --git a/src/Nethermind/Nethermind.TxPool.Test/Collections/SortedPoolTests.cs b/src/Nethermind/Nethermind.TxPool.Test/Collections/SortedPoolTests.cs index 758a9c9ff3da..b2820b770334 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/Collections/SortedPoolTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/Collections/SortedPoolTests.cs @@ -20,6 +20,8 @@ namespace Nethermind.TxPool.Test.Collections { [TestFixture] + [Parallelizable(ParallelScope.All)] + [FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class SortedPoolTests { private const int Capacity = 16; diff --git a/src/Nethermind/Nethermind.TxPool.Test/CompetingTransactionEqualityComparerTests.cs b/src/Nethermind/Nethermind.TxPool.Test/CompetingTransactionEqualityComparerTests.cs index 9d316746a3f5..c5fab4a47687 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/CompetingTransactionEqualityComparerTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/CompetingTransactionEqualityComparerTests.cs @@ -9,6 +9,7 @@ namespace Nethermind.TxPool.Test { + [Parallelizable(ParallelScope.All)] public class CompetingTransactionEqualityComparerTests { public static IEnumerable TestCases diff --git a/src/Nethermind/Nethermind.TxPool.Test/DelegatedAccountFilterTest.cs b/src/Nethermind/Nethermind.TxPool.Test/DelegatedAccountFilterTest.cs index c7d004f471f2..b7d7e3590d5c 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/DelegatedAccountFilterTest.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/DelegatedAccountFilterTest.cs @@ -19,6 +19,7 @@ namespace Nethermind.TxPool.Test; +[Parallelizable(ParallelScope.All)] internal class DelegatedAccountFilterTest { [Test] @@ -26,7 +27,7 @@ public void Accept_SenderIsNotDelegated_ReturnsAccepted() { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); DelegatedAccountFilter filter = new(headInfoProvider, standardPool, blobPool, Substitute.For(), new DelegationCache()); Transaction transaction = Build.A.Transaction.SignedAndResolved(new EthereumEcdsa(0), TestItem.PrivateKeyA).TestObject; @@ -46,7 +47,7 @@ public void Accept_SenderIsDelegatedWithNoTransactionsInPool_ReturnsAccepted() stateProvider.InsertCode(code, TestItem.AddressA); IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); DelegatedAccountFilter filter = new(headInfoProvider, standardPool, blobPool, stateProvider, new DelegationCache()); Transaction transaction = Build.A.Transaction.SignedAndResolved(new EthereumEcdsa(0), TestItem.PrivateKeyA).TestObject; @@ -62,7 +63,7 @@ public void Accept_SenderIsDelegatedWithOneTransactionInPoolWithSameNonce_Return { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); Transaction inPool = Build.A.Transaction.SignedAndResolved(new EthereumEcdsa(0), TestItem.PrivateKeyA).TestObject; standardPool.TryInsert(inPool.Hash, inPool); @@ -84,7 +85,7 @@ public void Accept_SenderIsDelegatedWithOneTransactionInPoolWithDifferentNonce_R { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); Transaction inPool = Build.A.Transaction.SignedAndResolved(new EthereumEcdsa(0), TestItem.PrivateKeyA).TestObject; standardPool.TryInsert(inPool.Hash, inPool); @@ -111,7 +112,7 @@ public void Accept_Eip7702IsNotActivated_ReturnsExpected(bool isActive, AcceptTx { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(isActive ? Prague.Instance : Cancun.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); Transaction inPool = Build.A.Transaction.WithNonce(0).SignedAndResolved(new EthereumEcdsa(0), TestItem.PrivateKeyA).TestObject; standardPool.TryInsert(inPool.Hash, inPool); @@ -140,7 +141,7 @@ public void Accept_SenderHasPendingDelegation_OnlyAcceptsIfNonceIsExactMatch(int { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); DelegationCache pendingDelegations = new(); pendingDelegations.IncrementDelegationCount(TestItem.AddressA); @@ -161,7 +162,7 @@ public void Accept_AuthorityHasPendingTransaction_ReturnsDelegatorHasPendingTx(b { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); DelegatedAccountFilter filter = new(headInfoProvider, standardPool, blobPool, Substitute.For(), new()); Transaction transaction; @@ -200,7 +201,7 @@ public void Accept_SetCodeTxHasAuthorityWithPendingTx_ReturnsDelegatorHasPending { IChainHeadSpecProvider headInfoProvider = Substitute.For(); headInfoProvider.GetCurrentHeadSpec().Returns(Prague.Instance); - TxDistinctSortedPool standardPool = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, Substitute.For>(), NullLogManager.Instance); + TxDistinctSortedPool standardPool = new TxDistinctSortedPool(new TxPoolConfig().Size, Substitute.For>(), NullLogManager.Instance); TxDistinctSortedPool blobPool = new BlobTxDistinctSortedPool(10, Substitute.For>(), NullLogManager.Instance); DelegationCache pendingDelegations = new(); pendingDelegations.IncrementDelegationCount(TestItem.AddressA); diff --git a/src/Nethermind/Nethermind.TxPool.Test/NonceManagerTests.cs b/src/Nethermind/Nethermind.TxPool.Test/NonceManagerTests.cs index dce8e3bde40c..f5d801d747a8 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/NonceManagerTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/NonceManagerTests.cs @@ -19,6 +19,8 @@ namespace Nethermind.TxPool.Test; +[Parallelizable(ParallelScope.All)] +[FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class NonceManagerTests { private ISpecProvider _specProvider; diff --git a/src/Nethermind/Nethermind.TxPool.Test/ReceiptStorageTests.cs b/src/Nethermind/Nethermind.TxPool.Test/ReceiptStorageTests.cs index cf9b832a7cb3..4bebaa002d24 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/ReceiptStorageTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/ReceiptStorageTests.cs @@ -19,6 +19,8 @@ namespace Nethermind.TxPool.Test { [TestFixture(true)] [TestFixture(false)] + [Parallelizable(ParallelScope.All)] + [FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class ReceiptStorageTests { private readonly bool _useEip2718; diff --git a/src/Nethermind/Nethermind.TxPool.Test/RetryCacheTests.cs b/src/Nethermind/Nethermind.TxPool.Test/RetryCacheTests.cs index 9b09028fedaa..1ee4370de27c 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/RetryCacheTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/RetryCacheTests.cs @@ -3,16 +3,16 @@ using Nethermind.Core; using Nethermind.Logging; -using NSubstitute; using NUnit.Framework; using System; using System.Threading; using System.Threading.Tasks; -using Nethermind.Core.Test; namespace Nethermind.TxPool.Test; [TestFixture] +[Parallelizable(ParallelScope.All)] +[FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class RetryCacheTests { public readonly struct ResourceRequestMessage : INew @@ -23,10 +23,27 @@ public class RetryCacheTests public interface ITestHandler : IMessageHandler; + /// + /// A test handler that tracks HandleMessage calls without using NSubstitute. + /// + private class TestHandler : ITestHandler + { + private int _handleMessageCallCount; + public int HandleMessageCallCount => _handleMessageCallCount; + public bool WasCalled => _handleMessageCallCount > 0; + public Action OnHandleMessage { get; set; } + + public void HandleMessage(ResourceRequestMessage message) + { + Interlocked.Increment(ref _handleMessageCallCount); + OnHandleMessage?.Invoke(message); + } + } + private CancellationTokenSource _cancellationTokenSource; private RetryCache _cache; - private readonly int Timeout = 5000; + private readonly int Timeout = 10000; [SetUp] public void Setup() @@ -45,8 +62,8 @@ public void TearDown() [Test] public void Announced_SameResourceDifferentNode_ReturnsEnqueued() { - AnnounceResult result1 = _cache.Announced(1, Substitute.For()); - AnnounceResult result2 = _cache.Announced(1, Substitute.For()); + AnnounceResult result1 = _cache.Announced(1, new TestHandler()); + AnnounceResult result2 = _cache.Announced(1, new TestHandler()); Assert.That(result1, Is.EqualTo(AnnounceResult.RequestRequired)); Assert.That(result2, Is.EqualTo(AnnounceResult.Delayed)); @@ -55,58 +72,57 @@ public void Announced_SameResourceDifferentNode_ReturnsEnqueued() [Test] public void Announced_AfterTimeout_ExecutesRetryRequests() { - ITestHandler request1 = Substitute.For(); - ITestHandler request2 = Substitute.For(); + TestHandler request1 = new(); + TestHandler request2 = new(); _cache.Announced(1, request1); _cache.Announced(1, request2); - Assert.That(() => request2.ReceivedCallsMatching(r => r.HandleMessage(Arg.Any())), Is.True.After(Timeout, 100)); - request1.DidNotReceive().HandleMessage(Arg.Any()); + Assert.That(() => request2.WasCalled, Is.True.After(Timeout, 100)); + Assert.That(request1.WasCalled, Is.False); } [Test] public void Announced_MultipleResources_ExecutesAllRetryRequestsExceptInitialOne() { - ITestHandler request1 = Substitute.For(); - ITestHandler request2 = Substitute.For(); - ITestHandler request3 = Substitute.For(); - ITestHandler request4 = Substitute.For(); + TestHandler request1 = new(); + TestHandler request2 = new(); + TestHandler request3 = new(); + TestHandler request4 = new(); _cache.Announced(1, request1); _cache.Announced(1, request2); _cache.Announced(2, request3); _cache.Announced(2, request4); - Assert.That(() => request2.ReceivedCallsMatching(r => r.HandleMessage(Arg.Any())), Is.True.After(Timeout, 100)); - Assert.That(() => request4.ReceivedCallsMatching(r => r.HandleMessage(Arg.Any())), Is.True.After(Timeout, 100)); - request1.DidNotReceive().HandleMessage(Arg.Any()); - request3.DidNotReceive().HandleMessage(Arg.Any()); + Assert.That(() => request2.WasCalled, Is.True.After(Timeout, 100)); + Assert.That(() => request4.WasCalled, Is.True.After(Timeout, 100)); + Assert.That(request1.WasCalled, Is.False); + Assert.That(request3.WasCalled, Is.False); } [Test] public void Received_RemovesResourceFromRetryQueue() { - _cache.Announced(1, Substitute.For()); + _cache.Announced(1, new TestHandler()); _cache.Received(1); - AnnounceResult result = _cache.Announced(1, Substitute.For()); + AnnounceResult result = _cache.Announced(1, new TestHandler()); Assert.That(result, Is.EqualTo(AnnounceResult.RequestRequired)); } [Test] public async Task Received_BeforeTimeout_PreventsRetryExecution() { - ITestHandler request = Substitute.For(); + TestHandler request = new(); _cache.Announced(1, request); _cache.Announced(1, request); _cache.Received(1); - await Task.Delay(Timeout, _cancellationTokenSource.Token); - request.DidNotReceive().HandleMessage(ResourceRequestMessage.New(1)); + Assert.That(request.WasCalled, Is.False); } [Test] @@ -116,9 +132,7 @@ public async Task Clear_cache_after_timeout() { Parallel.For(0, 100, (j) => { - ITestHandler request = Substitute.For(); - - _cache.Announced(i, request); + _cache.Announced(i, new TestHandler()); }); }); @@ -128,40 +142,39 @@ public async Task Clear_cache_after_timeout() } [Test] + [Retry(3)] public void RetryExecution_HandlesExceptions() { - ITestHandler faultyRequest = Substitute.For(); - ITestHandler normalRequest = Substitute.For(); - - faultyRequest.When(x => x.HandleMessage(Arg.Any())).Do(x => throw new InvalidOperationException("Test exception")); + TestHandler faultyRequest = new() { OnHandleMessage = _ => throw new InvalidOperationException("Test exception") }; + TestHandler normalRequest = new(); - _cache.Announced(1, Substitute.For()); + _cache.Announced(1, new TestHandler()); _cache.Announced(1, faultyRequest); _cache.Announced(1, normalRequest); - Assert.That(() => normalRequest.ReceivedCallsMatching(r => r.HandleMessage(ResourceRequestMessage.New(1))), Is.True.After(Timeout, 100)); + Assert.That(() => normalRequest.WasCalled, Is.True.After(Timeout, 100)); } [Test] public async Task CancellationToken_StopsProcessing() { - ITestHandler request = Substitute.For(); + TestHandler request = new(); _cache.Announced(1, request); await _cancellationTokenSource.CancelAsync(); await Task.Delay(Timeout); - request.DidNotReceive().HandleMessage(Arg.Any()); + Assert.That(request.WasCalled, Is.False); } [Test] public async Task Announced_AfterRetryInProgress_ReturnsNew() { - _cache.Announced(1, Substitute.For()); + _cache.Announced(1, new TestHandler()); await Task.Delay(Timeout, _cancellationTokenSource.Token); - Assert.That(() => _cache.Announced(1, Substitute.For()), Is.EqualTo(AnnounceResult.RequestRequired).After(Timeout, 100)); + Assert.That(() => _cache.Announced(1, new TestHandler()), Is.EqualTo(AnnounceResult.RequestRequired).After(Timeout, 100)); } [Test] diff --git a/src/Nethermind/Nethermind.TxPool.Test/TestBlockTree.cs b/src/Nethermind/Nethermind.TxPool.Test/TestBlockTree.cs new file mode 100644 index 000000000000..a8473dc82663 --- /dev/null +++ b/src/Nethermind/Nethermind.TxPool.Test/TestBlockTree.cs @@ -0,0 +1,108 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Blockchain; +using Nethermind.Blockchain.Find; +using Nethermind.Blockchain.Visitors; +using Nethermind.Core; +using Nethermind.Core.Collections; +using Nethermind.Core.Crypto; + +namespace Nethermind.TxPool.Test; + +/// +/// A minimal IBlockTree implementation for testing that avoids NSubstitute's +/// static state issues when running tests in parallel. +/// +internal class TestBlockTree : IBlockTree +{ + public Block? Head { get; set; } + public BlockHeader? BestSuggestedHeader { get; set; } + + public event EventHandler? BlockAddedToMain; + public event EventHandler? NewBestSuggestedBlock { add { } remove { } } + public event EventHandler? NewSuggestedBlock { add { } remove { } } + public event EventHandler? NewHeadBlock { add { } remove { } } + public event EventHandler? OnUpdateMainChain { add { } remove { } } + public event EventHandler? OnForkChoiceUpdated { add { } remove { } } + + public void RaiseBlockAddedToMain(BlockReplacementEventArgs args) + { + BlockAddedToMain?.Invoke(this, args); + } + + public BlockHeader FindBestSuggestedHeader() => BestSuggestedHeader!; + + // IBlockFinder implementation + public Hash256 HeadHash => Head?.Hash ?? Keccak.Zero; + public Hash256 GenesisHash => Keccak.Zero; + public Hash256? PendingHash => null; + public Hash256? FinalizedHash => null; + public Hash256? SafeHash => null; + public long? BestPersistedState { get; set; } + + public Block? FindBlock(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => null; + public Block? FindBlock(long blockNumber, BlockTreeLookupOptions options) => null; + public bool HasBlock(long blockNumber, Hash256 blockHash) => false; + public BlockHeader? FindHeader(Hash256 blockHash, BlockTreeLookupOptions options, long? blockNumber = null) => null; + public BlockHeader? FindHeader(long blockNumber, BlockTreeLookupOptions options) => null; + public Hash256? FindBlockHash(long blockNumber) => null; + public bool IsMainChain(BlockHeader blockHeader) => false; + public bool IsMainChain(Hash256 blockHash, bool throwOnMissingHash = true) => false; + public long GetLowestBlock() => 0; + + // IBlockTree implementation + public ulong NetworkId => 1; + public ulong ChainId => 1; + public BlockHeader? Genesis => null; + public Block? BestSuggestedBody => null; + public BlockHeader? BestSuggestedBeaconHeader => null; + public BlockHeader? LowestInsertedHeader { get; set; } + public BlockHeader? LowestInsertedBeaconHeader { get; set; } + public long BestKnownNumber => Head?.Number ?? 0; + public long BestKnownBeaconNumber => 0; + public bool CanAcceptNewBlocks => true; + public (long BlockNumber, Hash256 BlockHash) SyncPivot { get; set; } + public bool IsProcessingBlock { get; set; } + + public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None) + => AddBlockResult.Added; + + public void BulkInsertHeader(IReadOnlyList headers, BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None) { } + + public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, + BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags bodiesWriteFlags = WriteFlags.None) + => AddBlockResult.Added; + + public void UpdateHeadBlock(Hash256 blockHash) { } + public void NewOldestBlock(long oldestBlock) { } + public AddBlockResult SuggestBlock(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess) => AddBlockResult.Added; + public ValueTask SuggestBlockAsync(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess) + => ValueTask.FromResult(AddBlockResult.Added); + public AddBlockResult SuggestHeader(BlockHeader header) => AddBlockResult.Added; + public bool IsKnownBlock(long number, Hash256 blockHash) => false; + public bool IsKnownBeaconBlock(long number, Hash256 blockHash) => false; + public bool WasProcessed(long number, Hash256 blockHash) => false; + public void UpdateMainChain(IReadOnlyList blocks, bool wereProcessed, bool forceHeadBlock = false) { } + public void MarkChainAsProcessed(IReadOnlyList blocks) { } + public Task Accept(IBlockTreeVisitor blockTreeVisitor, CancellationToken cancellationToken) => Task.CompletedTask; + public (BlockInfo? Info, ChainLevelInfo? Level) GetInfo(long number, Hash256 blockHash) => (null, null); + public ChainLevelInfo? FindLevel(long number) => null; + public BlockInfo FindCanonicalBlockInfo(long blockNumber) => null!; + public Hash256? FindHash(long blockNumber) => null; + public IOwnedReadOnlyList FindHeaders(Hash256 hash, int numberOfBlocks, int skip, bool reverse) + => new ArrayPoolList(0); + public void DeleteInvalidBlock(Block invalidBlock) { } + public void DeleteOldBlock(long blockNumber, Hash256 blockHash) { } + public void ForkChoiceUpdated(Hash256? finalizedBlockHash, Hash256? safeBlockBlockHash) { } + public int DeleteChainSlice(in long startNumber, long? endNumber = null, bool force = false) => 0; + public bool IsBetterThanHead(BlockHeader? header) => false; + public void UpdateBeaconMainChain(BlockInfo[]? blockInfos, long clearBeaconMainChainStartPoint) { } + public void RecalculateTreeLevels() { } +} diff --git a/src/Nethermind/Nethermind.TxPool.Test/TestChainHeadInfoProvider.cs b/src/Nethermind/Nethermind.TxPool.Test/TestChainHeadInfoProvider.cs new file mode 100644 index 000000000000..9cb26ba9b4ea --- /dev/null +++ b/src/Nethermind/Nethermind.TxPool.Test/TestChainHeadInfoProvider.cs @@ -0,0 +1,35 @@ +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +#nullable enable + +using System; +using Nethermind.Core; +using Nethermind.Core.Specs; +using Nethermind.Evm.State; +using Nethermind.Int256; + +namespace Nethermind.TxPool.Test; + +/// +/// A minimal IChainHeadInfoProvider implementation for testing that avoids NSubstitute's +/// static state issues when running tests in parallel. +/// +internal class TestChainHeadInfoProvider : IChainHeadInfoProvider +{ + public IChainHeadSpecProvider SpecProvider { get; set; } = null!; + public IReadOnlyStateProvider ReadOnlyStateProvider { get; set; } = null!; + public long HeadNumber { get; set; } + public long? BlockGasLimit { get; set; } = 30_000_000; + public UInt256 CurrentBaseFee { get; set; } + public UInt256 CurrentFeePerBlobGas { get; set; } + public ProofVersion CurrentProofVersion { get; set; } + public bool IsSyncing { get; set; } + public bool IsProcessingBlock { get; set; } + public event EventHandler? HeadChanged; + + public void RaiseHeadChanged(BlockReplacementEventArgs args) + { + HeadChanged?.Invoke(this, args); + } +} diff --git a/src/Nethermind/Nethermind.TxPool.Test/TransactionExtensionsTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TransactionExtensionsTests.cs index f8baecc5b94f..1028eddc5577 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TransactionExtensionsTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TransactionExtensionsTests.cs @@ -10,6 +10,7 @@ namespace Nethermind.TxPool.Test { [TestFixture] + [Parallelizable(ParallelScope.All)] public class TransactionExtensionsTests { [Test] diff --git a/src/Nethermind/Nethermind.TxPool.Test/TransactionPoolInfoProviderTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TransactionPoolInfoProviderTests.cs index f9486f29e11c..722f7ba43e68 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TransactionPoolInfoProviderTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TransactionPoolInfoProviderTests.cs @@ -13,6 +13,8 @@ namespace Nethermind.TxPool.Test { [TestFixture] + [Parallelizable(ParallelScope.All)] + [FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class TxPoolInfoProviderTests { private Address _address; diff --git a/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs index 696f7f1385ee..17daab08c3ac 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs @@ -39,6 +39,8 @@ namespace Nethermind.TxPool.Test; [TestFixture] +[Parallelizable(ParallelScope.All)] +[FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public class TxBroadcasterTests { private ILogManager _logManager; @@ -48,7 +50,7 @@ public class TxBroadcasterTests private TxBroadcaster _broadcaster; private EthereumEcdsa _ethereumEcdsa; private TxPoolConfig _txPoolConfig; - private IChainHeadInfoProvider _headInfo; + private TestChainHeadInfoProvider _headInfo; [SetUp] public void Setup() @@ -59,18 +61,19 @@ public void Setup() _blockTree = Substitute.For(); _comparer = new TransactionComparerProvider(_specProvider, _blockTree).GetDefaultComparer(); _txPoolConfig = new TxPoolConfig(); - _headInfo = Substitute.For(); + _headInfo = new TestChainHeadInfoProvider(); } [TearDown] public void TearDown() => _broadcaster?.Dispose(); [Test] - public async Task should_not_broadcast_persisted_tx_to_peer_too_quickly() + public void should_not_broadcast_persisted_tx_to_peer_too_quickly() { + ManualTimestamper timestamper = new(DateTime.UtcNow); _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = 100 }; - _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager, timestamper: timestamper); + _headInfo.CurrentBaseFee = 0.GWei(); int addedTxsCount = TestItem.PrivateKeys.Length; Transaction[] transactions = new Transaction[addedTxsCount]; @@ -102,7 +105,8 @@ public async Task should_not_broadcast_persisted_tx_to_peer_too_quickly() peer.Received(1).SendNewTransactions(Arg.Any>(), true); - await Task.Delay(TimeSpan.FromMilliseconds(1001)); + // Advance time by 2 seconds (throttle is 1 second) + timestamper.Add(TimeSpan.FromSeconds(2)); peer.Received(1).SendNewTransactions(Arg.Any>(), true); @@ -120,7 +124,7 @@ public void should_pick_best_persistent_txs_to_broadcast([Values(1, 2, 99, 100, { _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold }; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _headInfo.CurrentBaseFee = 0.GWei(); int addedTxsCount = TestItem.PrivateKeys.Length; Transaction[] transactions = new Transaction[addedTxsCount]; @@ -203,7 +207,7 @@ public void should_skip_large_txs_when_picking_best_persistent_txs_to_broadcast( { _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold }; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _headInfo.CurrentBaseFee = 0.GWei(); // add 256 transactions, 10% of them is large int addedTxsCount = TestItem.PrivateKeys.Length; @@ -249,7 +253,7 @@ public void should_skip_blob_txs_when_picking_best_persistent_txs_to_broadcast([ { _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold }; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _headInfo.CurrentBaseFee = 0.GWei(); // add 256 transactions, 10% of them is blob type int addedTxsCount = TestItem.PrivateKeys.Length; @@ -297,7 +301,7 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee([Values( _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); const int currentBaseFeeInGwei = 250; - _headInfo.CurrentBaseFee.Returns(currentBaseFeeInGwei.GWei()); + _headInfo.CurrentBaseFee = currentBaseFeeInGwei.GWei(); Block headBlock = Build.A.Block .WithNumber(MainnetSpecProvider.LondonBlockNumber) .WithBaseFeePerGas(currentBaseFeeInGwei.GWei()) @@ -341,7 +345,7 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee([Values( [TestCase(150, true)] public void should_not_broadcast_tx_with_MaxFeePerGas_lower_than_70_percent_of_CurrentBaseFee(int maxFeePerGas, bool shouldBroadcast) { - _headInfo.CurrentBaseFee.Returns((UInt256)100); + _headInfo.CurrentBaseFee = (UInt256)100; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); @@ -371,7 +375,7 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); const int currentBaseFeeInGwei = 250; - _headInfo.CurrentBaseFee.Returns(currentBaseFeeInGwei.GWei()); + _headInfo.CurrentBaseFee = currentBaseFeeInGwei.GWei(); Block headBlock = Build.A.Block .WithNumber(MainnetSpecProvider.LondonBlockNumber) .WithBaseFeePerGas(currentBaseFeeInGwei.GWei()) @@ -416,7 +420,7 @@ public void should_not_pick_blob_txs_with_MaxFeePerBlobGas_lower_than_CurrentFee _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); const int currentFeePerBlobGas = 250; - _headInfo.CurrentFeePerBlobGas.Returns(currentFeePerBlobGas.GWei()); + _headInfo.CurrentFeePerBlobGas = currentFeePerBlobGas.GWei(); // add 256 transactions with MaxFeePerBlobGas 0-255 int addedTxsCount = TestItem.PrivateKeys.Length; @@ -463,7 +467,7 @@ public void should_pick_tx_with_lowest_nonce_from_bucket() { _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = 5 }; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _headInfo.CurrentBaseFee = 0.GWei(); const int addedTxsCount = 5; Transaction[] transactions = new Transaction[addedTxsCount]; @@ -508,7 +512,7 @@ public void should_broadcast_local_tx_immediately_after_receiving_it() public void should_broadcast_full_local_tx_immediately_after_receiving_it() { _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _headInfo.CurrentBaseFee = 0.GWei(); ISession session = Substitute.For(); session.Node.Returns(new Node(TestItem.PublicKeyA, TestItem.IPEndPointA)); @@ -646,7 +650,7 @@ public void should_check_tx_policy_for_broadcast(bool canGossipTransactions, boo { ITxGossipPolicy txGossipPolicy = Substitute.For(); _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager, txGossipPolicy); - _headInfo.CurrentBaseFee.Returns(0.GWei()); + _headInfo.CurrentBaseFee = 0.GWei(); ISession session = Substitute.For(); session.Node.Returns(new Node(TestItem.PublicKeyA, TestItem.IPEndPointA)); @@ -708,7 +712,7 @@ public void should_rebroadcast_all_persistent_transactions_if_PeerNotificationTh [TestCase(10000, 7000)] public void should_calculate_baseFeeThreshold_correctly(int baseFee, int expectedThreshold) { - _headInfo.CurrentBaseFee.Returns((UInt256)baseFee); + _headInfo.CurrentBaseFee = (UInt256)baseFee; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); _broadcaster.CalculateBaseFeeThreshold().Should().Be((UInt256)expectedThreshold); } @@ -717,7 +721,7 @@ public void should_calculate_baseFeeThreshold_correctly(int baseFee, int expecte public void calculation_of_baseFeeThreshold_should_handle_overflow_correctly([Values(0, 70, 100, 101, 500)] int threshold, [Values(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)] int divisor) { UInt256.Divide(UInt256.MaxValue, (UInt256)divisor, out UInt256 baseFee); - _headInfo.CurrentBaseFee.Returns(baseFee); + _headInfo.CurrentBaseFee = baseFee; _txPoolConfig = new TxPoolConfig() { MinBaseFeeThreshold = threshold }; _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); diff --git a/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.Blobs.cs b/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.Blobs.cs index 17764368551b..84b47a6be499 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.Blobs.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.Blobs.cs @@ -607,7 +607,8 @@ Transaction GetTx(bool isBlob, UInt256 nonce) _txPool.GetPendingTransactionsCount().Should().Be(firstIsBlob ? 0 : 1); _txPool.GetPendingBlobTransactionsCount().Should().Be(firstIsBlob ? 1 : 0); _stateProvider.IncrementNonce(TestItem.AddressA); - await RaiseBlockAddedToMainAndWaitForTransactions(1); + Block block = Build.A.Block.WithNumber(1).TestObject; + await RaiseBlockAddedToMainAndWaitForNewHead(block); _txPool.GetPendingTransactionsCount().Should().Be(0); _txPool.GetPendingBlobTransactionsCount().Should().Be(0); @@ -951,7 +952,7 @@ public void should_convert_blob_proofs_to_cell_proofs_if_enabled([Values(true, f EnsureSenderBalance(TestItem.AddressA, UInt256.MaxValue); // update head and set correct current proof version - _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.TestObject)); + _blockTree.RaiseBlockAddedToMain(new BlockReplacementEventArgs(Build.A.Block.TestObject)); Transaction blobTxAdded = Build.A.Transaction .WithShardBlobTxTypeAndFields() @@ -985,7 +986,7 @@ public void should_convert_blob_proofs_to_cell_proofs_if_enabled([Values(true, f public async Task should_evict_based_on_proof_version_and_fork(BlobsSupportMode poolMode, TestAction[] testActions) { Block head = _blockTree.Head; - _blockTree.FindBestSuggestedHeader().Returns(head.Header); + _blockTree.BestSuggestedHeader = head.Header; (ChainSpecBasedSpecProvider provider, _) = TestSpecHelper.LoadChainSpec(new ChainSpecJson { @@ -1053,12 +1054,12 @@ TestCaseData MakeTestCase(string testName, int finalCount, BlobsSupportMode mode } } - private Task AddEmptyBlock() + private async Task AddEmptyBlock() { BlockHeader bh = new(_blockTree.Head.Hash, Keccak.EmptyTreeHash, TestItem.AddressA, 0, _blockTree.Head.Number + 1, _blockTree.Head.GasLimit, _blockTree.Head.Timestamp + 1, []); - _blockTree.FindBestSuggestedHeader().Returns(bh); - _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(new Block(bh, new BlockBody([], [])), _blockTree.Head)); - return Task.Delay(300); + _blockTree.BestSuggestedHeader = bh; + Block block = new Block(bh, new BlockBody([], [])); + await RaiseBlockAddedToMainAndWaitForNewHead(block, _blockTree.Head); } private Transaction CreateBlobTx(PrivateKey sender, UInt256 nonce = default, int blobCount = 1, IReleaseSpec releaseSpec = default) @@ -1092,7 +1093,7 @@ public async Task should_evict_txs_with_too_many_blobs_per_tx_after_fork() }; Block head = _blockTree.Head; - _blockTree.FindBestSuggestedHeader().Returns(head.Header); + _blockTree.BestSuggestedHeader = head.Header; _txPool = CreatePool(specProvider: provider); EnsureSenderBalance(TestItem.AddressA, UInt256.MaxValue); @@ -1126,7 +1127,7 @@ public async Task should_evict_txs_with_too_many_blobs_per_block_after_fork() }; Block head = _blockTree.Head; - _blockTree.FindBestSuggestedHeader().Returns(head.Header); + _blockTree.BestSuggestedHeader = head.Header; _txPool = CreatePool(specProvider: provider); EnsureSenderBalance(TestItem.AddressA, UInt256.MaxValue); diff --git a/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs index 6f431b074d40..537752efcb27 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs @@ -36,16 +36,25 @@ namespace Nethermind.TxPool.Test { [TestFixture] + [Parallelizable(ParallelScope.All)] + [FixtureLifeCycle(LifeCycle.InstancePerTestCase)] public partial class TxPoolTests { + private const int Timeout = 10000; private ILogManager _logManager; private IEthereumEcdsa _ethereumEcdsa; private ISpecProvider _specProvider; private TxPool _txPool; private TestReadOnlyStateProvider _stateProvider; - private IBlockTree _blockTree; + private TestBlockTree _blockTree; - private readonly int _txGasLimit = 1_000_000; + private const int TxGasLimit = 1_000_000; + + [OneTimeSetUp] + public static void OneTimeSetup() + { + KzgPolynomialCommitments.InitializeAsync().Wait(); + } [SetUp] public void Setup() @@ -54,19 +63,17 @@ public void Setup() _specProvider = MainnetSpecProvider.Instance; _ethereumEcdsa = new EthereumEcdsa(_specProvider.ChainId); _stateProvider = new TestReadOnlyStateProvider(); - _blockTree = Substitute.For(); - Block block = Build.A.Block.WithNumber(10000000 - 1).TestObject; - _blockTree.Head.Returns(block); - _blockTree.FindBestSuggestedHeader().Returns(Build.A.BlockHeader.WithNumber(10000000).TestObject); - - KzgPolynomialCommitments.InitializeAsync().Wait(); + _blockTree = new TestBlockTree(); + Block block = Build.A.Block.WithNumber(10000000 - 1).WithBaseFeePerGas(0).TestObject; + _blockTree.Head = block; + _blockTree.BestSuggestedHeader = Build.A.BlockHeader.WithNumber(10000000).WithBaseFee(0).TestObject; } [Test] public void should_add_peers() { _txPool = CreatePool(); - var peers = GetPeers(); + IDictionary peers = GetPeers(); foreach ((ITxPoolPeer peer, _) in peers) { @@ -78,7 +85,7 @@ public void should_add_peers() public void should_delete_peers() { _txPool = CreatePool(); - var peers = GetPeers(); + IDictionary peers = GetPeers(); foreach ((ITxPoolPeer peer, _) in peers) { @@ -152,7 +159,7 @@ public void should_add_valid_transactions_recovering_its_address() { _txPool = CreatePool(); Transaction tx = Build.A.Transaction - .WithGasLimit(_txGasLimit) + .WithGasLimit(TxGasLimit) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; EnsureSenderBalance(tx); tx.SenderAddress = null; @@ -166,7 +173,7 @@ public void should_reject_transactions_from_contract_address() { _txPool = CreatePool(); Transaction tx = Build.A.Transaction - .WithGasLimit(_txGasLimit) + .WithGasLimit(TxGasLimit) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; EnsureSenderBalance(tx); _stateProvider.InsertCode(TestItem.AddressA, "A"u8.ToArray(), _specProvider.GetSpec((ForkActivation)1)); @@ -192,7 +199,7 @@ public void should_accept_1559_transactions_only_when_eip1559_enabled([Values(fa .WithMaxPriorityFeePerGas(5.GWei()) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; EnsureSenderBalance(tx); - _blockTree.BlockAddedToMain += Raise.EventWith(_blockTree, new BlockReplacementEventArgs(Build.A.Block.WithGasLimit(10000000).TestObject)); + _blockTree.RaiseBlockAddedToMain(new BlockReplacementEventArgs(Build.A.Block.WithGasLimit(10000000).TestObject)); AcceptTxResult result = txPool.SubmitTx(tx, TxHandlingOptions.PersistentBroadcast); txPool.GetPendingTransactionsCount().Should().Be(eip1559Enabled ? 1 : 0); result.Should().Be(eip1559Enabled ? AcceptTxResult.Accepted : AcceptTxResult.Invalid); @@ -215,7 +222,7 @@ public void should_not_ignore_insufficient_funds_for_eip1559_transactions() var headProcessed = new ManualResetEventSlim(false); txPool.TxPoolHeadChanged += (s, a) => headProcessed.Set(); - _blockTree.BlockAddedToMain += Raise.EventWith(_blockTree, new BlockReplacementEventArgs(Build.A.Block.WithGasLimit(10000000).TestObject)); + _blockTree.RaiseBlockAddedToMain(new BlockReplacementEventArgs(Build.A.Block.WithGasLimit(10000000).TestObject)); headProcessed.Wait(); result = txPool.SubmitTx(tx, TxHandlingOptions.PersistentBroadcast); @@ -332,7 +339,7 @@ public void should_ignore_overflow_transactions() public void should_ignore_overflow_transactions_gas_premium_and_fee_cap() { ISpecProvider specProvider = GetLondonSpecProvider(); - var txPool = CreatePool(null, specProvider); + TxPool txPool = CreatePool(null, specProvider); Transaction tx = Build.A.Transaction.WithGasPrice(UInt256.MaxValue / Transaction.BaseTxGasCost) .WithGasLimit(Transaction.BaseTxGasCost) .WithValue(Transaction.BaseTxGasCost) @@ -366,7 +373,7 @@ public void should_reject_tx_if_max_size_is_exceeded([Values(true, false)] bool Transaction tx = Build.A.Transaction.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; EnsureSenderBalance(tx); - var txPoolConfig = new TxPoolConfig() { MaxTxSize = tx.GetLength() - (sizeExceeded ? 1 : 0) }; + TxPoolConfig txPoolConfig = new() { MaxTxSize = tx.GetLength() - (sizeExceeded ? 1 : 0) }; _txPool = CreatePool(txPoolConfig); AcceptTxResult result = _txPool.SubmitTx(tx, TxHandlingOptions.PersistentBroadcast); @@ -399,7 +406,7 @@ public void should_ignore_tx_gas_limit_exceeded() { _txPool = CreatePool(); Transaction tx = Build.A.Transaction - .WithGasLimit(_txGasLimit + 1) + .WithGasLimit(TxGasLimit + 1) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; EnsureSenderBalance(tx); AcceptTxResult result = _txPool.SubmitTx(tx, TxHandlingOptions.PersistentBroadcast); @@ -558,7 +565,7 @@ public void should_not_add_tx_if_already_pending_lower_nonces_are_exhausting_bal { const int gasPrice = 10; const int value = 1; - int oneTxPrice = _txGasLimit * gasPrice + value; + int oneTxPrice = TxGasLimit * gasPrice + value; _txPool = CreatePool(); Transaction[] transactions = new Transaction[10]; @@ -570,7 +577,7 @@ public void should_not_add_tx_if_already_pending_lower_nonces_are_exhausting_bal .WithSenderAddress(TestItem.AddressA) .WithNonce((UInt256)i) .WithGasPrice((UInt256)gasPrice) - .WithGasLimit(_txGasLimit) + .WithGasLimit(TxGasLimit) .WithValue(value) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; }); @@ -591,7 +598,7 @@ public void should_not_count_txs_with_stale_nonces_when_calculating_cumulative_c { const int gasPrice = 10; const int value = 1; - int oneTxPrice = _txGasLimit * gasPrice + value; + int oneTxPrice = TxGasLimit * gasPrice + value; _txPool = CreatePool(); EnsureSenderBalance(TestItem.AddressA, (UInt256)(oneTxPrice * numberOfTxsPossibleToExecuteBeforeGasExhaustion)); @@ -604,7 +611,7 @@ public void should_not_count_txs_with_stale_nonces_when_calculating_cumulative_c .WithSenderAddress(TestItem.AddressA) .WithNonce((UInt256)i) .WithGasPrice((UInt256)gasPrice) - .WithGasLimit(_txGasLimit) + .WithGasLimit(TxGasLimit) .WithValue(value) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; }); @@ -631,7 +638,7 @@ public void should_add_tx_if_cost_of_executing_all_txs_in_bucket_exceeds_balance const int count = 10; const int gasPrice = 10; const int value = 1; - int oneTxPrice = _txGasLimit * gasPrice + value; + int oneTxPrice = TxGasLimit * gasPrice + value; _txPool = CreatePool(); Transaction[] transactions = new Transaction[count]; @@ -643,7 +650,7 @@ public void should_add_tx_if_cost_of_executing_all_txs_in_bucket_exceeds_balance .WithSenderAddress(TestItem.AddressA) .WithNonce((UInt256)i) .WithGasPrice((UInt256)gasPrice) - .WithGasLimit(_txGasLimit) + .WithGasLimit(TxGasLimit) .WithValue(value) .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA).TestObject; }); @@ -777,7 +784,7 @@ public void should_remove_txHash_from_hashCache_when_tx_removed_because_of_txPoo var headProcessed = new ManualResetEventSlim(false); _txPool.TxPoolHeadChanged += (s, a) => headProcessed.Set(); - _blockTree.BlockAddedToMain += Raise.EventWith(new BlockReplacementEventArgs(Build.A.Block.TestObject)); + _blockTree.RaiseBlockAddedToMain(new BlockReplacementEventArgs(Build.A.Block.TestObject)); headProcessed.Wait(); _txPool.IsKnown(higherPriorityTx.Hash).Should().BeTrue(); @@ -888,10 +895,11 @@ public void should_remove_stale_txs_from_persistent_transactions(int numberOfTxs BlockReplacementEventArgs blockReplacementEventArgs = new(block, null); ManualResetEvent manualResetEvent = new(false); - _txPool.RemoveTransaction(Arg.Do(t => manualResetEvent.Set())); - _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); - manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); + _txPool.TxPoolHeadChanged += (o, e) => manualResetEvent.Set(); + _blockTree.RaiseBlockAddedToMain(blockReplacementEventArgs); + bool signaled = manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(Timeout)); + signaled.Should().BeTrue("TxPoolHeadChanged event should have been raised"); // transactions[nonceIncludedInBlock] was included in the block and should be removed, as well as all lower nonces. _txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs - nonceIncludedInBlock - 1); } @@ -926,10 +934,11 @@ public void broadcaster_should_work_well_when_there_are_no_txs_in_persistent_txs BlockReplacementEventArgs blockReplacementEventArgs = new(block, null); ManualResetEvent manualResetEvent = new(false); - _txPool.RemoveTransaction(Arg.Do(t => manualResetEvent.Set())); - _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); - manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); + _txPool.TxPoolHeadChanged += (o, e) => manualResetEvent.Set(); + _blockTree.RaiseBlockAddedToMain(blockReplacementEventArgs); + bool signaled = manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(Timeout)); + signaled.Should().BeTrue("TxPoolHeadChanged event should have been raised"); _txPool.GetPendingTransactionsCount().Should().Be(1); _txPool.GetOwnPendingTransactions().Length.Should().Be(1); } @@ -960,7 +969,7 @@ public async Task should_remove_transactions_concurrently() public void should_add_transactions_concurrently() { int size = 3; - TxPoolConfig config = new() { GasLimit = _txGasLimit, Size = size }; + TxPoolConfig config = new() { GasLimit = TxGasLimit, Size = size }; _txPool = CreatePool(config); foreach (PrivateKey privateKey in TestItem.PrivateKeys) @@ -1007,10 +1016,11 @@ public void should_remove_tx_from_txPool_when_included_in_block(bool sameTransac BlockReplacementEventArgs blockReplacementEventArgs = new(block, null); ManualResetEvent manualResetEvent = new(false); - _txPool.RemoveTransaction(Arg.Do(t => manualResetEvent.Set())); - _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); - manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); + _txPool.TxPoolHeadChanged += (o, e) => manualResetEvent.Set(); + _blockTree.RaiseBlockAddedToMain(blockReplacementEventArgs); + bool signaled = manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(Timeout)); + signaled.Should().BeTrue("TxPoolHeadChanged event should have been raised"); _txPool.GetPendingTransactionsCount().Should().Be(0); } @@ -1030,10 +1040,11 @@ public void should_not_remove_txHash_from_hashCache_when_tx_removed_because_of_i BlockReplacementEventArgs blockReplacementEventArgs = new(block, null); ManualResetEvent manualResetEvent = new(false); - _txPool.RemoveTransaction(Arg.Do(t => manualResetEvent.Set())); - _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); - manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); + _txPool.TxPoolHeadChanged += (o, e) => manualResetEvent.Set(); + _blockTree.RaiseBlockAddedToMain(blockReplacementEventArgs); + bool signaled = manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(Timeout)); + signaled.Should().BeTrue("TxPoolHeadChanged event should have been raised"); foreach (Transaction transaction in transactions) { _txPool.IsKnown(transaction.Hash).Should().BeTrue(); @@ -1168,9 +1179,9 @@ public void should_accept_access_list_transactions_only_when_eip2930_enabled([Va { if (!eip2930Enabled) { - _blockTree.FindBestSuggestedHeader().Returns(Build.A.BlockHeader.WithNumber(MainnetSpecProvider.BerlinBlockNumber - 1).TestObject); + _blockTree.BestSuggestedHeader = Build.A.BlockHeader.WithNumber(MainnetSpecProvider.BerlinBlockNumber - 1).TestObject; Block block = Build.A.Block.WithNumber(MainnetSpecProvider.BerlinBlockNumber - 2).TestObject; - _blockTree.Head.Returns(block); + _blockTree.Head = block; } _txPool = CreatePool(null, new TestSpecProvider(eip2930Enabled ? Berlin.Instance : Istanbul.Instance)); @@ -1189,9 +1200,9 @@ public void should_accept_only_when_synced([Values(false, true)] bool isSynced, { if (!isSynced) { - _blockTree.FindBestSuggestedHeader().Returns(Build.A.BlockHeader.WithNumber(MainnetSpecProvider.BerlinBlockNumber - 1).TestObject); + _blockTree.BestSuggestedHeader = Build.A.BlockHeader.WithNumber(MainnetSpecProvider.BerlinBlockNumber - 1).TestObject; Block block = Build.A.Block.WithNumber(1).TestObject; - _blockTree.Head.Returns(block); + _blockTree.Head = block; } _txPool = CreatePool(null, new TestSpecProvider(Berlin.Instance)); @@ -1488,6 +1499,8 @@ public void should_increase_nonce_when_transaction_not_included_in_txPool_but_br } [Test] + [Retry(3)] + [NonParallelizable] public void should_include_transaction_after_removal() { ISpecProvider specProvider = GetLondonSpecProvider(); @@ -1523,12 +1536,10 @@ public void should_include_transaction_after_removal() _txPool.SubmitTx(expensiveTx2, TxHandlingOptions.None).Should().Be(AcceptTxResult.Accepted); // Rise new block event to cleanup cash and remove one expensive tx - _blockTree.BlockAddedToMain += - Raise.Event>(this, - new BlockReplacementEventArgs(Build.A.Block.WithTransactions(expensiveTx1).TestObject)); + _blockTree.RaiseBlockAddedToMain(new BlockReplacementEventArgs(Build.A.Block.WithTransactions(expensiveTx1).TestObject)); // Wait for event processing and send txA again => should be Accepted - Assert.That(() => _txPool.SubmitTx(txA, TxHandlingOptions.None), Is.EqualTo(AcceptTxResult.Accepted).After(100, 10)); + Assert.That(() => _txPool.SubmitTx(txA, TxHandlingOptions.None), Is.EqualTo(AcceptTxResult.Accepted).After(Timeout, 10)); } [TestCase(true, 1, 1, true)] @@ -2245,7 +2256,7 @@ private TxPool CreatePool( _ethereumEcdsa, txStorage, _headInfo, - config ?? new TxPoolConfig() { GasLimit = _txGasLimit }, + config ?? new TxPoolConfig() { GasLimit = TxGasLimit }, new TxValidator(_specProvider.ChainId), _logManager, transactionComparerProvider.GetDefaultComparer(), @@ -2263,33 +2274,13 @@ private ITxPoolPeer GetPeer(PublicKey publicKey) return peer; } - private static ISpecProvider GetLondonSpecProvider() - { - var specProvider = Substitute.For(); - specProvider.GetSpec(Arg.Any()).Returns(London.Instance); - return specProvider; - } + private static ISpecProvider GetLondonSpecProvider() => new TestSpecProvider(London.Instance); - private static ISpecProvider GetCancunSpecProvider() - { - var specProvider = Substitute.For(); - specProvider.GetSpec(Arg.Any()).Returns(Cancun.Instance); - return specProvider; - } + private static ISpecProvider GetCancunSpecProvider() => new TestSpecProvider(Cancun.Instance); - private static ISpecProvider GetPragueSpecProvider() - { - var specProvider = Substitute.For(); - specProvider.GetSpec(Arg.Any()).Returns(Prague.Instance); - return specProvider; - } + private static ISpecProvider GetPragueSpecProvider() => new TestSpecProvider(Prague.Instance); - private static ISpecProvider GetOsakaSpecProvider() - { - var specProvider = Substitute.For(); - specProvider.GetSpec(Arg.Any()).Returns(Osaka.Instance); - return specProvider; - } + private static ISpecProvider GetOsakaSpecProvider() => new TestSpecProvider(Osaka.Instance); private Transaction[] AddTransactionsToPool(bool sameTransactionSenderPerPeer = true, bool sameNoncePerPeer = false, int transactionsPerPeer = 10) { @@ -2397,10 +2388,10 @@ private async Task RaiseBlockAddedToMainAndWaitForTransactions(int txCount, Bloc SemaphoreSlim semaphoreSlim = new(0, txCount); _txPool.NewPending += (o, e) => semaphoreSlim.Release(); - _blockTree.BlockAddedToMain += Raise.EventWith(blockReplacementEventArgs); + _blockTree.RaiseBlockAddedToMain(blockReplacementEventArgs); for (int i = 0; i < txCount; i++) { - await semaphoreSlim.WaitAsync(10); + await semaphoreSlim.WaitAsync(1000); } } @@ -2417,7 +2408,7 @@ private async Task RaiseBlockAddedToMainAndWaitForNewHead(Block block, Block pre e => e.Number == block.Number ); - _blockTree.BlockAddedToMain += Raise.EventWith(blockReplacementEventArgs); + _blockTree.RaiseBlockAddedToMain(blockReplacementEventArgs); await waitTask; } diff --git a/src/Nethermind/Nethermind.TxPool/RetryCache.cs b/src/Nethermind/Nethermind.TxPool/RetryCache.cs index 4bd9b40626c9..9a797f733b86 100644 --- a/src/Nethermind/Nethermind.TxPool/RetryCache.cs +++ b/src/Nethermind/Nethermind.TxPool/RetryCache.cs @@ -122,11 +122,7 @@ public AnnounceResult Announced(in TResourceId resourceId, IMessageHandler - { - return AnnounceAdd(resourceId, retryHandler, out result); - }, _announceUpdate, handler); - + _retryRequests.AddOrUpdate(resourceId, (resourceId, retryHandler) => AnnounceAdd(resourceId, retryHandler, out result), _announceUpdate, handler); return result; } diff --git a/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs b/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs index 2c3ec6bbca67..9b481c5b10fd 100644 --- a/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs +++ b/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs @@ -68,21 +68,24 @@ internal class TxBroadcaster : IDisposable private readonly TimeSpan _minTimeBetweenPersistedTxBroadcast = TimeSpan.FromSeconds(1); private readonly ILogger _logger; + private readonly ITimestamper _timestamper; public TxBroadcaster(IComparer comparer, ITimerFactory timerFactory, ITxPoolConfig txPoolConfig, IChainHeadInfoProvider chainHeadInfoProvider, ILogManager? logManager, - ITxGossipPolicy? transactionsGossipPolicy = null) + ITxGossipPolicy? transactionsGossipPolicy = null, + ITimestamper? timestamper = null) { _txPoolConfig = txPoolConfig; _headInfo = chainHeadInfoProvider; + _timestamper = timestamper ?? Timestamper.Default; _txGossipPolicy = transactionsGossipPolicy ?? ShouldGossip.Instance; // Allocate closure once _gossipFilter = _txGossipPolicy.ShouldGossipTransaction; _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); - _persistentTxs = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, comparer, logManager); + _persistentTxs = new TxDistinctSortedPool(txPoolConfig.Size, comparer, logManager); _accumulatedTemporaryTxs = new ResettableList(512, 4); _txsToSend = new ResettableList(512, 4); @@ -172,7 +175,7 @@ internal void BroadcastPersistentTxs() return; } - DateTimeOffset now = DateTimeOffset.UtcNow; + DateTimeOffset now = _timestamper.UtcNowOffset; if (_lastPersistedTxBroadcast + _minTimeBetweenPersistedTxBroadcast > now) { if (_logger.IsTrace) _logger.Trace($"Minimum time between persistent tx broadcast not reached."); diff --git a/src/Nethermind/Nethermind.TxPool/TxPool.cs b/src/Nethermind/Nethermind.TxPool/TxPool.cs index 20aaa898432b..e94f67414948 100644 --- a/src/Nethermind/Nethermind.TxPool/TxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/TxPool.cs @@ -135,7 +135,7 @@ public TxPool(IEthereumEcdsa ecdsa, _broadcaster = new TxBroadcaster(comparer, TimerFactory.Default, txPoolConfig, chainHeadInfoProvider, logManager, transactionsGossipPolicy); TxPoolHeadChanged += _broadcaster.OnNewHead; - _transactions = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, comparer, logManager); + _transactions = new TxDistinctSortedPool(txPoolConfig.Size, comparer, logManager); _transactions.Removed += OnRemovedTx; _blobTransactions = txPoolConfig.BlobsSupport.IsPersistentStorage()