Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
36a0f7e
PoC of delayed sending if requested
flcl42 Sep 8, 2025
d8294a2
Add retry cache
flcl42 Sep 9, 2025
bcc06a3
Merge remote-tracking branch 'origin/master' into dont-cache-requeste…
flcl42 Sep 10, 2025
1d29ad5
Fix logs and multiple requests expiration
flcl42 Sep 29, 2025
2adbcbe
Merge remote-tracking branch 'origin/master' into dont-cache-requeste…
flcl42 Sep 29, 2025
fb8e9e3
Add tests
flcl42 Oct 1, 2025
a474531
Rename
flcl42 Oct 1, 2025
b831531
Add more tests
flcl42 Oct 3, 2025
5cfaf85
Improve
flcl42 Oct 6, 2025
0825d2d
Simplify dependencies
flcl42 Oct 6, 2025
9414c22
Improve
flcl42 Oct 6, 2025
62b1ca8
Rename
flcl42 Oct 6, 2025
a49c3b7
Merge remote-tracking branch 'origin/master' into dont-cache-requeste…
flcl42 Oct 6, 2025
d883a0a
no requestoor draft
flcl42 Oct 10, 2025
d5e60b2
Use messages
flcl42 Oct 15, 2025
34ec276
Merge remote-tracking branch 'origin/master' into dont-cache-requeste…
flcl42 Oct 15, 2025
aab4e49
Remove the requestoor
flcl42 Oct 15, 2025
805f8df
Fix tests
flcl42 Oct 15, 2025
f2db9a6
Fix tests
flcl42 Oct 15, 2025
f615bd5
More fixes
flcl42 Oct 15, 2025
ca79d70
Merge remote-tracking branch 'origin/master' into dont-cache-requeste…
flcl42 Oct 16, 2025
78e4de6
Substitute
flcl42 Oct 16, 2025
d41d386
Disconnect peers with invalid txs
flcl42 Oct 16, 2025
a197f6c
Update src/Nethermind/Nethermind.Network.Contract/Messages/IResourceR…
flcl42 Oct 16, 2025
1c8f173
Merge branch 'dont-cache-requested-txhashes' of github.com:Nethermind…
flcl42 Oct 16, 2025
eb2213e
Merge remote-tracking branch 'origin/master' into dont-cache-requeste…
flcl42 Oct 16, 2025
dc8f823
Compress code
flcl42 Oct 20, 2025
3605567
Try pooled set
flcl42 Oct 21, 2025
f06ff7d
Handle as eth66
flcl42 Oct 21, 2025
0d8fdd3
Fix review
flcl42 Oct 21, 2025
83eee29
Fix tests
flcl42 Oct 21, 2025
ff5da9a
No need in syncing handling
flcl42 Oct 22, 2025
152a0a4
Validate sizes and types
flcl42 Oct 22, 2025
53d16f0
Add a test
flcl42 Oct 22, 2025
f0011fa
Merge remote-tracking branch 'origin/master' into validate-sizes-and-…
flcl42 Oct 23, 2025
8242e69
Fix test
flcl42 Oct 23, 2025
011af56
Add tests
flcl42 Oct 23, 2025
4698baf
Merge branch 'master' into validate-sizes-and-types
flcl42 Oct 23, 2025
35b1382
Fix test
flcl42 Oct 23, 2025
f70d145
Code style
flcl42 Oct 23, 2025
7e4535b
Rollback
flcl42 Oct 23, 2025
45349cf
Fix tests
flcl42 Oct 23, 2025
44ef48b
Move to proper class
flcl42 Oct 24, 2025
7910ec6
Rollback that rollback
flcl42 Oct 24, 2025
b582c66
Merge branch 'master' into validate-sizes-and-types
flcl42 Oct 24, 2025
73ef0c9
Merge branch 'validate-sizes-and-types' of github.com:NethermindEth/n…
flcl42 Oct 24, 2025
3ffb44c
Mark not invalid txs as received
flcl42 Oct 24, 2025
b8c9b52
Move
flcl42 Oct 24, 2025
f6b8876
Fix dispose
flcl42 Oct 24, 2025
9229a78
Fix
flcl42 Oct 24, 2025
e34e86f
Moar
flcl42 Oct 24, 2025
bd5b964
Clean up cache
flcl42 Oct 24, 2025
2a6af80
Using
flcl42 Oct 24, 2025
6fb8387
Merge branch 'master' into validate-sizes-and-types
flcl42 Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/Nethermind/Nethermind.Core.Test/Caching/ClockCacheTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
Expand Down Expand Up @@ -242,6 +242,19 @@ public void Can_delete()
cache.Delete(_addresses[0]).Should().BeFalse();
}

