From 3344dd30657a18c39e87096cd2763a2ac910d863 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Tue, 3 Jan 2023 11:41:59 -0800 Subject: [PATCH 1/9] initial stab at a light touch worldstate resync behavior Signed-off-by: garyschulte --- .../CheckpointDownloaderFactory.java | 9 +++++++- .../worldstate/FastDownloaderFactory.java | 23 ++++++++++++++++++- .../sync/snapsync/SnapDownloaderFactory.java | 9 +++++++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 781162a36f2..1198ab31a97 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -79,7 +79,14 @@ public static Optional> createCheckpointDownloader( final FastSyncState fastSyncState = fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); - if (fastSyncState.getPivotBlockHeader().isEmpty() + + final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); + if (shouldResync) { + snapContext.clear(); + } + + if (!shouldResync + && fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() != BlockHeader.GENESIS_BLOCK_NUMBER) { LOG.info( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index b7e2bbf4a76..67cd4a7f7dc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -40,6 +40,7 @@ import java.nio.file.Path; import java.time.Clock; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.slf4j.Logger; @@ -48,6 +49,7 @@ public class FastDownloaderFactory { protected static final String FAST_SYNC_FOLDER = "fastsync"; + protected static final String FAST_SYNC_RESYNC = "resync"; private static final Logger LOG = LoggerFactory.getLogger(FastDownloaderFactory.class); @@ -80,7 +82,11 @@ public static Optional> create( final FastSyncState fastSyncState = fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); - if (fastSyncState.getPivotBlockHeader().isEmpty() + + final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); + + if (!shouldResync + && fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() != BlockHeader.GENESIS_BLOCK_NUMBER) { LOG.info( @@ -157,6 +163,21 @@ protected static void ensureDirectoryExists(final File dir) { } } + static final AtomicBoolean shouldResync = new AtomicBoolean(false); + + protected static boolean shouldResyncWorldstate(final Path resync) { + try { + if (resync.toFile().exists() && !shouldResync.get()) { + deleteFile(resync); + LOG.info("Triggering resync of worldstate, removed {}", resync); + shouldResync.set(true); + } + } catch (Exception ex) { + LOG.error("Not starting resync, unable to remove resync file: {}", resync); + } + return shouldResync.get(); + } + private static InMemoryTasksPriorityQueues createWorldStateDownloaderTaskCollection( final MetricsSystem metricsSystem, final int worldStateTaskCacheSize) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index e977eb736c2..725de7249a6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -75,7 +75,14 @@ public static Optional> createSnapDownloader( final FastSyncState fastSyncState = fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); - if (fastSyncState.getPivotBlockHeader().isEmpty() + + final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); + if (shouldResync) { + snapContext.clear(); + } + + if (!shouldResync + && fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() != BlockHeader.GENESIS_BLOCK_NUMBER) { LOG.info( From fa57cacb456ed9d6bf31790ff9dc167f52199eda Mon Sep 17 00:00:00 2001 From: garyschulte Date: Tue, 3 Jan 2023 12:36:42 -0800 Subject: [PATCH 2/9] added explicit worldstate clear to FastSyncDownloaderFactory Signed-off-by: garyschulte --- .../eth/sync/fastsync/worldstate/FastDownloaderFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index 67cd4a7f7dc..b99b9274b19 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -85,14 +85,16 @@ public static Optional> create( final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); - if (!shouldResync - && fastSyncState.getPivotBlockHeader().isEmpty() + if (shouldResync) { + worldStateStorage.clear(); + } else if (fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() != BlockHeader.GENESIS_BLOCK_NUMBER) { LOG.info( "Fast sync was requested, but cannot be enabled because the local blockchain is not empty."); return Optional.empty(); } + if (worldStateStorage instanceof BonsaiWorldStateKeyValueStorage) { worldStateStorage.clearFlatDatabase(); } else { From d300685c49b6da5ecf20ae62fb8641b2a3f1239b Mon Sep 17 00:00:00 2001 From: garyschulte Date: Wed, 4 Jan 2023 16:20:42 -0800 Subject: [PATCH 3/9] explicitly clear worldstate for snap and checkpoint rather than relying on subsequent truncation at snap sync start Signed-off-by: garyschulte --- .../eth/sync/checkpointsync/CheckpointDownloaderFactory.java | 1 + .../eth/sync/fastsync/worldstate/FastDownloaderFactory.java | 2 +- .../besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 1198ab31a97..052b8d4c9a5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -83,6 +83,7 @@ public static Optional> createCheckpointDownloader( final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); if (shouldResync) { snapContext.clear(); + worldStateStorage.clear(); } if (!shouldResync diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index b99b9274b19..c59086b7dde 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -49,7 +49,7 @@ public class FastDownloaderFactory { protected static final String FAST_SYNC_FOLDER = "fastsync"; - protected static final String FAST_SYNC_RESYNC = "resync"; + protected static final String FAST_SYNC_RESYNC = FAST_SYNC_FOLDER + "/resync"; private static final Logger LOG = LoggerFactory.getLogger(FastDownloaderFactory.class); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 725de7249a6..1cb8ab03c5c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -79,6 +79,7 @@ public static Optional> createSnapDownloader( final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); if (shouldResync) { snapContext.clear(); + worldStateStorage.clear(); } if (!shouldResync From 7c150f6df0a3228a15d7ca0ba1776a3fe0a45a79 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Sun, 8 Jan 2023 07:37:31 -0800 Subject: [PATCH 4/9] use debug rpc endpoint to resync worldstate Signed-off-by: garyschulte --- .../besu/ethereum/api/jsonrpc/RpcMethod.java | 1 + .../methods/DebugResyncWorldstate.java | 26 +++++ .../jsonrpc/methods/DebugJsonRpcMethods.java | 6 + .../methods/JsonRpcMethodsFactory.java | 1 + .../besu/ethereum/core/Synchronizer.java | 2 + .../eth/sync/DefaultSynchronizer.java | 109 +++++++++++------- .../CheckpointDownloaderFactory.java | 8 +- .../worldstate/FastDownloaderFactory.java | 23 +--- .../sync/snapsync/SnapDownloaderFactory.java | 8 +- .../fastsync/FastDownloaderFactoryTest.java | 15 ++- .../ethereum/retesteth/DummySynchronizer.java | 5 + 11 files changed, 131 insertions(+), 73 deletions(-) create mode 100644 ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/RpcMethod.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/RpcMethod.java index 4d8918bc6cd..b0d321d03fe 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/RpcMethod.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/RpcMethod.java @@ -34,6 +34,7 @@ public enum RpcMethod { CLIQUE_GET_SIGNER_METRICS("clique_getSignerMetrics"), DEBUG_ACCOUNT_AT("debug_accountAt"), DEBUG_METRICS("debug_metrics"), + DEBUG_RESYNC_WORLDSTATE("debug_resyncWorldState"), DEBUG_SET_HEAD("debug_setHead"), DEBUG_REPLAY_BLOCK("debug_replayBlock"), DEBUG_STORAGE_RANGE_AT("debug_storageRangeAt"), diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java new file mode 100644 index 00000000000..0c3380084d0 --- /dev/null +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java @@ -0,0 +1,26 @@ +package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods; + +import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod; +import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext; +import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; +import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; +import org.hyperledger.besu.ethereum.core.Synchronizer; + +public class DebugResyncWorldstate implements JsonRpcMethod { + private final Synchronizer synchronizer; + + public DebugResyncWorldstate(final Synchronizer synchronizer) { + this.synchronizer = synchronizer; + } + + @Override + public String getName() { + return RpcMethod.DEBUG_RESYNC_WORLDSTATE.getMethodName(); + } + + @Override + public JsonRpcResponse response(final JsonRpcRequestContext request) { + return new JsonRpcSuccessResponse(request.getRequest().getId(), synchronizer.resyncWorldState()); + } + +} diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java index c4a1a474ebd..ab6f87f253c 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugBatchSendRawTransaction; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugGetBadBlocks; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugMetrics; +import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugResyncWorldstate; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugSetHead; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugStandardTraceBadBlockToFile; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugStandardTraceBlockToFile; @@ -36,6 +37,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.TransactionTracer; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.BlockResultFactory; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; +import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; @@ -53,6 +55,7 @@ public class DebugJsonRpcMethods extends ApiGroupJsonRpcMethods { private final ProtocolSchedule protocolSchedule; private final ObservableMetricsSystem metricsSystem; private final TransactionPool transactionPool; + private final Synchronizer synchronizer; private final Path dataDir; DebugJsonRpcMethods( @@ -61,12 +64,14 @@ public class DebugJsonRpcMethods extends ApiGroupJsonRpcMethods { final ProtocolSchedule protocolSchedule, final ObservableMetricsSystem metricsSystem, final TransactionPool transactionPool, + final Synchronizer synchronizer, final Path dataDir) { this.blockchainQueries = blockchainQueries; this.protocolContext = protocolContext; this.protocolSchedule = protocolSchedule; this.metricsSystem = metricsSystem; this.transactionPool = transactionPool; + this.synchronizer = synchronizer; this.dataDir = dataDir; } @@ -88,6 +93,7 @@ protected Map create() { new DebugAccountRange(blockchainQueries), new DebugStorageRangeAt(blockchainQueries, blockReplay), new DebugMetrics(metricsSystem), + new DebugResyncWorldstate(synchronizer), new DebugTraceBlock( () -> new BlockTracer(blockReplay), ScheduleBasedBlockHeaderFunctions.create(protocolSchedule), diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/JsonRpcMethodsFactory.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/JsonRpcMethodsFactory.java index 3e8c25fac99..2f250fea0ff 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/JsonRpcMethodsFactory.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/JsonRpcMethodsFactory.java @@ -100,6 +100,7 @@ public Map methods( protocolSchedule, metricsSystem, transactionPool, + synchronizer, dataDir), new EeaJsonRpcMethods( blockchainQueries, protocolSchedule, transactionPool, privacyParameters), diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java index 72f9de06dc5..1ca313d5303 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java @@ -40,6 +40,8 @@ public interface Synchronizer { */ Optional getSyncStatus(); + boolean resyncWorldState(); + long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener); boolean unsubscribeSyncStatus(long observerId); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 5ce55ce7177..3e232357a31 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -50,6 +50,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,8 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi private final SyncState syncState; private final AtomicBoolean running = new AtomicBoolean(false); private final Optional blockPropagationManager; - private final Optional> fastSyncDownloader; + private final Function>> fastSyncFactory; + private Optional> fastSyncDownloader; private final Optional fullSyncDownloader; private final EthContext ethContext; private final ProtocolContext protocolContext; @@ -133,48 +135,57 @@ public DefaultSynchronizer( terminationCondition)); if (SyncMode.FAST.equals(syncConfig.getSyncMode())) { - this.fastSyncDownloader = - FastDownloaderFactory.create( - pivotBlockSelector, - syncConfig, - dataDirectory, - protocolSchedule, - protocolContext, - metricsSystem, - ethContext, - worldStateStorage, - syncState, - clock); + this.fastSyncFactory = + (isResync) -> + FastDownloaderFactory.create( + pivotBlockSelector, + syncConfig, + dataDirectory, + protocolSchedule, + protocolContext, + metricsSystem, + ethContext, + worldStateStorage, + syncState, + clock, + isResync); } else if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())) { - this.fastSyncDownloader = - CheckpointDownloaderFactory.createCheckpointDownloader( - new SnapPersistedContext(storageProvider), - pivotBlockSelector, - syncConfig, - dataDirectory, - protocolSchedule, - protocolContext, - metricsSystem, - ethContext, - worldStateStorage, - syncState, - clock); + this.fastSyncFactory = + (isResync) -> + CheckpointDownloaderFactory.createCheckpointDownloader( + new SnapPersistedContext(storageProvider), + pivotBlockSelector, + syncConfig, + dataDirectory, + protocolSchedule, + protocolContext, + metricsSystem, + ethContext, + worldStateStorage, + syncState, + clock, + isResync); } else { - this.fastSyncDownloader = - SnapDownloaderFactory.createSnapDownloader( - new SnapPersistedContext(storageProvider), - pivotBlockSelector, - syncConfig, - dataDirectory, - protocolSchedule, - protocolContext, - metricsSystem, - ethContext, - worldStateStorage, - syncState, - clock); + this.fastSyncFactory = + (isResync) -> + SnapDownloaderFactory.createSnapDownloader( + new SnapPersistedContext(storageProvider), + pivotBlockSelector, + syncConfig, + dataDirectory, + protocolSchedule, + protocolContext, + metricsSystem, + ethContext, + worldStateStorage, + syncState, + clock, + isResync); } + // create a non-resync fast sync downloader: + this.fastSyncDownloader = this.fastSyncFactory.apply(false); + metricsSystem.createLongGauge( BesuMetricCategory.ETHEREUM, "best_known_block_number", @@ -209,7 +220,6 @@ public CompletableFuture start() { CompletableFuture future; if (fastSyncDownloader.isPresent()) { future = fastSyncDownloader.get().start().thenCompose(this::handleSyncResult); - } else { syncState.markInitialSyncPhaseAsDone(); enableFallbackNodeFinder(); @@ -305,6 +315,25 @@ public Optional getSyncStatus() { return syncState.syncStatus(); } + @Override + public boolean resyncWorldState() { + // if we do not have a fast sync mode configured, return false + if (!fastSyncDownloader.isPresent()) { + return false; + } + + // if sync is running currently, stop it and delete the fast sync state + if (running.get()) { + stop(); + fastSyncDownloader.get().deleteFastSyncState(); + } + + // recreate fast sync with resync and start + this.fastSyncDownloader = this.fastSyncFactory.apply(true); + start(); + return true; + } + @Override public long subscribeSyncStatus(final SyncStatusListener listener) { checkNotNull(listener); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java index 052b8d4c9a5..3627b358e02 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/checkpointsync/CheckpointDownloaderFactory.java @@ -60,7 +60,8 @@ public static Optional> createCheckpointDownloader( final EthContext ethContext, final WorldStateStorage worldStateStorage, final SyncState syncState, - final Clock clock) { + final Clock clock, + final boolean isResync) { final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER); final FastSyncStateStorage fastSyncStateStorage = @@ -80,13 +81,12 @@ public static Optional> createCheckpointDownloader( final FastSyncState fastSyncState = fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); - final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); - if (shouldResync) { + if (isResync) { snapContext.clear(); worldStateStorage.clear(); } - if (!shouldResync + if (!isResync && fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() != BlockHeader.GENESIS_BLOCK_NUMBER) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index c59086b7dde..dbf9579b01f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -49,7 +49,6 @@ public class FastDownloaderFactory { protected static final String FAST_SYNC_FOLDER = "fastsync"; - protected static final String FAST_SYNC_RESYNC = FAST_SYNC_FOLDER + "/resync"; private static final Logger LOG = LoggerFactory.getLogger(FastDownloaderFactory.class); @@ -63,7 +62,8 @@ public static Optional> create( final EthContext ethContext, final WorldStateStorage worldStateStorage, final SyncState syncState, - final Clock clock) { + final Clock clock, + final boolean isResync) { final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER); final FastSyncStateStorage fastSyncStateStorage = @@ -83,9 +83,7 @@ public static Optional> create( final FastSyncState fastSyncState = fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); - final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); - - if (shouldResync) { + if (isResync) { worldStateStorage.clear(); } else if (fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() @@ -165,21 +163,6 @@ protected static void ensureDirectoryExists(final File dir) { } } - static final AtomicBoolean shouldResync = new AtomicBoolean(false); - - protected static boolean shouldResyncWorldstate(final Path resync) { - try { - if (resync.toFile().exists() && !shouldResync.get()) { - deleteFile(resync); - LOG.info("Triggering resync of worldstate, removed {}", resync); - shouldResync.set(true); - } - } catch (Exception ex) { - LOG.error("Not starting resync, unable to remove resync file: {}", resync); - } - return shouldResync.get(); - } - private static InMemoryTasksPriorityQueues createWorldStateDownloaderTaskCollection( final MetricsSystem metricsSystem, final int worldStateTaskCacheSize) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java index 1cb8ab03c5c..9c85a743e7c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapDownloaderFactory.java @@ -56,7 +56,8 @@ public static Optional> createSnapDownloader( final EthContext ethContext, final WorldStateStorage worldStateStorage, final SyncState syncState, - final Clock clock) { + final Clock clock, + final boolean isResync) { final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER); final FastSyncStateStorage fastSyncStateStorage = @@ -76,13 +77,12 @@ public static Optional> createSnapDownloader( final FastSyncState fastSyncState = fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)); - final boolean shouldResync = shouldResyncWorldstate(dataDirectory.resolve(FAST_SYNC_RESYNC)); - if (shouldResync) { + if (isResync) { snapContext.clear(); worldStateStorage.clear(); } - if (!shouldResync + if (!isResync && fastSyncState.getPivotBlockHeader().isEmpty() && protocolContext.getBlockchain().getChainHeadBlockNumber() != BlockHeader.GENESIS_BLOCK_NUMBER) { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java index 7cdcd8d3dc4..8f95a0be90d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastDownloaderFactoryTest.java @@ -80,7 +80,8 @@ public void shouldThrowIfSyncModeChangedWhileFastSyncIncomplete() { ethContext, worldStateStorage, syncState, - clock)) + clock, + false)) .isInstanceOf(IllegalStateException.class); } @@ -101,7 +102,8 @@ public void shouldNotThrowIfSyncModeChangedWhileFastSyncComplete() { ethContext, worldStateStorage, syncState, - clock); + clock, + false); assertThat(result).isEmpty(); } @@ -125,7 +127,8 @@ public void shouldNotThrowWhenFastSyncModeRequested() throws NoSuchFieldExceptio ethContext, worldStateStorage, syncState, - clock); + clock, + false); verify(mutableBlockchain).getChainHeadBlockNumber(); } @@ -155,7 +158,8 @@ public void shouldClearWorldStateDuringFastSyncWhenStateQueDirectoryExists() thr ethContext, worldStateStorage, syncState, - clock); + clock, + false); verify(worldStateStorage).clear(); assertThat(Files.exists(stateQueueDir)).isFalse(); @@ -187,7 +191,8 @@ public void shouldCrashWhenStateQueueIsNotDirectory() throws IOException { ethContext, worldStateStorage, syncState, - clock)) + clock, + false)) .isInstanceOf(IllegalStateException.class); } diff --git a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java index 1f94d49e246..b523d2ba542 100644 --- a/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java +++ b/ethereum/retesteth/src/main/java/org/hyperledger/besu/ethereum/retesteth/DummySynchronizer.java @@ -44,6 +44,11 @@ public Optional getSyncStatus() { return Optional.empty(); } + @Override + public boolean resyncWorldState() { + return false; + } + @Override public long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener) { return 0; From 9f34c6babe422389eb8f6d3a02c17f260314739f Mon Sep 17 00:00:00 2001 From: garyschulte Date: Mon, 9 Jan 2023 15:29:24 -0800 Subject: [PATCH 5/9] reset sync status on worldstate resync, disable/enable transaction handling during/after resync Signed-off-by: garyschulte --- .../methods/DebugResyncWorldstate.java | 4 +- .../eth/sync/DefaultSynchronizer.java | 8 +-- .../backwardsync/BackwardsSyncAlgorithm.java | 33 ++++++++---- .../worldstate/FastDownloaderFactory.java | 1 - .../ethereum/eth/sync/state/SyncState.java | 5 ++ ...PooledTransactionHashesMessageHandler.java | 20 +++++-- .../eth/transactions/TransactionPool.java | 22 ++++++-- .../transactions/TransactionPoolFactory.java | 54 ++++++++++++------- .../TransactionsMessageHandler.java | 20 +++++-- plugin-api/build.gradle | 2 +- .../besu/plugin/services/BesuEvents.java | 3 ++ 11 files changed, 120 insertions(+), 52 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java index 0c3380084d0..93f1f917a63 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java @@ -20,7 +20,7 @@ public String getName() { @Override public JsonRpcResponse response(final JsonRpcRequestContext request) { - return new JsonRpcSuccessResponse(request.getRequest().getId(), synchronizer.resyncWorldState()); + return new JsonRpcSuccessResponse( + request.getRequest().getId(), synchronizer.resyncWorldState()); } - } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 3e232357a31..3816fbeba2d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -317,18 +317,14 @@ public Optional getSyncStatus() { @Override public boolean resyncWorldState() { - // if we do not have a fast sync mode configured, return false - if (!fastSyncDownloader.isPresent()) { - return false; - } - // if sync is running currently, stop it and delete the fast sync state - if (running.get()) { + if (fastSyncDownloader.isPresent() && running.get()) { stop(); fastSyncDownloader.get().deleteFastSyncState(); } // recreate fast sync with resync and start + this.syncState.markInitialSyncRestart(); this.fastSyncDownloader = this.fastSyncFactory.apply(true); start(); return true; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardsSyncAlgorithm.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardsSyncAlgorithm.java index 648c706f1f9..7bcb8a8b8dd 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardsSyncAlgorithm.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardsSyncAlgorithm.java @@ -24,21 +24,25 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask; +import org.hyperledger.besu.plugin.services.BesuEvents; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; -public class BackwardsSyncAlgorithm { +public class BackwardsSyncAlgorithm implements BesuEvents.InitialSyncCompletionListener { private static final Logger LOG = getLogger(BackwardsSyncAlgorithm.class); private final BackwardSyncContext context; private final FinalBlockConfirmation finalBlockConfirmation; + private final AtomicReference latch = + new AtomicReference<>(new CountDownLatch(1)); private volatile boolean finished = false; public BackwardsSyncAlgorithm( @@ -125,19 +129,16 @@ protected CompletableFuture executeForwardAsync() { @VisibleForTesting protected CompletableFuture waitForReady() { - final CountDownLatch latch = new CountDownLatch(1); - final long idTTD = - context.getSyncState().subscribeTTDReached(reached -> countDownIfReady(latch)); - final long idIS = - context.getSyncState().subscribeCompletionReached(() -> countDownIfReady(latch)); - return CompletableFuture.runAsync(() -> checkReadiness(latch, idTTD, idIS)); + final long idTTD = context.getSyncState().subscribeTTDReached(reached -> countDownIfReady()); + final long idIS = context.getSyncState().subscribeCompletionReached(this); + return CompletableFuture.runAsync(() -> checkReadiness(idTTD, idIS)); } - private void checkReadiness(final CountDownLatch latch, final long idTTD, final long idIS) { + private void checkReadiness(final long idTTD, final long idIS) { try { if (!context.isReady()) { LOG.debug("Waiting for preconditions..."); - final boolean await = latch.await(2, TimeUnit.MINUTES); + final boolean await = latch.get().await(2, TimeUnit.MINUTES); if (await) { LOG.debug("Preconditions meet, ensure at least one peer is connected"); waitForPeers(1).get(); @@ -156,9 +157,9 @@ private void checkReadiness(final CountDownLatch latch, final long idTTD, final } } - private void countDownIfReady(final CountDownLatch latch) { + private void countDownIfReady() { if (context.isReady()) { - latch.countDown(); + latch.get().countDown(); } } @@ -167,4 +168,14 @@ private CompletableFuture waitForPeers(final int count) { WaitForPeersTask.create(context.getEthContext(), count, context.getMetricsSystem()); return waitForPeersTask.run(); } + + @Override + public void onInitialSyncCompleted() { + countDownIfReady(); + } + + @Override + public void onInitialSyncRestart() { + latch.set(new CountDownLatch(1)); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java index dbf9579b01f..81e0b4d8bbb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/FastDownloaderFactory.java @@ -40,7 +40,6 @@ import java.nio.file.Path; import java.time.Clock; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.slf4j.Logger; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java index 026453c1bee..3a6ea374892 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java @@ -319,4 +319,9 @@ public void markInitialSyncPhaseAsDone() { public boolean isInitialSyncPhaseDone() { return isInitialSyncPhaseDone; } + + public void markInitialSyncRestart() { + isInitialSyncPhaseDone = false; + completionListenerSubscribers.forEach(InitialSyncCompletionListener::onInitialSyncRestart); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java index 003183a86c8..ac2a98f9c52 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java @@ -25,12 +25,14 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; class NewPooledTransactionHashesMessageHandler implements EthMessages.MessageCallback { private final NewPooledTransactionHashesMessageProcessor transactionsMessageProcessor; private final EthScheduler scheduler; private final Duration txMsgKeepAlive; + private final AtomicBoolean isEnabled = new AtomicBoolean(true); public NewPooledTransactionHashesMessageHandler( final EthScheduler scheduler, @@ -47,9 +49,19 @@ public void exec(final EthMessage message) { final NewPooledTransactionHashesMessage transactionsMessage = NewPooledTransactionHashesMessage.readFrom(message.getData(), capability); final Instant startedAt = now(); - scheduler.scheduleTxWorkerTask( - () -> - transactionsMessageProcessor.processNewPooledTransactionHashesMessage( - message.getPeer(), transactionsMessage, startedAt, txMsgKeepAlive)); + if (isEnabled.get()) { + scheduler.scheduleTxWorkerTask( + () -> + transactionsMessageProcessor.processNewPooledTransactionHashesMessage( + message.getPeer(), transactionsMessage, startedAt, txMsgKeepAlive)); + } + } + + public void enable() { + isEnabled.set(true); + } + + public void disable() { + isEnabled.set(false); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index cb34f3bc972..d020006feae 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -54,6 +54,7 @@ import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -79,6 +80,7 @@ public class TransactionPool implements BlockAddedObserver { private final MiningParameters miningParameters; private final LabelledMetric duplicateTransactionCounter; private final TransactionPoolConfiguration configuration; + private final AtomicBoolean isPoolEnabled = new AtomicBoolean(true); public TransactionPool( final AbstractPendingTransactionsSorter pendingTransactions, @@ -222,9 +224,11 @@ public void unsubscribeDroppedTransactions(final long id) { @Override public void onBlockAdded(final BlockAddedEvent event) { LOG.trace("Block added event {}", event); - event.getAddedTransactions().forEach(pendingTransactions::transactionAddedToBlock); - pendingTransactions.manageBlockAdded(event.getBlock()); - reAddTransactions(event.getRemovedTransactions()); + if (isPoolEnabled.get()) { + event.getAddedTransactions().forEach(pendingTransactions::transactionAddedToBlock); + pendingTransactions.manageBlockAdded(event.getBlock()); + reAddTransactions(event.getRemovedTransactions()); + } } private void reAddTransactions(final List reAddTransactions) { @@ -426,4 +430,16 @@ static ValidationResultAndAccount invalid(final TransactionInvalidReason reason) return new ValidationResultAndAccount(ValidationResult.invalid(reason)); } } + + public void enable() { + isPoolEnabled.set(true); + } + + public void disable() { + // should disable: + // block added listener / behavior + // transactionsMessageHandler + // pooledTransactionsMessageHandler + isPoolEnabled.set(false); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index b2aea31c7a1..7d1497f9c7c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; +import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.time.Clock; @@ -116,31 +117,44 @@ static TransactionPool createTransactionPool( metricsSystem), transactionPoolConfiguration.getTxMessageKeepAliveSeconds()); - if (syncState.isInitialSyncPhaseDone()) { - enableTransactionPool( - protocolContext, - ethContext, - transactionTracker, - transactionPool, - transactionsMessageHandler, - pooledTransactionsMessageHandler); - } else { - syncState.subscribeCompletionReached( - () -> { - enableTransactionPool( - protocolContext, - ethContext, - transactionTracker, - transactionPool, - transactionsMessageHandler, - pooledTransactionsMessageHandler); - }); + subscribeTransactionHandlers( + protocolContext, + ethContext, + transactionTracker, + transactionPool, + transactionsMessageHandler, + pooledTransactionsMessageHandler); + + if (!syncState.isInitialSyncPhaseDone()) { + LOG.info("Disabling transaction handling during initial sync"); + pooledTransactionsMessageHandler.disable(); + transactionsMessageHandler.disable(); + transactionPool.disable(); } + syncState.subscribeCompletionReached( + new BesuEvents.InitialSyncCompletionListener() { + @Override + public void onInitialSyncCompleted() { + LOG.info("Enabling transaction handling following initial sync"); + transactionPool.enable(); + transactionsMessageHandler.enable(); + pooledTransactionsMessageHandler.enable(); + } + + @Override + public void onInitialSyncRestart() { + LOG.info("Disabling transaction handling during re-sync"); + pooledTransactionsMessageHandler.disable(); + transactionsMessageHandler.disable(); + transactionPool.disable(); + } + }); + return transactionPool; } - private static void enableTransactionPool( + private static void subscribeTransactionHandlers( final ProtocolContext protocolContext, final EthContext ethContext, final PeerTransactionTracker transactionTracker, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java index 372778fc2c2..af1b3a4d73a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java @@ -23,12 +23,14 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; class TransactionsMessageHandler implements EthMessages.MessageCallback { private final TransactionsMessageProcessor transactionsMessageProcessor; private final EthScheduler scheduler; private final Duration txMsgKeepAlive; + private final AtomicBoolean isEnabled = new AtomicBoolean(true); public TransactionsMessageHandler( final EthScheduler scheduler, @@ -43,9 +45,19 @@ public TransactionsMessageHandler( public void exec(final EthMessage message) { final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData()); final Instant startedAt = now(); - scheduler.scheduleTxWorkerTask( - () -> - transactionsMessageProcessor.processTransactionsMessage( - message.getPeer(), transactionsMessage, startedAt, txMsgKeepAlive)); + if (isEnabled.get()) { + scheduler.scheduleTxWorkerTask( + () -> + transactionsMessageProcessor.processTransactionsMessage( + message.getPeer(), transactionsMessage, startedAt, txMsgKeepAlive)); + } + } + + public void disable() { + isEnabled.set(false); + } + + public void enable() { + isEnabled.set(true); } } diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index 648fd64349d..4a31cb83470 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -66,7 +66,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 'nBDCEeFH318uhGZEBmuTGOfYLI1+9tLDyjn/RDe5saI=' + knownHash = 'vFf6OIL506w5+DvYxs7btZcCnVpkuRw5WjzWUkrPeu4=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java index 79fa42d5fda..3e60d244c7d 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java @@ -253,5 +253,8 @@ interface InitialSyncCompletionListener { /** Emitted when initial sync finishes */ void onInitialSyncCompleted(); + + /** Emitted when initial sync restarts */ + void onInitialSyncRestart(); } } From 8d42daaf67008068e77673de7fcdfbb88773634a Mon Sep 17 00:00:00 2001 From: garyschulte Date: Mon, 9 Jan 2023 16:12:17 -0800 Subject: [PATCH 6/9] reset bad blocks manager on resync Signed-off-by: garyschulte --- .../methods/DebugResyncWorldstate.java | 29 ++++++++++++++++++- .../jsonrpc/methods/DebugJsonRpcMethods.java | 2 +- .../besu/ethereum/chain/BadBlockManager.java | 6 ++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java index 93f1f917a63..578fdbf0708 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/DebugResyncWorldstate.java @@ -1,16 +1,39 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods; import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.Synchronizer; +import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; public class DebugResyncWorldstate implements JsonRpcMethod { private final Synchronizer synchronizer; + private final ProtocolSchedule protocolSchedule; + private final Blockchain blockchain; - public DebugResyncWorldstate(final Synchronizer synchronizer) { + public DebugResyncWorldstate( + final ProtocolSchedule protocolSchedule, + final Blockchain blockchain, + final Synchronizer synchronizer) { this.synchronizer = synchronizer; + this.protocolSchedule = protocolSchedule; + this.blockchain = blockchain; } @Override @@ -20,6 +43,10 @@ public String getName() { @Override public JsonRpcResponse response(final JsonRpcRequestContext request) { + protocolSchedule + .getByBlockNumber(blockchain.getChainHeadBlockNumber()) + .getBadBlocksManager() + .reset(); return new JsonRpcSuccessResponse( request.getRequest().getId(), synchronizer.resyncWorldState()); } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java index ab6f87f253c..33cc16be0cf 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/methods/DebugJsonRpcMethods.java @@ -93,7 +93,7 @@ protected Map create() { new DebugAccountRange(blockchainQueries), new DebugStorageRangeAt(blockchainQueries, blockReplay), new DebugMetrics(metricsSystem), - new DebugResyncWorldstate(synchronizer), + new DebugResyncWorldstate(protocolSchedule, protocolContext.getBlockchain(), synchronizer), new DebugTraceBlock( () -> new BlockTracer(blockReplay), ScheduleBasedBlockHeaderFunctions.create(protocolSchedule), diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BadBlockManager.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BadBlockManager.java index f2fd2a2501a..da7055b51c3 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BadBlockManager.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BadBlockManager.java @@ -49,6 +49,12 @@ public void addBadBlock(final Block badBlock, final Optional cause) { } } + public void reset() { + this.badBlocks.invalidateAll(); + this.badHeaders.invalidateAll(); + this.latestValidHashes.invalidateAll(); + } + /** * Return all invalid blocks * From e891c69cec55f53431ef5a57791953a02224e0a4 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Mon, 9 Jan 2023 17:39:00 -0800 Subject: [PATCH 7/9] TransactionPoolFactoryTest's and accessor for isEnabled() Signed-off-by: garyschulte --- ...PooledTransactionHashesMessageHandler.java | 8 +- .../eth/transactions/TransactionPool.java | 8 +- .../transactions/TransactionPoolFactory.java | 18 ++-- .../TransactionsMessageHandler.java | 8 +- .../TransactionPoolFactoryTest.java | 98 +++++++++---------- 5 files changed, 74 insertions(+), 66 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java index ac2a98f9c52..a346b07e414 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/NewPooledTransactionHashesMessageHandler.java @@ -57,11 +57,15 @@ public void exec(final EthMessage message) { } } - public void enable() { + public void setEnabled() { isEnabled.set(true); } - public void disable() { + public void setDisabled() { isEnabled.set(false); } + + public boolean isEnabled() { + return isEnabled.get(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index d020006feae..45caff8ab6d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -431,15 +431,19 @@ static ValidationResultAndAccount invalid(final TransactionInvalidReason reason) } } - public void enable() { + public void setEnabled() { isPoolEnabled.set(true); } - public void disable() { + public void setDisabled() { // should disable: // block added listener / behavior // transactionsMessageHandler // pooledTransactionsMessageHandler isPoolEnabled.set(false); } + + public boolean isEnabled() { + return isPoolEnabled.get(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 7d1497f9c7c..37eba842536 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -127,9 +127,9 @@ static TransactionPool createTransactionPool( if (!syncState.isInitialSyncPhaseDone()) { LOG.info("Disabling transaction handling during initial sync"); - pooledTransactionsMessageHandler.disable(); - transactionsMessageHandler.disable(); - transactionPool.disable(); + pooledTransactionsMessageHandler.setDisabled(); + transactionsMessageHandler.setDisabled(); + transactionPool.setDisabled(); } syncState.subscribeCompletionReached( @@ -137,17 +137,17 @@ static TransactionPool createTransactionPool( @Override public void onInitialSyncCompleted() { LOG.info("Enabling transaction handling following initial sync"); - transactionPool.enable(); - transactionsMessageHandler.enable(); - pooledTransactionsMessageHandler.enable(); + transactionPool.setEnabled(); + transactionsMessageHandler.setEnabled(); + pooledTransactionsMessageHandler.setEnabled(); } @Override public void onInitialSyncRestart() { LOG.info("Disabling transaction handling during re-sync"); - pooledTransactionsMessageHandler.disable(); - transactionsMessageHandler.disable(); - transactionPool.disable(); + pooledTransactionsMessageHandler.setDisabled(); + transactionsMessageHandler.setDisabled(); + transactionPool.setDisabled(); } }); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java index af1b3a4d73a..8193de1e0ce 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionsMessageHandler.java @@ -53,11 +53,15 @@ public void exec(final EthMessage message) { } } - public void disable() { + public void setDisabled() { isEnabled.set(false); } - public void enable() { + public void setEnabled() { isEnabled.set(true); } + + public boolean isEnabled() { + return isEnabled.get(); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java index 9c45d60e604..cd17fd07adf 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactoryTest.java @@ -20,9 +20,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import org.hyperledger.besu.datatypes.Hash; @@ -37,13 +35,11 @@ import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; -import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter; import org.hyperledger.besu.ethereum.forkid.ForkIdManager; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; -import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.testutil.TestClock; @@ -52,6 +48,7 @@ import java.util.Collections; import java.util.Optional; +import org.assertj.core.api.Condition; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -98,43 +95,6 @@ public void setup() { when(ethContext.getScheduler()).thenReturn(ethScheduler); } - @Test - public void disconnectNotInvokedBeforeInitialSyncIsDone() { - setupInitialSyncPhase(true); - final RespondingEthPeer ethPeer = - RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); - assertThat(ethPeer.getEthPeer()).isNotNull(); - assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); - ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); - verifyNoInteractions(peerTransactionTracker); - } - - @Test - public void disconnectInvokedAfterInitialSyncIsDone() { - setupInitialSyncPhase(true); - final RespondingEthPeer ethPeer = - RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); - assertThat(ethPeer.getEthPeer()).isNotNull(); - assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); - - syncState.markInitialSyncPhaseAsDone(); - - ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); - verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); - } - - @Test - public void disconnectInvokedIfNoInitialSync() { - setupInitialSyncPhase(false); - final RespondingEthPeer ethPeer = - RespondingEthPeer.builder().ethProtocolManager(ethProtocolManager).build(); - assertThat(ethPeer.getEthPeer()).isNotNull(); - assertThat(ethPeer.getEthPeer().isDisconnected()).isFalse(); - - ethPeer.disconnect(DisconnectMessage.DisconnectReason.CLIENT_QUITTING); - verify(peerTransactionTracker, times(1)).onDisconnect(ethPeer.getEthPeer()); - } - @Test public void notRegisteredToBlockAddedEventBeforeInitialSyncIsDone() { setupInitialSyncPhase(true); @@ -142,7 +102,7 @@ public void notRegisteredToBlockAddedEventBeforeInitialSyncIsDone() { ArgumentCaptor.forClass(BlockAddedObserver.class); verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture()); - assertThat(blockAddedListeners.getAllValues()).doesNotContain(pool); + assertThat(pool.isEnabled()).isFalse(); } @Test @@ -155,6 +115,7 @@ public void registeredToBlockAddedEventAfterInitialSyncIsDone() { verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture()); assertThat(blockAddedListeners.getAllValues()).contains(pool); + assertThat(pool.isEnabled()).isTrue(); } @Test @@ -166,18 +127,31 @@ public void registeredToBlockAddedEventIfNoInitialSync() { verify(blockchain, atLeastOnce()).observeBlockAdded(blockAddedListeners.capture()); assertThat(blockAddedListeners.getAllValues()).contains(pool); + assertThat(pool.isEnabled()).isTrue(); } @Test - public void incomingTransactionMessageHandlersNotRegisteredBeforeInitialSyncIsDone() { + public void incomingTransactionMessageHandlersDisabledBeforeInitialSyncIsDone() { setupInitialSyncPhase(true); ArgumentCaptor messageHandlers = ArgumentCaptor.forClass(EthMessages.MessageCallback.class); - verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture()); + verify(ethMessages, atLeast(2)).subscribe(anyInt(), messageHandlers.capture()); assertThat(messageHandlers.getAllValues()) - .doesNotHaveAnyElementsOfTypes( - TransactionsMessageHandler.class, NewPooledTransactionHashesMessageHandler.class); + .haveAtLeastOne( + new Condition<>( + h -> + h instanceof NewPooledTransactionHashesMessageHandler + && !((NewPooledTransactionHashesMessageHandler) h).isEnabled(), + "pooled transaction hashes handler should be disabled")); + + assertThat(messageHandlers.getAllValues()) + .haveAtLeastOne( + new Condition<>( + h -> + h instanceof TransactionsMessageHandler + && !((TransactionsMessageHandler) h).isEnabled(), + "transaction messages handler should be disabled")); } @Test @@ -187,12 +161,23 @@ public void incomingTransactionMessageHandlersRegisteredAfterInitialSyncIsDone() ArgumentCaptor messageHandlers = ArgumentCaptor.forClass(EthMessages.MessageCallback.class); - verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture()); + verify(ethMessages, atLeast(2)).subscribe(anyInt(), messageHandlers.capture()); assertThat(messageHandlers.getAllValues()) - .hasAtLeastOneElementOfType(TransactionsMessageHandler.class); + .haveAtLeastOne( + new Condition<>( + h -> + h instanceof NewPooledTransactionHashesMessageHandler + && ((NewPooledTransactionHashesMessageHandler) h).isEnabled(), + "pooled transaction hashes handler should be enabled")); + assertThat(messageHandlers.getAllValues()) - .hasAtLeastOneElementOfType(NewPooledTransactionHashesMessageHandler.class); + .haveAtLeastOne( + new Condition<>( + h -> + h instanceof TransactionsMessageHandler + && ((TransactionsMessageHandler) h).isEnabled(), + "transaction messages handler should be enabled")); } @Test @@ -204,9 +189,20 @@ public void incomingTransactionMessageHandlersRegisteredIfNoInitialSync() { verify(ethMessages, atLeast(0)).subscribe(anyInt(), messageHandlers.capture()); assertThat(messageHandlers.getAllValues()) - .hasAtLeastOneElementOfType(TransactionsMessageHandler.class); + .haveAtLeastOne( + new Condition<>( + h -> + h instanceof NewPooledTransactionHashesMessageHandler + && ((NewPooledTransactionHashesMessageHandler) h).isEnabled(), + "pooled transaction hashes handler should be enabled")); + assertThat(messageHandlers.getAllValues()) - .hasAtLeastOneElementOfType(NewPooledTransactionHashesMessageHandler.class); + .haveAtLeastOne( + new Condition<>( + h -> + h instanceof TransactionsMessageHandler + && ((TransactionsMessageHandler) h).isEnabled(), + "transaction messages handler should be enabled")); } private void setupInitialSyncPhase(final boolean hasInitialSyncPhase) { From 19a7145e84ccade964f789d0905a57052129758e Mon Sep 17 00:00:00 2001 From: garyschulte Date: Mon, 9 Jan 2023 17:51:14 -0800 Subject: [PATCH 8/9] remove todo comment Signed-off-by: garyschulte --- .../besu/ethereum/eth/transactions/TransactionPool.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 45caff8ab6d..9c1758bc902 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -436,10 +436,6 @@ public void setEnabled() { } public void setDisabled() { - // should disable: - // block added listener / behavior - // transactionsMessageHandler - // pooledTransactionsMessageHandler isPoolEnabled.set(false); } From d830f2a7e13b40443e480dd11b43e7179076e18e Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Wed, 11 Jan 2023 18:35:56 +0100 Subject: [PATCH 9/9] Reset transaction pool state every time the initial sync is done Signed-off-by: Fabio Di Fabio --- .../ethereum/eth/transactions/PeerTransactionTracker.java | 5 +++++ .../besu/ethereum/eth/transactions/TransactionPool.java | 4 ++++ .../ethereum/eth/transactions/TransactionPoolFactory.java | 2 ++ .../sorter/AbstractPendingTransactionsSorter.java | 6 ++++++ .../sorter/BaseFeePendingTransactionsSorter.java | 7 +++++++ .../sorter/GasPricePendingTransactionsSorter.java | 6 ++++++ .../eth/transactions/sorter/LowestInvalidNonceCache.java | 5 +++++ 7 files changed, 35 insertions(+) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java index 07326672d03..2815a426b6e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/PeerTransactionTracker.java @@ -33,6 +33,11 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback { private final Map> seenTransactions = new ConcurrentHashMap<>(); private final Map> transactionsToSend = new ConcurrentHashMap<>(); + public void reset() { + seenTransactions.clear(); + transactionsToSend.clear(); + } + public synchronized void markTransactionsAsSeen( final EthPeer peer, final Collection transactions) { markTransactionHashesAsSeen(peer, toHashList(transactions)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index 9c1758bc902..3cdbddbfcef 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -112,6 +112,10 @@ void handleConnect(final EthPeer peer) { transactionBroadcaster.relayTransactionPoolTo(peer); } + public void reset() { + pendingTransactions.reset(); + } + public ValidationResult addLocalTransaction( final Transaction transaction) { final ValidationResultAndAccount validationResult = validateLocalTransaction(transaction); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 37eba842536..0e22531d55b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -137,6 +137,8 @@ static TransactionPool createTransactionPool( @Override public void onInitialSyncCompleted() { LOG.info("Enabling transaction handling following initial sync"); + transactionPool.reset(); + transactionTracker.reset(); transactionPool.setEnabled(); transactionsMessageHandler.setEnabled(); pooledTransactionsMessageHandler.setEnabled(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java index 59fde3c4795..b230d39bf25 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/AbstractPendingTransactionsSorter.java @@ -134,6 +134,12 @@ public AbstractPendingTransactionsSorter( pendingTransactions::size); } + public void reset() { + pendingTransactions.clear(); + transactionsBySender.clear(); + lowestInvalidKnownNonceCache.reset(); + } + public void evictOldTransactions() { final Instant removeTransactionsBefore = clock.instant().minus(poolConfig.getPendingTxRetentionPeriod(), ChronoUnit.HOURS); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java index f5363815393..32708f26481 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/BaseFeePendingTransactionsSorter.java @@ -93,6 +93,13 @@ public BaseFeePendingTransactionsSorter( .thenComparing(PendingTransaction::getSequence) .reversed()); + @Override + public void reset() { + super.reset(); + prioritizedTransactionsStaticRange.clear(); + prioritizedTransactionsDynamicRange.clear(); + } + @Override public void manageBlockAdded(final Block block) { block.getHeader().getBaseFee().ifPresent(this::updateBaseFee); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java index 51743281fc4..1d9ad3184f1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/GasPricePendingTransactionsSorter.java @@ -52,6 +52,12 @@ public GasPricePendingTransactionsSorter( super(poolConfig, clock, metricsSystem, chainHeadHeaderSupplier); } + @Override + public void reset() { + super.reset(); + prioritizedTransactions.clear(); + } + @Override public void manageBlockAdded(final Block block) { // nothing to do diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/LowestInvalidNonceCache.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/LowestInvalidNonceCache.java index 4e084889675..ca4c3902047 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/LowestInvalidNonceCache.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/sorter/LowestInvalidNonceCache.java @@ -126,6 +126,11 @@ public String toString() { + '}'; } + public void reset() { + lowestInvalidKnownNonceBySender.clear(); + evictionOrder.clear(); + } + private static class InvalidNonceStatus implements Comparable { final Address address;