Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
Expand Down Expand Up @@ -91,53 +92,54 @@ public TrieLogPruner(
}

public void initialize() {
preloadQueueWithTimeout();
preloadQueueWithTimeout(PRELOAD_TIMEOUT_IN_SECONDS);
}

private void preloadQueueWithTimeout() {
@VisibleForTesting
void preloadQueueWithTimeout(final int timeoutInSeconds) {

LOG.info("Trie log pruner queue preload starting...");
LOG.atInfo()
.setMessage("Attempting to load first {} trie logs from database...")
.addArgument(loadingLimit)
.log();

try (final ScheduledExecutorService preloadExecutor = Executors.newScheduledThreadPool(1)) {
try (final ExecutorService preloadExecutor = Executors.newSingleThreadExecutor()) {
final Future<?> future = preloadExecutor.submit(this::preloadQueue);

final AtomicBoolean timeoutOccurred = new AtomicBoolean(false);
final Runnable timeoutTask =
() -> {
timeoutOccurred.set(true);
LOG.atWarn()
.setMessage(
"Timeout occurred while loading and processing {} trie logs from database")
.addArgument(loadingLimit)
.log();
};

final ScheduledFuture<?> timeoutFuture =
preloadExecutor.schedule(timeoutTask, PRELOAD_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
LOG.atInfo()
.setMessage(
"Trie log pruning will timeout after {} seconds. If this is timing out, consider using `besu storage trie-log prune` subcommand, see https://besu.hyperledger.org/public-networks/how-to/bonsai-limit-trie-logs")
.addArgument(PRELOAD_TIMEOUT_IN_SECONDS)
.addArgument(timeoutInSeconds)
.log();

preloadQueue(timeoutOccurred, timeoutFuture);
try {
future.get(timeoutInSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error loading trie logs from database", e);
future.cancel(true);
} catch (TimeoutException e) {
future.cancel(true);
LOG.atWarn()
.setMessage("Timeout occurred while loading and processing {} trie logs from database")
.addArgument(loadingLimit)
.log();
}
}
LOG.info("Trie log pruner queue preload complete.");
}

private void preloadQueue(
final AtomicBoolean timeoutOccurred, final ScheduledFuture<?> timeoutFuture) {
private void preloadQueue() {

try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {

final AtomicLong addToPruneQueueCount = new AtomicLong();
final AtomicLong orphansPruned = new AtomicLong();
trieLogKeys.forEach(
blockHashAsBytes -> {
if (timeoutOccurred.get()) {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException(
new TimeoutException("Timeout occurred while preloading trie log prune queue"));
new InterruptedException("Thread interrupted during trie log processing."));
}
final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
Expand All @@ -152,17 +154,19 @@ private void preloadQueue(
}
});

timeoutFuture.cancel(true);
LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue());
LOG.atInfo().log(
"Added {} trie logs to prune queue. Commencing pruning of eligible trie logs...",
addToPruneQueueCount.intValue());
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Pruned {} trie logs.", prunedCount);
LOG.atInfo().log("Pruned {} trie logs", prunedCount);
} catch (Exception e) {
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
if (e instanceof InterruptedException
|| (e.getCause() != null && e.getCause() instanceof InterruptedException)) {
LOG.info("Operation interrupted, but will attempt to prune what's in the queue so far...");
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Operation timed out, but still pruned {} trie logs.", prunedCount);
LOG.atInfo().log("...pruned {} trie logs", prunedCount);
Thread.currentThread().interrupt(); // Preserve interrupt status
} else {
LOG.error("Error loading trie logs from database, nothing pruned", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.trielog;
package org.hyperledger.besu.ethereum.trie.diffbased.common.trielog;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -26,9 +26,6 @@
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogAddedEvent;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogLayer;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogPruner;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;

import java.util.Optional;
Expand All @@ -43,6 +40,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;

public class TrieLogPrunerTest {

Expand Down Expand Up @@ -82,6 +80,53 @@ public void initialize_preloads_queue_and_prunes_orphaned_blocks() {
verify(worldState, times(1)).pruneTrieLog(header2.getBlockHash());
}

@Test
public void preloadQueueWithTimeout_handles_timeout_during_streamTrieLogKeys() {
// Given
final int timeoutInSeconds = 1;
final long timeoutInMillis = timeoutInSeconds * 1000;
final int loadingLimit = 2;
TrieLogPruner trieLogPruner =
new TrieLogPruner(
worldState, blockchain, executeAsync, 3, loadingLimit, false, new NoOpMetricsSystem());

// Simulate a long-running operation
when(worldState.streamTrieLogKeys(loadingLimit))
.thenAnswer(new AnswersWithDelay(timeoutInMillis * 2, invocation -> Stream.empty()));

// When
long startTime = System.currentTimeMillis();
trieLogPruner.preloadQueueWithTimeout(timeoutInSeconds);
long elapsedTime = System.currentTimeMillis() - startTime;

// Then
assertThat(elapsedTime).isLessThan(timeoutInMillis * 2);
}

@Test
public void preloadQueueWithTimeout_handles_timeout_during_getBlockHeader() {
// Given
final int timeoutInSeconds = 1;
final long timeoutInMillis = timeoutInSeconds * 1000;
TrieLogPruner trieLogPruner = setupPrunerAndFinalizedBlock(3, 1);

// Simulate a long-running operation
when(blockchain.getBlockHeader(any(Hash.class)))
// delay on first invocation, then return empty
.thenAnswer(new AnswersWithDelay(timeoutInMillis * 2, invocation -> Optional.empty()))
.thenReturn(Optional.empty());

// When
long startTime = System.currentTimeMillis();
trieLogPruner.preloadQueueWithTimeout(timeoutInSeconds);
long elapsedTime = System.currentTimeMillis() - startTime;

// Then
assertThat(elapsedTime).isLessThan(timeoutInMillis * 2);
verify(worldState, times(1)).pruneTrieLog(key(1));
verify(worldState, times(1)).pruneTrieLog(key(2));
}

@Test
public void trieLogs_pruned_in_reverse_order_within_pruning_window() {
// Given
Expand Down