diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java index e1470212d35..32a03c6a3d3 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java @@ -500,10 +500,22 @@ private Void retryBlockCreationUntilUseful( .log(); return null; } catch (final Throwable e) { - LOG.warn( - "Something went wrong creating block for payload id {}, error {}", - payloadIdentifier, - logException(e)); + if (isBlockCreationCancelled(payloadIdentifier)) { + // when the block creation is canceled, in some edge cases it is possible to have + // concurrency issues, so inform the user how to interpret that possibility + LOG.info( + "Got an exception after cancellation of block creation for payload id {}. " + + "This is expected if you already saw the earlier " + + "\"the completion of the block creation continues in a best effort mode, and could fail due to concurrency issues\" log. " + + "If you do not see that earlier warning log please report this stack trace.", + payloadIdentifier, + e); + } else { + LOG.warn( + "Something went wrong creating block for payload id {}, error {}", + payloadIdentifier, + logException(e)); + } return null; } } diff --git a/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinatorTest.java b/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinatorTest.java index f3de4dd38ba..913094fba01 100644 --- a/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinatorTest.java +++ b/consensus/merge/src/test/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinatorTest.java @@ -371,6 +371,99 @@ public void exceptionDuringBuildingBlockShouldNotBeInvalid() verify(badBlockManager, never()).addBadBlock(any(), any()); } + /** + * Verifies that a Throwable thrown inside retryBlockCreationUntilUseful, at a point where + * isBlockCreationCancelled is already true, is handled gracefully and does not propagate out of + * the background task. The scenario: createBlock for the empty block (synchronous in + * preparePayload) succeeds normally; then inside the retry loop the second createBlock call first + * cancels block creation via finalizeProposalById, then throws a RuntimeException simulating a + * race-condition error. The catch (Throwable) block in retryBlockCreationUntilUseful sees + * isBlockCreationCancelled == true and logs at INFO instead of propagating the exception. + */ + @Test + public void exceptionThrownAfterBlockCreationCancellationIsHandledGracefully() + throws ExecutionException, InterruptedException { + + final AtomicReference coordinatorRef = new AtomicReference<>(); + // Capture the payloadIdentifier from the empty-block putPayloadById call so the retry-loop + // spy can call finalizeProposalById with it. + final AtomicReference payloadIdRef = new AtomicReference<>(); + + MergeCoordinator.MergeBlockCreatorFactory mergeBlockCreatorFactory = + (parentHeader, address) -> { + MergeBlockCreator beingSpiedOn = + spy( + new MergeBlockCreator( + miningConfiguration, + parent -> Bytes.EMPTY, + transactionPool, + protocolContext, + protocolSchedule, + parentHeader, + ethScheduler)); + + // First call (empty block, synchronous in preparePayload): run normally so that + // preparePayload completes and the retry loop is started. + // Second call (inside the retry loop): cancel block creation first so that + // isBlockCreationCancelled is true when the RuntimeException reaches the catch block. + doCallRealMethod() + .doAnswer( + inv -> { + PayloadIdentifier pid = payloadIdRef.get(); + if (pid != null) { + coordinatorRef.get().finalizeProposalById(pid); + } + throw new RuntimeException( + "simulated concurrency error after block creation was cancelled"); + }) + .when(beingSpiedOn) + .createBlock( + any(), + any(Bytes32.class), + anyLong(), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any()); + return beingSpiedOn; + }; + + MergeCoordinator coordinatorUnderTest = + spy( + new MergeCoordinator( + protocolContext, + protocolSchedule, + ethScheduler, + miningConfiguration, + backwardSyncContext, + mergeBlockCreatorFactory)); + coordinatorRef.set(coordinatorUnderTest); + + // Capture payloadId from the empty-block putPayloadById; do NOT finalize here so the + // retry loop gets a chance to start and exercise the Throwable catch path. + doAnswer( + invocation -> { + payloadIdRef.compareAndSet( + null, invocation.getArgument(0, PayloadWrapper.class).payloadIdentifier()); + return null; + }) + .when(mergeContext) + .putPayloadById(any()); + + coordinatorUnderTest.preparePayload( + genesisState.getBlock().getHeader(), + System.currentTimeMillis() / 1000, + Bytes32.ZERO, + suggestedFeeRecipient, + Optional.empty(), + Optional.empty(), + Optional.empty()); + + // The RuntimeException must not be propagated: the catch (Throwable) block should handle it + // gracefully (logging at INFO since isBlockCreationCancelled is true) and return null. + blockCreationTask.get(); + } + @Test public void shouldNotRecordProposedBadBlockToBadBlockManager() throws ExecutionException, InterruptedException { diff --git a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java index f3f6855ee35..204fb8ff985 100644 --- a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java +++ b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java @@ -246,9 +246,13 @@ private Map timeLimitedSelection private Map internalTimeLimitedSelection( final List candidateTransactions, final long remainingSelectionTime) { + final long startTimeNanos = System.nanoTime(); + validTxSelectionTimeoutResult = BLOCK_SELECTION_TIMEOUT; invalidTxSelectionTimeoutResult = BLOCK_SELECTION_TIMEOUT_INVALID_TX; + final CountDownLatch internalSelectionDone = new CountDownLatch(1); + final var selectionResults = new ConcurrentHashMap( candidateTransactions.size()); @@ -256,36 +260,43 @@ private Map internalTimeLimitedS currTxSelectionTask = new FutureTask<>( () -> { - LOG.atDebug() - .setMessage( - "Starting internal pool transaction selection, run time capped at {}ms, stats {}") - .addArgument(() -> nanosToMillis(remainingSelectionTime)) - .addArgument(blockSelectionContext.transactionPool()::logStats) - .log(); + try { + LOG.atDebug() + .setMessage( + "Starting internal pool transaction selection, run time capped at {}ms, stats {}") + .addArgument(() -> nanosToMillis(remainingSelectionTime)) + .addArgument(blockSelectionContext.transactionPool()::logStats) + .log(); - for (PendingTransaction candidateTx : candidateTransactions) { - final var selectionResult = evaluateTransaction(candidateTx); - selectionResults.put(candidateTx, selectionResult); - if (selectionResult.stop()) { - break; + for (PendingTransaction candidateTx : candidateTransactions) { + final var selectionResult = evaluateTransaction(candidateTx); + selectionResults.put(candidateTx, selectionResult); + if (selectionResult.stop()) { + break; + } } + } finally { + internalSelectionDone.countDown(); } }, null); + ethScheduler.scheduleBlockCreationTask( blockSelectionContext.pendingBlockHeader().getNumber(), currTxSelectionTask); try { currTxSelectionTask.get(remainingSelectionTime, TimeUnit.NANOSECONDS); - } catch (InterruptedException | ExecutionException | CancellationException e) { - if (isCancelled.get()) { - LOG.debug( - "Transaction selection cancelled during execution, finalizing with current progress"); - } else { - LOG.warn("Error during block transaction selection", e); - // force rollback - rollback(); - } + } catch (ExecutionException e) { + LOG.warn("Error during block transaction selection", e); + // force rollback + rollback(); + } catch (CancellationException e) { + LOG.debug( + "Transaction selection cancelled during execution, finalizing with current progress", e); + } catch (InterruptedException e) { + LOG.debug( + "Transaction selection interrupted during execution, finalizing with current progress", + e); } catch (TimeoutException e) { // synchronize since we want to be sure that there is no concurrent state update synchronized (isTimeout) { @@ -300,6 +311,17 @@ private Map internalTimeLimitedS nanosToMillis(remainingSelectionTime)); } + // in case of a cancellation or a timeout, it is possible that the thread that is processing the + // tx is still running, so to avoid concurrency issues accessing the world state during the + // following steps (e.g. withdrawals, EL request or rewards processing) we try to wait, + // for a max amount of time, for the cancellation to complete, before proceeding in + // a best effort mode that could potentially fail. + if (internalSelectionDone.getCount() != 0) { + final long elapsedSelectionTime = System.nanoTime() - startTimeNanos; + final long maxWaitTime = remainingSelectionTime - elapsedSelectionTime; + waitForCancellationToBeProcessed("Internal", internalSelectionDone, maxWaitTime); + } + return selectionResults; } @@ -331,13 +353,14 @@ private void pluginTimeLimitedSelection( try { currTxSelectionTask.get(pluginTxsSelectionMaxTimeNanos, TimeUnit.NANOSECONDS); - } catch (InterruptedException | ExecutionException | CancellationException e) { - if (isCancelled.get()) { - throw new CancellationException("Cancelled during plugin transaction selection"); - } + } catch (ExecutionException e) { LOG.error("Unhandled exception during plugin transaction selection", e); // force a rollback rollback(); + } catch (CancellationException e) { + LOG.debug("Cancelled during plugin transaction selection", e); + } catch (InterruptedException e) { + LOG.debug("Interrupted during plugin transaction selection", e); } catch (TimeoutException e) { // synchronize since we want to be sure that there is no concurrent state update synchronized (isTimeout) { @@ -346,43 +369,69 @@ private void pluginTimeLimitedSelection( // cancelling the task and interrupting the thread running it currTxSelectionTask.cancel(true); - final long elapsedPluginTxsSelectionTime = System.nanoTime() - startTime; LOG.warn( "Interrupting the plugin selection of transactions for block inclusion after {}ms," + " as it exceeds the maximum configured duration of {}ms", - nanosToMillis(elapsedPluginTxsSelectionTime), + nanosToMillis(System.nanoTime() - startTime), nanosToMillis(pluginTxsSelectionMaxTimeNanos)); + } - final var remainingSelectionTime = - blockTxsSelectionMaxTimeNanos - elapsedPluginTxsSelectionTime; + // in case of a cancellation or a timeout, it is possible that the thread that is processing the + // tx is still running, so to avoid concurrency issues accessing the world state during the + // following steps (e.g. withdrawals, EL request or rewards processing) we try to wait, + // for a max amount of time, for the cancellation to complete, before proceeding in + // a best effort mode that could potentially fail. + if (pluginSelectionDone.getCount() != 0) { + final long elapsedSelectionTime = System.nanoTime() - startTime; + final long maxWaitTime = blockTxsSelectionMaxTimeNanos - elapsedSelectionTime; + waitForCancellationToBeProcessed("Plugin", pluginSelectionDone, maxWaitTime); + } + } + + private void waitForCancellationToBeProcessed( + final String context, final CountDownLatch selectionDone, final long maxWaitTimeNanos) { + if (maxWaitTimeNanos <= 0) { + LOG.info( + "No time remains to wait for the cancellation of the {} selection to complete normally, " + + "the completion of the block creation continues in a best effort mode, and could fail due to concurrency issues", + context); + return; + } + final long waitStartTime = System.nanoTime(); + try { + // wait at max the specified time, for the thread to fully process the interrupt, + // before proceeding, to avoid overlapping executions. LOG.atTrace() .setMessage( - "Plugin transaction selection state {}, waiting {}ms for the thread to process the interrupt") + "{} transaction selection state {}, waiting at max {}ms for the thread to process the interrupt") + .addArgument(context) .addArgument(currTxSelectionTask::state) - .addArgument(() -> nanosToMillis(remainingSelectionTime)) + .addArgument(() -> nanosToMillis(maxWaitTimeNanos)) .log(); - try { - // need to wait for the thread to fully process the interrupt, - // before proceeding, to avoid overlapping executions. - pluginSelectionDone.await(remainingSelectionTime, TimeUnit.NANOSECONDS); - + if (selectionDone.await(maxWaitTimeNanos, TimeUnit.NANOSECONDS)) { LOG.atTrace() - .setMessage("Plugin selection cancellation processed in {}ms, task status {}") - .addArgument( - () -> - nanosToMillis((System.nanoTime() - startTime) - elapsedPluginTxsSelectionTime)) + .setMessage("{} selection cancellation processed in {}ms, task status {}") + .addArgument(context) + .addArgument(() -> nanosToMillis(System.nanoTime() - waitStartTime)) .addArgument(currTxSelectionTask.state()) .log(); - - } catch (InterruptedException ex) { - LOG.warn( - "Interrupted after waiting {}ms for the cancellation of plugin transaction selection task", - nanosToMillis(remainingSelectionTime), - ex); - throw new RuntimeException(ex); + } else { + LOG.info( + "Cancellation of {} selection not completed after waiting for {}ms, the completion of the block creation" + + " continues in a best effort mode, and could fail due to concurrency issues", + context, + nanosToMillis(maxWaitTimeNanos)); } + + } catch (InterruptedException ex) { + LOG.warn( + "{} interrupted after waiting {}ms for the cancellation of transaction selection task", + context, + nanosToMillis(maxWaitTimeNanos), + ex); + throw new RuntimeException(ex); } } diff --git a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java index 7352ec4accc..8035aff6493 100644 --- a/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java +++ b/ethereum/blockcreation/src/test/java/org/hyperledger/besu/ethereum/blockcreation/AbstractBlockTransactionSelectorTest.java @@ -104,6 +104,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -1207,6 +1209,171 @@ public TransactionSelectionResult evaluateTransactionPostProcessing( await().atMost(Duration.ofMillis(500)).until(tecIsCancelled::get); } + /** + * When an external thread calls cancel() while the plugin selection task is genuinely running + * (sleeping), FutureTask.cancel(true) transitions the task to CANCELLED/INTERRUPTED state and + * get() throws CancellationException. The old code re-threw that exception; the new code catches + * it, logs at DEBUG, and returns gracefully. + */ + @Test + public void externalCancellationDuringPluginSelectionIsHandledGracefully() { + final CountDownLatch pluginTaskStarted = new CountDownLatch(1); + final AtomicReference selectorRef = new AtomicReference<>(); + + final PluginTransactionSelectorFactory transactionSelectorFactory = + mock(PluginTransactionSelectorFactory.class); + when(transactionSelectorFactory.create(any(), any())) + .thenReturn( + new PluginTransactionSelector() { + @Override + public TransactionSelectionResult evaluateTransactionPreProcessing( + final TransactionEvaluationContext evaluationContext) { + pluginTaskStarted.countDown(); + try { + // simulate slow plugin work; will be interrupted by the external cancel() + Thread.sleep(5_000); + } catch (InterruptedException e) { + // expected when the task is externally cancelled + } + return SELECTED; + } + + @Override + public TransactionSelectionResult evaluateTransactionPostProcessing( + final TransactionEvaluationContext evaluationContext, + final org.hyperledger.besu.plugin.data.TransactionProcessingResult + processingResult) { + return SELECTED; + } + }); + + transactionSelectionService.registerPluginTransactionSelectorFactory( + transactionSelectorFactory); + + selectorRef.set( + createBlockSelectorAndSetupTxPool( + createMiningParameters( + transactionSelectionService, Wei.ZERO, PositiveNumber.fromInt(10_000)), + transactionProcessor, + createBlock(301_000), + AddressHelpers.ofValue(1), + Wei.ZERO, + transactionSelectionService)); + + final var tx = createTransaction(0, Wei.of(7), 100_000); + ensureTransactionIsValid(tx); + transactionPool.addRemoteTransactions(List.of(tx)); + + // Cancel from an external thread once the plugin task is running + final CompletableFuture cancellationTask = + CompletableFuture.runAsync( + () -> { + try { + if (!pluginTaskStarted.await(5, TimeUnit.SECONDS)) { + return; + } + selectorRef.get().cancel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // must complete without throwing; the old code propagated CancellationException out of + // pluginTimeLimitedSelection, which would bubble up through buildTransactionListForBlock() + final var results = selectorRef.get().buildTransactionListForBlock(); + assertThat(results).isNotNull(); + cancellationTask.orTimeout(5, TimeUnit.SECONDS).join(); + } + + /** + * When no plugin factory is registered the plugin phase is a no-op, so processTransaction is only + * called during internal selection. This test verifies that the CountDownLatch added in + * internalTimeLimitedSelection causes buildTransactionListForBlock() to wait for the selection + * thread to fully finish before returning, even when cancel() is called mid-processing and the + * thread performs additional work after the interrupt. + * + *

Coordination flow: + * + *

    + *
  1. Selection thread calls cancel() on itself and blocks on {@code cleanupCanFinish}. + *
  2. It signals {@code cleanupStarted} so the test thread knows the latch has not yet counted + * down and that {@code buildTransactionListForBlock()} is therefore still blocked. + *
  3. Test thread asserts the build future is still running, then releases {@code + * cleanupCanFinish}. + *
  4. Selection thread unblocks, sets {@code selectionThreadFinished}, and counts down the + * internal latch — allowing buildTransactionListForBlock() to return. + *
+ */ + @Test + public void internalSelectionLatchEnsuresBuildTransactionListWaitsForSelectionThreadToFinish() + throws InterruptedException { + final AtomicBoolean selectionThreadFinished = new AtomicBoolean(false); + final AtomicReference selectorRef = new AtomicReference<>(); + final CountDownLatch cleanupStarted = new CountDownLatch(1); + final CountDownLatch cleanupCanFinish = new CountDownLatch(1); + + final BlockTransactionSelector selector = + createBlockSelectorAndSetupTxPool( + // no plugin factory registered: plugin phase does nothing + createMiningParameters( + transactionSelectionService, Wei.ZERO, PositiveNumber.fromInt(5_000)), + transactionProcessor, + createBlock(500_000), + AddressHelpers.ofValue(1), + Wei.ZERO, + transactionSelectionService); + selectorRef.set(selector); + + final var tx = createTransaction(0, Wei.of(7), 100_000); + transactionPool.addRemoteTransactions(List.of(tx)); + + // On processing: cancel the selector (from within the selection thread), then block on + // cleanupCanFinish to simulate post-interrupt cleanup work, then mark as finished. + final Answer slowCancelAnswer = + invocation -> { + selectorRef.get().cancel(); // triggers FutureTask.cancel(true) → interrupts this thread + try { + Thread.sleep(Long.MAX_VALUE); // immediately interrupted by cancel(true) + } catch (InterruptedException e) { + // interrupt received: signal that we are in cleanup and block until released + cleanupStarted.countDown(); + try { + cleanupCanFinish.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + selectionThreadFinished.set(true); + return TransactionProcessingResult.invalid( + ValidationResult.invalid(EXECUTION_INTERRUPTED)); + }; + + when(transactionProcessor.processTransaction( + any(), any(), eq(tx), any(), any(), any(), any(), any())) + .thenAnswer(slowCancelAnswer); + when(transactionProcessor.processTransaction( + any(), any(), eq(tx), any(), any(), any(), any(), any(), any())) + .thenAnswer(slowCancelAnswer); + + // Run on a background thread so the test thread can drive cleanup timing. + final CompletableFuture buildFuture = + CompletableFuture.runAsync(selector::buildTransactionListForBlock); + + // Wait until the selection thread has started cleanup; at this point internalSelectionDone + // has NOT yet counted down, so buildFuture must still be blocked. + assertThat(cleanupStarted.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(buildFuture.isDone()).isFalse(); + + // Release the selection thread; it will set selectionThreadFinished and count down the latch, + // which unblocks buildTransactionListForBlock(). + cleanupCanFinish.countDown(); + buildFuture.orTimeout(5, TimeUnit.SECONDS).join(); + + // Without the CountDownLatch, buildTransactionListForBlock() would have returned before the + // selection thread reached selectionThreadFinished.set(true). + assertThat(selectionThreadFinished.get()).isTrue(); + } + private void internalBlockSelectionTimeoutSimulation( final boolean isPoa, final boolean preProcessingTooLate,