Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,22 @@ private Void retryBlockCreationUntilUseful(
.log();
return null;
} catch (final Throwable e) {
LOG.warn(
"Something went wrong creating block for payload id {}, error {}",
payloadIdentifier,
logException(e));
if (isBlockCreationCancelled(payloadIdentifier)) {
// when the block creation is canceled, in some edge cases it is possible to have
// concurrency issues, so inform the user how to interpret that possibility
LOG.info(
"Got an exception after cancellation of block creation for payload id {}. "
+ "This is expected if you already saw the earlier "
+ "\"the completion of the block creation continues in a best effort mode, and could fail due to concurrency issues\" log. "
+ "If you do not see that earlier warning log please report this stack trace.",
payloadIdentifier,
e);
} else {
LOG.warn(
"Something went wrong creating block for payload id {}, error {}",
payloadIdentifier,
logException(e));
}
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,99 @@ public void exceptionDuringBuildingBlockShouldNotBeInvalid()
verify(badBlockManager, never()).addBadBlock(any(), any());
}

/**
* Verifies that a Throwable thrown inside retryBlockCreationUntilUseful, at a point where
* isBlockCreationCancelled is already true, is handled gracefully and does not propagate out of
* the background task. The scenario: createBlock for the empty block (synchronous in
* preparePayload) succeeds normally; then inside the retry loop the second createBlock call first
* cancels block creation via finalizeProposalById, then throws a RuntimeException simulating a
* race-condition error. The catch (Throwable) block in retryBlockCreationUntilUseful sees
* isBlockCreationCancelled == true and logs at INFO instead of propagating the exception.
*/
@Test
public void exceptionThrownAfterBlockCreationCancellationIsHandledGracefully()
throws ExecutionException, InterruptedException {

final AtomicReference<MergeCoordinator> coordinatorRef = new AtomicReference<>();
// Capture the payloadIdentifier from the empty-block putPayloadById call so the retry-loop
// spy can call finalizeProposalById with it.
final AtomicReference<PayloadIdentifier> payloadIdRef = new AtomicReference<>();

MergeCoordinator.MergeBlockCreatorFactory mergeBlockCreatorFactory =
(parentHeader, address) -> {
MergeBlockCreator beingSpiedOn =
spy(
new MergeBlockCreator(
miningConfiguration,
parent -> Bytes.EMPTY,
transactionPool,
protocolContext,
protocolSchedule,
parentHeader,
ethScheduler));

// First call (empty block, synchronous in preparePayload): run normally so that
// preparePayload completes and the retry loop is started.
// Second call (inside the retry loop): cancel block creation first so that
// isBlockCreationCancelled is true when the RuntimeException reaches the catch block.
doCallRealMethod()
.doAnswer(
inv -> {
PayloadIdentifier pid = payloadIdRef.get();
if (pid != null) {
coordinatorRef.get().finalizeProposalById(pid);
}
throw new RuntimeException(
"simulated concurrency error after block creation was cancelled");
})
.when(beingSpiedOn)
.createBlock(
any(),
any(Bytes32.class),
anyLong(),
eq(Optional.empty()),
eq(Optional.empty()),
eq(Optional.empty()),
any());
return beingSpiedOn;
};

MergeCoordinator coordinatorUnderTest =
spy(
new MergeCoordinator(
protocolContext,
protocolSchedule,
ethScheduler,
miningConfiguration,
backwardSyncContext,
mergeBlockCreatorFactory));
coordinatorRef.set(coordinatorUnderTest);

// Capture payloadId from the empty-block putPayloadById; do NOT finalize here so the
// retry loop gets a chance to start and exercise the Throwable catch path.
doAnswer(
invocation -> {
payloadIdRef.compareAndSet(
null, invocation.getArgument(0, PayloadWrapper.class).payloadIdentifier());
return null;
})
.when(mergeContext)
.putPayloadById(any());

coordinatorUnderTest.preparePayload(
genesisState.getBlock().getHeader(),
System.currentTimeMillis() / 1000,
Bytes32.ZERO,
suggestedFeeRecipient,
Optional.empty(),
Optional.empty(),
Optional.empty());

// The RuntimeException must not be propagated: the catch (Throwable) block should handle it
// gracefully (logging at INFO since isBlockCreationCancelled is true) and return null.
blockCreationTask.get();
}

