From 67d83216dc6c41cf99e97d53b1f2ff15b19595f6 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Fri, 6 Mar 2026 15:33:35 +0100 Subject: [PATCH] Remove pre-eth/68 transaction announcement support and limit pooled tx requests by size Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + .../TransactionAnnouncementDecoder.java | 39 +----- ...dGetPooledTransactionsFromPeerFetcher.java | 32 ++++- .../NewPooledTransactionHashesMessage.java | 8 +- ...oledTransactionHashesMessageProcessor.java | 25 ++-- .../transactions/TransactionAnnouncement.java | 49 +------ .../transactions/TransactionPoolFactory.java | 9 +- ...PooledTransactionsFromPeerFetcherTest.java | 129 +++++++++++++----- ...NewPooledTransactionHashesMessageTest.java | 9 +- ...TransactionHashesMessageProcessorTest.java | 83 +++-------- ...ledTransactionHashesMessageSenderTest.java | 23 ++-- .../TransactionPoolFactoryTest.java | 3 +- 12 files changed, 189 insertions(+), 221 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 391367f5894..83302dee4fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ - Add `-Pcases` case name filtering to JMH benchmark suite [#9982](https://github.com/hyperledger/besu/pull/9982) - Use JDK SHA-256 provider to leverage hardware SHA-NI instructions instead of BouncyCastle [#9924](https://github.com/hyperledger/besu/pull/9924) - Support [EIP-7975](https://eips.ethereum.org/EIPS/eip-7975): eth/70 - partial block receipt lists +- Limit pooled tx requests by size and remove pre-eth/68 transaction announcement support [#9990](https://github.com/besu-eth/besu/pull/9990) ## 26.2.0 diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java index 2e91386aa58..2d08cbaa96a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java @@ -16,7 +16,6 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.datatypes.TransactionType; -import org.hyperledger.besu.ethereum.eth.EthProtocolVersion; import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability; import org.hyperledger.besu.ethereum.rlp.RLPException; @@ -24,9 +23,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; - -import org.apache.tuweni.bytes.Bytes; public class TransactionAnnouncementDecoder { @@ -44,24 +40,7 @@ public interface Decoder { * @return the correct decoder */ public static Decoder getDecoder(final Capability capability) { - if (capability.getVersion() >= EthProtocolVersion.V68) { - return TransactionAnnouncementDecoder::decodeForEth68; - } else { - return TransactionAnnouncementDecoder::decodeForEth66; - } - } - - /** - * Decode the list of transactions in the NewPooledTransactionHashesMessage - * - * @param input input used to decode the NewPooledTransactionHashesMessage before Eth/68 - *

format: [hash_0: B_32, hash_1: B_32, ...] - * @return the list of TransactionAnnouncement decoded from the message. Only hash is present. - * size and type will return an Optional.empty() - */ - private static List decodeForEth66(final RLPInput input) { - final List hashes = input.readList(rlp -> Hash.wrap(rlp.readBytes32())); - return hashes.stream().map(TransactionAnnouncement::new).collect(Collectors.toList()); + return TransactionAnnouncementDecoder::decodeForEth68; } /** @@ -84,21 +63,7 @@ private static List decodeForEth68(final RLPInput input types.add(transactionType); } - List sizes = - input.readList( - in -> { - // for backward compatibility with previous Besu implementation be lenient and support - // also unsigned int with leading zeros. - // ToDo: this could be replaced with the simpler `RLPInput::readUnsignedIntScalar` - // after some months it has been released, since most of the Besus - // will be using the new implementation. - final Bytes intBytes = in.readBytes(); - if (intBytes.size() > 4) { - throw new RLPException( - "Expected max 4 bytes for unsigned int, but got " + intBytes.size() + " bytes"); - } - return intBytes.toLong(); - }); + final List sizes = input.readList(RLPInput::readUnsignedIntScalar); final List hashes = input.readList(rlp -> Hash.wrap(rlp.readBytes32())); input.leaveList(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java index 5c9e144038e..5b7f9bcbacb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java @@ -24,6 +24,7 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetPooledTransactionsFromPeerTask; import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker; +import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; @@ -51,7 +52,8 @@ public class BufferedGetPooledTransactionsFromPeerFetcher { private final String metricLabel; private final ScheduledFuture scheduledFuture; private final EthPeer peer; - private final Queue txAnnounces; + private final Queue txAnnounces; + private final int maxTransactionsMessageSize; public BufferedGetPooledTransactionsFromPeerFetcher( final EthContext ethContext, @@ -59,6 +61,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher( final EthPeer peer, final TransactionPool transactionPool, final PeerTransactionTracker transactionTracker, + final int maxTransactionsMessageSize, final TransactionPoolMetrics metrics, final String metricLabel) { this.ethContext = ethContext; @@ -70,6 +73,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher( this.metricLabel = metricLabel; this.txAnnounces = Queues.synchronizedQueue(EvictingQueue.create(DEFAULT_MAX_PENDING_TRANSACTIONS)); + this.maxTransactionsMessageSize = maxTransactionsMessageSize; } public ScheduledFuture getScheduledFuture() { @@ -124,20 +128,36 @@ public void requestTransactions() { } } - public void addHashes(final Collection hashes) { - txAnnounces.addAll(hashes); + public void addAnnouncements(final Collection announcements) { + txAnnounces.addAll(announcements); } private List getTxHashesToRetrieve() { final List toRetrieve = new ArrayList<>(MAX_HASHES); int discarded = 0; + long cumulativeSize = 0; while (toRetrieve.size() < MAX_HASHES && !txAnnounces.isEmpty()) { - final Hash txHashAnnounced = txAnnounces.poll(); - if (!transactionTracker.hasSeenTransaction(txHashAnnounced)) { - toRetrieve.add(txHashAnnounced); + final TransactionAnnouncement txAnnounced = txAnnounces.peek(); + if (!transactionTracker.hasSeenTransaction(txAnnounced.hash())) { + if (cumulativeSize + txAnnounced.size() > maxTransactionsMessageSize) { + // defense in case maxTransactionsMessageSize is set too small + // this avoids an infinite loop if the first announcement is oversized + if (txAnnounced.size() > maxTransactionsMessageSize) { + LOG.warn( + "maxTransactionsMessageSize ({} bytes) is set too small to fetch tx announcement {}", + maxTransactionsMessageSize, + txAnnounced); + txAnnounces.remove(); + } + // max size reached + break; + } + toRetrieve.add(txAnnounced.hash()); + cumulativeSize += txAnnounced.size(); } else { discarded++; } + txAnnounces.remove(); } final int alreadySeenCount = discarded; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java index 6056e7dab1c..3a81fc06d25 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java @@ -17,7 +17,6 @@ import static org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementDecoder.getDecoder; import static org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementEncoder.getEncoder; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractMessageData; @@ -67,15 +66,10 @@ public static NewPooledTransactionHashesMessage readFrom( return new NewPooledTransactionHashesMessage(message.getData(), capability); } - @VisibleForTesting - public List pendingTransactions() { + public List pendingTransactionAnnouncements() { if (pendingTransactions == null) { pendingTransactions = getDecoder(capability).decode(RLP.input(data)); } return pendingTransactions; } - - public List pendingTransactionHashes() { - return pendingTransactions().stream().map(TransactionAnnouncement::getHash).toList(); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java index 7938900a03e..69f13822e23 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java @@ -16,7 +16,6 @@ import static java.time.Instant.now; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.BufferedGetPooledTransactionsFromPeerFetcher; @@ -48,13 +47,15 @@ public class NewPooledTransactionHashesMessageProcessor { private final TransactionPoolConfiguration transactionPoolConfiguration; private final EthContext ethContext; private final TransactionPoolMetrics metrics; + private final int maxTransactionsMessageSize; public NewPooledTransactionHashesMessageProcessor( final PeerTransactionTracker transactionTracker, final TransactionPool transactionPool, final TransactionPoolConfiguration transactionPoolConfiguration, final EthContext ethContext, - final TransactionPoolMetrics metrics) { + final TransactionPoolMetrics metrics, + final int maxTransactionsMessageSize) { this.transactionTracker = transactionTracker; this.transactionPool = transactionPool; this.transactionPoolConfiguration = transactionPoolConfiguration; @@ -62,6 +63,7 @@ public NewPooledTransactionHashesMessageProcessor( this.metrics = metrics; metrics.initExpiredMessagesCounter(METRIC_LABEL); this.scheduledTasks = new ConcurrentHashMap<>(); + this.maxTransactionsMessageSize = maxTransactionsMessageSize; } void processNewPooledTransactionHashesMessage( @@ -76,12 +78,12 @@ void processNewPooledTransactionHashesMessage( } else { LOG.atTrace() .setMessage( - "Ignoring expired transactions message: peer={}, latency={}, queuedAt={}, keepAlive={}, hashes={}") + "Ignoring expired transactions message: peer={}, latency={}, queuedAt={}, keepAlive={}, announcements={}") .addArgument(peer) .addArgument(latency) .addArgument(queueAt) .addArgument(keepAlive) - .addArgument(transactionsMessage::pendingTransactionHashes) + .addArgument(transactionsMessage::pendingTransactionAnnouncements) .log(); metrics.incrementExpiredMessages(METRIC_LABEL); } @@ -90,12 +92,14 @@ void processNewPooledTransactionHashesMessage( private void processNewPooledTransactionHashesMessage( final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) { try { - final List incomingTransactionHashes = transactionsMessage.pendingTransactionHashes(); + final List incomingTransactionAnnouncements = + transactionsMessage.pendingTransactionAnnouncements(); LOG.atTrace() - .setMessage("Received pooled transaction hashes message: peer={}, incoming hashes={}") + .setMessage( + "Received pooled transaction hashes message: peer={}, incoming announcements={}") .addArgument(peer) - .addArgument(incomingTransactionHashes) + .addArgument(incomingTransactionAnnouncements) .log(); final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask = @@ -120,13 +124,14 @@ private void processNewPooledTransactionHashesMessage( peer, transactionPool, transactionTracker, + maxTransactionsMessageSize, metrics, METRIC_LABEL); }); - bufferedTask.addHashes( - incomingTransactionHashes.stream() - .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty()) + bufferedTask.addAnnouncements( + incomingTransactionAnnouncements.stream() + .filter(ann -> transactionPool.getTransactionByHash(ann.hash()).isEmpty()) .toList()); } catch (final RLPException ex) { if (peer != null) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java index 8a5aeb28490..b0af58d2689 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java @@ -22,20 +22,8 @@ import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.Optional; - -public class TransactionAnnouncement { - private final Hash hash; - private final Optional type; - private final Optional size; - - public TransactionAnnouncement(final Hash hash) { - this.hash = checkNotNull(hash, "Hash cannot be null"); - this.type = Optional.empty(); - this.size = Optional.empty(); - } +public record TransactionAnnouncement(Hash hash, TransactionType type, Long size) { public TransactionAnnouncement(final Transaction transaction) { this( checkNotNull(transaction, "Transaction cannot be null").getHash(), @@ -45,20 +33,8 @@ public TransactionAnnouncement(final Transaction transaction) { public TransactionAnnouncement(final Hash hash, final TransactionType type, final Long size) { this.hash = checkNotNull(hash, "Hash cannot be null"); - this.type = Optional.of(checkNotNull(type, "Type cannot be null")); - this.size = Optional.of(checkNotNull(size, "Size cannot be null")); - } - - public Hash getHash() { - return hash; - } - - public Optional getType() { - return type; - } - - public Optional getSize() { - return size; + this.type = checkNotNull(type, "Type cannot be null"); + this.size = checkNotNull(size, "Size cannot be null"); } public static List create( @@ -74,23 +50,4 @@ public static List create( } return transactions; } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final TransactionAnnouncement that = (TransactionAnnouncement) o; - return Objects.equals(size, that.size) - && Objects.equals(type, that.type) - && Objects.equals(hash, that.hash); - } - - @Override - public int hashCode() { - return Objects.hash(hash, size, type); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index b100437eaa8..de5b7376bf1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -84,7 +84,8 @@ public static TransactionPool createTransactionPool( transactionsMessageSender, newPooledTransactionHashesMessageSender, blobCache, - miningConfiguration); + miningConfiguration, + ethProtocolConfiguration); } static TransactionPool createTransactionPool( @@ -99,7 +100,8 @@ static TransactionPool createTransactionPool( final TransactionsMessageSender transactionsMessageSender, final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender, final BlobCache blobCache, - final MiningConfiguration miningConfiguration) { + final MiningConfiguration miningConfiguration, + final EthProtocolConfiguration ethProtocolConfiguration) { final TransactionPool transactionPool = new TransactionPool( @@ -139,7 +141,8 @@ static TransactionPool createTransactionPool( transactionPool, transactionPoolConfiguration, ethContext, - metrics), + metrics, + ethProtocolConfiguration.getMaxTransactionsMessageSize()), transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds()); subscribeTransactionHandlers( diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java index 2ae4aa8340c..be815b1889b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java @@ -28,6 +28,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; @@ -35,7 +36,9 @@ import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode; import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult; +import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetPooledTransactionsFromPeerTask; import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker; +import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; @@ -45,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,6 +56,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -90,6 +95,7 @@ public void setup() { ethPeer, transactionPool, transactionTracker, + EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE, new TransactionPoolMetrics(metricsSystem), "new_pooled_transaction_hashes"); } @@ -99,25 +105,18 @@ public void requestTransactionShouldStartTaskWhenUnknownTransaction() { final Transaction transaction = generator.transaction(); final List taskResult = List.of(transaction); final PeerTaskExecutorResult> peerTaskResult = - new PeerTaskExecutorResult>( + new PeerTaskExecutorResult<>( Optional.of(taskResult), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer)); when(peerTaskExecutor.executeAgainstPeer( - any( - org.hyperledger.besu.ethereum.eth.manager.peertask.task - .GetPooledTransactionsFromPeerTask.class), - eq(ethPeer))) + any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer))) .thenReturn(peerTaskResult); - fetcher.addHashes(List.of(transaction.getHash())); + fetcher.addAnnouncements(List.of(new TransactionAnnouncement(transaction))); fetcher.requestTransactions(); verify(peerTaskExecutor) - .executeAgainstPeer( - any( - org.hyperledger.besu.ethereum.eth.manager.peertask.task - .GetPooledTransactionsFromPeerTask.class), - eq(ethPeer)); + .executeAgainstPeer(any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer)); verifyNoMoreInteractions(peerTaskExecutor); verify(transactionPool, times(1)).addRemoteTransactions(taskResult); @@ -126,29 +125,25 @@ public void requestTransactionShouldStartTaskWhenUnknownTransaction() { @Test public void requestTransactionShouldSplitRequestIntoSeveralTasks() { - final Map transactionsByHash = + final Map transactionsByAnnouncement = IntStream.range(0, 257) .mapToObj(unused -> generator.transaction()) - .collect(Collectors.toMap((t) -> t.getHash(), (t) -> t)); - fetcher.addHashes(transactionsByHash.keySet()); + .collect(Collectors.toMap(TransactionAnnouncement::new, Function.identity())); + final Map transactionsByHash = + transactionsByAnnouncement.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().hash(), Map.Entry::getValue)); + fetcher.addAnnouncements(transactionsByAnnouncement.keySet()); when(peerTaskExecutor.executeAgainstPeer( - any( - org.hyperledger.besu.ethereum.eth.manager.peertask.task - .GetPooledTransactionsFromPeerTask.class), - eq(ethPeer))) + any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer))) .thenAnswer( (invocationOnMock) -> { - org.hyperledger.besu.ethereum.eth.manager.peertask.task - .GetPooledTransactionsFromPeerTask - task = - invocationOnMock.getArgument( - 0, - org.hyperledger.besu.ethereum.eth.manager.peertask.task - .GetPooledTransactionsFromPeerTask.class); + GetPooledTransactionsFromPeerTask task = + invocationOnMock.getArgument(0, GetPooledTransactionsFromPeerTask.class); + List resultTransactions = task.getHashes().stream().map(transactionsByHash::get).toList(); - return new PeerTaskExecutorResult>( + return new PeerTaskExecutorResult<>( Optional.of(resultTransactions), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer)); @@ -157,11 +152,77 @@ public void requestTransactionShouldSplitRequestIntoSeveralTasks() { fetcher.requestTransactions(); verify(peerTaskExecutor, times(2)) - .executeAgainstPeer( - any( - org.hyperledger.besu.ethereum.eth.manager.peertask.task - .GetPooledTransactionsFromPeerTask.class), - eq(ethPeer)); + .executeAgainstPeer(any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer)); + verifyNoMoreInteractions(peerTaskExecutor); + } + + @Test + public void requestTransactionShouldSplitRequestWhenCumulativeSizeExceedsLimit() { + // DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE = 1 MB (1,048,576 bytes). + // The inner check is: if (cumulative + txSize > limit) break. + // With 2 announcements of 600 KB each: + // ann1: 0 + 600KB ≤ 1MB → added (cumulative = 600KB) + // ann2: 600KB + 600KB > 1MB → break, ann2 is not removed + // First batch = [ann1]. ann2 stays queued and is returned as the second batch. + final long largeSize = 600L * 1024; + final List announcements = + IntStream.range(0, 2) + .mapToObj(unused -> generator.transaction()) + .map(tx -> new TransactionAnnouncement(tx.getHash(), tx.getType(), largeSize)) + .toList(); + + when(peerTaskExecutor.executeAgainstPeer( + any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer))) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of()), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer))); + + fetcher.addAnnouncements(announcements); + fetcher.requestTransactions(); + + final ArgumentCaptor taskCaptor = + ArgumentCaptor.forClass(GetPooledTransactionsFromPeerTask.class); + + verify(peerTaskExecutor, times(2)).executeAgainstPeer(taskCaptor.capture(), eq(ethPeer)); + verifyNoMoreInteractions(peerTaskExecutor); + + assertThat(taskCaptor.getAllValues().stream().map(GetPooledTransactionsFromPeerTask::getHashes)) + .containsExactly( + List.of(announcements.get(0).hash()), List.of(announcements.get(1).hash())); + } + + @Test + public void requestTransactionShouldDiscardOversizedAnnouncementAndNotLoopForever() { + // An announcement whose individual size exceeds the limit would be stuck at the head of the + // queue forever (peek → size check fails → break → peek again → ...). + // The fix: if txSize > limit, discard it (remove) before breaking. + final long oversizedSize = EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE + 1L; + final Transaction oversizedTx = generator.transaction(); + final Transaction normalTx = generator.transaction(); + + final TransactionAnnouncement oversized = + new TransactionAnnouncement(oversizedTx.getHash(), oversizedTx.getType(), oversizedSize); + final TransactionAnnouncement normal = new TransactionAnnouncement(normalTx); + + fetcher.addAnnouncements(List.of(oversized, normal)); + + when(peerTaskExecutor.executeAgainstPeer( + any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer))) + .thenReturn( + new PeerTaskExecutorResult<>( + Optional.of(List.of()), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer))); + + // First call: oversized is discarded then break → no request is made, method returns normally + fetcher.requestTransactions(); + verifyNoInteractions(peerTaskExecutor); + + // Second call: oversized is gone; normal tx is now at the head and gets fetched + fetcher.requestTransactions(); + + final ArgumentCaptor taskCaptor = + ArgumentCaptor.forClass(GetPooledTransactionsFromPeerTask.class); + verify(peerTaskExecutor, times(1)).executeAgainstPeer(taskCaptor.capture(), eq(ethPeer)); + assertThat(taskCaptor.getValue().getHashes()).containsExactly(normal.hash()); verifyNoMoreInteractions(peerTaskExecutor); } @@ -169,10 +230,10 @@ public void requestTransactionShouldSplitRequestIntoSeveralTasks() { public void requestTransactionShouldNotStartTaskWhenTransactionAlreadySeen() { final Transaction transaction = generator.transaction(); - final Hash hash = transaction.getHash(); - transactionTracker.markTransactionHashesAsSeen(ethPeer, List.of(hash)); + final TransactionAnnouncement announcement = new TransactionAnnouncement(transaction); + transactionTracker.markTransactionHashesAsSeen(ethPeer, List.of(announcement.hash())); - fetcher.addHashes(List.of(hash)); + fetcher.addAnnouncements(List.of(announcement)); fetcher.requestTransactions(); verifyNoInteractions(peerTaskExecutor); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java index d64ab02b5fa..7cb36baa211 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java @@ -16,12 +16,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; import java.util.List; @@ -37,8 +36,10 @@ public void roundTripNewPooledTransactionHashesMessage() { final NewPooledTransactionHashesMessage msg = NewPooledTransactionHashesMessage.create(transactions, EthProtocol.LATEST); assertThat(msg.getCode()).isEqualTo(EthProtocolMessages.NEW_POOLED_TRANSACTION_HASHES); - final List pendingHashes = msg.pendingTransactionHashes(); - assertThat(pendingHashes).isEqualTo(toHashList(transactions)); + final List pendingAnnouncements = + msg.pendingTransactionAnnouncements(); + assertThat(pendingAnnouncements) + .isEqualTo(transactions.stream().map(TransactionAnnouncement::new).toList()); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java index 4e0d26029f2..01aada53d39 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java @@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.EthProtocol; +import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration; import org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementDecoder; import org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementEncoder; import org.hyperledger.besu.ethereum.eth.manager.EthContext; @@ -100,7 +101,8 @@ public void setup() { transactionPool, transactionPoolConfiguration, ethContext, - new TransactionPoolMetrics(metricsSystem)); + new TransactionPoolMetrics(metricsSystem), + EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE); when(ethContext.getScheduler()).thenReturn(ethScheduler); } @@ -226,7 +228,8 @@ void shouldCreateAndDecodeForEth68() { final NewPooledTransactionHashesMessage message = NewPooledTransactionHashesMessage.create(transactionList, EthProtocol.ETH68); - final List announcementList = message.pendingTransactions(); + final List announcementList = + message.pendingTransactionAnnouncements(); assertThat(announcementList).containsExactlyElementsOf(expectedTransactions); } @@ -271,72 +274,28 @@ void shouldDecodeBytesCorrectly_Eth68() { getDecoder(EthProtocol.ETH68).decode(RLP.input(bytes)); final TransactionAnnouncement frontier = announcementList.get(0); - assertThat(frontier.getHash()) + assertThat(frontier.hash()) .isEqualTo( Hash.fromHexString( "0x0000000000000000000000000000000000000000000000000000000000000001")); - assertThat(frontier.getType()).hasValue(TransactionType.FRONTIER); - assertThat(frontier.getSize()).hasValue(1L); + assertThat(frontier.type()).isEqualTo(TransactionType.FRONTIER); + assertThat(frontier.size()).isEqualTo(1L); final TransactionAnnouncement accessList = announcementList.get(1); - assertThat(accessList.getHash()) + assertThat(accessList.hash()) .isEqualTo( Hash.fromHexString( "0x0000000000000000000000000000000000000000000000000000000000000002")); - assertThat(accessList.getType()).hasValue(TransactionType.ACCESS_LIST); - assertThat(accessList.getSize()).hasValue(2L); + assertThat(accessList.type()).isEqualTo(TransactionType.ACCESS_LIST); + assertThat(accessList.size()).isEqualTo(2L); final TransactionAnnouncement eip1559 = announcementList.get(2); - assertThat(eip1559.getHash()) + assertThat(eip1559.hash()) .isEqualTo( Hash.fromHexString( "0x0000000000000000000000000000000000000000000000000000000000000003")); - assertThat(eip1559.getType()).hasValue(TransactionType.EIP1559); - assertThat(eip1559.getSize()).hasValue(3L); - } - - @Test - void shouldDecodeBytesCorrectly_PreviousImplementations_Eth68() { - /* - * [ - * "0x0000102"] - * ["0x00000001","0x00000002","0x00000003"], - * ["0x0000000000000000000000000000000000000000000000000000000000000001", - * "0x0000000000000000000000000000000000000000000000000000000000000002", - * "0x0000000000000000000000000000000000000000000000000000000000000003"] - * ] - */ - - final Bytes bytes = - Bytes.fromHexString( - "0xf87983000102cf840000000184000000028400000003f863a00000000000000000000000000000000000000000000000000000000000000001a00000000000000000000000000000000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000003"); - - final List announcementList = - getDecoder(EthProtocol.ETH68).decode(RLP.input(bytes)); - - final TransactionAnnouncement frontier = announcementList.get(0); - assertThat(frontier.getHash()) - .isEqualTo( - Hash.fromHexString( - "0x0000000000000000000000000000000000000000000000000000000000000001")); - assertThat(frontier.getType()).hasValue(TransactionType.FRONTIER); - assertThat(frontier.getSize()).hasValue(1L); - - final TransactionAnnouncement accessList = announcementList.get(1); - assertThat(accessList.getHash()) - .isEqualTo( - Hash.fromHexString( - "0x0000000000000000000000000000000000000000000000000000000000000002")); - assertThat(accessList.getType()).hasValue(TransactionType.ACCESS_LIST); - assertThat(accessList.getSize()).hasValue(2L); - - final TransactionAnnouncement eip1559 = announcementList.get(2); - assertThat(eip1559.getHash()) - .isEqualTo( - Hash.fromHexString( - "0x0000000000000000000000000000000000000000000000000000000000000003")); - assertThat(eip1559.getType()).hasValue(TransactionType.EIP1559); - assertThat(eip1559.getSize()).hasValue(3L); + assertThat(eip1559.type()).isEqualTo(TransactionType.EIP1559); + assertThat(eip1559.size()).isEqualTo(3L); } @Test @@ -355,9 +314,9 @@ void shouldEncodeAndDecodeTransactionAnnouncement_Eth68() { for (final Transaction transaction : list) { final TransactionAnnouncement announcement = announcementList.get(list.indexOf(transaction)); - assertThat(announcement.getHash()).isEqualTo(transaction.getHash()); - assertThat(announcement.getType()).hasValue(transaction.getType()); - assertThat(announcement.getSize()).hasValue((long) transaction.getSizeForAnnouncement()); + assertThat(announcement.hash()).isEqualTo(transaction.getHash()); + assertThat(announcement.type()).isEqualTo(transaction.getType()); + assertThat(announcement.size()).isEqualTo(transaction.getSizeForAnnouncement()); } } @@ -423,15 +382,13 @@ void shouldThrowRLPExceptionWhenSizeSizeGreaterThanFourBytes() { TransactionAnnouncementDecoder.getDecoder(EthProtocol.ETH68) .decode(RLP.input(invalidMessageBytes))) .isInstanceOf(RLPException.class) - .hasMessageContaining("Expected max 4 bytes for unsigned int, but got 5 bytes"); + .hasMessageContaining( + "Cannot read a unsigned int scalar, expecting a maximum of 4 bytes but current element is 5 bytes long"); } @Test void shouldThrowNullPointerIfArgumentsAreNull() { final Hash hash = Hash.hash(Bytes.random(32)); - assertThatThrownBy(() -> new TransactionAnnouncement((Hash) null)) - .isInstanceOf(NullPointerException.class) - .hasMessage("Hash cannot be null"); assertThatThrownBy(() -> new TransactionAnnouncement(null, TransactionType.EIP1559, 0L)) .isInstanceOf(NullPointerException.class) @@ -445,7 +402,7 @@ void shouldThrowNullPointerIfArgumentsAreNull() { .isInstanceOf(NullPointerException.class) .hasMessage("Size cannot be null"); - assertThatThrownBy(() -> new TransactionAnnouncement((Transaction) null)) + assertThatThrownBy(() -> new TransactionAnnouncement(null)) .isInstanceOf(NullPointerException.class) .hasMessage("Transaction cannot be null"); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java index 308b6e15ea0..f89dfbb7147 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java @@ -16,7 +16,6 @@ import static com.google.common.collect.Sets.newHashSet; import static org.assertj.core.api.Assertions.assertThat; -import static org.hyperledger.besu.ethereum.core.Transaction.toHashList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; @@ -25,7 +24,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.EthProtocol; @@ -115,8 +113,9 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception { .hasSize(2) .allMatch( message -> message.getCode() == EthProtocolMessages.NEW_POOLED_TRANSACTION_HASHES); - final Set firstBatch = getTransactionsFromMessage(sentMessages.get(0)); - final Set secondBatch = getTransactionsFromMessage(sentMessages.get(1)); + final Set firstBatch = getTransactionsFromMessage(sentMessages.get(0)); + final Set secondBatch = + getTransactionsFromMessage(sentMessages.get(1)); final int expectedFirstBatchSize = 4096, expectedSecondBatchSize = 1904, toleranceDelta = 0; assertThat(firstBatch) @@ -127,23 +126,27 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception { expectedSecondBatchSize - toleranceDelta, expectedSecondBatchSize + toleranceDelta); assertThat(Sets.union(firstBatch, secondBatch)) - .containsExactlyInAnyOrderElementsOf(toHashList(transactions)); + .containsExactlyInAnyOrderElementsOf( + transactions.stream().map(TransactionAnnouncement::new).toList()); } private MessageData transactionsMessageContaining(final Transaction... transactions) { return argThat( message -> { - final Set actualSentTransactions = getTransactionsFromMessage(message); - final Set expectedTransactions = - newHashSet(toHashList(Arrays.asList(transactions))); + final Set actualSentTransactions = + getTransactionsFromMessage(message); + final Set expectedTransactions = + Arrays.stream(transactions) + .map(TransactionAnnouncement::new) + .collect(Collectors.toSet()); return message.getCode() == EthProtocolMessages.NEW_POOLED_TRANSACTION_HASHES && actualSentTransactions.equals(expectedTransactions); }); } - private Set getTransactionsFromMessage(final MessageData message) { + private Set getTransactionsFromMessage(final MessageData message) { final NewPooledTransactionHashesMessage transactionsMessage = NewPooledTransactionHashesMessage.readFrom(message, EthProtocol.LATEST); - return newHashSet(transactionsMessage.pendingTransactionHashes()); + return newHashSet(transactionsMessage.pendingTransactionAnnouncements()); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index 9bcd06f4fef..c940cf232ef 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -413,7 +413,8 @@ private TransactionPool createTransactionPool( transactionsMessageSender, newPooledTransactionHashesMessageSender, new BlobCache(), - MiningConfiguration.newDefault()); + MiningConfiguration.newDefault(), + EthProtocolConfiguration.DEFAULT); } private TransactionPool createAndEnableTransactionPool(