Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum RpcMethod {
CLIQUE_GET_SIGNER_METRICS("clique_getSignerMetrics"),
DEBUG_ACCOUNT_AT("debug_accountAt"),
DEBUG_METRICS("debug_metrics"),
DEBUG_RESYNC_WORLDSTATE("debug_resyncWorldState"),
Comment thread
garyschulte marked this conversation as resolved.
DEBUG_SET_HEAD("debug_setHead"),
DEBUG_REPLAY_BLOCK("debug_replayBlock"),
DEBUG_STORAGE_RANGE_AT("debug_storageRangeAt"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods;

import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;

public class DebugResyncWorldstate implements JsonRpcMethod {
private final Synchronizer synchronizer;
private final ProtocolSchedule protocolSchedule;
private final Blockchain blockchain;

public DebugResyncWorldstate(
final ProtocolSchedule protocolSchedule,
final Blockchain blockchain,
final Synchronizer synchronizer) {
this.synchronizer = synchronizer;
this.protocolSchedule = protocolSchedule;
this.blockchain = blockchain;
}

@Override
public String getName() {
return RpcMethod.DEBUG_RESYNC_WORLDSTATE.getMethodName();
}

@Override
public JsonRpcResponse response(final JsonRpcRequestContext request) {
protocolSchedule
.getByBlockNumber(blockchain.getChainHeadBlockNumber())
.getBadBlocksManager()
.reset();
return new JsonRpcSuccessResponse(
request.getRequest().getId(), synchronizer.resyncWorldState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugBatchSendRawTransaction;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugGetBadBlocks;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugMetrics;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugResyncWorldstate;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugSetHead;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugStandardTraceBadBlockToFile;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.DebugStandardTraceBlockToFile;
Expand All @@ -36,6 +37,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.processor.TransactionTracer;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.BlockResultFactory;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
Expand All @@ -53,6 +55,7 @@ public class DebugJsonRpcMethods extends ApiGroupJsonRpcMethods {
private final ProtocolSchedule protocolSchedule;
private final ObservableMetricsSystem metricsSystem;
private final TransactionPool transactionPool;
private final Synchronizer synchronizer;
private final Path dataDir;

DebugJsonRpcMethods(
Expand All @@ -61,12 +64,14 @@ public class DebugJsonRpcMethods extends ApiGroupJsonRpcMethods {
final ProtocolSchedule protocolSchedule,
final ObservableMetricsSystem metricsSystem,
final TransactionPool transactionPool,
final Synchronizer synchronizer,
final Path dataDir) {
this.blockchainQueries = blockchainQueries;
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.metricsSystem = metricsSystem;
this.transactionPool = transactionPool;
this.synchronizer = synchronizer;
this.dataDir = dataDir;
}

Expand All @@ -88,6 +93,7 @@ protected Map<String, JsonRpcMethod> create() {
new DebugAccountRange(blockchainQueries),
new DebugStorageRangeAt(blockchainQueries, blockReplay),
new DebugMetrics(metricsSystem),
new DebugResyncWorldstate(protocolSchedule, protocolContext.getBlockchain(), synchronizer),
new DebugTraceBlock(
() -> new BlockTracer(blockReplay),
ScheduleBasedBlockHeaderFunctions.create(protocolSchedule),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public Map<String, JsonRpcMethod> methods(
protocolSchedule,
metricsSystem,
transactionPool,
synchronizer,
dataDir),
new EeaJsonRpcMethods(
blockchainQueries, protocolSchedule, transactionPool, privacyParameters),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public void addBadBlock(final Block badBlock, final Optional<Throwable> cause) {
}
}

public void reset() {
this.badBlocks.invalidateAll();
this.badHeaders.invalidateAll();
this.latestValidHashes.invalidateAll();
}

Comment on lines +52 to +57

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this called?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* Return all invalid blocks
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();

boolean resyncWorldState();

long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener);

boolean unsubscribeSyncStatus(long observerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,7 +63,8 @@ public class DefaultSynchronizer implements Synchronizer, UnverifiedForkchoiceLi
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Optional<BlockPropagationManager> blockPropagationManager;
private final Optional<FastSyncDownloader<?>> fastSyncDownloader;
private final Function<Boolean, Optional<FastSyncDownloader<?>>> fastSyncFactory;
private Optional<FastSyncDownloader<?>> fastSyncDownloader;
private final Optional<FullSyncDownloader> fullSyncDownloader;
private final EthContext ethContext;
private final ProtocolContext protocolContext;
Expand Down Expand Up @@ -133,48 +135,57 @@ public DefaultSynchronizer(
terminationCondition));

if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock);
this.fastSyncFactory =
(isResync) ->
FastDownloaderFactory.create(
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock,
isResync);
} else if (SyncMode.X_CHECKPOINT.equals(syncConfig.getSyncMode())) {
this.fastSyncDownloader =
CheckpointDownloaderFactory.createCheckpointDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock);
this.fastSyncFactory =
(isResync) ->
CheckpointDownloaderFactory.createCheckpointDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock,
isResync);
} else {
this.fastSyncDownloader =
SnapDownloaderFactory.createSnapDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock);
this.fastSyncFactory =
(isResync) ->
SnapDownloaderFactory.createSnapDownloader(
new SnapPersistedContext(storageProvider),
pivotBlockSelector,
syncConfig,
dataDirectory,
protocolSchedule,
protocolContext,
metricsSystem,
ethContext,
worldStateStorage,
syncState,
clock,
isResync);
}

// create a non-resync fast sync downloader:
this.fastSyncDownloader = this.fastSyncFactory.apply(false);

metricsSystem.createLongGauge(
BesuMetricCategory.ETHEREUM,
"best_known_block_number",
Expand Down Expand Up @@ -209,7 +220,6 @@ public CompletableFuture<Void> start() {
CompletableFuture<Void> future;
if (fastSyncDownloader.isPresent()) {
future = fastSyncDownloader.get().start().thenCompose(this::handleSyncResult);

} else {
syncState.markInitialSyncPhaseAsDone();
enableFallbackNodeFinder();
Expand Down Expand Up @@ -305,6 +315,21 @@ public Optional<SyncStatus> getSyncStatus() {
return syncState.syncStatus();
}

@Override
public boolean resyncWorldState() {
// if sync is running currently, stop it and delete the fast sync state
if (fastSyncDownloader.isPresent() && running.get()) {
stop();
fastSyncDownloader.get().deleteFastSyncState();
}

// recreate fast sync with resync and start
this.syncState.markInitialSyncRestart();
this.fastSyncDownloader = this.fastSyncFactory.apply(true);
start();
return true;
}

@Override
public long subscribeSyncStatus(final SyncStatusListener listener) {
checkNotNull(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,25 @@
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeersTask;
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;

public class BackwardsSyncAlgorithm {
public class BackwardsSyncAlgorithm implements BesuEvents.InitialSyncCompletionListener {
private static final Logger LOG = getLogger(BackwardsSyncAlgorithm.class);

private final BackwardSyncContext context;
private final FinalBlockConfirmation finalBlockConfirmation;
private final AtomicReference<CountDownLatch> latch =
new AtomicReference<>(new CountDownLatch(1));
private volatile boolean finished = false;

public BackwardsSyncAlgorithm(
Expand Down Expand Up @@ -125,19 +129,16 @@ protected CompletableFuture<Void> executeForwardAsync() {

@VisibleForTesting
protected CompletableFuture<Void> waitForReady() {
final CountDownLatch latch = new CountDownLatch(1);
final long idTTD =
context.getSyncState().subscribeTTDReached(reached -> countDownIfReady(latch));
final long idIS =
context.getSyncState().subscribeCompletionReached(() -> countDownIfReady(latch));
return CompletableFuture.runAsync(() -> checkReadiness(latch, idTTD, idIS));
final long idTTD = context.getSyncState().subscribeTTDReached(reached -> countDownIfReady());
final long idIS = context.getSyncState().subscribeCompletionReached(this);
return CompletableFuture.runAsync(() -> checkReadiness(idTTD, idIS));
}

private void checkReadiness(final CountDownLatch latch, final long idTTD, final long idIS) {
private void checkReadiness(final long idTTD, final long idIS) {
try {
if (!context.isReady()) {
LOG.debug("Waiting for preconditions...");
final boolean await = latch.await(2, TimeUnit.MINUTES);
final boolean await = latch.get().await(2, TimeUnit.MINUTES);
if (await) {
LOG.debug("Preconditions meet, ensure at least one peer is connected");
waitForPeers(1).get();
Expand All @@ -156,9 +157,9 @@ private void checkReadiness(final CountDownLatch latch, final long idTTD, final
}
}

private void countDownIfReady(final CountDownLatch latch) {
private void countDownIfReady() {
if (context.isReady()) {
latch.countDown();
latch.get().countDown();
}
}

Expand All @@ -167,4 +168,14 @@ private CompletableFuture<Void> waitForPeers(final int count) {
WaitForPeersTask.create(context.getEthContext(), count, context.getMetricsSystem());
return waitForPeersTask.run();
}

@Override
public void onInitialSyncCompleted() {
countDownIfReady();
}

@Override
public void onInitialSyncRestart() {
latch.set(new CountDownLatch(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState,
final Clock clock) {
final Clock clock,
final boolean isResync) {

final Path fastSyncDataDirectory = dataDirectory.resolve(FAST_SYNC_FOLDER);
final FastSyncStateStorage fastSyncStateStorage =
Expand All @@ -79,7 +80,14 @@ public static Optional<FastSyncDownloader<?>> createCheckpointDownloader(

final FastSyncState fastSyncState =
fastSyncStateStorage.loadState(ScheduleBasedBlockHeaderFunctions.create(protocolSchedule));
if (fastSyncState.getPivotBlockHeader().isEmpty()

if (isResync) {
snapContext.clear();
worldStateStorage.clear();
}

if (!isResync
&& fastSyncState.getPivotBlockHeader().isEmpty()
&& protocolContext.getBlockchain().getChainHeadBlockNumber()
!= BlockHeader.GENESIS_BLOCK_NUMBER) {
LOG.info(
Expand Down
Loading