[Test]
public void Delete_returns_value()
{
Cache cache = Create();
cache.Set(_addresses[0], _accounts[0]);

cache.Delete(_addresses[0], out Account? value).Should().BeTrue();
value.Should().Be(_accounts[0]);

cache.Delete(_addresses[0], out Account? noValue).Should().BeFalse();
noValue.Should().BeNull();
}

[Test]
public void Clear_should_free_all_capacity()
{
Expand Down
16 changes: 12 additions & 4 deletions src/Nethermind/Nethermind.Core/Caching/ClockCache.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
Expand Down Expand Up @@ -136,9 +136,15 @@ void ThrowInvalidOperationException()
}
}

public bool Delete(TKey key)
public bool Delete(TKey key) => Delete(key, out _);

public bool Delete(TKey key, [NotNullWhen(true)] out TValue? value)
{
if (MaxCapacity == 0) return false;
if (MaxCapacity == 0)
{
value = default;
return false;
}

using var lockRelease = _lock.Acquire();

Expand All @@ -148,9 +154,11 @@ public bool Delete(TKey key)
KeyToOffset[ov.Offset] = default;
ClearAccessed(ov.Offset);
FreeOffsets.Enqueue(ov.Offset);
return true;
value = ov.Value;
return ov.Value != null;
}

value = default;
return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Net;
using DotNetty.Buffers;
using FluentAssertions;
using Nethermind.Consensus;
Expand All @@ -19,6 +17,7 @@
using Nethermind.Network.P2P.Messages;
using Nethermind.Network.P2P.Subprotocols;
using Nethermind.Network.P2P.Subprotocols.Eth.V62.Messages;
using Nethermind.Network.P2P.Subprotocols.Eth.V66;
using Nethermind.Network.P2P.Subprotocols.Eth.V66.Messages;
using Nethermind.Network.P2P.Subprotocols.Eth.V68;
using Nethermind.Network.P2P.Subprotocols.Eth.V68.Messages;
Expand All @@ -31,6 +30,8 @@
using Nethermind.TxPool;
using NSubstitute;
using NUnit.Framework;
using System;
using System.Net;

namespace Nethermind.Network.Test.P2P.Subprotocols.Eth.V68;

Expand Down Expand Up @@ -146,6 +147,38 @@ public void Should_throw_when_sizes_doesnt_match(bool removeSize)
action.Should().Throw<SubprotocolException>();
}


[Test]
public void Should_disconnect_if_tx_size_is_wrong()
{
GenerateTxLists(4, out ArrayPoolList<byte> types, out ArrayPoolList<int> sizes, out ArrayPoolList<Hash256> hashes, out ArrayPoolList<Transaction> txs);
sizes[0] += 10;
using NewPooledTransactionHashesMessage68 hashesMsg = new(types, sizes, hashes);
using PooledTransactionsMessage txsMsg = new(1111, new(txs));

HandleIncomingStatusMessage();
HandleZeroMessage(hashesMsg, Eth68MessageCode.NewPooledTransactionHashes);
HandleZeroMessage(txsMsg, Eth66MessageCode.PooledTransactions);

_session.Received().InitiateDisconnect(DisconnectReason.BackgroundTaskFailure, "invalid pooled tx type or size");
}


