Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -104,8 +105,11 @@ protected void internalRemove(
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
return null;
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,22 +398,20 @@ public final void blockAdded(
nextLayer.blockAdded(feeMarket, blockHeader, maxConfirmedNonceBySender);
maxConfirmedNonceBySender.forEach(this::confirmed);
internalBlockAdded(blockHeader, feeMarket);
promoteTransactions();
}

protected abstract void internalBlockAdded(
final BlockHeader blockHeader, final FeeMarket feeMarket);

final void promoteTransactions() {
int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
final int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
final long freeSpace = cacheFreeSpace();

while (cacheFreeSpace() > 0 && freeSlots > 0) {
final var promotedTx = nextLayer.promote(this::promotionFilter);
if (promotedTx != null) {
processAdded(promotedTx);
--freeSlots;
} else {
break;
}
if (freeSlots > 0 && freeSpace > 0) {
nextLayer
.promote(this::promotionFilter, cacheFreeSpace(), freeSlots)
.forEach(this::processAdded);
}
}

Expand Down Expand Up @@ -444,8 +442,6 @@ private void confirmed(final Address sender, final long maxConfirmedNonce) {
internalConfirmed(senderTxs, sender, maxConfirmedNonce, highestNonceRemovedTx);
}
}

promoteTransactions();
}

protected abstract void internalConfirmed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ public OptionalLong getCurrentNonceFor(final Address sender) {
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
return null;
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
Expand Down Expand Up @@ -139,30 +141,51 @@ public Stream<PendingTransaction> stream() {
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {

final var maybePromotedTx =
orderByMaxFee.descendingSet().stream()
.filter(candidateTx -> promotionFilter.test(candidateTx))
.findFirst();

return maybePromotedTx
.map(
promotedTx -> {
final var senderTxs = txsBySender.get(promotedTx.getSender());
// we always promote the first tx of a sender, so remove the first entry
senderTxs.pollFirstEntry();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);

// now that we have space, promote from the next layer
promoteTransactions();

if (senderTxs.isEmpty()) {
txsBySender.remove(promotedTx.getSender());
}
return promotedTx;
})
.orElse(null);
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accumulatedSpace = 0;
final List<PendingTransaction> promotedTxs = new ArrayList<>();

// first find all txs that can be promoted
search:
for (final var senderFirstTx : orderByMaxFee.descendingSet()) {
final var senderTxs = txsBySender.get(senderFirstTx.getSender());
for (final var candidateTx : senderTxs.values()) {
if (promotionFilter.test(candidateTx)) {
accumulatedSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) {
promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender to avoid gaps
break;
}
}
}

// then remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
}
});

if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}

return promotedTxs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -126,36 +125,86 @@ protected void internalReplaced(final PendingTransaction replacedTx) {
@Override
protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket feeMarket) {}

/**
* We only want to promote transactions that have gap == 0, so there will be no gap in the prev
* layers. A promoted transaction is removed from this layer, and the gap data is updated for its
* sender.
*
* @param promotionFilter the prev layer's promotion filter
* @param freeSpace max amount of memory promoted txs can occupy
* @param freeSlots max number of promoted txs
* @return a list of transactions promoted to the prev layer
*/
@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
final PendingTransaction promotedTx =
orderByGap.get(0).stream()
.map(txsBySender::get)
.map(NavigableMap::values)
.flatMap(Collection::stream)
.filter(promotionFilter)
.findFirst()
.orElse(null);

if (promotedTx != null) {
final Address sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.pollFirstEntry();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accumulatedSpace = 0;
final List<PendingTransaction> promotedTxs = new ArrayList<>();

final var zeroGapSenders = orderByGap.get(0);

search:
for (final var sender : zeroGapSenders) {
final var senderSeqTxs = getSequentialSubset(txsBySender.get(sender));

for (final var candidateTx : senderSeqTxs.values()) {

if (promotionFilter.test(candidateTx)) {
accumulatedSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) {
promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender
break;
}
}
}

return promotedTx;
// remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
}
}
});

if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}

return promotedTxs;
}

private NavigableMap<Long, PendingTransaction> getSequentialSubset(
final NavigableMap<Long, PendingTransaction> senderTxs) {
long lastSequentialNonce = senderTxs.firstKey();
for (final long nonce : senderTxs.tailMap(lastSequentialNonce, false).keySet()) {
if (nonce == lastSequentialNonce + 1) {
++lastSequentialNonce;
} else {
break;
}
}
return senderTxs.headMap(lastSequentialNonce, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void blockAdded(
*/
OptionalLong getCurrentNonceFor(Address sender);

PendingTransaction promote(Predicate<PendingTransaction> promotionFilter);
List<PendingTransaction> promote(
Predicate<PendingTransaction> promotionFilter, final long freeSpace, final int freeSlots);

long subscribeToAdded(PendingTransactionAddedListener listener);

Expand Down