diff --git a/CHANGELOG.md b/CHANGELOG.md index 80b56358bdd..fdfb1212d50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Add access to an immutable world view to start/end transaction hooks in the tracing API[#5836](https://github.com/hyperledger/besu/pull/5836) - Layered transaction pool implementation is now stable and enabled by default. If you want still to use the legacy implementation, use `--tx-pool=legacy` [#5772](https://github.com/hyperledger/besu) - Tune G1GC to reduce Besu memory footprint, and new `besu-untuned` start scripts to run without any specific G1GC flags [#5879](https://github.com/hyperledger/besu/pull/5879) +- Reduce `engine_forkchoiceUpdatedV?` response time by asynchronously process block added events in the transaction pool [#5909](https://github.com/hyperledger/besu/pull/5909) ### Bug Fixes - do not create ignorable storage on revert storage-variables subcommand [#5830](https://github.com/hyperledger/besu/pull/5830) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java index 91818b9f5e2..9df6edb2c9d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthScheduler.java @@ -83,7 +83,7 @@ public EthScheduler( metricsSystem), MonitoredExecutors.newCachedThreadPool( EthScheduler.class.getSimpleName() + "-Services", metricsSystem), - MonitoredExecutors.newBoundedThreadPool( + MonitoredExecutors.newFixedThreadPool( EthScheduler.class.getSimpleName() + "-Computation", 1, computationWorkerCount, @@ -133,6 +133,10 @@ public void scheduleTxWorkerTask(final Runnable command) { txWorkerExecutor.execute(command); } + public CompletableFuture scheduleServiceTask(final Supplier task) { + return CompletableFuture.supplyAsync(task, servicesExecutor); + } + public CompletableFuture scheduleServiceTask(final EthTask task) { final CompletableFuture serviceFuture = task.runAsync(servicesExecutor); pendingFutures.add(serviceFuture); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index fd3d1a4c16a..f6c8161bf0c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -62,7 +62,9 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -104,6 +106,8 @@ public class TransactionPool implements BlockAddedObserver { new PendingTransactionsListenersProxy(); private volatile OptionalLong subscribeConnectId = OptionalLong.empty(); private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager(); + private final Lock blockAddedLock = new ReentrantLock(); + private final Queue blockAddedQueue = new ConcurrentLinkedQueue<>(); public TransactionPool( final Supplier pendingTransactionsSupplier, @@ -321,16 +325,47 @@ public void unsubscribeDroppedTransactions(final long id) { @Override public void onBlockAdded(final BlockAddedEvent event) { if (isPoolEnabled.get()) { - LOG.trace("Block added event {}", event); + final long started = System.currentTimeMillis(); if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED) || event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) { - pendingTransactions.manageBlockAdded( - event.getBlock().getHeader(), - event.getAddedTransactions(), - event.getRemovedTransactions(), - protocolSchedule.getByBlockHeader(event.getBlock().getHeader()).getFeeMarket()); - reAddTransactions(event.getRemovedTransactions()); + // add the event to the processing queue + blockAddedQueue.add(event); + + // we want to process the added block asynchronously, + // but at the same time we must ensure that blocks are processed in order one at time + ethContext + .getScheduler() + .scheduleServiceTask( + () -> { + while (!blockAddedQueue.isEmpty()) { + if (blockAddedLock.tryLock()) { + // no other thread is processing the queue, so start processing it + try { + BlockAddedEvent e = blockAddedQueue.poll(); + // check again since another thread could have stolen our task + if (e != null) { + pendingTransactions.manageBlockAdded( + e.getBlock().getHeader(), + e.getAddedTransactions(), + e.getRemovedTransactions(), + protocolSchedule + .getByBlockHeader(e.getBlock().getHeader()) + .getFeeMarket()); + reAddTransactions(e.getRemovedTransactions()); + LOG.atDebug() + .setMessage("Block added event {} processed in {}ms") + .addArgument(e) + .addArgument(() -> System.currentTimeMillis() - started) + .log(); + } + } finally { + blockAddedLock.unlock(); + } + } + } + return null; + }); } } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java index 1fcd45f84fe..7fad9f1a057 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/AbstractTransactionPoolTest.java @@ -33,6 +33,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -98,6 +99,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -229,6 +231,9 @@ public void setUp() { final EthScheduler ethScheduler = mock(EthScheduler.class); syncTaskCapture = ArgumentCaptor.forClass(Runnable.class); doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture()); + doAnswer(invocation -> ((Supplier) invocation.getArguments()[0]).get()) + .when(ethScheduler) + .scheduleServiceTask(any(Supplier.class)); doReturn(ethScheduler).when(ethContext).getScheduler(); peerTransactionTracker = new PeerTransactionTracker();