[Test]
public void Should_disconnect_if_tx_type_is_wrong()
{
GenerateTxLists(4, out ArrayPoolList<byte> types, out ArrayPoolList<int> sizes, out ArrayPoolList<Hash256> hashes, out ArrayPoolList<Transaction> txs);
types[0]++;
using NewPooledTransactionHashesMessage68 hashesMsg = new(types, sizes, hashes);
using PooledTransactionsMessage txsMsg = new(1111, new(txs));

HandleIncomingStatusMessage();
HandleZeroMessage(hashesMsg, Eth68MessageCode.NewPooledTransactionHashes);
HandleZeroMessage(txsMsg, Eth66MessageCode.PooledTransactions);

_session.Received().InitiateDisconnect(DisconnectReason.BackgroundTaskFailure, "invalid pooled tx type or size");
}

[Test]
public void Should_process_huge_transaction()
{
Expand Down Expand Up @@ -273,20 +306,28 @@ private void HandleZeroMessage<T>(T msg, byte messageCode) where T : MessageBase
}

private void GenerateLists(int txCount, out ArrayPoolList<byte> types, out ArrayPoolList<int> sizes, out ArrayPoolList<Hash256> hashes)
{
GenerateTxLists(txCount, out types, out sizes, out hashes, out ArrayPoolList<Transaction> txs);
txs.Dispose();
}