@Test
public void shouldNotRecordProposedBadBlockToBadBlockManager()
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,46 +246,57 @@ private Map<PendingTransaction, TransactionSelectionResult> timeLimitedSelection

private Map<PendingTransaction, TransactionSelectionResult> internalTimeLimitedSelection(
final List<PendingTransaction> candidateTransactions, final long remainingSelectionTime) {
final long startTimeNanos = System.nanoTime();

validTxSelectionTimeoutResult = BLOCK_SELECTION_TIMEOUT;
invalidTxSelectionTimeoutResult = BLOCK_SELECTION_TIMEOUT_INVALID_TX;

final CountDownLatch internalSelectionDone = new CountDownLatch(1);

final var selectionResults =
new ConcurrentHashMap<PendingTransaction, TransactionSelectionResult>(
candidateTransactions.size());

currTxSelectionTask =
new FutureTask<>(
() -> {
LOG.atDebug()
.setMessage(
"Starting internal pool transaction selection, run time capped at {}ms, stats {}")
.addArgument(() -> nanosToMillis(remainingSelectionTime))
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();
try {
LOG.atDebug()
.setMessage(
"Starting internal pool transaction selection, run time capped at {}ms, stats {}")
.addArgument(() -> nanosToMillis(remainingSelectionTime))
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();

for (PendingTransaction candidateTx : candidateTransactions) {
final var selectionResult = evaluateTransaction(candidateTx);
selectionResults.put(candidateTx, selectionResult);
if (selectionResult.stop()) {
break;
for (PendingTransaction candidateTx : candidateTransactions) {
final var selectionResult = evaluateTransaction(candidateTx);
selectionResults.put(candidateTx, selectionResult);
if (selectionResult.stop()) {
break;
}
}
} finally {
internalSelectionDone.countDown();
}
},
null);

ethScheduler.scheduleBlockCreationTask(
blockSelectionContext.pendingBlockHeader().getNumber(), currTxSelectionTask);

try {
currTxSelectionTask.get(remainingSelectionTime, TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | CancellationException e) {
if (isCancelled.get()) {
LOG.debug(
"Transaction selection cancelled during execution, finalizing with current progress");
} else {
LOG.warn("Error during block transaction selection", e);
// force rollback
rollback();
}
} catch (ExecutionException e) {
LOG.warn("Error during block transaction selection", e);
// force rollback
rollback();
} catch (CancellationException e) {
LOG.debug(
"Transaction selection cancelled during execution, finalizing with current progress", e);
} catch (InterruptedException e) {
LOG.debug(
"Transaction selection interrupted during execution, finalizing with current progress",
e);
Comment thread
fab-10 marked this conversation as resolved.
} catch (TimeoutException e) {
// synchronize since we want to be sure that there is no concurrent state update
synchronized (isTimeout) {
Expand All @@ -300,6 +311,17 @@ private Map<PendingTransaction, TransactionSelectionResult> internalTimeLimitedS
nanosToMillis(remainingSelectionTime));
}

// in case of a cancellation or a timeout, it is possible that the thread that is processing the
// tx is still running, so to avoid concurrency issues accessing the world state during the
// following steps (e.g. withdrawals, EL request or rewards processing) we try to wait,
// for a max amount of time, for the cancellation to complete, before proceeding in
// a best effort mode that could potentially fail.
if (internalSelectionDone.getCount() != 0) {
final long elapsedSelectionTime = System.nanoTime() - startTimeNanos;
final long maxWaitTime = remainingSelectionTime - elapsedSelectionTime;
waitForCancellationToBeProcessed("Internal", internalSelectionDone, maxWaitTime);
}

return selectionResults;
}

Expand Down Expand Up @@ -331,13 +353,14 @@ private void pluginTimeLimitedSelection(

try {
currTxSelectionTask.get(pluginTxsSelectionMaxTimeNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | CancellationException e) {
if (isCancelled.get()) {
throw new CancellationException("Cancelled during plugin transaction selection");
}
} catch (ExecutionException e) {
LOG.error("Unhandled exception during plugin transaction selection", e);
// force a rollback
rollback();
} catch (CancellationException e) {
LOG.debug("Cancelled during plugin transaction selection", e);
} catch (InterruptedException e) {
LOG.debug("Interrupted during plugin transaction selection", e);
} catch (TimeoutException e) {
// synchronize since we want to be sure that there is no concurrent state update
synchronized (isTimeout) {
Expand All @@ -346,43 +369,69 @@ private void pluginTimeLimitedSelection(

// cancelling the task and interrupting the thread running it
currTxSelectionTask.cancel(true);
final long elapsedPluginTxsSelectionTime = System.nanoTime() - startTime;
LOG.warn(
"Interrupting the plugin selection of transactions for block inclusion after {}ms,"
+ " as it exceeds the maximum configured duration of {}ms",
nanosToMillis(elapsedPluginTxsSelectionTime),
nanosToMillis(System.nanoTime() - startTime),
nanosToMillis(pluginTxsSelectionMaxTimeNanos));
}

final var remainingSelectionTime =
blockTxsSelectionMaxTimeNanos - elapsedPluginTxsSelectionTime;
// in case of a cancellation or a timeout, it is possible that the thread that is processing the
// tx is still running, so to avoid concurrency issues accessing the world state during the
// following steps (e.g. withdrawals, EL request or rewards processing) we try to wait,
// for a max amount of time, for the cancellation to complete, before proceeding in
// a best effort mode that could potentially fail.
if (pluginSelectionDone.getCount() != 0) {
final long elapsedSelectionTime = System.nanoTime() - startTime;
final long maxWaitTime = blockTxsSelectionMaxTimeNanos - elapsedSelectionTime;
waitForCancellationToBeProcessed("Plugin", pluginSelectionDone, maxWaitTime);
}
}

private void waitForCancellationToBeProcessed(
final String context, final CountDownLatch selectionDone, final long maxWaitTimeNanos) {
if (maxWaitTimeNanos <= 0) {
LOG.info(
"No time remains to wait for the cancellation of the {} selection to complete normally, "
+ "the completion of the block creation continues in a best effort mode, and could fail due to concurrency issues",
context);
return;
}

final long waitStartTime = System.nanoTime();
try {
// wait at max the specified time, for the thread to fully process the interrupt,
// before proceeding, to avoid overlapping executions.
LOG.atTrace()
.setMessage(
"Plugin transaction selection state {}, waiting {}ms for the thread to process the interrupt")
"{} transaction selection state {}, waiting at max {}ms for the thread to process the interrupt")
.addArgument(context)
.addArgument(currTxSelectionTask::state)
.addArgument(() -> nanosToMillis(remainingSelectionTime))
.addArgument(() -> nanosToMillis(maxWaitTimeNanos))
.log();

try {
// need to wait for the thread to fully process the interrupt,
// before proceeding, to avoid overlapping executions.
pluginSelectionDone.await(remainingSelectionTime, TimeUnit.NANOSECONDS);

if (selectionDone.await(maxWaitTimeNanos, TimeUnit.NANOSECONDS)) {
LOG.atTrace()
.setMessage("Plugin selection cancellation processed in {}ms, task status {}")
.addArgument(
() ->
nanosToMillis((System.nanoTime() - startTime) - elapsedPluginTxsSelectionTime))
.setMessage("{} selection cancellation processed in {}ms, task status {}")
.addArgument(context)
.addArgument(() -> nanosToMillis(System.nanoTime() - waitStartTime))
.addArgument(currTxSelectionTask.state())
.log();

} catch (InterruptedException ex) {
LOG.warn(
"Interrupted after waiting {}ms for the cancellation of plugin transaction selection task",
nanosToMillis(remainingSelectionTime),
ex);
throw new RuntimeException(ex);
} else {
LOG.info(
"Cancellation of {} selection not completed after waiting for {}ms, the completion of the block creation"
+ " continues in a best effort mode, and could fail due to concurrency issues",
context,
nanosToMillis(maxWaitTimeNanos));
}

} catch (InterruptedException ex) {
LOG.warn(
"{} interrupted after waiting {}ms for the cancellation of transaction selection task",
context,
nanosToMillis(maxWaitTimeNanos),
ex);
throw new RuntimeException(ex);
Comment thread
fab-10 marked this conversation as resolved.
}
}

Expand Down
Loading
Loading