Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- Add `-Pcases` case name filtering to JMH benchmark suite [#9982](https://github.com/hyperledger/besu/pull/9982)
- Use JDK SHA-256 provider to leverage hardware SHA-NI instructions instead of BouncyCastle [#9924](https://github.com/hyperledger/besu/pull/9924)
- Support [EIP-7975](https://eips.ethereum.org/EIPS/eip-7975): eth/70 - partial block receipt lists
- Limit pooled tx requests by size and remove pre-eth/68 transaction announcement support [#9990](https://github.com/besu-eth/besu/pull/9990)

## 26.2.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.eth.EthProtocolVersion;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.ethereum.rlp.RLPInput;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.tuweni.bytes.Bytes;

public class TransactionAnnouncementDecoder {

Expand All @@ -44,24 +40,7 @@ public interface Decoder {
* @return the correct decoder
*/
public static Decoder getDecoder(final Capability capability) {
if (capability.getVersion() >= EthProtocolVersion.V68) {
return TransactionAnnouncementDecoder::decodeForEth68;
} else {
return TransactionAnnouncementDecoder::decodeForEth66;
}
}

/**
* Decode the list of transactions in the NewPooledTransactionHashesMessage
*
* @param input input used to decode the NewPooledTransactionHashesMessage before Eth/68
* <p>format: [hash_0: B_32, hash_1: B_32, ...]
* @return the list of TransactionAnnouncement decoded from the message. Only hash is present.
* size and type will return an Optional.empty()
*/
private static List<TransactionAnnouncement> decodeForEth66(final RLPInput input) {
final List<Hash> hashes = input.readList(rlp -> Hash.wrap(rlp.readBytes32()));
return hashes.stream().map(TransactionAnnouncement::new).collect(Collectors.toList());
return TransactionAnnouncementDecoder::decodeForEth68;
}

/**
Expand All @@ -84,21 +63,7 @@ private static List<TransactionAnnouncement> decodeForEth68(final RLPInput input
types.add(transactionType);
}

List<Long> sizes =
input.readList(
in -> {
// for backward compatibility with previous Besu implementation be lenient and support
// also unsigned int with leading zeros.
// ToDo: this could be replaced with the simpler `RLPInput::readUnsignedIntScalar`
// after some months it has been released, since most of the Besus
// will be using the new implementation.
final Bytes intBytes = in.readBytes();
if (intBytes.size() > 4) {
throw new RLPException(
"Expected max 4 bytes for unsigned int, but got " + intBytes.size() + " bytes");
}
return intBytes.toLong();
});
final List<Long> sizes = input.readList(RLPInput::readUnsignedIntScalar);

final List<Hash> hashes = input.readList(rlp -> Hash.wrap(rlp.readBytes32()));
input.leaveList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetPooledTransactionsFromPeerTask;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

Expand Down Expand Up @@ -51,14 +52,16 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
private final String metricLabel;
private final ScheduledFuture<?> scheduledFuture;
private final EthPeer peer;
private final Queue<Hash> txAnnounces;
private final Queue<TransactionAnnouncement> txAnnounces;
private final int maxTransactionsMessageSize;

public BufferedGetPooledTransactionsFromPeerFetcher(
final EthContext ethContext,
final ScheduledFuture<?> scheduledFuture,
final EthPeer peer,
final TransactionPool transactionPool,
final PeerTransactionTracker transactionTracker,
final int maxTransactionsMessageSize,
final TransactionPoolMetrics metrics,
final String metricLabel) {
this.ethContext = ethContext;
Expand All @@ -70,6 +73,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
this.metricLabel = metricLabel;
this.txAnnounces =
Queues.synchronizedQueue(EvictingQueue.create(DEFAULT_MAX_PENDING_TRANSACTIONS));
this.maxTransactionsMessageSize = maxTransactionsMessageSize;
}

public ScheduledFuture<?> getScheduledFuture() {
Expand Down Expand Up @@ -124,20 +128,36 @@ public void requestTransactions() {
}
}

public void addHashes(final Collection<Hash> hashes) {
txAnnounces.addAll(hashes);
public void addAnnouncements(final Collection<TransactionAnnouncement> announcements) {
txAnnounces.addAll(announcements);
}

private List<Hash> getTxHashesToRetrieve() {
final List<Hash> toRetrieve = new ArrayList<>(MAX_HASHES);
int discarded = 0;
long cumulativeSize = 0;
while (toRetrieve.size() < MAX_HASHES && !txAnnounces.isEmpty()) {
final Hash txHashAnnounced = txAnnounces.poll();
if (!transactionTracker.hasSeenTransaction(txHashAnnounced)) {
toRetrieve.add(txHashAnnounced);
final TransactionAnnouncement txAnnounced = txAnnounces.peek();
if (!transactionTracker.hasSeenTransaction(txAnnounced.hash())) {
if (cumulativeSize + txAnnounced.size() > maxTransactionsMessageSize) {
// defense in case maxTransactionsMessageSize is set too small
// this avoids an infinite loop if the first announcement is oversized
if (txAnnounced.size() > maxTransactionsMessageSize) {
LOG.warn(
"maxTransactionsMessageSize ({} bytes) is set too small to fetch tx announcement {}",
maxTransactionsMessageSize,
txAnnounced);
txAnnounces.remove();
}
// max size reached
break;
}
toRetrieve.add(txAnnounced.hash());
cumulativeSize += txAnnounced.size();
} else {
discarded++;
}
txAnnounces.remove();
Comment thread
fab-10 marked this conversation as resolved.
}

final int alreadySeenCount = discarded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementDecoder.getDecoder;
import static org.hyperledger.besu.ethereum.eth.encoding.TransactionAnnouncementEncoder.getEncoder;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAnnouncement;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractMessageData;
Expand Down Expand Up @@ -67,15 +66,10 @@ public static NewPooledTransactionHashesMessage readFrom(
return new NewPooledTransactionHashesMessage(message.getData(), capability);
}

@VisibleForTesting
public List<TransactionAnnouncement> pendingTransactions() {
public List<TransactionAnnouncement> pendingTransactionAnnouncements() {
if (pendingTransactions == null) {
pendingTransactions = getDecoder(capability).decode(RLP.input(data));
}
return pendingTransactions;
}

public List<Hash> pendingTransactionHashes() {
return pendingTransactions().stream().map(TransactionAnnouncement::getHash).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static java.time.Instant.now;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.BufferedGetPooledTransactionsFromPeerFetcher;
Expand Down Expand Up @@ -48,20 +47,23 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext;
private final TransactionPoolMetrics metrics;
private final int maxTransactionsMessageSize;

public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final EthContext ethContext,
final TransactionPoolMetrics metrics) {
final TransactionPoolMetrics metrics,
final int maxTransactionsMessageSize) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.transactionPoolConfiguration = transactionPoolConfiguration;
this.ethContext = ethContext;
this.metrics = metrics;
metrics.initExpiredMessagesCounter(METRIC_LABEL);
this.scheduledTasks = new ConcurrentHashMap<>();
this.maxTransactionsMessageSize = maxTransactionsMessageSize;
}

void processNewPooledTransactionHashesMessage(
Expand All @@ -76,12 +78,12 @@ void processNewPooledTransactionHashesMessage(
} else {
LOG.atTrace()
.setMessage(
"Ignoring expired transactions message: peer={}, latency={}, queuedAt={}, keepAlive={}, hashes={}")
"Ignoring expired transactions message: peer={}, latency={}, queuedAt={}, keepAlive={}, announcements={}")
.addArgument(peer)
.addArgument(latency)
.addArgument(queueAt)
.addArgument(keepAlive)
.addArgument(transactionsMessage::pendingTransactionHashes)
.addArgument(transactionsMessage::pendingTransactionAnnouncements)
.log();
metrics.incrementExpiredMessages(METRIC_LABEL);
}
Expand All @@ -90,12 +92,14 @@ void processNewPooledTransactionHashesMessage(
private void processNewPooledTransactionHashesMessage(
final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) {
try {
final List<Hash> incomingTransactionHashes = transactionsMessage.pendingTransactionHashes();
final List<TransactionAnnouncement> incomingTransactionAnnouncements =
transactionsMessage.pendingTransactionAnnouncements();

LOG.atTrace()
.setMessage("Received pooled transaction hashes message: peer={}, incoming hashes={}")
.setMessage(
"Received pooled transaction hashes message: peer={}, incoming announcements={}")
.addArgument(peer)
.addArgument(incomingTransactionHashes)
.addArgument(incomingTransactionAnnouncements)
.log();

final BufferedGetPooledTransactionsFromPeerFetcher bufferedTask =
Expand All @@ -120,13 +124,14 @@ private void processNewPooledTransactionHashesMessage(
peer,
transactionPool,
transactionTracker,
maxTransactionsMessageSize,
metrics,
METRIC_LABEL);
});

bufferedTask.addHashes(
incomingTransactionHashes.stream()
.filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
bufferedTask.addAnnouncements(
incomingTransactionAnnouncements.stream()
.filter(ann -> transactionPool.getTransactionByHash(ann.hash()).isEmpty())
.toList());
} catch (final RLPException ex) {
if (peer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class TransactionAnnouncement {
private final Hash hash;
private final Optional<TransactionType> type;
private final Optional<Long> size;

public TransactionAnnouncement(final Hash hash) {
this.hash = checkNotNull(hash, "Hash cannot be null");
this.type = Optional.empty();
this.size = Optional.empty();
}

public record TransactionAnnouncement(Hash hash, TransactionType type, Long size) {
public TransactionAnnouncement(final Transaction transaction) {
Comment thread
fab-10 marked this conversation as resolved.
this(
checkNotNull(transaction, "Transaction cannot be null").getHash(),
Expand All @@ -45,20 +33,8 @@ public TransactionAnnouncement(final Transaction transaction) {

public TransactionAnnouncement(final Hash hash, final TransactionType type, final Long size) {
this.hash = checkNotNull(hash, "Hash cannot be null");
this.type = Optional.of(checkNotNull(type, "Type cannot be null"));
this.size = Optional.of(checkNotNull(size, "Size cannot be null"));
}

public Hash getHash() {
return hash;
}

public Optional<TransactionType> getType() {
return type;
}

public Optional<Long> getSize() {
return size;
this.type = checkNotNull(type, "Type cannot be null");
this.size = checkNotNull(size, "Size cannot be null");
}

public static List<TransactionAnnouncement> create(
Expand All @@ -74,23 +50,4 @@ public static List<TransactionAnnouncement> create(
}
return transactions;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TransactionAnnouncement that = (TransactionAnnouncement) o;
return Objects.equals(size, that.size)
&& Objects.equals(type, that.type)
&& Objects.equals(hash, that.hash);
}

@Override
public int hashCode() {
return Objects.hash(hash, size, type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static TransactionPool createTransactionPool(
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
blobCache,
miningConfiguration);
miningConfiguration,
ethProtocolConfiguration);
}

static TransactionPool createTransactionPool(
Expand All @@ -99,7 +100,8 @@ static TransactionPool createTransactionPool(
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final BlobCache blobCache,
final MiningConfiguration miningConfiguration) {
final MiningConfiguration miningConfiguration,
final EthProtocolConfiguration ethProtocolConfiguration) {

final TransactionPool transactionPool =
new TransactionPool(
Expand Down Expand Up @@ -139,7 +141,8 @@ static TransactionPool createTransactionPool(
transactionPool,
transactionPoolConfiguration,
ethContext,
metrics),
metrics,
ethProtocolConfiguration.getMaxTransactionsMessageSize()),
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds());

subscribeTransactionHandlers(
Expand Down
Loading
Loading