diff --git a/src/Nethermind/Nethermind.Core.Test/Caching/ClockCacheTests.cs b/src/Nethermind/Nethermind.Core.Test/Caching/ClockCacheTests.cs index f4bdf703576e..0dd477bcaeb9 100644 --- a/src/Nethermind/Nethermind.Core.Test/Caching/ClockCacheTests.cs +++ b/src/Nethermind/Nethermind.Core.Test/Caching/ClockCacheTests.cs @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only using System; @@ -242,6 +242,19 @@ public void Can_delete() cache.Delete(_addresses[0]).Should().BeFalse(); } + [Test] + public void Delete_returns_value() + { + Cache cache = Create(); + cache.Set(_addresses[0], _accounts[0]); + + cache.Delete(_addresses[0], out Account? value).Should().BeTrue(); + value.Should().Be(_accounts[0]); + + cache.Delete(_addresses[0], out Account? noValue).Should().BeFalse(); + noValue.Should().BeNull(); + } + [Test] public void Clear_should_free_all_capacity() { diff --git a/src/Nethermind/Nethermind.Core/Caching/ClockCache.cs b/src/Nethermind/Nethermind.Core/Caching/ClockCache.cs index a13c6eaef1fe..ee94fa68bedb 100644 --- a/src/Nethermind/Nethermind.Core/Caching/ClockCache.cs +++ b/src/Nethermind/Nethermind.Core/Caching/ClockCache.cs @@ -1,4 +1,4 @@ -// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited +// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only using System; @@ -136,9 +136,15 @@ void ThrowInvalidOperationException() } } - public bool Delete(TKey key) + public bool Delete(TKey key) => Delete(key, out _); + + public bool Delete(TKey key, [NotNullWhen(true)] out TValue? value) { - if (MaxCapacity == 0) return false; + if (MaxCapacity == 0) + { + value = default; + return false; + } using var lockRelease = _lock.Acquire(); @@ -148,9 +154,11 @@ public bool Delete(TKey key) KeyToOffset[ov.Offset] = default; ClearAccessed(ov.Offset); FreeOffsets.Enqueue(ov.Offset); - return true; + value = ov.Value; + return ov.Value != null; } + value = default; return false; } diff --git a/src/Nethermind/Nethermind.Network.Test/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandlerTests.cs b/src/Nethermind/Nethermind.Network.Test/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandlerTests.cs index 1ecc4d466cc9..657350833df7 100644 --- a/src/Nethermind/Nethermind.Network.Test/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandlerTests.cs +++ b/src/Nethermind/Nethermind.Network.Test/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandlerTests.cs @@ -1,8 +1,6 @@ // SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only -using System; -using System.Net; using DotNetty.Buffers; using FluentAssertions; using Nethermind.Consensus; @@ -19,6 +17,7 @@ using Nethermind.Network.P2P.Messages; using Nethermind.Network.P2P.Subprotocols; using Nethermind.Network.P2P.Subprotocols.Eth.V62.Messages; +using Nethermind.Network.P2P.Subprotocols.Eth.V66; using Nethermind.Network.P2P.Subprotocols.Eth.V66.Messages; using Nethermind.Network.P2P.Subprotocols.Eth.V68; using Nethermind.Network.P2P.Subprotocols.Eth.V68.Messages; @@ -31,6 +30,8 @@ using Nethermind.TxPool; using NSubstitute; using NUnit.Framework; +using System; +using System.Net; namespace Nethermind.Network.Test.P2P.Subprotocols.Eth.V68; @@ -146,6 +147,38 @@ public void Should_throw_when_sizes_doesnt_match(bool removeSize) action.Should().Throw(); } + + [Test] + public void Should_disconnect_if_tx_size_is_wrong() + { + GenerateTxLists(4, out ArrayPoolList types, out ArrayPoolList sizes, out ArrayPoolList hashes, out ArrayPoolList txs); + sizes[0] += 10; + using NewPooledTransactionHashesMessage68 hashesMsg = new(types, sizes, hashes); + using PooledTransactionsMessage txsMsg = new(1111, new(txs)); + + HandleIncomingStatusMessage(); + HandleZeroMessage(hashesMsg, Eth68MessageCode.NewPooledTransactionHashes); + HandleZeroMessage(txsMsg, Eth66MessageCode.PooledTransactions); + + _session.Received().InitiateDisconnect(DisconnectReason.BackgroundTaskFailure, "invalid pooled tx type or size"); + } + + + [Test] + public void Should_disconnect_if_tx_type_is_wrong() + { + GenerateTxLists(4, out ArrayPoolList types, out ArrayPoolList sizes, out ArrayPoolList hashes, out ArrayPoolList txs); + types[0]++; + using NewPooledTransactionHashesMessage68 hashesMsg = new(types, sizes, hashes); + using PooledTransactionsMessage txsMsg = new(1111, new(txs)); + + HandleIncomingStatusMessage(); + HandleZeroMessage(hashesMsg, Eth68MessageCode.NewPooledTransactionHashes); + HandleZeroMessage(txsMsg, Eth66MessageCode.PooledTransactions); + + _session.Received().InitiateDisconnect(DisconnectReason.BackgroundTaskFailure, "invalid pooled tx type or size"); + } + [Test] public void Should_process_huge_transaction() { @@ -273,20 +306,28 @@ private void HandleZeroMessage(T msg, byte messageCode) where T : MessageBase } private void GenerateLists(int txCount, out ArrayPoolList types, out ArrayPoolList sizes, out ArrayPoolList hashes) + { + GenerateTxLists(txCount, out types, out sizes, out hashes, out ArrayPoolList txs); + txs.Dispose(); + } + + private void GenerateTxLists(int txCount, out ArrayPoolList types, out ArrayPoolList sizes, out ArrayPoolList hashes, out ArrayPoolList txs) { TxDecoder txDecoder = TxDecoder.Instance; types = new(txCount); sizes = new(txCount); hashes = new(txCount); + txs = new(txCount); for (int i = 0; i < txCount; ++i) { - Transaction tx = Build.A.Transaction.WithType((TxType)(i % 3)).WithData(new byte[i]) + Transaction tx = Build.A.Transaction.WithType((TxType)(i % 3)).SignedAndResolved().WithData(new byte[i]) .WithHash(i % 2 == 0 ? TestItem.KeccakA : TestItem.KeccakB).TestObject; types.Add((byte)tx.Type); sizes.Add(txDecoder.GetLength(tx, RlpBehaviors.None)); hashes.Add(tx.Hash); + txs.Add(tx); } } } diff --git a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/ProtocolHandlerBase.cs b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/ProtocolHandlerBase.cs index e900dbaa0fb0..3d983ab69f55 100644 --- a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/ProtocolHandlerBase.cs +++ b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/ProtocolHandlerBase.cs @@ -16,7 +16,6 @@ using Nethermind.Serialization.Rlp; using Nethermind.Stats; using Nethermind.Stats.Model; -using Nethermind.Synchronization; namespace Nethermind.Network.P2P.ProtocolHandlers { @@ -90,6 +89,8 @@ protected T Deserialize(IByteBuffer data) where T : P2PMessage int originalReaderIndex = data.ReaderIndex; T result = _serializer.Deserialize(data); if (data.IsReadable()) ThrowIncompleteDeserializationException(data, originalReaderIndex); + Logger.Warn($"{Counter} Got {typeof(T).Name}"); + return result; } catch (RlpLimitException e) @@ -113,7 +114,7 @@ private static void ThrowIncompleteDeserializationException(IByteBuffer data, in protected internal void Send(T message) where T : P2PMessage { Interlocked.Increment(ref Counter); - if (Logger.IsTrace) Logger.Trace($"{Counter} Sending {typeof(T).Name}"); + Logger.Warn($"{Counter} Sending {typeof(T).Name}"); if (NetworkDiagTracer.IsEnabled) { string messageString = message.ToString(); diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Eth62ProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Eth62ProtocolHandler.cs index d0857c003cd2..ac6795f86d1f 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Eth62ProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V62/Eth62ProtocolHandler.cs @@ -55,10 +55,7 @@ public Eth62ProtocolHandler(ISession session, EnsureGossipPolicy(); } - public void DisableTxFiltering() - { - _floodController.IsEnabled = false; - } + public void DisableTxFiltering() => _floodController.IsEnabled = false; public override byte ProtocolVersion => EthVersions.Eth62; public override string ProtocolCode => Protocol.Eth; @@ -242,7 +239,7 @@ protected void Handle(TransactionsMessage msg) BackgroundTaskScheduler.ScheduleBackgroundTask((iList, 0), _handleSlow); } - private ValueTask HandleSlow((IOwnedReadOnlyList txs, int startIndex) request, CancellationToken cancellationToken) + protected virtual ValueTask HandleSlow((IOwnedReadOnlyList txs, int startIndex) request, CancellationToken cancellationToken) { IOwnedReadOnlyList transactions = request.txs; ReadOnlySpan transactionsSpan = transactions.AsSpan(); @@ -250,6 +247,7 @@ private ValueTask HandleSlow((IOwnedReadOnlyList txs, int startInde { int startIdx = request.startIndex; bool isTrace = Logger.IsTrace; + for (int i = startIdx; i < transactionsSpan.Length; i++) { if (cancellationToken.IsCancellationRequested) diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/Eth65ProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/Eth65ProtocolHandler.cs index 3c6d6174d584..6861021228a8 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/Eth65ProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/Eth65ProtocolHandler.cs @@ -223,6 +223,7 @@ protected virtual void RequestPooledTransactions(I private ArrayPoolList AddMarkUnknownHashes(ReadOnlySpan hashes) { ArrayPoolList discoveredTxHashesAndSizes = new(hashes.Length); + for (int i = 0; i < hashes.Length; i++) { Hash256 hash = hashes[i]; @@ -240,7 +241,7 @@ private ArrayPoolList AddMarkUnknownHashes(ReadOnlySpan hashes public virtual void HandleMessage(PooledTransactionRequestMessage message) { - ArrayPoolList hashesToRetry = new(1) { new Hash256(message.TxHash) }; + using ArrayPoolList hashesToRetry = new(1) { new Hash256(message.TxHash) }; RequestPooledTransactions(hashesToRetry); } } diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandler.cs index 093532f45122..8b44ac53396f 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V68/Eth68ProtocolHandler.cs @@ -4,6 +4,7 @@ using Nethermind.Consensus; using Nethermind.Consensus.Scheduler; using Nethermind.Core; +using Nethermind.Core.Caching; using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; @@ -20,6 +21,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; namespace Nethermind.Network.P2P.Subprotocols.Eth.V68; @@ -45,6 +48,8 @@ public class Eth68ProtocolHandler(ISession session, ? long.MaxValue : txPoolConfig.MaxBlobTxSize.Value + (long)specProvider.GetFinalMaxBlobGasPerBlock(); + private ClockCache TxShapeAnnouncements { get; } = new(MemoryAllowance.TxHashCacheSize / 10); + public override string Name => "eth68"; public override byte ProtocolVersion => EthVersions.Eth68; @@ -125,6 +130,7 @@ protected void RequestPooledTransactions(IOwnedReadOnlyList hashes, IOw Hash256 hash = hashes[index]; int txSize = sizes[index]; TxType txType = (TxType)types[index]; + TxShapeAnnouncements.Set(hash, (txSize, txType)); long maxTxSize = txType.SupportsBlobs() ? _configuredMaxBlobTxSize : _configuredMaxTxSize; @@ -239,4 +245,21 @@ private void SendMessage(IOwnedReadOnlyList types, IOwnedReadOnlyList NewPooledTransactionHashesMessage68 message = new(types, sizes, hashes); Send(message); } + + protected override ValueTask HandleSlow((IOwnedReadOnlyList txs, int startIndex) request, CancellationToken cancellationToken) + { + int startIdx = request.startIndex; + for (int i = startIdx; i < request.txs.Count; i++) + { + if (!ValidateSizeAndType(request.txs[i])) + { + throw new SubprotocolException("invalid pooled tx type or size"); + } + } + + return base.HandleSlow(request, cancellationToken); + } + + private bool ValidateSizeAndType(Transaction tx) + => !TxShapeAnnouncements.Delete(tx.Hash!, out (int Size, TxType Type) txShape) || (tx.GetLength() == txShape.Size && tx.Type == txShape.Type); } diff --git a/src/Nethermind/Nethermind.TxPool/RetryCache.cs b/src/Nethermind/Nethermind.TxPool/RetryCache.cs index 130c6fc344dd..13cbf60b5a96 100644 --- a/src/Nethermind/Nethermind.TxPool/RetryCache.cs +++ b/src/Nethermind/Nethermind.TxPool/RetryCache.cs @@ -18,7 +18,6 @@ public class RetryCache { private readonly int _timeoutMs; private readonly int _checkMs; - private readonly int _requestingCacheSize; private readonly ConcurrentDictionary>> _retryRequests = new(); private readonly ConcurrentQueue<(TResourceId ResourceId, DateTimeOffset ExpiresAfter)> _expiringQueue = new(); @@ -31,8 +30,7 @@ public RetryCache(ILogManager logManager, int timeoutMs = 2500, int requestingCa _timeoutMs = timeoutMs; _checkMs = _timeoutMs / 5; - _requestingCacheSize = requestingCacheSize; - _requestingResources = new(_requestingCacheSize); + _requestingResources = new(requestingCacheSize); Task.Run(async () => { @@ -46,23 +44,26 @@ public RetryCache(ILogManager logManager, int timeoutMs = 2500, int requestingCa if (_retryRequests.TryRemove(item.ResourceId, out PooledSet>? requests)) { - if (requests.Count > 0) + using (requests) { - _requestingResources.Set(item.ResourceId); - } + if (requests.Count > 0) + { + _requestingResources.Set(item.ResourceId); + } - if (_logger.IsTrace) _logger.Trace($"Sending retry requests for {item.ResourceId} after timeout"); + if (_logger.IsWarn) _logger.Warn($"Sending retry requests for {item.ResourceId} after timeout"); - foreach (IMessageHandler retryHandler in requests) - { - try - { - retryHandler.HandleMessage(TMessage.New(item.ResourceId)); - } - catch (Exception ex) + foreach (IMessageHandler retryHandler in requests) { - if (_logger.IsTrace) _logger.Error($"Failed to send retry request to {retryHandler} for {item.ResourceId}", ex); + try + { + retryHandler.HandleMessage(TMessage.New(item.ResourceId)); + } + catch (Exception ex) + { + if (_logger.IsWarn) _logger.Error($"Failed to send retry request to {retryHandler} for {item.ResourceId}", ex); + } } } } @@ -79,7 +80,7 @@ public AnnounceResult Announced(TResourceId resourceId, IMessageHandler { - if (_logger.IsTrace) _logger.Trace($"Announced {resourceId} by {retryHandler}: NEW"); + if (_logger.IsWarn) _logger.Warn($"Announced {resourceId} by {retryHandler}: NEW"); _expiringQueue.Enqueue((resourceId, DateTimeOffset.UtcNow.AddMilliseconds(_timeoutMs))); added = true; @@ -87,7 +88,7 @@ public AnnounceResult Announced(TResourceId resourceId, IMessageHandler { - if (_logger.IsTrace) _logger.Trace($"Announced {resourceId} by {retryHandler}: UPDATE"); + if (_logger.IsWarn) _logger.Warn($"Announced {resourceId} by {retryHandler}: UPDATE"); dict.Add(retryHandler); return dict; @@ -96,16 +97,17 @@ public AnnounceResult Announced(TResourceId resourceId, IMessageHandler>? _); + _requestingResources.Delete(resourceId); } } diff --git a/src/Nethermind/Nethermind.TxPool/TxPool.cs b/src/Nethermind/Nethermind.TxPool/TxPool.cs index 59e9fbd0143e..d6c8275f1271 100644 --- a/src/Nethermind/Nethermind.TxPool/TxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/TxPool.cs @@ -489,8 +489,6 @@ public void RemovePeer(PublicKey nodeId) public AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions) { - _retryCache.Received(tx.Hash!); - bool startBroadcast = _txPoolConfig.PersistentBroadcastEnabled && (handlingOptions & TxHandlingOptions.PersistentBroadcast) == TxHandlingOptions.PersistentBroadcast; @@ -500,6 +498,7 @@ public AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions // If local tx allow it to be accepted even when syncing !startBroadcast) { + _retryCache.Received(tx.Hash!); return AcceptTxResult.Syncing; } @@ -539,6 +538,11 @@ public AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions _newHeadLock.ExitReadLock(); } + if (accepted != AcceptTxResult.Invalid) + { + _retryCache.Received(tx.Hash!); + } + if (accepted) { // Clear proper snapshot