diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java index 3d47ad6efd8..84fa0ac5bfc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java @@ -20,6 +20,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -104,8 +105,11 @@ protected void internalRemove( } @Override - public PendingTransaction promote(final Predicate promotionFilter) { - return null; + public List promote( + final Predicate promotionFilter, + final long freeSpace, + final int freeSlots) { + return List.of(); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 0feae3168de..270e64352ab 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -398,22 +398,20 @@ public final void blockAdded( nextLayer.blockAdded(feeMarket, blockHeader, maxConfirmedNonceBySender); maxConfirmedNonceBySender.forEach(this::confirmed); internalBlockAdded(blockHeader, feeMarket); + promoteTransactions(); } protected abstract void internalBlockAdded( final BlockHeader blockHeader, final FeeMarket feeMarket); final void promoteTransactions() { - int freeSlots = maxTransactionsNumber() - pendingTransactions.size(); + final int freeSlots = maxTransactionsNumber() - pendingTransactions.size(); + final long freeSpace = cacheFreeSpace(); - while (cacheFreeSpace() > 0 && freeSlots > 0) { - final var promotedTx = nextLayer.promote(this::promotionFilter); - if (promotedTx != null) { - processAdded(promotedTx); - --freeSlots; - } else { - break; - } + if (freeSlots > 0 && freeSpace > 0) { + nextLayer + .promote(this::promotionFilter, cacheFreeSpace(), freeSlots) + .forEach(this::processAdded); } } @@ -444,8 +442,6 @@ private void confirmed(final Address sender, final long maxConfirmedNonce) { internalConfirmed(senderTxs, sender, maxConfirmedNonce, highestNonceRemovedTx); } } - - promoteTransactions(); } protected abstract void internalConfirmed( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java index 9a18e4937ea..12d3147b326 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java @@ -115,8 +115,11 @@ public OptionalLong getCurrentNonceFor(final Address sender) { } @Override - public PendingTransaction promote(final Predicate promotionFilter) { - return null; + public List promote( + final Predicate promotionFilter, + final long freeSpace, + final int freeSlots) { + return List.of(); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java index 6527031c917..227d0e4daa3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java @@ -25,7 +25,9 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -139,30 +141,51 @@ public Stream stream() { } @Override - public PendingTransaction promote(final Predicate promotionFilter) { - - final var maybePromotedTx = - orderByMaxFee.descendingSet().stream() - .filter(candidateTx -> promotionFilter.test(candidateTx)) - .findFirst(); - - return maybePromotedTx - .map( - promotedTx -> { - final var senderTxs = txsBySender.get(promotedTx.getSender()); - // we always promote the first tx of a sender, so remove the first entry - senderTxs.pollFirstEntry(); - processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED); - - // now that we have space, promote from the next layer - promoteTransactions(); - - if (senderTxs.isEmpty()) { - txsBySender.remove(promotedTx.getSender()); - } - return promotedTx; - }) - .orElse(null); + public List promote( + final Predicate promotionFilter, + final long freeSpace, + final int freeSlots) { + long accumulatedSpace = 0; + final List promotedTxs = new ArrayList<>(); + + // first find all txs that can be promoted + search: + for (final var senderFirstTx : orderByMaxFee.descendingSet()) { + final var senderTxs = txsBySender.get(senderFirstTx.getSender()); + for (final var candidateTx : senderTxs.values()) { + if (promotionFilter.test(candidateTx)) { + accumulatedSpace += candidateTx.memorySize(); + if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) { + promotedTxs.add(candidateTx); + } else { + // no room for more txs the search is over exit the loops + break search; + } + } else { + // skip remaining txs for this sender to avoid gaps + break; + } + } + } + + // then remove promoted txs from this layer + promotedTxs.forEach( + promotedTx -> { + final var sender = promotedTx.getSender(); + final var senderTxs = txsBySender.get(sender); + senderTxs.remove(promotedTx.getNonce()); + processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED); + if (senderTxs.isEmpty()) { + txsBySender.remove(sender); + } + }); + + if (!promotedTxs.isEmpty()) { + // since we removed some txs we can try to promote from next layer + promoteTransactions(); + } + + return promotedTxs; } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java index 00248dcf4ef..64a4142bccb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java @@ -27,7 +27,6 @@ import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -126,36 +125,86 @@ protected void internalReplaced(final PendingTransaction replacedTx) { @Override protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket feeMarket) {} + /** + * We only want to promote transactions that have gap == 0, so there will be no gap in the prev + * layers. A promoted transaction is removed from this layer, and the gap data is updated for its + * sender. + * + * @param promotionFilter the prev layer's promotion filter + * @param freeSpace max amount of memory promoted txs can occupy + * @param freeSlots max number of promoted txs + * @return a list of transactions promoted to the prev layer + */ @Override - public PendingTransaction promote(final Predicate promotionFilter) { - final PendingTransaction promotedTx = - orderByGap.get(0).stream() - .map(txsBySender::get) - .map(NavigableMap::values) - .flatMap(Collection::stream) - .filter(promotionFilter) - .findFirst() - .orElse(null); - - if (promotedTx != null) { - final Address sender = promotedTx.getSender(); - final var senderTxs = txsBySender.get(sender); - senderTxs.pollFirstEntry(); - processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED); - if (senderTxs.isEmpty()) { - txsBySender.remove(sender); - orderByGap.get(0).remove(sender); - gapBySender.remove(sender); - } else { - final long firstNonce = senderTxs.firstKey(); - final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1)); - if (newGap != 0) { - updateGap(sender, 0, newGap); + public List promote( + final Predicate promotionFilter, + final long freeSpace, + final int freeSlots) { + long accumulatedSpace = 0; + final List promotedTxs = new ArrayList<>(); + + final var zeroGapSenders = orderByGap.get(0); + + search: + for (final var sender : zeroGapSenders) { + final var senderSeqTxs = getSequentialSubset(txsBySender.get(sender)); + + for (final var candidateTx : senderSeqTxs.values()) { + + if (promotionFilter.test(candidateTx)) { + accumulatedSpace += candidateTx.memorySize(); + if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) { + promotedTxs.add(candidateTx); + } else { + // no room for more txs the search is over exit the loops + break search; + } + } else { + // skip remaining txs for this sender + break; } } } - return promotedTx; + // remove promoted txs from this layer + promotedTxs.forEach( + promotedTx -> { + final var sender = promotedTx.getSender(); + final var senderTxs = txsBySender.get(sender); + senderTxs.remove(promotedTx.getNonce()); + processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED); + if (senderTxs.isEmpty()) { + txsBySender.remove(sender); + orderByGap.get(0).remove(sender); + gapBySender.remove(sender); + } else { + final long firstNonce = senderTxs.firstKey(); + final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1)); + if (newGap != 0) { + updateGap(sender, 0, newGap); + } + } + }); + + if (!promotedTxs.isEmpty()) { + // since we removed some txs we can try to promote from next layer + promoteTransactions(); + } + + return promotedTxs; + } + + private NavigableMap getSequentialSubset( + final NavigableMap senderTxs) { + long lastSequentialNonce = senderTxs.firstKey(); + for (final long nonce : senderTxs.tailMap(lastSequentialNonce, false).keySet()) { + if (nonce == lastSequentialNonce + 1) { + ++lastSequentialNonce; + } else { + break; + } + } + return senderTxs.headMap(lastSequentialNonce, true); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java index 9c85dac1096..3f18aa57e9f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java @@ -66,7 +66,8 @@ void blockAdded( */ OptionalLong getCurrentNonceFor(Address sender); - PendingTransaction promote(Predicate promotionFilter); + List promote( + Predicate promotionFilter, final long freeSpace, final int freeSlots); long subscribeToAdded(PendingTransactionAddedListener listener);