diff --git a/CHANGELOG.md b/CHANGELOG.md
index 391367f5894..83302dee4fc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -34,6 +34,7 @@
- Add `-Pcases` case name filtering to JMH benchmark suite [#9982](https://github.com/hyperledger/besu/pull/9982)
- Use JDK SHA-256 provider to leverage hardware SHA-NI instructions instead of BouncyCastle [#9924](https://github.com/hyperledger/besu/pull/9924)
- Support [EIP-7975](https://eips.ethereum.org/EIPS/eip-7975): eth/70 - partial block receipt lists
+- Limit pooled tx requests by size and remove pre-eth/68 transaction announcement support [#9990](https://github.com/besu-eth/besu/pull/9990)
## 26.2.0
diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java
index 2e91386aa58..2d08cbaa96a 100644
--- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java
+++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/encoding/TransactionAnnouncementDecoder.java
@@ -16,7 +16,6 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.TransactionType;
-import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.rlp.RLPException;
@@ -24,9 +23,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.tuweni.bytes.Bytes;
public class TransactionAnnouncementDecoder {
@@ -44,24 +40,7 @@ public interface Decoder {
* @return the correct decoder
*/
public static Decoder getDecoder(final Capability capability) {
- if (capability.getVersion() >= EthProtocolVersion.V68) {
- return TransactionAnnouncementDecoder::decodeForEth68;
- } else {
- return TransactionAnnouncementDecoder::decodeForEth66;
- }
- }
-
- /**
- * Decode the list of transactions in the NewPooledTransactionHashesMessage
- *
- * @param input input used to decode the NewPooledTransactionHashesMessage before Eth/68
- *
format: [hash_0: B_32, hash_1: B_32, ...]
- * @return the list of TransactionAnnouncement decoded from the message. Only hash is present.
- * size and type will return an Optional.empty()
- */
- private static List decodeForEth66(final RLPInput input) {
- final List hashes = input.readList(rlp -> Hash.wrap(rlp.readBytes32()));
- return hashes.stream().map(TransactionAnnouncement::new).collect(Collectors.toList());
+ return TransactionAnnouncementDecoder::decodeForEth68;
}
/**
@@ -84,21 +63,7 @@ private static List decodeForEth68(final RLPInput input
types.add(transactionType);
}
- List sizes =
- input.readList(
- in -> {
- // for backward compatibility with previous Besu implementation be lenient and support
- // also unsigned int with leading zeros.
- // ToDo: this could be replaced with the simpler `RLPInput::readUnsignedIntScalar`
- // after some months it has been released, since most of the Besus
- // will be using the new implementation.
- final Bytes intBytes = in.readBytes();
- if (intBytes.size() > 4) {
- throw new RLPException(
- "Expected max 4 bytes for unsigned int, but got " + intBytes.size() + " bytes");
- }
- return intBytes.toLong();
- });
+ final List sizes = input.readList(RLPInput::readUnsignedIntScalar);
final List hashes = input.readList(rlp -> Hash.wrap(rlp.readBytes32()));
input.leaveList();
diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java
index 5c9e144038e..5b7f9bcbacb 100644
--- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java
+++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcher.java
@@ -24,6 +24,7 @@
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetPooledTransactionsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
+import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
@@ -51,7 +52,8 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
private final String metricLabel;
private final ScheduledFuture> scheduledFuture;
private final EthPeer peer;
- private final Queue txAnnounces;
+ private final Queue txAnnounces;
+ private final int maxTransactionsMessageSize;
public BufferedGetPooledTransactionsFromPeerFetcher(
final EthContext ethContext,
@@ -59,6 +61,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
final EthPeer peer,
final TransactionPool transactionPool,
final PeerTransactionTracker transactionTracker,
+ final int maxTransactionsMessageSize,
final TransactionPoolMetrics metrics,
final String metricLabel) {
this.ethContext = ethContext;
@@ -70,6 +73,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
this.metricLabel = metricLabel;
this.txAnnounces =
Queues.synchronizedQueue(EvictingQueue.create(DEFAULT_MAX_PENDING_TRANSACTIONS));
+ this.maxTransactionsMessageSize = maxTransactionsMessageSize;
}
public ScheduledFuture> getScheduledFuture() {
@@ -124,20 +128,36 @@ public void requestTransactions() {
}
}
- public void addHashes(final Collection hashes) {
- txAnnounces.addAll(hashes);
+ public void addAnnouncements(final Collection announcements) {
+ txAnnounces.addAll(announcements);
}
private List getTxHashesToRetrieve() {
final List toRetrieve = new ArrayList<>(MAX_HASHES);
int discarded = 0;
+ long cumulativeSize = 0;
while (toRetrieve.size() < MAX_HASHES && !txAnnounces.isEmpty()) {
- final Hash txHashAnnounced = txAnnounces.poll();
- if (!transactionTracker.hasSeenTransaction(txHashAnnounced)) {
- toRetrieve.add(txHashAnnounced);
+ final TransactionAnnouncement txAnnounced = txAnnounces.peek();
+ if (!transactionTracker.hasSeenTransaction(txAnnounced.hash())) {
+ if (cumulativeSize + txAnnounced.size() > maxTransactionsMessageSize) {
+ // defense in case maxTransactionsMessageSize is set too small
+ // this avoids an infinite loop if the first announcement is oversized
+ if (txAnnounced.size() > maxTransactionsMessageSize) {
+ LOG.warn(
+ "maxTransactionsMessageSize ({} bytes) is set too small to fetch tx announcement {}",
+ maxTransactionsMessageSize,
+ txAnnounced);
+ txAnnounces.remove();
+ }
+ // max size reached
+ break;
+ }
+ toRetrieve.add(txAnnounced.hash());
+ cumulativeSize += txAnnounced.size();
} else {
discarded++;
}
+ txAnnounces.remove();
}
final int alreadySeenCount = discarded;
diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java
index 6056e7dab1c..3a81fc06d25 100644
--- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java
+++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessage.java
@@ -17,7 +17,6 @@
import static org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementDecoder.getDecoder;
import static org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementEncoder.getEncoder;
-import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractMessageData;
@@ -67,15 +66,10 @@ public static NewPooledTransactionHashesMessage readFrom(
return new NewPooledTransactionHashesMessage(message.getData(), capability);
}
- @VisibleForTesting
- public List pendingTransactions() {
+ public List pendingTransactionAnnouncements() {
if (pendingTransactions == null) {
pendingTransactions = getDecoder(capability).decode(RLP.input(data));
}
return pendingTransactions;
}
-
- public List pendingTransactionHashes() {
- return pendingTransactions().stream().map(TransactionAnnouncement::getHash).toList();
- }
}
diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java
index 7938900a03e..69f13822e23 100644
--- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java
+++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessor.java
@@ -16,7 +16,6 @@
import static java.time.Instant.now;
-import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.BufferedGetPooledTransactionsFromPeerFetcher;
@@ -48,13 +47,15 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext;
private final TransactionPoolMetrics metrics;
+ private final int maxTransactionsMessageSize;
public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final EthContext ethContext,
- final TransactionPoolMetrics metrics) {
+ final TransactionPoolMetrics metrics,
+ final int maxTransactionsMessageSize) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.transactionPoolConfiguration = transactionPoolConfiguration;
@@ -62,6 +63,7 @@ public NewPooledTransactionHashesMessageProcessor(
this.metrics = metrics;
metrics.initExpiredMessagesCounter(METRIC_LABEL);
this.scheduledTasks = new ConcurrentHashMap<>();
+ this.maxTransactionsMessageSize = maxTransactionsMessageSize;
}
void processNewPooledTransactionHashesMessage(
@@ -76,12 +78,12 @@ void processNewPooledTransactionHashesMessage(
} else {
LOG.atTrace()
.setMessage(
- "Ignoring expired transactions message: peer={}, latency={}, queuedAt={}, keepAlive={}, hashes={}")
+ "Ignoring expired transactions message: peer={}, latency={}, queuedAt={}, keepAlive={}, announcements={}")
.addArgument(peer)
.addArgument(latency)
.addArgument(queueAt)
.addArgument(keepAlive)
- .addArgument(transactionsMessage::pendingTransactionHashes)
+ .addArgument(transactionsMessage::pendingTransactionAnnouncements)
.log();
metrics.incrementExpiredMessages(METRIC_LABEL);
}
@@ -90,12 +92,14 @@ void processNewPooledTransactionHashesMessage(
private void processNewPooledTransactionHashesMessage(
final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) {
try {
- final List incomingTransactionHashes = transactionsMessage.pendingTransactionHashes();
+ final List incomingTransactionAnnouncements =
+ transactionsMessage.pendingTransactionAnnouncements();
LOG.atTrace()
- .setMessage("Received pooled transaction hashes message: peer={}, incoming hashes={}")
+ .setMessage(
+ "Received pooled transaction hashes message: peer={}, incoming announcements={}")
.addArgument(peer)
- .addArgument(incomingTransactionHashes)
+ .addArgument(incomingTransactionAnnouncements)
.log();
final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
@@ -120,13 +124,14 @@ private void processNewPooledTransactionHashesMessage(
peer,
transactionPool,
transactionTracker,
+ maxTransactionsMessageSize,
metrics,
METRIC_LABEL);
});
- bufferedTask.addHashes(
- incomingTransactionHashes.stream()
- .filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
+ bufferedTask.addAnnouncements(
+ incomingTransactionAnnouncements.stream()
+ .filter(ann -> transactionPool.getTransactionByHash(ann.hash()).isEmpty())
.toList());
} catch (final RLPException ex) {
if (peer != null) {
diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java
index 8a5aeb28490..b0af58d2689 100644
--- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java
+++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionAnnouncement.java
@@ -22,20 +22,8 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-
-public class TransactionAnnouncement {
- private final Hash hash;
- private final Optional type;
- private final Optional size;
-
- public TransactionAnnouncement(final Hash hash) {
- this.hash = checkNotNull(hash, "Hash cannot be null");
- this.type = Optional.empty();
- this.size = Optional.empty();
- }
+public record TransactionAnnouncement(Hash hash, TransactionType type, Long size) {
public TransactionAnnouncement(final Transaction transaction) {
this(
checkNotNull(transaction, "Transaction cannot be null").getHash(),
@@ -45,20 +33,8 @@ public TransactionAnnouncement(final Transaction transaction) {
public TransactionAnnouncement(final Hash hash, final TransactionType type, final Long size) {
this.hash = checkNotNull(hash, "Hash cannot be null");
- this.type = Optional.of(checkNotNull(type, "Type cannot be null"));
- this.size = Optional.of(checkNotNull(size, "Size cannot be null"));
- }
-
- public Hash getHash() {
- return hash;
- }
-
- public Optional getType() {
- return type;
- }
-
- public Optional getSize() {
- return size;
+ this.type = checkNotNull(type, "Type cannot be null");
+ this.size = checkNotNull(size, "Size cannot be null");
}
public static List create(
@@ -74,23 +50,4 @@ public static List create(
}
return transactions;
}
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final TransactionAnnouncement that = (TransactionAnnouncement) o;
- return Objects.equals(size, that.size)
- && Objects.equals(type, that.type)
- && Objects.equals(hash, that.hash);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(hash, size, type);
- }
}
diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
index b100437eaa8..de5b7376bf1 100644
--- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
+++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java
@@ -84,7 +84,8 @@ public static TransactionPool createTransactionPool(
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
blobCache,
- miningConfiguration);
+ miningConfiguration,
+ ethProtocolConfiguration);
}
static TransactionPool createTransactionPool(
@@ -99,7 +100,8 @@ static TransactionPool createTransactionPool(
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final BlobCache blobCache,
- final MiningConfiguration miningConfiguration) {
+ final MiningConfiguration miningConfiguration,
+ final EthProtocolConfiguration ethProtocolConfiguration) {
final TransactionPool transactionPool =
new TransactionPool(
@@ -139,7 +141,8 @@ static TransactionPool createTransactionPool(
transactionPool,
transactionPoolConfiguration,
ethContext,
- metrics),
+ metrics,
+ ethProtocolConfiguration.getMaxTransactionsMessageSize()),
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds());
subscribeTransactionHandlers(
diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java
index 2ae4aa8340c..be815b1889b 100644
--- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java
+++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/BufferedGetPooledTransactionsFromPeerFetcherTest.java
@@ -28,6 +28,7 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
+import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
@@ -35,7 +36,9 @@
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
+import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetPooledTransactionsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
+import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
@@ -45,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -52,6 +56,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
@@ -90,6 +95,7 @@ public void setup() {
ethPeer,
transactionPool,
transactionTracker,
+ EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE,
new TransactionPoolMetrics(metricsSystem),
"new_pooled_transaction_hashes");
}
@@ -99,25 +105,18 @@ public void requestTransactionShouldStartTaskWhenUnknownTransaction() {
final Transaction transaction = generator.transaction();
final List taskResult = List.of(transaction);
final PeerTaskExecutorResult> peerTaskResult =
- new PeerTaskExecutorResult>(
+ new PeerTaskExecutorResult<>(
Optional.of(taskResult), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer));
when(peerTaskExecutor.executeAgainstPeer(
- any(
- org.hyperledger.besu.ethereum.eth.manager.peertask.task
- .GetPooledTransactionsFromPeerTask.class),
- eq(ethPeer)))
+ any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer)))
.thenReturn(peerTaskResult);
- fetcher.addHashes(List.of(transaction.getHash()));
+ fetcher.addAnnouncements(List.of(new TransactionAnnouncement(transaction)));
fetcher.requestTransactions();
verify(peerTaskExecutor)
- .executeAgainstPeer(
- any(
- org.hyperledger.besu.ethereum.eth.manager.peertask.task
- .GetPooledTransactionsFromPeerTask.class),
- eq(ethPeer));
+ .executeAgainstPeer(any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer));
verifyNoMoreInteractions(peerTaskExecutor);
verify(transactionPool, times(1)).addRemoteTransactions(taskResult);
@@ -126,29 +125,25 @@ public void requestTransactionShouldStartTaskWhenUnknownTransaction() {
@Test
public void requestTransactionShouldSplitRequestIntoSeveralTasks() {
- final Map transactionsByHash =
+ final Map transactionsByAnnouncement =
IntStream.range(0, 257)
.mapToObj(unused -> generator.transaction())
- .collect(Collectors.toMap((t) -> t.getHash(), (t) -> t));
- fetcher.addHashes(transactionsByHash.keySet());
+ .collect(Collectors.toMap(TransactionAnnouncement::new, Function.identity()));
+ final Map transactionsByHash =
+ transactionsByAnnouncement.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().hash(), Map.Entry::getValue));
+ fetcher.addAnnouncements(transactionsByAnnouncement.keySet());
when(peerTaskExecutor.executeAgainstPeer(
- any(
- org.hyperledger.besu.ethereum.eth.manager.peertask.task
- .GetPooledTransactionsFromPeerTask.class),
- eq(ethPeer)))
+ any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer)))
.thenAnswer(
(invocationOnMock) -> {
- org.hyperledger.besu.ethereum.eth.manager.peertask.task
- .GetPooledTransactionsFromPeerTask
- task =
- invocationOnMock.getArgument(
- 0,
- org.hyperledger.besu.ethereum.eth.manager.peertask.task
- .GetPooledTransactionsFromPeerTask.class);
+ GetPooledTransactionsFromPeerTask task =
+ invocationOnMock.getArgument(0, GetPooledTransactionsFromPeerTask.class);
+
List resultTransactions =
task.getHashes().stream().map(transactionsByHash::get).toList();
- return new PeerTaskExecutorResult>(
+ return new PeerTaskExecutorResult<>(
Optional.of(resultTransactions),
PeerTaskExecutorResponseCode.SUCCESS,
List.of(ethPeer));
@@ -157,11 +152,77 @@ public void requestTransactionShouldSplitRequestIntoSeveralTasks() {
fetcher.requestTransactions();
verify(peerTaskExecutor, times(2))
- .executeAgainstPeer(
- any(
- org.hyperledger.besu.ethereum.eth.manager.peertask.task
- .GetPooledTransactionsFromPeerTask.class),
- eq(ethPeer));
+ .executeAgainstPeer(any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer));
+ verifyNoMoreInteractions(peerTaskExecutor);
+ }
+
+ @Test
+ public void requestTransactionShouldSplitRequestWhenCumulativeSizeExceedsLimit() {
+ // DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE = 1 MB (1,048,576 bytes).
+ // The inner check is: if (cumulative + txSize > limit) break.
+ // With 2 announcements of 600 KB each:
+ // ann1: 0 + 600KB ≤ 1MB → added (cumulative = 600KB)
+ // ann2: 600KB + 600KB > 1MB → break, ann2 is not removed
+ // First batch = [ann1]. ann2 stays queued and is returned as the second batch.
+ final long largeSize = 600L * 1024;
+ final List announcements =
+ IntStream.range(0, 2)
+ .mapToObj(unused -> generator.transaction())
+ .map(tx -> new TransactionAnnouncement(tx.getHash(), tx.getType(), largeSize))
+ .toList();
+
+ when(peerTaskExecutor.executeAgainstPeer(
+ any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer)))
+ .thenReturn(
+ new PeerTaskExecutorResult<>(
+ Optional.of(List.of()), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer)));
+
+ fetcher.addAnnouncements(announcements);
+ fetcher.requestTransactions();
+
+ final ArgumentCaptor taskCaptor =
+ ArgumentCaptor.forClass(GetPooledTransactionsFromPeerTask.class);
+
+ verify(peerTaskExecutor, times(2)).executeAgainstPeer(taskCaptor.capture(), eq(ethPeer));
+ verifyNoMoreInteractions(peerTaskExecutor);
+
+ assertThat(taskCaptor.getAllValues().stream().map(GetPooledTransactionsFromPeerTask::getHashes))
+ .containsExactly(
+ List.of(announcements.get(0).hash()), List.of(announcements.get(1).hash()));
+ }
+
+ @Test
+ public void requestTransactionShouldDiscardOversizedAnnouncementAndNotLoopForever() {
+ // An announcement whose individual size exceeds the limit would be stuck at the head of the
+ // queue forever (peek → size check fails → break → peek again → ...).
+ // The fix: if txSize > limit, discard it (remove) before breaking.
+ final long oversizedSize = EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE + 1L;
+ final Transaction oversizedTx = generator.transaction();
+ final Transaction normalTx = generator.transaction();
+
+ final TransactionAnnouncement oversized =
+ new TransactionAnnouncement(oversizedTx.getHash(), oversizedTx.getType(), oversizedSize);
+ final TransactionAnnouncement normal = new TransactionAnnouncement(normalTx);
+
+ fetcher.addAnnouncements(List.of(oversized, normal));
+
+ when(peerTaskExecutor.executeAgainstPeer(
+ any(GetPooledTransactionsFromPeerTask.class), eq(ethPeer)))
+ .thenReturn(
+ new PeerTaskExecutorResult<>(
+ Optional.of(List.of()), PeerTaskExecutorResponseCode.SUCCESS, List.of(ethPeer)));
+
+ // First call: oversized is discarded then break → no request is made, method returns normally
+ fetcher.requestTransactions();
+ verifyNoInteractions(peerTaskExecutor);
+
+ // Second call: oversized is gone; normal tx is now at the head and gets fetched
+ fetcher.requestTransactions();
+
+ final ArgumentCaptor taskCaptor =
+ ArgumentCaptor.forClass(GetPooledTransactionsFromPeerTask.class);
+ verify(peerTaskExecutor, times(1)).executeAgainstPeer(taskCaptor.capture(), eq(ethPeer));
+ assertThat(taskCaptor.getValue().getHashes()).containsExactly(normal.hash());
verifyNoMoreInteractions(peerTaskExecutor);
}
@@ -169,10 +230,10 @@ public void requestTransactionShouldSplitRequestIntoSeveralTasks() {
public void requestTransactionShouldNotStartTaskWhenTransactionAlreadySeen() {
final Transaction transaction = generator.transaction();
- final Hash hash = transaction.getHash();
- transactionTracker.markTransactionHashesAsSeen(ethPeer, List.of(hash));
+ final TransactionAnnouncement announcement = new TransactionAnnouncement(transaction);
+ transactionTracker.markTransactionHashesAsSeen(ethPeer, List.of(announcement.hash()));
- fetcher.addHashes(List.of(hash));
+ fetcher.addAnnouncements(List.of(announcement));
fetcher.requestTransactions();
verifyNoInteractions(peerTaskExecutor);
diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java
index d64ab02b5fa..7cb36baa211 100644
--- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java
+++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/messages/NewPooledTransactionHashesMessageTest.java
@@ -16,12 +16,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
-import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
+import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage;
import java.util.List;
@@ -37,8 +36,10 @@ public void roundTripNewPooledTransactionHashesMessage() {
final NewPooledTransactionHashesMessage msg =
NewPooledTransactionHashesMessage.create(transactions, EthProtocol.LATEST);
assertThat(msg.getCode()).isEqualTo(EthProtocolMessages.NEW_POOLED_TRANSACTION_HASHES);
- final List pendingHashes = msg.pendingTransactionHashes();
- assertThat(pendingHashes).isEqualTo(toHashList(transactions));
+ final List pendingAnnouncements =
+ msg.pendingTransactionAnnouncements();
+ assertThat(pendingAnnouncements)
+ .isEqualTo(transactions.stream().map(TransactionAnnouncement::new).toList());
}
@Test
diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java
index 4e0d26029f2..01aada53d39 100644
--- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java
+++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageProcessorTest.java
@@ -35,6 +35,7 @@
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
+import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementDecoder;
import org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementEncoder;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
@@ -100,7 +101,8 @@ public void setup() {
transactionPool,
transactionPoolConfiguration,
ethContext,
- new TransactionPoolMetrics(metricsSystem));
+ new TransactionPoolMetrics(metricsSystem),
+ EthProtocolConfiguration.DEFAULT_MAX_TRANSACTIONS_MESSAGE_SIZE);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
}
@@ -226,7 +228,8 @@ void shouldCreateAndDecodeForEth68() {
final NewPooledTransactionHashesMessage message =
NewPooledTransactionHashesMessage.create(transactionList, EthProtocol.ETH68);
- final List announcementList = message.pendingTransactions();
+ final List announcementList =
+ message.pendingTransactionAnnouncements();
assertThat(announcementList).containsExactlyElementsOf(expectedTransactions);
}
@@ -271,72 +274,28 @@ void shouldDecodeBytesCorrectly_Eth68() {
getDecoder(EthProtocol.ETH68).decode(RLP.input(bytes));
final TransactionAnnouncement frontier = announcementList.get(0);
- assertThat(frontier.getHash())
+ assertThat(frontier.hash())
.isEqualTo(
Hash.fromHexString(
"0x0000000000000000000000000000000000000000000000000000000000000001"));
- assertThat(frontier.getType()).hasValue(TransactionType.FRONTIER);
- assertThat(frontier.getSize()).hasValue(1L);
+ assertThat(frontier.type()).isEqualTo(TransactionType.FRONTIER);
+ assertThat(frontier.size()).isEqualTo(1L);
final TransactionAnnouncement accessList = announcementList.get(1);
- assertThat(accessList.getHash())
+ assertThat(accessList.hash())
.isEqualTo(
Hash.fromHexString(
"0x0000000000000000000000000000000000000000000000000000000000000002"));
- assertThat(accessList.getType()).hasValue(TransactionType.ACCESS_LIST);
- assertThat(accessList.getSize()).hasValue(2L);
+ assertThat(accessList.type()).isEqualTo(TransactionType.ACCESS_LIST);
+ assertThat(accessList.size()).isEqualTo(2L);
final TransactionAnnouncement eip1559 = announcementList.get(2);
- assertThat(eip1559.getHash())
+ assertThat(eip1559.hash())
.isEqualTo(
Hash.fromHexString(
"0x0000000000000000000000000000000000000000000000000000000000000003"));
- assertThat(eip1559.getType()).hasValue(TransactionType.EIP1559);
- assertThat(eip1559.getSize()).hasValue(3L);
- }
-
- @Test
- void shouldDecodeBytesCorrectly_PreviousImplementations_Eth68() {
- /*
- * [
- * "0x0000102"]
- * ["0x00000001","0x00000002","0x00000003"],
- * ["0x0000000000000000000000000000000000000000000000000000000000000001",
- * "0x0000000000000000000000000000000000000000000000000000000000000002",
- * "0x0000000000000000000000000000000000000000000000000000000000000003"]
- * ]
- */
-
- final Bytes bytes =
- Bytes.fromHexString(
- "0xf87983000102cf840000000184000000028400000003f863a00000000000000000000000000000000000000000000000000000000000000001a00000000000000000000000000000000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000003");
-
- final List announcementList =
- getDecoder(EthProtocol.ETH68).decode(RLP.input(bytes));
-
- final TransactionAnnouncement frontier = announcementList.get(0);
- assertThat(frontier.getHash())
- .isEqualTo(
- Hash.fromHexString(
- "0x0000000000000000000000000000000000000000000000000000000000000001"));
- assertThat(frontier.getType()).hasValue(TransactionType.FRONTIER);
- assertThat(frontier.getSize()).hasValue(1L);
-
- final TransactionAnnouncement accessList = announcementList.get(1);
- assertThat(accessList.getHash())
- .isEqualTo(
- Hash.fromHexString(
- "0x0000000000000000000000000000000000000000000000000000000000000002"));
- assertThat(accessList.getType()).hasValue(TransactionType.ACCESS_LIST);
- assertThat(accessList.getSize()).hasValue(2L);
-
- final TransactionAnnouncement eip1559 = announcementList.get(2);
- assertThat(eip1559.getHash())
- .isEqualTo(
- Hash.fromHexString(
- "0x0000000000000000000000000000000000000000000000000000000000000003"));
- assertThat(eip1559.getType()).hasValue(TransactionType.EIP1559);
- assertThat(eip1559.getSize()).hasValue(3L);
+ assertThat(eip1559.type()).isEqualTo(TransactionType.EIP1559);
+ assertThat(eip1559.size()).isEqualTo(3L);
}
@Test
@@ -355,9 +314,9 @@ void shouldEncodeAndDecodeTransactionAnnouncement_Eth68() {
for (final Transaction transaction : list) {
final TransactionAnnouncement announcement = announcementList.get(list.indexOf(transaction));
- assertThat(announcement.getHash()).isEqualTo(transaction.getHash());
- assertThat(announcement.getType()).hasValue(transaction.getType());
- assertThat(announcement.getSize()).hasValue((long) transaction.getSizeForAnnouncement());
+ assertThat(announcement.hash()).isEqualTo(transaction.getHash());
+ assertThat(announcement.type()).isEqualTo(transaction.getType());
+ assertThat(announcement.size()).isEqualTo(transaction.getSizeForAnnouncement());
}
}
@@ -423,15 +382,13 @@ void shouldThrowRLPExceptionWhenSizeSizeGreaterThanFourBytes() {
TransactionAnnouncementDecoder.getDecoder(EthProtocol.ETH68)
.decode(RLP.input(invalidMessageBytes)))
.isInstanceOf(RLPException.class)
- .hasMessageContaining("Expected max 4 bytes for unsigned int, but got 5 bytes");
+ .hasMessageContaining(
+ "Cannot read a unsigned int scalar, expecting a maximum of 4 bytes but current element is 5 bytes long");
}
@Test
void shouldThrowNullPointerIfArgumentsAreNull() {
final Hash hash = Hash.hash(Bytes.random(32));
- assertThatThrownBy(() -> new TransactionAnnouncement((Hash) null))
- .isInstanceOf(NullPointerException.class)
- .hasMessage("Hash cannot be null");
assertThatThrownBy(() -> new TransactionAnnouncement(null, TransactionType.EIP1559, 0L))
.isInstanceOf(NullPointerException.class)
@@ -445,7 +402,7 @@ void shouldThrowNullPointerIfArgumentsAreNull() {
.isInstanceOf(NullPointerException.class)
.hasMessage("Size cannot be null");
- assertThatThrownBy(() -> new TransactionAnnouncement((Transaction) null))
+ assertThatThrownBy(() -> new TransactionAnnouncement(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("Transaction cannot be null");
}
diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java
index 308b6e15ea0..f89dfbb7147 100644
--- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java
+++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageSenderTest.java
@@ -16,7 +16,6 @@
import static com.google.common.collect.Sets.newHashSet;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.hyperledger.besu.ethereum.core.Transaction.toHashList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
@@ -25,7 +24,6 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
@@ -115,8 +113,9 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
.hasSize(2)
.allMatch(
message -> message.getCode() == EthProtocolMessages.NEW_POOLED_TRANSACTION_HASHES);
- final Set firstBatch = getTransactionsFromMessage(sentMessages.get(0));
- final Set secondBatch = getTransactionsFromMessage(sentMessages.get(1));
+ final Set firstBatch = getTransactionsFromMessage(sentMessages.get(0));
+ final Set secondBatch =
+ getTransactionsFromMessage(sentMessages.get(1));
final int expectedFirstBatchSize = 4096, expectedSecondBatchSize = 1904, toleranceDelta = 0;
assertThat(firstBatch)
@@ -127,23 +126,27 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
expectedSecondBatchSize - toleranceDelta, expectedSecondBatchSize + toleranceDelta);
assertThat(Sets.union(firstBatch, secondBatch))
- .containsExactlyInAnyOrderElementsOf(toHashList(transactions));
+ .containsExactlyInAnyOrderElementsOf(
+ transactions.stream().map(TransactionAnnouncement::new).toList());
}
private MessageData transactionsMessageContaining(final Transaction... transactions) {
return argThat(
message -> {
- final Set actualSentTransactions = getTransactionsFromMessage(message);
- final Set expectedTransactions =
- newHashSet(toHashList(Arrays.asList(transactions)));
+ final Set actualSentTransactions =
+ getTransactionsFromMessage(message);
+ final Set expectedTransactions =
+ Arrays.stream(transactions)
+ .map(TransactionAnnouncement::new)
+ .collect(Collectors.toSet());
return message.getCode() == EthProtocolMessages.NEW_POOLED_TRANSACTION_HASHES
&& actualSentTransactions.equals(expectedTransactions);
});
}
- private Set getTransactionsFromMessage(final MessageData message) {
+ private Set getTransactionsFromMessage(final MessageData message) {
final NewPooledTransactionHashesMessage transactionsMessage =
NewPooledTransactionHashesMessage.readFrom(message, EthProtocol.LATEST);
- return newHashSet(transactionsMessage.pendingTransactionHashes());
+ return newHashSet(transactionsMessage.pendingTransactionAnnouncements());
}
}
diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
index 9bcd06f4fef..c940cf232ef 100644
--- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
+++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java
@@ -413,7 +413,8 @@ private TransactionPool createTransactionPool(
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
new BlobCache(),
- MiningConfiguration.newDefault());
+ MiningConfiguration.newDefault(),
+ EthProtocolConfiguration.DEFAULT);
}
private TransactionPool createAndEnableTransactionPool(