private void GenerateTxLists(int txCount, out ArrayPoolList<byte> types, out ArrayPoolList<int> sizes, out ArrayPoolList<Hash256> hashes, out ArrayPoolList<Transaction> txs)
{
TxDecoder txDecoder = TxDecoder.Instance;
types = new(txCount);
sizes = new(txCount);
hashes = new(txCount);
txs = new(txCount);

for (int i = 0; i < txCount; ++i)
{
Transaction tx = Build.A.Transaction.WithType((TxType)(i % 3)).WithData(new byte[i])
Transaction tx = Build.A.Transaction.WithType((TxType)(i % 3)).SignedAndResolved().WithData(new byte[i])
.WithHash(i % 2 == 0 ? TestItem.KeccakA : TestItem.KeccakB).TestObject;

types.Add((byte)tx.Type);
sizes.Add(txDecoder.GetLength(tx, RlpBehaviors.None));
hashes.Add(tx.Hash);
txs.Add(tx);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using Nethermind.Serialization.Rlp;
using Nethermind.Stats;
using Nethermind.Stats.Model;
using Nethermind.Synchronization;

namespace Nethermind.Network.P2P.ProtocolHandlers
{
Expand Down Expand Up @@ -90,6 +89,8 @@ protected T Deserialize<T>(IByteBuffer data) where T : P2PMessage
int originalReaderIndex = data.ReaderIndex;
T result = _serializer.Deserialize<T>(data);
if (data.IsReadable()) ThrowIncompleteDeserializationException(data, originalReaderIndex);
Logger.Warn($"{Counter} Got {typeof(T).Name}");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here the same


return result;
}
catch (RlpLimitException e)
Expand All @@ -113,7 +114,7 @@ private static void ThrowIncompleteDeserializationException(IByteBuffer data, in
protected internal void Send<T>(T message) where T : P2PMessage
{
Interlocked.Increment(ref Counter);
if (Logger.IsTrace) Logger.Trace($"{Counter} Sending {typeof(T).Name}");
Logger.Warn($"{Counter} Sending {typeof(T).Name}");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warn? Seems like overlooked revert

if (NetworkDiagTracer.IsEnabled)
{
string messageString = message.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ public Eth62ProtocolHandler(ISession session,
EnsureGossipPolicy();
}

public void DisableTxFiltering()
{
_floodController.IsEnabled = false;
}
public void DisableTxFiltering() => _floodController.IsEnabled = false;

public override byte ProtocolVersion => EthVersions.Eth62;
public override string ProtocolCode => Protocol.Eth;
Expand Down Expand Up @@ -242,14 +239,15 @@ protected void Handle(TransactionsMessage msg)
BackgroundTaskScheduler.ScheduleBackgroundTask((iList, 0), _handleSlow);
}

private ValueTask HandleSlow((IOwnedReadOnlyList<Transaction> txs, int startIndex) request, CancellationToken cancellationToken)
protected virtual ValueTask HandleSlow((IOwnedReadOnlyList<Transaction> txs, int startIndex) request, CancellationToken cancellationToken)
{
IOwnedReadOnlyList<Transaction> transactions = request.txs;
ReadOnlySpan<Transaction> transactionsSpan = transactions.AsSpan();
try
{
int startIdx = request.startIndex;
bool isTrace = Logger.IsTrace;

for (int i = startIdx; i < transactionsSpan.Length; i++)
{
if (cancellationToken.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ protected virtual void RequestPooledTransactions<GetPooledTransactionsMessage>(I
private ArrayPoolList<Hash256> AddMarkUnknownHashes(ReadOnlySpan<Hash256> hashes)
{
ArrayPoolList<Hash256> discoveredTxHashesAndSizes = new(hashes.Length);

for (int i = 0; i < hashes.Length; i++)
{
Hash256 hash = hashes[i];
Expand All @@ -240,7 +241,7 @@ private ArrayPoolList<Hash256> AddMarkUnknownHashes(ReadOnlySpan<Hash256> hashes

public virtual void HandleMessage(PooledTransactionRequestMessage message)
{
ArrayPoolList<Hash256> hashesToRetry = new(1) { new Hash256(message.TxHash) };
using ArrayPoolList<Hash256> hashesToRetry = new(1) { new Hash256(message.TxHash) };
RequestPooledTransactions<GetPooledTransactionsMessage>(hashesToRetry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Nethermind.Consensus;
using Nethermind.Consensus.Scheduler;
using Nethermind.Core;
using Nethermind.Core.Caching;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
Expand All @@ -20,6 +21,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Network.P2P.Subprotocols.Eth.V68;

Expand All @@ -45,6 +48,8 @@ public class Eth68ProtocolHandler(ISession session,
? long.MaxValue
: txPoolConfig.MaxBlobTxSize.Value + (long)specProvider.GetFinalMaxBlobGasPerBlock();

private ClockCache<ValueHash256, (int, TxType)> TxShapeAnnouncements { get; } = new(MemoryAllowance.TxHashCacheSize / 10);

public override string Name => "eth68";

public override byte ProtocolVersion => EthVersions.Eth68;
Expand Down Expand Up @@ -125,6 +130,7 @@ protected void RequestPooledTransactions(IOwnedReadOnlyList<Hash256> hashes, IOw
Hash256 hash = hashes[index];
int txSize = sizes[index];
TxType txType = (TxType)types[index];
TxShapeAnnouncements.Set(hash, (txSize, txType));

long maxTxSize = txType.SupportsBlobs() ? _configuredMaxBlobTxSize : _configuredMaxTxSize;

Expand Down Expand Up @@ -239,4 +245,21 @@ private void SendMessage(IOwnedReadOnlyList<byte> types, IOwnedReadOnlyList<int>
NewPooledTransactionHashesMessage68 message = new(types, sizes, hashes);
Send(message);
}

protected override ValueTask HandleSlow((IOwnedReadOnlyList<Transaction> txs, int startIndex) request, CancellationToken cancellationToken)
{
int startIdx = request.startIndex;
for (int i = startIdx; i < request.txs.Count; i++)
{
if (!ValidateSizeAndType(request.txs[i]))
{
throw new SubprotocolException("invalid pooled tx type or size");
}
}

return base.HandleSlow(request, cancellationToken);
}

private bool ValidateSizeAndType(Transaction tx)
=> !TxShapeAnnouncements.Delete(tx.Hash!, out (int Size, TxType Type) txShape) || (tx.GetLength() == txShape.Size && tx.Type == txShape.Type);
}
40 changes: 21 additions & 19 deletions src/Nethermind/Nethermind.TxPool/RetryCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class RetryCache<TMessage, TResourceId>
{
private readonly int _timeoutMs;
private readonly int _checkMs;
private readonly int _requestingCacheSize;

private readonly ConcurrentDictionary<TResourceId, PooledSet<IMessageHandler<TMessage>>> _retryRequests = new();
private readonly ConcurrentQueue<(TResourceId ResourceId, DateTimeOffset ExpiresAfter)> _expiringQueue = new();
Expand All @@ -31,8 +30,7 @@ public RetryCache(ILogManager logManager, int timeoutMs = 2500, int requestingCa

_timeoutMs = timeoutMs;
_checkMs = _timeoutMs / 5;
_requestingCacheSize = requestingCacheSize;
_requestingResources = new(_requestingCacheSize);
_requestingResources = new(requestingCacheSize);

Task.Run(async () =>
{
Expand All @@ -46,23 +44,26 @@ public RetryCache(ILogManager logManager, int timeoutMs = 2500, int requestingCa

if (_retryRequests.TryRemove(item.ResourceId, out PooledSet<IMessageHandler<TMessage>>? requests))
{
if (requests.Count > 0)
using (requests)
{
_requestingResources.Set(item.ResourceId);
}
if (requests.Count > 0)
{
_requestingResources.Set(item.ResourceId);
}

if (_logger.IsTrace) _logger.Trace($"Sending retry requests for {item.ResourceId} after timeout");
if (_logger.IsWarn) _logger.Warn($"Sending retry requests for {item.ResourceId} after timeout");


foreach (IMessageHandler<TMessage> retryHandler in requests)
{
try
{
retryHandler.HandleMessage(TMessage.New(item.ResourceId));
}
catch (Exception ex)
foreach (IMessageHandler<TMessage> retryHandler in requests)
{
if (_logger.IsTrace) _logger.Error($"Failed to send retry request to {retryHandler} for {item.ResourceId}", ex);
try
{
retryHandler.HandleMessage(TMessage.New(item.ResourceId));
}
catch (Exception ex)
{
if (_logger.IsWarn) _logger.Error($"Failed to send retry request to {retryHandler} for {item.ResourceId}", ex);
}
}
}
}
Expand All @@ -79,15 +80,15 @@ public AnnounceResult Announced(TResourceId resourceId, IMessageHandler<TMessage

_retryRequests.AddOrUpdate(resourceId, (resourceId) =>
{
if (_logger.IsTrace) _logger.Trace($"Announced {resourceId} by {retryHandler}: NEW");
if (_logger.IsWarn) _logger.Warn($"Announced {resourceId} by {retryHandler}: NEW");

_expiringQueue.Enqueue((resourceId, DateTimeOffset.UtcNow.AddMilliseconds(_timeoutMs)));
added = true;

return [];
}, (resourceId, dict) =>
{
if (_logger.IsTrace) _logger.Trace($"Announced {resourceId} by {retryHandler}: UPDATE");
if (_logger.IsWarn) _logger.Warn($"Announced {resourceId} by {retryHandler}: UPDATE");

dict.Add(retryHandler);
return dict;
Expand All @@ -96,16 +97,17 @@ public AnnounceResult Announced(TResourceId resourceId, IMessageHandler<TMessage
return added ? AnnounceResult.New : AnnounceResult.Enqueued;
}

if (_logger.IsTrace) _logger.Trace($"Announced {resourceId} by {retryHandler}, but a retry is in progress already, immidietly firing");
if (_logger.IsWarn) _logger.Warn($"Announced {resourceId} by {retryHandler}, but a retry is in progress already, immidietly firing");

return AnnounceResult.New;
}

public void Received(TResourceId resourceId)
{
if (_logger.IsTrace) _logger.Trace($"Received {resourceId}");
if (_logger.IsWarn) _logger.Warn($"Received {resourceId}");

_retryRequests.TryRemove(resourceId, out PooledSet<IMessageHandler<TMessage>>? _);
_requestingResources.Delete(resourceId);
}
}

Expand Down
Loading